import json import asyncio import socket class WizDriver: PORT = 38899 SCENES = { # Динамические (меняют цвет/яркость во времени) "ocean": 1, "romance": 2, "party": 3, "fireplace": 5, "cozy": 6, "forest": 10, "pastel_colors": 11, "wake_up": 12, "bedtime": 13, "warm_white": 14, "daylight": 15, "cool_white": 16, "night_light": 17, "focus": 18, "relax": 19, "true_colors": 20, "tv_time": 21, "plant_growth": 22, "spring": 23, "summer": 24, "fall": 25, "deep_dive": 26, "jungle": 27, "mojito": 28, "club": 29, "christmas": 30, "halloween": 31, "candlelight": 32, "golden_white": 33, "pulse": 34, "steampunk": 35, } async def send_udp(self, ip: str, payload: dict): loop = asyncio.get_event_loop() with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.settimeout(2.0) data = json.dumps(payload).encode() await loop.run_in_executor(None, sock.sendto, data, (ip, self.PORT)) try: resp, _ = sock.recvfrom(1024) return json.loads(resp.decode()) except socket.timeout: return None async def set_pilot(self, ip: str, params: dict): payload = {"method": "setPilot", "params": params} return await self.send_udp(ip, payload) async def get_pilot(self, ip: str): payload = {"method": "getPilot", "params": {}} return await self.send_udp(ip, payload)