Files
ignis-core/app/api/routes/devices.py
Artem Kokos b6b25fa2a1 BBFbyOpus
2026-04-01 22:51:24 +07:00

86 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from app.core.state import state_manager, discovery_service
from app.core.database import async_session
from app.models.device import GroupModel, GroupCreateSchema
from app.api.deps import verify_token, require_admin
from app.drivers.wiz import WizDriver
logger = logging.getLogger(__name__)
# Создаем роутер с защитой
router = APIRouter(dependencies=[Depends(verify_token)])
wiz = WizDriver()
@router.get("")
async def get_all_devices():
return state_manager.devices
@router.get("/groups")
async def get_groups():
return state_manager.groups
@router.get("/scenes")
async def get_scenes():
return wiz.SCENES
@router.post("/groups", dependencies=[Depends(require_admin)])
async def create_group(data: GroupCreateSchema):
async with async_session() as session:
existing = await session.get(GroupModel, data.id)
if existing:
raise HTTPException(status_code=400, detail="Группа с таким ID уже есть")
new_group = GroupModel(id=data.id, name=data.name, device_ids=data.macs)
session.add(new_group)
await session.commit()
# Обновляем атрибуты из БД, чтобы избежать DetachedInstanceError
await session.refresh(new_group)
state_manager.groups[data.id] = new_group
return {"status": "created", "group": data.name}
@router.delete("/groups/{group_id}", dependencies=[Depends(require_admin)])
async def delete_group(group_id: str):
async with async_session() as session:
result = await session.execute(
select(GroupModel).where(GroupModel.id == group_id)
)
group = result.scalar_one_or_none()
if not group:
raise HTTPException(status_code=404, detail="Группа не найдена")
await session.delete(group)
await session.commit()
state_manager.groups.pop(group_id, None)
return {"status": "deleted", "id": group_id}
@router.post("/rescan", dependencies=[Depends(require_admin)])
async def rescan_network():
found_devices = await discovery_service.scan_network()
# MAC-адреса найденных ламп
found_macs = {dev["mac"] for dev in found_devices}
# Удаляем устройства, которые не ответили (оффлайн)
offline_macs = [mac for mac in state_manager.devices if mac not in found_macs]
for mac in offline_macs:
del state_manager.devices[mac]
logger.info(f"Устройство {mac} не ответило -- убрано из списка")
# Обновляем/добавляем найденные
for dev_data in found_devices:
state_manager.update_device(dev_data)
return {
"status": "ok",
"found": len(found_macs),
"removed_offline": len(offline_macs),
}