from dataclasses import asdict, dataclass from datetime import datetime import logging from typing import Dict, List from app.models.device import DeviceSchema, GroupModel from app.core.discovery import DiscoveryService logger = logging.getLogger(__name__) @dataclass(frozen=True) class DiscoveryApplyResult: found: int added: int updated: int removed_offline: int pending_removal: int online: int def to_dict(self) -> dict: return asdict(self) @dataclass(frozen=True) class DiscoverySnapshot: last_scan_at: str last_scan_mode: str summary: DiscoveryApplyResult def to_dict(self) -> dict: payload = asdict(self) payload["summary"] = self.summary.to_dict() return payload class StateManager: def __init__(self): # Храним устройства как Pydantic объекты self.devices: Dict[str, DeviceSchema] = {} # Группы как модели SQLAlchemy self.groups: Dict[str, GroupModel] = {} # Сколько подряд циклов discovery устройство не видно self._missing_scan_counts: Dict[str, int] = {} self.discovery_snapshot: DiscoverySnapshot | None = None def update_device(self, device_data: dict): """Обновляет или добавляет устройство в состояние.""" mac = device_data["mac"] current = self.devices.get(mac) device = DeviceSchema( id=mac, ip=device_data["ip"], name=current.name if current else f"WiZ {mac[-4:]}", room=current.room if current else "Default", ) self.devices[mac] = device self._missing_scan_counts.pop(mac, None) def apply_discovery_snapshot( self, found_devices: list[dict], *, remove_missing: bool, missing_threshold: int = 1, ) -> DiscoveryApplyResult: found_by_mac = {device["mac"]: device for device in found_devices} added = 0 updated = 0 for mac, device_data in found_by_mac.items(): if mac in self.devices: updated += 1 else: added += 1 self.update_device(device_data) removed_offline = 0 if remove_missing: for mac in list(self.devices): if mac in found_by_mac: continue missed_scans = self._missing_scan_counts.get(mac, 0) + 1 self._missing_scan_counts[mac] = missed_scans if missed_scans < missing_threshold: logger.info( "Устройство %s не ответило (%s/%s), оставляю до следующего цикла", mac, missed_scans, missing_threshold, ) continue self.devices.pop(mac, None) self._missing_scan_counts.pop(mac, None) removed_offline += 1 logger.info("Устройство %s не ответило -- убрано из списка", mac) return DiscoveryApplyResult( found=len(found_by_mac), added=added, updated=updated, removed_offline=removed_offline, pending_removal=len(self._missing_scan_counts), online=len(self.devices), ) def record_discovery(self, mode: str, result: DiscoveryApplyResult): self.discovery_snapshot = DiscoverySnapshot( last_scan_at=datetime.now().isoformat(), last_scan_mode=mode, summary=result, ) def get_discovery_snapshot(self) -> dict | None: if not self.discovery_snapshot: return None return self.discovery_snapshot.to_dict() def get_group_ips(self, group_id: str) -> List[str]: """Возвращает список IP всех ламп, входящих в группу.""" group = self.groups.get(group_id) if not group: return [] # Извлекаем IP по MAC-адресам, которые хранятся в группе return [ self.devices[d_id].ip for d_id in group.device_ids if d_id in self.devices ] state_manager = StateManager() discovery_service = DiscoveryService()