109 lines
3.6 KiB
Python
109 lines
3.6 KiB
Python
from dataclasses import asdict, dataclass
|
|
import logging
|
|
from typing import Dict, List
|
|
|
|
from app.models.device import DeviceSchema, GroupModel
|
|
from app.core.discovery import DiscoveryService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class DiscoveryApplyResult:
|
|
found: int
|
|
added: int
|
|
updated: int
|
|
removed_offline: int
|
|
pending_removal: int
|
|
online: int
|
|
|
|
def to_dict(self) -> dict:
|
|
return asdict(self)
|
|
|
|
|
|
class StateManager:
|
|
def __init__(self):
|
|
# Храним устройства как Pydantic объекты
|
|
self.devices: Dict[str, DeviceSchema] = {}
|
|
# Группы как модели SQLAlchemy
|
|
self.groups: Dict[str, GroupModel] = {}
|
|
# Сколько подряд циклов discovery устройство не видно
|
|
self._missing_scan_counts: Dict[str, int] = {}
|
|
|
|
def update_device(self, device_data: dict):
|
|
"""Обновляет или добавляет устройство в состояние."""
|
|
mac = device_data["mac"]
|
|
current = self.devices.get(mac)
|
|
device = DeviceSchema(
|
|
id=mac,
|
|
ip=device_data["ip"],
|
|
name=current.name if current else f"WiZ {mac[-4:]}",
|
|
room=current.room if current else "Default",
|
|
)
|
|
self.devices[mac] = device
|
|
self._missing_scan_counts.pop(mac, None)
|
|
|
|
def apply_discovery_snapshot(
|
|
self,
|
|
found_devices: list[dict],
|
|
*,
|
|
remove_missing: bool,
|
|
missing_threshold: int = 1,
|
|
) -> DiscoveryApplyResult:
|
|
found_by_mac = {device["mac"]: device for device in found_devices}
|
|
|
|
added = 0
|
|
updated = 0
|
|
for mac, device_data in found_by_mac.items():
|
|
if mac in self.devices:
|
|
updated += 1
|
|
else:
|
|
added += 1
|
|
self.update_device(device_data)
|
|
|
|
removed_offline = 0
|
|
if remove_missing:
|
|
for mac in list(self.devices):
|
|
if mac in found_by_mac:
|
|
continue
|
|
|
|
missed_scans = self._missing_scan_counts.get(mac, 0) + 1
|
|
self._missing_scan_counts[mac] = missed_scans
|
|
if missed_scans < missing_threshold:
|
|
logger.info(
|
|
"Устройство %s не ответило (%s/%s), оставляю до следующего цикла",
|
|
mac,
|
|
missed_scans,
|
|
missing_threshold,
|
|
)
|
|
continue
|
|
|
|
self.devices.pop(mac, None)
|
|
self._missing_scan_counts.pop(mac, None)
|
|
removed_offline += 1
|
|
logger.info("Устройство %s не ответило -- убрано из списка", mac)
|
|
|
|
return DiscoveryApplyResult(
|
|
found=len(found_by_mac),
|
|
added=added,
|
|
updated=updated,
|
|
removed_offline=removed_offline,
|
|
pending_removal=len(self._missing_scan_counts),
|
|
online=len(self.devices),
|
|
)
|
|
|
|
def get_group_ips(self, group_id: str) -> List[str]:
|
|
"""Возвращает список IP всех ламп, входящих в группу."""
|
|
group = self.groups.get(group_id)
|
|
if not group:
|
|
return []
|
|
|
|
# Извлекаем IP по MAC-адресам, которые хранятся в группе
|
|
return [
|
|
self.devices[d_id].ip for d_id in group.device_ids if d_id in self.devices
|
|
]
|
|
|
|
|
|
state_manager = StateManager()
|
|
discovery_service = DiscoveryService()
|