Files
ignis-core/app/core/discovery.py
Артём Кокос c7adc24b07 Initial commit
2026-02-12 21:52:24 +07:00

89 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
import socket
import logging
import subprocess
import ipaddress
from typing import List, Dict
logger = logging.getLogger(__name__)
class DiscoveryService:
def __init__(self, port: int = 38899):
self.port = port
self.discover_msg = {"method": "getPilot", "params": {}}
def _get_local_subnet(self) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
network = ipaddress.IPv4Network(f"{local_ip}/24", strict=False)
return str(network)
except Exception as e:
logger.error(f"Linux Discovery Error: Не удалось определить подсеть: {e}")
return "192.168.1.0/24"
async def scan_network(self, timeout: float = 1.5) -> List[Dict]:
subnet = self._get_local_subnet()
network = ipaddress.IPv4Network(subnet)
found_devices = []
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setblocking(False)
loop = asyncio.get_running_loop()
message = json.dumps(self.discover_msg).encode()
logger.info(f"🚀 Начинаю сканирование сети {subnet}...")
# Рассылаем запросы
for ip in network.hosts():
try:
sock.sendto(message, (str(ip), self.port))
except Exception:
continue
start_time = loop.time()
while (loop.time() - start_time) < timeout:
try:
data, addr = await asyncio.wait_for(
loop.run_in_executor(None, sock.recvfrom, 1024), timeout=0.1
)
resp = json.loads(data.decode())
if "result" in resp:
res = resp["result"]
mac = res.get("mac")
ip_from_socket = addr[0]
if mac:
device_info = {
"mac": mac,
"ip": ip_from_socket,
"state": {
"on": res.get("state"),
"dimming": res.get("dimming"),
"temp": res.get("temp"),
},
}
found_devices.append(device_info)
# Красивый лог с IP и MAC
logger.info(
f" [+] Найдена лампа: {ip_from_socket} | MAC: {mac}"
)
except (asyncio.TimeoutError, json.JSONDecodeError):
continue
except Exception as e:
# На Linux Errno 11 (EAGAIN) тут может выскочить, если буфер пуст
await asyncio.sleep(0.01)
continue
sock.close()
# Фильтруем дубликаты (если лампа ответила несколько раз)
unique_devices = list({d["mac"]: d for d in found_devices}.values())
return unique_devices