Files
ignis-core/app/core/scheduler.py
2026-05-15 23:12:28 +07:00

373 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import os
from datetime import datetime, timedelta
from uuid import uuid4
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import STATE_STOPPED
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from dotenv import load_dotenv
from sqlalchemy import delete, select
from app.core.database import async_session, sync_engine
from app.models.event_log import EventLog
from app.models.schedule import ScheduleTask
load_dotenv()
logger = logging.getLogger(__name__)
TZ_NAME = os.getenv("APP_TIMEZONE", "Asia/Novosibirsk")
app_tz = pytz.timezone(TZ_NAME)
RETENTION_DAYS = int(os.getenv("EVENT_LOG_RETENTION_DAYS", "30"))
INTERNAL_JOB_PREFIXES = ("cleanup_",)
jobstores = {"default": SQLAlchemyJobStore(engine=sync_engine)}
scheduler = AsyncIOScheduler(jobstores=jobstores, timezone=app_tz)
def is_internal_job_id(job_id: str) -> bool:
return any(job_id.startswith(prefix) for prefix in INTERNAL_JOB_PREFIXES)
def build_schedule_job_id(trigger_type: str) -> str:
return f"{trigger_type}_{uuid4().hex}"
def _parse_once_run_at(trigger_args: dict) -> datetime:
run_at = datetime.fromisoformat(trigger_args["run_at"])
if run_at.tzinfo is None:
return app_tz.localize(run_at)
return run_at.astimezone(app_tz)
def build_trigger(task: ScheduleTask):
if task.trigger_type == "once":
return DateTrigger(
run_date=_parse_once_run_at(task.trigger_args), timezone=app_tz
)
if task.trigger_type == "cron":
trigger_kwargs = dict(task.trigger_args)
trigger_kwargs["timezone"] = app_tz
return CronTrigger(**trigger_kwargs)
raise ValueError(f"Неизвестный тип триггера: {task.trigger_type}")
def serialize_trigger_args(trigger) -> tuple[str, dict] | None:
if isinstance(trigger, DateTrigger):
run_date = trigger.run_date
if run_date.tzinfo is None:
run_date = app_tz.localize(run_date)
else:
run_date = run_date.astimezone(app_tz)
return "once", {"run_at": run_date.isoformat()}
if isinstance(trigger, CronTrigger):
trigger_args = {}
for field in trigger.fields:
if field.name in {
"year",
"month",
"day",
"week",
"day_of_week",
"hour",
"minute",
"second",
}:
trigger_args[field.name] = str(field)
return "cron", trigger_args
return None
async def cleanup_old_events():
"""Удаляет записи event_log старше RETENTION_DAYS."""
cutoff = (datetime.now() - timedelta(days=RETENTION_DAYS)).isoformat()
async with async_session() as session:
result = await session.execute(
delete(EventLog).where(EventLog.timestamp < cutoff)
)
await session.commit()
if result.rowcount:
logger.info(
f"Очистка лога: удалено {result.rowcount} записей старше {RETENTION_DAYS} дней"
)
def ensure_internal_jobs():
scheduler.add_job(
cleanup_old_events,
CronTrigger(hour=3, minute=0, timezone=app_tz),
id="cleanup_event_log",
name="Очистка старых событий",
replace_existing=True,
)
async def execute_schedule_job(job_id: str):
"""
Унифицированная точка входа для исполнения задач расписаний.
Рантайм-задача в APScheduler всегда адресуется только по job_id,
а вся доменная информация подтягивается из основной БД.
"""
async with async_session() as session:
result = await session.execute(
select(ScheduleTask).where(
ScheduleTask.job_id == job_id,
ScheduleTask.is_active.is_(True),
)
)
task = result.scalar_one_or_none()
if not task:
logger.warning(f"Расписание {job_id} не найдено или деактивировано")
try:
scheduler.remove_job(job_id)
except Exception:
pass
return
from app.api.routes.schedules import run_group_command
await run_group_command(
task.target_id,
task.target_type == "group",
dict(task.action_params),
)
if task.trigger_type == "once":
await delete_schedule_task(job_id, suppress_missing=True)
def add_runtime_job(task: ScheduleTask):
scheduler.add_job(
execute_schedule_job,
trigger=build_trigger(task),
args=[task.job_id],
id=task.job_id,
name=f"{task.trigger_type.upper()}: {task.target_id}",
replace_existing=True,
max_instances=1,
misfire_grace_time=300,
coalesce=(task.trigger_type == "cron"),
)
async def create_schedule_task(
*,
trigger_type: str,
target_id: str,
target_type: str,
trigger_args: dict,
action_params: dict,
) -> ScheduleTask:
task = ScheduleTask(
job_id=build_schedule_job_id(trigger_type),
trigger_type=trigger_type,
target_id=target_id,
target_type=target_type,
trigger_args=trigger_args,
action_params=action_params,
is_active=True,
)
async with async_session() as session:
session.add(task)
await session.commit()
await session.refresh(task)
try:
add_runtime_job(task)
except Exception:
async with async_session() as session:
result = await session.execute(
select(ScheduleTask).where(ScheduleTask.job_id == task.job_id)
)
persisted = result.scalar_one_or_none()
if persisted:
await session.delete(persisted)
await session.commit()
raise
return task
async def list_schedule_tasks() -> list[dict]:
async with async_session() as session:
result = await session.execute(
select(ScheduleTask)
.where(ScheduleTask.is_active.is_(True))
.order_by(ScheduleTask.created_at.asc(), ScheduleTask.id.asc())
)
tasks = result.scalars().all()
items = []
for task in tasks:
job = scheduler.get_job(task.job_id)
next_run = None
if job and job.next_run_time:
next_run = job.next_run_time.astimezone(app_tz).isoformat()
hour = None
minute = None
day_of_week = None
if task.trigger_type == "cron":
hour = task.trigger_args.get("hour")
minute = task.trigger_args.get("minute")
day_of_week = task.trigger_args.get("day_of_week", "*")
items.append(
{
"id": task.job_id,
"target_id": task.target_id,
"is_group": task.target_type == "group",
"state": task.action_params.get("state"),
"action_params": task.action_params,
"trigger_type": task.trigger_type,
"next_run": next_run,
"hour": hour,
"minute": minute,
"day_of_week": day_of_week,
"job_present": job is not None,
}
)
return items
async def delete_schedule_task(job_id: str, *, suppress_missing: bool = False) -> bool:
if is_internal_job_id(job_id):
raise ValueError("Нельзя удалить служебную задачу")
deleted = False
async with async_session() as session:
result = await session.execute(
select(ScheduleTask).where(ScheduleTask.job_id == job_id)
)
task = result.scalar_one_or_none()
if task:
await session.delete(task)
await session.commit()
deleted = True
try:
scheduler.remove_job(job_id)
deleted = True
except Exception:
pass
if not deleted and not suppress_missing:
raise KeyError(job_id)
return deleted
async def migrate_legacy_scheduler_jobs():
"""
Переносит старые APScheduler-only задачи в таблицу schedules.
Это нужно для безопасного апгрейда существующих установок, где задачи
уже лежат в apscheduler_jobs, но ещё не имеют метаданных в основной БД.
"""
async with async_session() as session:
result = await session.execute(select(ScheduleTask.job_id))
known_job_ids = set(result.scalars().all())
for job in scheduler.get_jobs():
if is_internal_job_id(job.id) or job.id in known_job_ids:
continue
serialized = serialize_trigger_args(job.trigger)
if not serialized:
logger.warning(f"Пропускаю неподдерживаемую legacy-задачу {job.id}")
continue
if len(job.args) != 3:
logger.warning(f"Legacy-задача {job.id} имеет неожиданные args")
continue
target_id, is_group, action_params = job.args
if (
not isinstance(target_id, str)
or not isinstance(is_group, bool)
or not isinstance(action_params, dict)
):
logger.warning(
f"Legacy-задача {job.id} имеет неподдерживаемую сигнатуру"
)
continue
trigger_type, trigger_args = serialized
session.add(
ScheduleTask(
job_id=job.id,
trigger_type=trigger_type,
target_id=target_id,
target_type="group" if is_group else "device",
trigger_args=trigger_args,
action_params=action_params,
is_active=True,
)
)
logger.info(f"Мигрирована legacy-задача {job.id} в таблицу schedules")
await session.commit()
async def rebuild_runtime_jobs_from_metadata():
for job in scheduler.get_jobs():
if not is_internal_job_id(job.id):
scheduler.remove_job(job.id)
async with async_session() as session:
result = await session.execute(
select(ScheduleTask).where(ScheduleTask.is_active.is_(True))
)
tasks = result.scalars().all()
now = datetime.now(app_tz)
for task in tasks:
if task.trigger_type == "once":
run_at = _parse_once_run_at(task.trigger_args)
if run_at <= now:
logger.info(f"Удаляю просроченную одноразовую задачу {task.job_id}")
await delete_schedule_task(task.job_id, suppress_missing=True)
continue
add_runtime_job(task)
async def reconcile_schedule_jobs():
await migrate_legacy_scheduler_jobs()
await rebuild_runtime_jobs_from_metadata()
async def start_scheduler():
current_loop = asyncio.get_running_loop()
bound_loop = getattr(scheduler, "_eventloop", None)
if scheduler.running and (
bound_loop is None or bound_loop.is_closed() or bound_loop is not current_loop
):
if bound_loop is not None and not bound_loop.is_closed():
scheduler.shutdown(wait=False)
else:
scheduler.state = STATE_STOPPED
scheduler._eventloop = current_loop
if not scheduler.running:
scheduler.start()
logger.info(f"Планировщик запущен. Таймзона: {TZ_NAME}")
ensure_internal_jobs()
await reconcile_schedule_jobs()