import asyncio import ipaddress import json import logging import os import socket import struct from dataclasses import dataclass from typing import Dict, List try: import fcntl except ImportError: # pragma: no cover - не на Linux fcntl = None logger = logging.getLogger(__name__) ENV_MIN_PREFIX_LEN = 16 AUTO_MIN_PREFIX_LEN = 24 DEFAULT_DISCOVERY_INTERVAL_SECONDS = 600 DEFAULT_BACKGROUND_MISSING_THRESHOLD = 2 EXCLUDED_INTERFACE_PREFIXES = ( "lo", "docker", "br-", "veth", "virbr", "tun", "tap", "wg", "tailscale", "zt", "utun", "ppp", ) SIOCGIFADDR = 0x8915 SIOCGIFNETMASK = 0x891B @dataclass(frozen=True) class InterfaceSubnet: name: str address: ipaddress.IPv4Address network: ipaddress.IPv4Network class DiscoveryService: def __init__(self, port: int = 38899): self.port = port self.discover_msg = {"method": "getPilot", "params": {}} self._scan_lock = asyncio.Lock() def _env_min_prefix_len(self) -> int: return int(os.getenv("DISCOVERY_ENV_MIN_PREFIX_LEN", ENV_MIN_PREFIX_LEN)) def _auto_min_prefix_len(self) -> int: return int(os.getenv("DISCOVERY_AUTO_MIN_PREFIX_LEN", AUTO_MIN_PREFIX_LEN)) def _background_interval_seconds(self) -> int: return int( os.getenv( "DISCOVERY_INTERVAL_SECONDS", DEFAULT_DISCOVERY_INTERVAL_SECONDS ) ) def _background_missing_threshold(self) -> int: return int( os.getenv( "DISCOVERY_BACKGROUND_MISSING_THRESHOLD", DEFAULT_BACKGROUND_MISSING_THRESHOLD, ) ) def _parse_env_subnets(self, value: str) -> List[str]: subnets: list[str] = [] min_prefix_len = self._env_min_prefix_len() for raw_subnet in value.split(","): subnet = raw_subnet.strip() if not subnet: continue try: network = ipaddress.IPv4Network(subnet, strict=False) except ValueError as exc: logger.error("Неверный формат подсети %s: %s", subnet, exc) continue if network.prefixlen < min_prefix_len: logger.warning( "Подсеть %s слишком большая (/%s), ограничиваю до /%s", subnet, network.prefixlen, min_prefix_len, ) network = ipaddress.IPv4Network( f"{network.network_address}/{min_prefix_len}", strict=False ) subnets.append(str(network)) return subnets def _interface_subnets(self) -> list[InterfaceSubnet]: if fcntl is None: return [] candidates: list[InterfaceSubnet] = [] with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: for _, interface_name in socket.if_nameindex(): ifreq = struct.pack("256s", interface_name.encode("utf-8")[:15]) try: address = socket.inet_ntoa( fcntl.ioctl(sock.fileno(), SIOCGIFADDR, ifreq)[20:24] ) netmask = socket.inet_ntoa( fcntl.ioctl(sock.fileno(), SIOCGIFNETMASK, ifreq)[20:24] ) except OSError: continue ipv4 = ipaddress.IPv4Address(address) if ipv4.is_loopback or ipv4.is_link_local: continue network = ipaddress.IPv4Network(f"{address}/{netmask}", strict=False) candidates.append( InterfaceSubnet( name=interface_name, address=ipv4, network=network, ) ) return candidates def _is_excluded_interface(self, interface_name: str) -> bool: lowered = interface_name.lower() return lowered.startswith(EXCLUDED_INTERFACE_PREFIXES) def _normalize_auto_network( self, candidate: InterfaceSubnet ) -> ipaddress.IPv4Network: min_prefix_len = self._auto_min_prefix_len() target_prefix_len = max(candidate.network.prefixlen, min_prefix_len) if target_prefix_len != candidate.network.prefixlen: logger.info( "Авто-discovery: подсеть %s (%s) шире /%s, сканирую локальный сегмент /%s", candidate.network, candidate.name, min_prefix_len, target_prefix_len, ) return ipaddress.IPv4Network( f"{candidate.address}/{target_prefix_len}", strict=False ) def _collect_auto_subnets(self) -> list[str]: candidates = self._interface_subnets() if not candidates: return [] private_candidates = [candidate for candidate in candidates if candidate.address.is_private] usable_candidates = private_candidates or candidates preferred_candidates = [ candidate for candidate in usable_candidates if not self._is_excluded_interface(candidate.name) ] selected_candidates = preferred_candidates or usable_candidates subnets: list[str] = [] seen: set[str] = set() for candidate in selected_candidates: normalized = str(self._normalize_auto_network(candidate)) if normalized in seen: continue seen.add(normalized) subnets.append(normalized) if subnets: logger.info( "Авто-discovery: выбраны подсети %s", ", ".join(subnets), ) return subnets def _fallback_subnet(self) -> list[str]: try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.connect(("8.8.8.8", 80)) local_ip = sock.getsockname()[0] except Exception as exc: logger.error( "Discovery Error: Не удалось определить подсеть автоматически: %s", exc, ) return ["192.168.1.0/24"] network = ipaddress.IPv4Network( f"{local_ip}/{self._auto_min_prefix_len()}", strict=False, ) logger.info( "Авто-discovery fallback: использую локальный сегмент %s", network ) return [str(network)] def _get_target_subnets(self) -> List[str]: env_network = os.getenv("SCAN_NETWORK", "").strip() if env_network: subnets = self._parse_env_subnets(env_network) return subnets if subnets else ["192.168.1.0/24"] auto_subnets = self._collect_auto_subnets() if auto_subnets: return auto_subnets return self._fallback_subnet() async def scan_network(self, timeout: float = 2.0) -> List[Dict]: subnets = self._get_target_subnets() found_devices = [] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) loop = asyncio.get_running_loop() message = json.dumps(self.discover_msg).encode() logger.debug("Начинаю сканирование сетей: %s...", ", ".join(subnets)) try: for subnet in subnets: try: network = ipaddress.IPv4Network(subnet) for ip in network.hosts(): try: sock.sendto(message, (str(ip), self.port)) except Exception: continue except ValueError as exc: logger.error("Неверный формат подсети %s: %s", subnet, exc) start_time = loop.time() while (loop.time() - start_time) < timeout: try: data, addr = await asyncio.wait_for( loop.run_in_executor(None, sock.recvfrom, 1024), timeout=0.2 ) resp = json.loads(data.decode()) if "result" not in resp: continue result = resp["result"] mac = result.get("mac") if not mac: continue found_devices.append( { "mac": mac, "ip": addr[0], "state": { "on": result.get("state"), "dimming": result.get("dimming"), "temp": result.get("temp"), }, } ) logger.info(" [+] Найдена лампа: %s | MAC: %s", addr[0], mac) except (asyncio.TimeoutError, json.JSONDecodeError): continue except Exception: await asyncio.sleep(0.01) continue finally: sock.close() return list({device["mac"]: device for device in found_devices}.values()) async def _refresh_devices( self, state_manager, *, mode: str, remove_missing: bool, missing_threshold: int, timeout: float = 2.0, ): async with self._scan_lock: found_devices = await self.scan_network(timeout=timeout) result = state_manager.apply_discovery_snapshot( found_devices, remove_missing=remove_missing, missing_threshold=missing_threshold, ) logger.info( "Discovery (%s): found=%s added=%s updated=%s removed=%s pending_removal=%s online=%s", mode, result.found, result.added, result.updated, result.removed_offline, result.pending_removal, result.online, ) return result async def startup_refresh(self, state_manager, timeout: float = 2.0): return await self._refresh_devices( state_manager, mode="startup", remove_missing=True, missing_threshold=1, timeout=timeout, ) async def manual_refresh(self, state_manager, timeout: float = 2.0): return await self._refresh_devices( state_manager, mode="manual", remove_missing=True, missing_threshold=1, timeout=timeout, ) async def background_refresh(self, state_manager, timeout: float = 2.0): return await self._refresh_devices( state_manager, mode="background", remove_missing=True, missing_threshold=self._background_missing_threshold(), timeout=timeout, ) async def start_background_discovery(self, state_manager, interval: int | None = None): interval_seconds = interval or self._background_interval_seconds() while True: await asyncio.sleep(interval_seconds) try: await self.background_refresh(state_manager) except Exception as exc: logger.error("Discovery background error: %s", exc)