2023-03-10 16:10:49 +08:00
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--skip', type=int, default=0, help="skip x lines (unused)")
parser.add_argument('--drawfirst', action='store_true', help="draw image first (unused)")
2023-03-10 17:29:43 +08:00
parser.add_argument('--keepdrawing', action='store_true', help="keepdrawing, sleep for slp1")
2023-03-10 16:10:49 +08:00
parser.add_argument('--slp', type=float, default=0.0001, help="sleep x seconds after each ping")
parser.add_argument('--slp1', type=float, default=0.5, help="sleep x seconds after each frame")
2023-03-10 17:29:43 +08:00
parser.add_argument('--rotate', type=int, default=0, help="rotate image for x deg")
2023-03-10 16:17:46 +08:00
parser.add_argument('-i', '--image', type=str, default='1.png', help="input image")
2023-03-10 16:10:49 +08:00
args = parser.parse_args()
skip, slp, slp1, draw_img = args.skip, args.slp, args.slp1, args.drawfirst
import struct
class ICMP6:
def __init__(self, destination, id, sequence, payload=None,
payload_size=56, ttl=64, traffic_class=0):
if payload is not None:
payload_size = len(payload)
payload = b'\x00'*payload_size
self._destination = destination
self._id = id & 0xffff
self._sequence = sequence & 0xffff
self._payload = payload
self._payload_size = payload_size
self._ttl = ttl
self._traffic_class = traffic_class
self._time = 0
def _checksum(self, data):
Compute the checksum of an ICMP packet. Checksums are used to
verify the integrity of packets.
sum = 0
data += b'\x00'
for i in range(0, len(data) - 1, 2):
sum += (data[i] << 8) + data[i + 1]
sum = (sum & 0xffff) + (sum >> 16)
sum = ~sum & 0xffff
return sum
def _create_packet(self):
Build an ICMP packet from an identifier, a sequence number and
a payload.
This method returns the newly created ICMP header concatenated
to the payload passed in parameters.
id, sequence, payload = self._id, self._sequence, self._payload
checksum = 0
# Temporary ICMP header to compute the checksum
header = struct.pack('!2B3H', self._ICMP_ECHO_REQUEST, 0, checksum,
id, sequence)
checksum = self._checksum(header + payload)
# Definitive ICMP header
header = struct.pack('!2B3H', self._ICMP_ECHO_REQUEST, 0, checksum,
id, sequence)
return header + payload
import socket, traceback, time
from PIL import Image
icmp6pkt = ICMP6(None, 0xad, 0, b'')._create_packet()
import concurrent.futures
def send_icmp_mt(addrs, workers=10):
def sendto(addr):
with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_ICMPV6) as s:
s.sendto(icmp6pkt, (addr, 0))
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for addr in addrs:
executor.submit(sendto, addr)
executor.shutdown(wait=True, cancel_futures=False)
def send_icmp_st(addrs):
for addr in addrs:
with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_ICMPV6) as s:
s.sendto(icmp6pkt, (addr, 0))
except Exception:
send_icmp = send_icmp_st
headers = {'User-Agent': 'curl/7.88.1)'}
mkreq = lambda u: urllib.request.Request(u, data=None, headers=headers)
timeout = 60
import urllib.request
import contextlib
import base64
import io
import threading
img = None
imglock = threading.Lock()
def get_bg():
global img
while True:
with contextlib.closing(urllib.request.urlopen(mkreq('http://us2.g-load.eu:9090/stream'), timeout=timeout)) as resp:
for line in resp:
if line.startswith(b'data:'):
with imglock:
imgn = Image.open(io.BytesIO(base64.b64decode(line[5:-1])))
assert imgn.size == (X, Y)
if img is None:
img = imgn.convert(mode='RGBA')
assert imgn.mode == 'RGBA'
img.paste(imgn, (0, 0), imgn)
except Exception:
print('get_bg exit')
def start_bg_task():
threading.Thread(target=get_bg, daemon=True).start()
while img is None:
print('connection established')
import random
draw_seq = [(x, y) for x in range(X) for y in range(Y)]
ldraw = Image.new(mode="RGBA", size=(X, Y))
def draw(img):
global ldraw
send_icmp_q = []
start = time.time()
print('draw 1 frame in ', end='')
for x, y in draw_seq:
newpix = img.getpixel((x, y))
r, g, b, a = newpix
if a and newpix != ldraw.getpixel((x, y)):
startsend = time.time()
ldraw = img.copy()
end = time.time()
print(f"{1000*(end-start):.1f}ms {1000*(end-startsend):.1f}ms")
2023-03-10 16:52:44 +08:00
LANCZOS = Image.Resampling.LANCZOS if getattr(Image, 'Resampling', None) else Image.LANCZOS
2023-03-10 17:29:43 +08:00
to_draw = Image.open(args.image).convert('RGBA').rotate(args.rotate).resize((X, Y), LANCZOS)
while True:
2023-03-10 17:42:41 +08:00
ldraw = img
2023-03-10 17:29:43 +08:00
if not args.keepdrawing: