Files
ignis-core/app/core/state.py
2026-05-21 21:47:33 +07:00

135 lines
4.4 KiB
Python

from dataclasses import asdict, dataclass
from datetime import datetime
import logging
from typing import Dict, List
from app.models.device import DeviceSchema, GroupModel
from app.core.discovery import DiscoveryService
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class DiscoveryApplyResult:
found: int
added: int
updated: int
removed_offline: int
pending_removal: int
online: int
def to_dict(self) -> dict:
return asdict(self)
@dataclass(frozen=True)
class DiscoverySnapshot:
last_scan_at: str
last_scan_mode: str
summary: DiscoveryApplyResult
def to_dict(self) -> dict:
payload = asdict(self)
payload["summary"] = self.summary.to_dict()
return payload
class StateManager:
def __init__(self):
# Храним устройства как Pydantic объекты
self.devices: Dict[str, DeviceSchema] = {}
# Группы как модели SQLAlchemy
self.groups: Dict[str, GroupModel] = {}
# Сколько подряд циклов discovery устройство не видно
self._missing_scan_counts: Dict[str, int] = {}
self.discovery_snapshot: DiscoverySnapshot | None = None
def update_device(self, device_data: dict):
"""Обновляет или добавляет устройство в состояние."""
mac = device_data["mac"]
current = self.devices.get(mac)
device = DeviceSchema(
id=mac,
ip=device_data["ip"],
name=current.name if current else f"WiZ {mac[-4:]}",
room=current.room if current else "Default",
)
self.devices[mac] = device
self._missing_scan_counts.pop(mac, None)
def apply_discovery_snapshot(
self,
found_devices: list[dict],
*,
remove_missing: bool,
missing_threshold: int = 1,
) -> DiscoveryApplyResult:
found_by_mac = {device["mac"]: device for device in found_devices}
added = 0
updated = 0
for mac, device_data in found_by_mac.items():
if mac in self.devices:
updated += 1
else:
added += 1
self.update_device(device_data)
removed_offline = 0
if remove_missing:
for mac in list(self.devices):
if mac in found_by_mac:
continue
missed_scans = self._missing_scan_counts.get(mac, 0) + 1
self._missing_scan_counts[mac] = missed_scans
if missed_scans < missing_threshold:
logger.info(
"Устройство %s не ответило (%s/%s), оставляю до следующего цикла",
mac,
missed_scans,
missing_threshold,
)
continue
self.devices.pop(mac, None)
self._missing_scan_counts.pop(mac, None)
removed_offline += 1
logger.info("Устройство %s не ответило -- убрано из списка", mac)
return DiscoveryApplyResult(
found=len(found_by_mac),
added=added,
updated=updated,
removed_offline=removed_offline,
pending_removal=len(self._missing_scan_counts),
online=len(self.devices),
)
def record_discovery(self, mode: str, result: DiscoveryApplyResult):
self.discovery_snapshot = DiscoverySnapshot(
last_scan_at=datetime.now().isoformat(),
last_scan_mode=mode,
summary=result,
)
def get_discovery_snapshot(self) -> dict | None:
if not self.discovery_snapshot:
return None
return self.discovery_snapshot.to_dict()
def get_group_ips(self, group_id: str) -> List[str]:
"""Возвращает список IP всех ламп, входящих в группу."""
group = self.groups.get(group_id)
if not group:
return []
# Извлекаем IP по MAC-адресам, которые хранятся в группе
return [
self.devices[d_id].ip for d_id in group.device_ids if d_id in self.devices
]
state_manager = StateManager()
discovery_service = DiscoveryService()