import asyncio import json import socket import logging import os import ipaddress from typing import List, Dict logger = logging.getLogger(__name__) class DiscoveryService: def __init__(self, port: int = 38899): self.port = port self.discover_msg = {"method": "getPilot", "params": {}} def _get_target_subnets(self) -> List[str]: """ Определяет список подсетей для сканирования. Приоритет: 1. Переменная окружения SCAN_NETWORK (можно через запятую: "192.168.0.0/24,192.168.1.0/24") 2. Автоопределение по дефолтному шлюзу """ env_network = os.getenv("SCAN_NETWORK") if env_network: return [s.strip() for s in env_network.split(",")] # Автоопределение (твой старый метод) try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: # Коннект не создает трафика, но заставляет ОС выбрать нужный интерфейс s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] network = ipaddress.IPv4Network(f"{local_ip}/24", strict=False) return [str(network)] except Exception as e: logger.error( f"Discovery Error: Не удалось определить подсеть автоматически: {e}" ) return ["192.168.1.0/24"] async def scan_network(self, timeout: float = 2.0) -> List[Dict]: subnets = self._get_target_subnets() found_devices = [] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) loop = asyncio.get_running_loop() message = json.dumps(self.discover_msg).encode() logger.debug(f"🚀 Начинаю сканирование сетей: {', '.join(subnets)}...") # Рассылаем запросы по всем целевым сетям for subnet in subnets: try: network = ipaddress.IPv4Network(subnet) for ip in network.hosts(): try: sock.sendto(message, (str(ip), self.port)) except Exception: continue except ValueError as e: logger.error(f"❌ Неверный формат подсети {subnet}: {e}") # Собираем ответы start_time = loop.time() while (loop.time() - start_time) < timeout: try: # Используем небольшой таймаут на чтение, чтобы успевать выходить из цикла data, addr = await asyncio.wait_for( loop.run_in_executor(None, sock.recvfrom, 1024), timeout=0.2 ) resp = json.loads(data.decode()) if "result" in resp: res = resp["result"] mac = res.get("mac") if mac: found_devices.append( { "mac": mac, "ip": addr[0], "state": { "on": res.get("state"), "dimming": res.get("dimming"), "temp": res.get("temp"), }, } ) logger.info(f" [+] Найдена лампа: {addr[0]} | MAC: {mac}") except (asyncio.TimeoutError, json.JSONDecodeError): continue except Exception: await asyncio.sleep(0.01) continue sock.close() # Фильтруем дубликаты return list({d["mac"]: d for d in found_devices}.values()) async def start_background_discovery(self, state_manager, interval=600): """Запускает бесконечный цикл сканирования.""" while True: try: found_devices = await self.scan_network() for dev_data in found_devices: state_manager.update_device(dev_data) logger.info( f"📡 Discovery: онлайн {len(state_manager.devices)} устройств" ) except Exception as e: logger.error(f"❌ Discovery background error: {e}") await asyncio.sleep(interval)