Files
ignis-core/app/api/routes/control.py
Артём Кокос 765bfd3201 Refactor: Clean main.py
2026-02-21 13:34:47 +07:00

92 lines
2.8 KiB
Python

import asyncio
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from app.core.state import state_manager
from app.drivers.wiz import WizDriver
from app.api.deps import verify_token
router = APIRouter(dependencies=[Depends(verify_token)])
wiz = WizDriver()
@router.post("/device/{device_id}")
async def control_device(
device_id: str,
state: Optional[bool] = None,
brightness: Optional[int] = None,
scene: Optional[str] = None,
temp: Optional[int] = None,
r: Optional[int] = None,
g: Optional[int] = None,
b: Optional[int] = None,
):
device = state_manager.devices.get(device_id)
if not device:
raise HTTPException(status_code=404, detail="Лампа не в сети")
params = {}
if state is not None:
params["state"] = state
if brightness is not None:
params["dimming"] = brightness
if scene and scene in wiz.SCENES:
params["sceneId"] = wiz.SCENES[scene]
elif temp is not None:
params["temp"] = temp
elif any(v is not None for v in [r, g, b]):
params["r"], params["g"], params["b"] = r or 0, g or 0, b or 0
if not params:
raise HTTPException(status_code=400, detail="Никаких команд не передано")
result = await wiz.set_pilot(device.ip, params)
return {"device_id": device_id, "applied": params, "result": result}
@router.post("/group/{group_id}")
async def control_group(
group_id: str,
state: Optional[bool] = None,
brightness: Optional[int] = None,
scene: Optional[str] = None,
temp: Optional[int] = None,
r: Optional[int] = None,
g: Optional[int] = None,
b: Optional[int] = None,
):
ips = state_manager.get_group_ips(group_id)
if not ips:
raise HTTPException(status_code=404, detail="Группа не найдена или оффлайн")
params = {}
if state is not None:
params["state"] = state
if brightness is not None:
params["dimming"] = brightness
if scene and scene in wiz.SCENES:
params["sceneId"] = wiz.SCENES[scene]
elif temp is not None:
params["temp"] = temp
elif any(v is not None for v in [r, g, b]):
params["r"], params["g"], params["b"] = r or 0, g or 0, b or 0
tasks = [wiz.set_pilot(ip, params) for ip in ips]
await asyncio.gather(*tasks, return_exceptions=True)
return {"status": "ok", "applied": params, "sent_to": ips}
@router.post("/device/{device_id}/blink")
async def blink_device(device_id: str):
device = state_manager.devices.get(device_id)
if not device:
raise HTTPException(status_code=404, detail="Лампа оффлайн")
await wiz.set_pilot(device.ip, {"sceneId": 34, "speed": 100})
await asyncio.sleep(3)
await wiz.set_pilot(device.ip, {"state": False})
return {"status": "blink_sent"}