Files
ignis-core/app/api/routes/schedules.py
2026-03-28 20:11:23 +07:00

165 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from app.core.scheduler import app_tz, scheduler
from app.core.state import state_manager
from app.drivers.wiz import WizDriver
from app.api.deps import verify_token
logger = logging.getLogger(__name__)
router = APIRouter(dependencies=[Depends(verify_token)])
async def run_group_command(target_id: str, is_group: bool, params: dict):
"""
Универсальное выполнение команды по расписанию.
IP резолвится в момент выполнения, а не создания задачи --
корректно работает при смене IP (DHCP) и изменении состава группы.
"""
if is_group:
ips = state_manager.get_group_ips(target_id)
else:
dev = state_manager.devices.get(target_id)
ips = [dev.ip] if dev else []
if not ips:
logger.warning(f"Расписание: цель {target_id} не найдена (0 IP)")
return
local_wiz = WizDriver()
for ip in ips:
try:
await local_wiz.set_pilot(ip, params)
logger.info(f"⏰ Расписание: {target_id} -> {ip}: {params}")
except Exception as e:
logger.error(f"⏰ Расписание: ошибка {ip}: {e}")
@router.post("/once")
async def schedule_once(
target_id: str,
state: bool,
run_at: Optional[datetime] = None,
hours_from_now: Optional[int] = None,
is_group: bool = True,
):
# 1. Определяем время запуска в правильной таймзоне
if hours_from_now is not None:
exec_time = datetime.now(app_tz) + timedelta(hours=hours_from_now)
elif run_at:
if run_at.tzinfo is None:
exec_time = app_tz.localize(run_at)
else:
exec_time = run_at
else:
raise HTTPException(status_code=400, detail="Нужно время или отступ в часах")
# 2. Проверяем что цель существует (но IP резолвится при выполнении)
if is_group:
if target_id not in state_manager.groups:
raise HTTPException(status_code=404, detail="Группа не найдена")
else:
if target_id not in state_manager.devices:
raise HTTPException(status_code=404, detail="Устройство не найдено")
# 3. Регаем задачу
job_id = f"once_{target_id}_{int(exec_time.timestamp())}"
scheduler.add_job(
run_group_command,
trigger=DateTrigger(run_date=exec_time, timezone=app_tz),
args=[target_id, is_group, {"state": state}],
id=job_id,
name=f"Once: {target_id} | {state}",
replace_existing=True,
)
return {"status": "scheduled", "run_at": exec_time.isoformat()}
@router.post("/cron")
async def add_cron_task(
target_id: str,
hour: str,
minute: str,
day_of_week: str = "*",
is_group: bool = True,
state: bool = True,
):
# Проверяем что цель существует
if is_group:
if target_id not in state_manager.groups:
raise HTTPException(status_code=404, detail="Группа не найдена")
else:
if target_id not in state_manager.devices:
raise HTTPException(status_code=404, detail="Устройство не найдено")
# Одна задача на всю группу -- IP резолвятся при каждом срабатывании
trigger = CronTrigger(
hour=hour, minute=minute, day_of_week=day_of_week, timezone=app_tz
)
job_id = f"cron_{target_id}_{hour}_{minute}"
scheduler.add_job(
run_group_command,
trigger,
args=[target_id, is_group, {"state": state}],
id=job_id,
name=f"CRON: {target_id} | {hour}:{minute} | {state}",
replace_existing=True,
)
return {"status": "cron_scheduled", "job_id": job_id}
@router.get("/tasks")
async def get_all_tasks():
jobs = []
for job in scheduler.get_jobs():
# Парсим имя
name_parts = job.name.split("|")
target = name_parts[0].replace("CRON:", "").replace("Once:", "").strip()
is_on = "True" in job.name or "true" in job.name.lower()
h, m = None, None
next_run_str = None
if job.next_run_time:
# Переводим из UTC в локальную таймзону для вывода
local_time = job.next_run_time.astimezone(app_tz)
h = str(local_time.hour).zfill(2)
m = str(local_time.minute).zfill(2)
next_run_str = local_time.isoformat()
# Если это крон, подтягиваем значения из триггера
if hasattr(job.trigger, "fields"):
h = str(job.trigger.fields[5])
m = str(job.trigger.fields[6])
jobs.append(
{
"id": job.id,
"target_id": target,
"state": is_on,
"next_run": next_run_str,
"hour": h,
"minute": m,
}
)
return {"tasks": jobs}
@router.delete("/{job_id}")
async def cancel_task(job_id: str):
try:
scheduler.remove_job(job_id)
return {"status": "deleted"}
except Exception:
raise HTTPException(status_code=404, detail="Задача не найдена")