kioubit-dn42-v6-canvas-send/a.py
2023-03-10 16:10:49 +08:00

267 lines
7.7 KiB
Python

import struct
class ICMP6:
_ICMP_ECHO_REQUEST = 128
def __init__(self, destination, id, sequence, payload=None,
payload_size=56, ttl=64, traffic_class=0):
if payload is not None:
payload_size = len(payload)
else:
payload = b'\x00'*payload_size
self._destination = destination
self._id = id & 0xffff
self._sequence = sequence & 0xffff
self._payload = payload
self._payload_size = payload_size
self._ttl = ttl
self._traffic_class = traffic_class
self._time = 0
def _checksum(self, data):
'''
Compute the checksum of an ICMP packet. Checksums are used to
verify the integrity of packets.
'''
sum = 0
data += b'\x00'
for i in range(0, len(data) - 1, 2):
sum += (data[i] << 8) + data[i + 1]
sum = (sum & 0xffff) + (sum >> 16)
sum = ~sum & 0xffff
return sum
def _create_packet(self):
'''
Build an ICMP packet from an identifier, a sequence number and
a payload.
This method returns the newly created ICMP header concatenated
to the payload passed in parameters.
'''
id, sequence, payload = self._id, self._sequence, self._payload
checksum = 0
# Temporary ICMP header to compute the checksum
header = struct.pack('!2B3H', self._ICMP_ECHO_REQUEST, 0, checksum,
id, sequence)
checksum = self._checksum(header + payload)
# Definitive ICMP header
header = struct.pack('!2B3H', self._ICMP_ECHO_REQUEST, 0, checksum,
id, sequence)
return header + payload
import sys
skip = int(sys.argv[1]) if sys.argv[1:] else 0
slp = float(sys.argv[2]) if sys.argv[2:] else 0.0001
slp1 = float(sys.argv[3]) if sys.argv[3:] else 0.5
#draw_img = int(sys.argv[4]) if sys.argv[4:] else 0
X=512
Y=512
import socket, traceback, time
from PIL import Image
#img = Image.open('1.png')
#img = img.resize((X, Y), Image.Resampling.LANCZOS)
#img.save('out.png')
#img = img.convert('RGBA')
imgdvd = Image.open('dvd.png')
imgdvd = imgdvd.convert('RGBA')
icmp6pkt = ICMP6(None, 0xad, 0, b'')._create_packet()
import concurrent.futures
def send_icmp_mt(addrs, workers=10):
def sendto(addr):
with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_ICMPV6) as s:
s.sendto(icmp6pkt, (addr, 0))
time.sleep(slp)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
for addr in addrs:
executor.submit(sendto, addr)
executor.shutdown(wait=True, cancel_futures=False)
def send_icmp_st(addrs):
try:
for addr in addrs:
with socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_ICMPV6) as s:
s.sendto(icmp6pkt, (addr, 0))
time.sleep(slp)
except Exception:
traceback.print_exc()
send_icmp = send_icmp_st
headers = {'User-Agent': 'curl/7.88.1)'}
mkreq = lambda u: urllib.request.Request(u, data=None, headers=headers)
timeout = 60
import urllib.request
import contextlib
import base64
import io
import threading
img = None
imglock = threading.Lock()
def get_bg():
global img
while True:
try:
with contextlib.closing(urllib.request.urlopen(mkreq('http://us2.g-load.eu:9090/stream'), timeout=timeout)) as resp:
for line in resp:
if line.startswith(b'data:'):
with imglock:
imgn = Image.open(io.BytesIO(base64.b64decode(line[5:-1])))
assert imgn.size == (X, Y)
#print(imgn.mode)
if img is None:
img = imgn.convert(mode='RGBA')
else:
assert imgn.mode == 'RGBA'
img.paste(imgn, (0, 0), imgn)
except Exception:
traceback.print_exc()
print('get_bg exit')
time.sleep(3)
def start_bg_task():
threading.Thread(target=get_bg, daemon=True).start()
while img is None:
time.sleep(1)
print('connection established')
import random
Dx, Dy = imgdvd.size
sx = random.randint(0, X-1-Dx)
sy = random.randint(0, Y-1-Dy)
fx = random.choice((1, -1))
fy = random.choice((1, -1))
#import numpy as np
#draw_seq = np.arange(X*Y)
#np.random.shuffle(draw_seq)
#def _spr(rangex):
# maxidx = len(rangex) - 1
# ret = []
# mid = maxidx // 2
# dl = mid
# dr = mid
# ret.append(mid)
# while True:
# if dr < maxidx:
# dr += 1
# ret.append(dr)
# if dl > 0:
# dl -= 1
# ret.append(dl)
# if dl <= 0 and dr >= maxidx:
# assert len(ret) - 1 == maxidx
# return ret
#draw_seq = [(i // X, i % X) for i in range(X*Y)]
#draw_seq = [(x, y) for x in _spr(range(X)) for y in _spr(range(Y))]
draw_seq = [(x, y) for x in range(X) for y in range(Y)]
#random.shuffle(draw_seq)
start_bg_task()
ldraw = Image.new(mode="RGBA", size=(X, Y))
def draw(img):
global ldraw
send_icmp_q = []
start = time.time()
print('draw 1 frame in ', end='')
#for x in range(X):
for x, y in draw_seq:
#for y in range(Y):
#if x < skip:
# continue
newpix = img.getpixel((x, y))
r, g, b, a = newpix
if a and newpix != ldraw.getpixel((x, y)):
send_icmp_q.append(f"fdcf:8538:9ad5:3333:{x:x}:{y:x}:11{r:02x}:{g:02x}{b:02x}")
#print(f"fdcf:8538:9ad5:3333:{x:x}:{y:x}:11{r:02x}:{g:02x}{b:02x}")
#time.sleep(slp)
startsend = time.time()
send_icmp(send_icmp_q)
ldraw = img.copy()
end = time.time()
print(f"{1000*(end-start):.1f}ms {1000*(end-startsend):.1f}ms")
D_RGB = [0, 0, 0]
imgdvdc = None
def update_drgb():
global imgdvdc
D_RGB.clear()
D_RGB.extend([random.randint(50, 250), random.randint(50, 250), random.randint(50, 250)])
imgdvdc = imgdvd.copy()
for x in range(Dx):
for y in range(Dy):
_, _, _, a = imgdvd.getpixel((x, y))
imgdvdc.putpixel((x, y), (*D_RGB, a))
update_drgb()
import collections
oldn = collections.deque()
for _ in range(5):
oldn.append(((0, 0), Image.new(mode="RGBA", size=(Dx, Dy))))
assert len(oldn) >= 3
def blend(doexit=False):
global img
with imglock:
for old1 in oldn:
img.paste(old1[1], old1[0], old1[1])
new = img.crop((sx, sy, sx+Dx, sy+Dy))
oldn.append(((sx, sy), new.copy()))
oldn.popleft()
new.paste(imgdvdc, (0, 0), imgdvdc)
new1 = Image.new(mode="RGBA", size=(X, Y))
#new1 = img.copy()
old2 = oldn[-2]
new1.paste(old2[1], old2[0], old2[1])
if doexit:
old3 = oldn[-3]
new1.paste(old3[1], old3[0], old3[1])
return new1
new1.paste(new, (sx, sy), new)
#new1.save('/tmp/2.png')
#time.sleep(0.5)
return new1
STEP=5
#if draw_img:
# draw(img)
try:
while True:
draw(blend())
if fx > 0:
if sx + Dx >= X:
fx = -fx
update_drgb()
else:
if sx <= 0:
fx = -fx
update_drgb()
if fy > 0:
if sy + Dy >= Y:
fy = -fy
update_drgb()
else:
if sy <= 0:
fy = -fy
update_drgb()
sx += STEP * fx
sy += STEP * fy
sx = sx if sx >= 0 else 0
sx = sx if sx + Dx < X else X - Dx
sy = sy if sy >= 0 else 0
sy = sy if sy + Dy < Y else Y - Dy
time.sleep(slp1)
except KeyboardInterrupt:
print('exit...')
ldraw = Image.new(mode="RGBA", size=(X, Y))
draw(blend(doexit=True))
time.sleep(1)