Files
ignis-core/app/core/discovery.py
2026-02-24 21:21:20 +07:00

116 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import socket
import logging
import os
import ipaddress
from typing import List, Dict
logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, port: int = 38899):
self.port = port
self.discover_msg = {"method": "getPilot", "params": {}}
def _get_target_subnets(self) -> List[str]:
"""
Определяет список подсетей для сканирования.
Приоритет:
1. Переменная окружения SCAN_NETWORK (можно через запятую: "192.168.0.0/24,192.168.1.0/24")
2. Автоопределение по дефолтному шлюзу
"""
env_network = os.getenv("SCAN_NETWORK")
if env_network:
return [s.strip() for s in env_network.split(",")]
# Автоопределение (твой старый метод)
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# Коннект не создает трафика, но заставляет ОС выбрать нужный интерфейс
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
network = ipaddress.IPv4Network(f"{local_ip}/24", strict=False)
return [str(network)]
except Exception as e:
logger.error(
f"Discovery Error: Не удалось определить подсеть автоматически: {e}"
)
return ["192.168.1.0/24"]
async def scan_network(self, timeout: float = 2.0) -> List[Dict]:
subnets = self._get_target_subnets()
found_devices = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
loop = asyncio.get_running_loop()
message = json.dumps(self.discover_msg).encode()
logger.debug(f"🚀 Начинаю сканирование сетей: {', '.join(subnets)}...")
# Рассылаем запросы по всем целевым сетям
for subnet in subnets:
try:
network = ipaddress.IPv4Network(subnet)
for ip in network.hosts():
try:
sock.sendto(message, (str(ip), self.port))
except Exception:
continue
except ValueError as e:
logger.error(f"❌ Неверный формат подсети {subnet}: {e}")
# Собираем ответы
start_time = loop.time()
while (loop.time() - start_time) < timeout:
try:
# Используем небольшой таймаут на чтение, чтобы успевать выходить из цикла
data, addr = await asyncio.wait_for(
loop.run_in_executor(None, sock.recvfrom, 1024), timeout=0.2
)
resp = json.loads(data.decode())
if "result" in resp:
res = resp["result"]
mac = res.get("mac")
if mac:
found_devices.append(
{
"mac": mac,
"ip": addr[0],
"state": {
"on": res.get("state"),
"dimming": res.get("dimming"),
"temp": res.get("temp"),
},
}
)
logger.info(f" [+] Найдена лампа: {addr[0]} | MAC: {mac}")
except (asyncio.TimeoutError, json.JSONDecodeError):
continue
except Exception:
await asyncio.sleep(0.01)
continue
sock.close()
# Фильтруем дубликаты
return list({d["mac"]: d for d in found_devices}.values())
async def start_background_discovery(self, state_manager, interval=600):
"""Запускает бесконечный цикл сканирования."""
while True:
try:
found_devices = await self.scan_network()
for dev_data in found_devices:
state_manager.update_device(dev_data)
logger.info(
f"📡 Discovery: онлайн {len(state_manager.devices)} устройств"
)
except Exception as e:
logger.error(f"❌ Discovery background error: {e}")
await asyncio.sleep(interval)