Files
ignis-core/app/api/routes/schedules.py
Artem Kokos b6b25fa2a1 BBFbyOpus
2026-04-01 22:51:24 +07:00

183 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from app.core.scheduler import app_tz, scheduler
from app.core.state import state_manager
from app.drivers.wiz import WizDriver
from app.api.deps import require_admin
logger = logging.getLogger(__name__)
router = APIRouter(dependencies=[Depends(require_admin)])
# Префиксы служебных задач -- не показываем на фронте
_INTERNAL_JOB_PREFIXES = ("cleanup_",)
async def run_group_command(target_id: str, is_group: bool, params: dict):
"""
Универсальное выполнение команды по расписанию.
IP резолвится в момент выполнения, а не создания задачи --
корректно работает при смене IP (DHCP) и изменении состава группы.
"""
if is_group:
ips = state_manager.get_group_ips(target_id)
else:
dev = state_manager.devices.get(target_id)
ips = [dev.ip] if dev else []
if not ips:
logger.warning(f"Расписание: цель {target_id} не найдена (0 IP)")
return
local_wiz = WizDriver()
for ip in ips:
try:
await local_wiz.set_pilot(ip, params)
logger.info(f"Расписание: {target_id} -> {ip}: {params}")
except Exception as e:
logger.error(f"Расписание: ошибка {ip}: {e}")
# Логируем toggle в event_log
# Импорт здесь, чтобы избежать циклической зависимости
from app.api.routes.control import log_toggle_by_name
target_type = "group" if is_group else "device"
await log_toggle_by_name("scheduler", target_type, target_id, params)
@router.post("/once")
async def schedule_once(
target_id: str,
state: bool,
run_at: Optional[datetime] = None,
hours_from_now: Optional[int] = None,
is_group: bool = True,
):
# 1. Определяем время запуска в правильной таймзоне
if hours_from_now is not None:
exec_time = datetime.now(app_tz) + timedelta(hours=hours_from_now)
elif run_at:
if run_at.tzinfo is None:
exec_time = app_tz.localize(run_at)
else:
exec_time = run_at
else:
raise HTTPException(status_code=400, detail="Нужно время или отступ в часах")
# 2. Проверяем что цель существует (но IP резолвится при выполнении)
if is_group:
if target_id not in state_manager.groups:
raise HTTPException(status_code=404, detail="Группа не найдена")
else:
if target_id not in state_manager.devices:
raise HTTPException(status_code=404, detail="Устройство не найдено")
# 3. Регаем задачу
job_id = f"once_{target_id}_{int(exec_time.timestamp())}"
scheduler.add_job(
run_group_command,
trigger=DateTrigger(run_date=exec_time, timezone=app_tz),
args=[target_id, is_group, {"state": state}],
id=job_id,
name=f"Once: {target_id} | {state}",
replace_existing=True,
)
return {"status": "scheduled", "run_at": exec_time.isoformat()}
@router.post("/cron")
async def add_cron_task(
target_id: str,
hour: str,
minute: str,
day_of_week: str = "*",
is_group: bool = True,
state: bool = True,
):
# Проверяем что цель существует
if is_group:
if target_id not in state_manager.groups:
raise HTTPException(status_code=404, detail="Группа не найдена")
else:
if target_id not in state_manager.devices:
raise HTTPException(status_code=404, detail="Устройство не найдено")
# Одна задача на всю группу -- IP резолвятся при каждом срабатывании
trigger = CronTrigger(
hour=hour, minute=minute, day_of_week=day_of_week, timezone=app_tz
)
job_id = f"cron_{target_id}_{hour}_{minute}"
scheduler.add_job(
run_group_command,
trigger,
args=[target_id, is_group, {"state": state}],
id=job_id,
name=f"CRON: {target_id} | {hour}:{minute} | {state}",
replace_existing=True,
)
return {"status": "cron_scheduled", "job_id": job_id}
@router.get("/tasks")
async def get_all_tasks():
jobs = []
for job in scheduler.get_jobs():
# Пропускаем служебные задачи
if any(job.id.startswith(prefix) for prefix in _INTERNAL_JOB_PREFIXES):
continue
# Парсим имя
name_parts = job.name.split("|")
target = name_parts[0].replace("CRON:", "").replace("Once:", "").strip()
is_on = "True" in job.name or "true" in job.name.lower()
h, m = None, None
next_run_str = None
if job.next_run_time:
# Переводим из UTC в локальную таймзону для вывода
local_time = job.next_run_time.astimezone(app_tz)
h = str(local_time.hour).zfill(2)
m = str(local_time.minute).zfill(2)
next_run_str = local_time.isoformat()
# Если это крон, подтягиваем значения из триггера
if hasattr(job.trigger, "fields"):
h = str(job.trigger.fields[5])
m = str(job.trigger.fields[6])
jobs.append(
{
"id": job.id,
"target_id": target,
"state": is_on,
"next_run": next_run_str,
"hour": h,
"minute": m,
}
)
return {"tasks": jobs}
@router.delete("/{job_id}")
async def cancel_task(job_id: str):
# Запрещаем удалять служебные задачи через API
if any(job_id.startswith(prefix) for prefix in _INTERNAL_JOB_PREFIXES):
raise HTTPException(status_code=403, detail="Нельзя удалить служебную задачу")
try:
scheduler.remove_job(job_id)
return {"status": "deleted"}
except Exception:
raise HTTPException(status_code=404, detail="Задача не найдена")