Files
ignis-core/main.py
Артём Кокос b30ce9ce2a Static API Key for security
2026-02-21 13:06:02 +07:00

442 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import os
from typing import Optional, List
from datetime import datetime, timedelta
from contextlib import asynccontextmanager
from dotenv import load_dotenv
from fastapi import FastAPI, Depends, HTTPException, Security, APIRouter
from fastapi.staticfiles import StaticFiles
from fastapi.security import APIKeyHeader
from starlette.status import HTTP_403_FORBIDDEN
from sqlalchemy import select
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from app.core.discovery import DiscoveryService
from app.core.state import state_manager
from app.core.scheduler import start_scheduler, scheduler, execute_lamp_command
from app.drivers.wiz import WizDriver
from app.core.database import init_db, async_session
from app.models.device import GroupModel, DeviceModel, GroupCreateSchema
from app.models.schedule import ScheduleTask
load_dotenv()
API_KEY = os.getenv("IGNIS_API_KEY")
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
discovery = DiscoveryService()
wiz = WizDriver()
async def verify_token(header_value: str = Depends(api_key_header)):
# Если ключ не задан в .env, пускаем (для локальной разработки),
# либо строго требуем, если мы в продакшене
if not API_KEY:
return None
if header_value == API_KEY:
return header_value
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
api_router = APIRouter(dependencies=[Depends(verify_token)])
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Жизненный цикл приложения."""
# 1. Инициализация БД (создание таблиц)
await init_db()
logger.info("🗄️ База данных инициализирована")
await start_scheduler()
# 2. Загрузка групп из БД в память
async with async_session() as session:
result = await session.execute(select(GroupModel))
groups = result.scalars().all()
for g in groups:
state_manager.groups[g.id] = g
logger.info(f"📂 Загружена группа: {g.name} (MACs: {g.device_ids})")
async def periodic_discovery():
logger.info("🚀 Запущена фоновая служба Discovery")
while True:
try:
found_devices = await discovery.scan_network()
for dev_data in found_devices:
state_manager.update_device(dev_data)
logger.info(
f"📡 Сеть просканирована. Устройств онлайн: {len(state_manager.devices)}"
)
except Exception as e:
logger.error(f"❌ Ошибка в цикле Discovery: {e}")
await asyncio.sleep(600)
discovery_task = asyncio.create_task(periodic_discovery())
yield
discovery_task.cancel()
logger.info("🛑 Фоновая служба Discovery остановлена")
app = FastAPI(title="Ignis Core API", lifespan=lifespan)
# --- Эндпоинты Устройств ---
@api_router.get("/devices")
async def get_all_devices():
"""Список всех обнаруженных ламп."""
return state_manager.devices
@api_router.get("/groups")
async def get_groups():
"""Список всех созданных люстр/групп."""
return state_manager.groups
@api_router.get("/scenes")
async def get_scenes():
return wiz.SCENES
@api_router.post("/groups")
async def create_group(data: GroupCreateSchema):
async with async_session() as session:
existing = await session.get(GroupModel, data.id)
if existing:
raise HTTPException(status_code=400, detail="Группа с таким ID уже есть")
# Берем данные из схемы
new_group = GroupModel(id=data.id, name=data.name, device_ids=data.macs)
session.add(new_group)
await session.commit()
state_manager.groups[data.id] = new_group
return {"status": "created", "group": data.name}
@api_router.post("/control/device/{device_id}")
async def control_device(
device_id: str,
state: Optional[bool] = None,
brightness: Optional[int] = None,
scene: Optional[str] = None,
temp: Optional[int] = None,
r: Optional[int] = None,
g: Optional[int] = None,
b: Optional[int] = None,
):
device = state_manager.devices.get(device_id)
if not device:
raise HTTPException(status_code=404, detail="Лампа не в сети")
params = {}
if state is not None:
params["state"] = state
if brightness is not None:
params["dimming"] = brightness
if scene and scene in wiz.SCENES:
params["sceneId"] = wiz.SCENES[scene]
elif temp is not None:
params["temp"] = temp
elif r is not None or g is not None or b is not None:
params["r"], params["g"], params["b"] = r or 0, g or 0, b or 0
if not params:
raise HTTPException(status_code=400, detail="Никаких команд не передано")
result = await wiz.set_pilot(device.ip, params)
return {"device_id": device_id, "applied": params, "result": result}
@api_router.post("/control/group/{group_id}")
async def control_group(
group_id: str,
state: Optional[bool] = None,
brightness: Optional[int] = None,
scene: Optional[str] = None,
temp: Optional[int] = None,
r: Optional[int] = None,
g: Optional[int] = None,
b: Optional[int] = None,
):
ips = state_manager.get_group_ips(group_id)
if not ips:
raise HTTPException(status_code=404, detail="Группа не найдена или оффлайн")
params = {}
if state is not None:
params["state"] = state
if brightness is not None:
params["dimming"] = brightness
if scene and scene in wiz.SCENES:
params["sceneId"] = wiz.SCENES[scene]
elif temp is not None:
params["temp"] = temp
elif r is not None or g is not None or b is not None:
params["r"], params["g"], params["b"] = r or 0, g or 0, b or 0
if not params:
raise HTTPException(status_code=400, detail="Никаких команд не передано")
# Используем return_exceptions=True, чтобы ошибки одной лампы не ломали всё
tasks = [wiz.set_pilot(ip, params) for ip in ips]
await asyncio.gather(*tasks, return_exceptions=True)
return {"status": "ok", "applied": params, "sent_to": ips}
@api_router.delete("/groups/{group_id}")
async def delete_group(group_id: str):
async with async_session() as session:
# Ищем в базе
result = await session.execute(
select(GroupModel).where(GroupModel.id == group_id)
)
group = result.scalar_one_or_none()
if not group:
raise HTTPException(status_code=404, detail="Группа не найдена")
await session.delete(group)
await session.commit()
# Удаляем из оперативной памяти
if group_id in state_manager.groups:
del state_manager.groups[group_id]
return {"status": "deleted", "id": group_id}
@api_router.post("/discovery/rescan")
async def rescan_network():
logger.info("🔄 Ручной перезапуск сканирования сети...")
found_devices = await discovery.scan_network()
for dev_data in found_devices:
state_manager.update_device(dev_data)
return {"status": "ok", "found": len(state_manager.devices)}
@api_router.post("/control/device/{device_id}/blink")
async def blink_device(device_id: str):
device = state_manager.devices.get(device_id)
if not device:
raise HTTPException(status_code=404, detail="Лампа оффлайн")
# Сцена 34 в WiZ — это пульсация/мигание
await wiz.set_pilot(device.ip, {"sceneId": 34, "speed": 100})
# Через 3 секунды выключаем, чтобы не мигала вечно
await asyncio.sleep(3)
await wiz.set_pilot(device.ip, {"state": False})
return {"status": "blink_sent"}
@api_router.post("/schedules/once")
async def add_once_task(device_id: str, minutes: int, state: bool):
device = state_manager.devices.get(device_id)
if not device:
raise HTTPException(status_code=404, detail="Лампа не найдена")
# Получаем TZ из самого шедулера, чтобы они были синхронны
run_time = datetime.now(scheduler.timezone) + timedelta(minutes=minutes)
job = scheduler.add_job(
execute_lamp_command,
"date",
run_date=run_time,
args=[device.ip, {"state": state}],
)
return {"status": "scheduled", "job_id": job.id, "run_at": run_time.isoformat()}
@api_router.post("/schedules/group/once")
async def add_group_once_task(group_id: str, minutes: int, state: bool):
group = state_manager.groups.get(group_id)
if not group:
raise HTTPException(status_code=404, detail="Группа не найдена")
# Собираем все IP ламп группы
ips = [
state_manager.devices[mac].ip
for mac in group.device_ids
if mac in state_manager.devices
]
run_time = datetime.now(scheduler.timezone) + timedelta(minutes=minutes)
for ip in ips:
scheduler.add_job(
execute_lamp_command, "date", run_date=run_time, args=[ip, {"state": state}]
)
return {
"status": "scheduled",
"group": group_id,
"lamps": len(ips),
"run_at": run_time,
}
@api_router.get("/schedules/active")
async def get_active_jobs():
jobs = []
for job in scheduler.get_jobs():
jobs.append(
{
"id": job.id,
"next_run": job.next_run_time,
"func": job.func_ref,
"args": str(job.args),
}
)
return {"active_jobs": jobs}
@api_router.delete("/schedules/{job_id}")
async def cancel_task(job_id: str):
try:
scheduler.remove_job(job_id)
return {"status": "deleted"}
except:
raise HTTPException(status_code=404, detail="Задача не найдена")
@api_router.post("/schedules/at")
async def add_task_at_time(
target_id: str,
run_at: datetime,
is_group: bool = False,
state: Optional[bool] = None,
brightness: Optional[int] = None,
scene: Optional[str] = None,
):
"""Запуск команды в конкретную дату и время (ISO формат)"""
# Собираем параметры команды
params = {}
if state is not None:
params["state"] = state
if brightness is not None:
params["dimming"] = brightness
if scene and scene in wiz.SCENES:
params["sceneId"] = wiz.SCENES[scene]
# Определяем список IP
ips = []
if is_group:
ips = state_manager.get_group_ips(target_id)
else:
dev = state_manager.devices.get(target_id)
if dev:
ips = [dev.ip]
if not ips:
raise HTTPException(status_code=404, detail="Цель не найдена или оффлайн")
job_ids = []
for ip in ips:
job = scheduler.add_job(
execute_lamp_command,
DateTrigger(run_date=run_at, timezone=scheduler.timezone),
args=[ip, params],
name=f"{'Group' if is_group else 'Device'}: {target_id} | Action: {params}",
)
job_ids.append(job.id)
return {"status": "scheduled", "jobs": job_ids, "run_at": run_at}
@api_router.post("/schedules/cron")
async def add_cron_task(
target_id: str,
hour: str,
minute: str,
day_of_week: str = "*",
is_group: bool = True,
state: bool = True,
):
"""
Многоразовая задача (Cron).
Пример: hour="7", minute="30", day_of_week="mon-fri"
"""
ips = state_manager.get_group_ips(target_id) if is_group else []
if not is_group:
dev = state_manager.devices.get(target_id)
if dev:
ips = [dev.ip]
if not ips:
raise HTTPException(status_code=404, detail="Цель не найдена")
params = {"state": state}
trigger = CronTrigger(
hour=hour, minute=minute, day_of_week=day_of_week, timezone=scheduler.timezone
)
job_ids = []
for ip in ips:
job = scheduler.add_job(
execute_lamp_command,
trigger,
args=[ip, params],
name=f"CRON: {target_id} | {hour}:{minute} | {day_of_week}",
)
job_ids.append(job.id)
return {"status": "cron_scheduled", "jobs": job_ids}
@api_router.get("/schedules/tasks")
async def get_all_tasks():
"""Список задач для бота или фронта"""
jobs = []
for job in scheduler.get_jobs():
jobs.append(
{
"id": job.id,
"name": job.name, # Мы сохранили описание в поле name выше
"next_run": (
job.next_run_time.isoformat() if job.next_run_time else None
),
"params": str(job.args[1]) if len(job.args) > 1 else None,
}
)
return {"tasks": jobs}
app.include_router(api_router)
# --- МОНТИРОВАНИЕ СТАТИКИ (ДОЛЖНО БЫТЬ ПОСЛЕ ВСЕХ API МАРШРУТОВ) ---
app.mount("/", StaticFiles(directory="static", html=True), name="static")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)