feat: Guests API keys. Closes #3

This commit is contained in:
Artem Kokos
2026-03-28 21:20:55 +07:00
parent d024ba78ab
commit 3d8939a6aa
7 changed files with 297 additions and 400 deletions

View File

@@ -1,27 +1,74 @@
import os
import logging
from fastapi import Depends, HTTPException, Security
from dataclasses import dataclass
from typing import Optional
from fastapi import Depends, HTTPException
from fastapi.security import APIKeyHeader
from starlette.status import HTTP_403_FORBIDDEN
from dotenv import load_dotenv
from sqlalchemy import select
from app.core.database import async_session
from app.models.api_key import ApiKeyModel
load_dotenv()
logger = logging.getLogger(__name__)
API_KEY = os.getenv("IGNIS_API_KEY")
if not API_KEY:
MASTER_KEY = os.getenv("IGNIS_API_KEY")
if not MASTER_KEY:
logger.warning("IGNIS_API_KEY не задан -- авторизация отключена!")
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
async def verify_token(header_value: str = Depends(api_key_header)):
if not API_KEY:
return None
if header_value == API_KEY:
return header_value
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
@dataclass
class AuthContext:
"""Результат авторизации -- передаётся в роуты через Depends."""
is_master: bool # мастер-ключ из .env
is_admin: bool # право на CRUD групп, расписания, ресканирование
key_name: str # имя ключа (для логов)
async def verify_token(header_value: str = Depends(api_key_header)) -> AuthContext:
"""
Проверка API-ключа:
1. Если IGNIS_API_KEY не задан -- авторизация отключена, полный доступ
2. Мастер-ключ из .env -- полный доступ
3. Ключ из БД (api_keys) -- проверяем active и is_admin
4. Иначе -- 403
"""
# Авторизация отключена
if not MASTER_KEY:
return AuthContext(is_master=True, is_admin=True, key_name="no-auth")
if not header_value:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="API-ключ не передан")
# Мастер-ключ
if header_value == MASTER_KEY:
return AuthContext(is_master=True, is_admin=True, key_name="master")
# Ищем в БД
async with async_session() as session:
result = await session.execute(
select(ApiKeyModel).where(ApiKeyModel.key == header_value)
)
api_key = result.scalar_one_or_none()
if api_key and api_key.active:
return AuthContext(
is_master=False,
is_admin=api_key.is_admin,
key_name=api_key.name,
)
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Неверный или деактивированный ключ")
def require_admin(auth: AuthContext = Depends(verify_token)) -> AuthContext:
"""Dependency для роутов, требующих админских прав."""
if not auth.is_admin:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Недостаточно прав")
return auth

View File

@@ -0,0 +1,85 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from app.core.database import async_session
from app.models.api_key import ApiKeyModel
from app.api.deps import require_admin, AuthContext
# Все операции с ключами -- только для админов (мастер-ключ)
router = APIRouter(dependencies=[Depends(require_admin)])
@router.get("")
async def list_keys():
"""Список всех гостевых ключей."""
async with async_session() as session:
result = await session.execute(select(ApiKeyModel))
keys = result.scalars().all()
return [
{
"key": k.key,
"name": k.name,
"is_admin": k.is_admin,
"active": k.active,
"created_at": k.created_at,
}
for k in keys
]
@router.post("")
async def create_key(name: str, is_admin: bool = False):
"""Создать гостевой ключ. Возвращает сгенерированный токен."""
new_key = ApiKeyModel(
key=ApiKeyModel.generate_key(),
name=name,
is_admin=is_admin,
)
async with async_session() as session:
session.add(new_key)
await session.commit()
return {
"key": new_key.key,
"name": new_key.name,
"is_admin": new_key.is_admin,
"message": "Сохраните ключ -- он больше не будет показан полностью",
}
@router.delete("/{key}")
async def revoke_key(key: str):
"""Деактивировать (отозвать) гостевой ключ."""
async with async_session() as session:
result = await session.execute(
select(ApiKeyModel).where(ApiKeyModel.key == key)
)
api_key = result.scalar_one_or_none()
if not api_key:
raise HTTPException(status_code=404, detail="Ключ не найден")
api_key.active = False
session.add(api_key)
await session.commit()
return {"status": "revoked", "name": api_key.name}
@router.post("/{key}/activate")
async def activate_key(key: str):
"""Повторно активировать ключ."""
async with async_session() as session:
result = await session.execute(
select(ApiKeyModel).where(ApiKeyModel.key == key)
)
api_key = result.scalar_one_or_none()
if not api_key:
raise HTTPException(status_code=404, detail="Ключ не найден")
api_key.active = True
session.add(api_key)
await session.commit()
return {"status": "activated", "name": api_key.name}

View File

@@ -3,7 +3,7 @@ from sqlalchemy import select
from app.core.state import state_manager, discovery_service
from app.core.database import async_session
from app.models.device import GroupModel, GroupCreateSchema
from app.api.deps import verify_token
from app.api.deps import verify_token, require_admin
from app.drivers.wiz import WizDriver
# Создаем роутер с защитой
@@ -26,7 +26,7 @@ async def get_scenes():
return wiz.SCENES
@router.post("/groups")
@router.post("/groups", dependencies=[Depends(require_admin)])
async def create_group(data: GroupCreateSchema):
async with async_session() as session:
existing = await session.get(GroupModel, data.id)
@@ -40,7 +40,7 @@ async def create_group(data: GroupCreateSchema):
return {"status": "created", "group": data.name}
@router.delete("/groups/{group_id}")
@router.delete("/groups/{group_id}", dependencies=[Depends(require_admin)])
async def delete_group(group_id: str):
async with async_session() as session:
result = await session.execute(
@@ -56,7 +56,7 @@ async def delete_group(group_id: str):
return {"status": "deleted", "id": group_id}
@router.post("/rescan")
@router.post("/rescan", dependencies=[Depends(require_admin)])
async def rescan_network():
found_devices = await discovery_service.scan_network()
for dev_data in found_devices:

View File

@@ -8,11 +8,11 @@ from apscheduler.triggers.date import DateTrigger
from app.core.scheduler import app_tz, scheduler
from app.core.state import state_manager
from app.drivers.wiz import WizDriver
from app.api.deps import verify_token
from app.api.deps import require_admin
logger = logging.getLogger(__name__)
router = APIRouter(dependencies=[Depends(verify_token)])
router = APIRouter(dependencies=[Depends(require_admin)])
async def run_group_command(target_id: str, is_group: bool, params: dict):