Files
ignis-core/tests/test_p0_security_and_control.py
2026-05-21 20:46:04 +07:00

521 lines
18 KiB
Python

import asyncio
import os
import unittest
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from httpx import ASGITransport, AsyncClient
from sqlalchemy import delete, select
TEST_DB_PATH = Path(__file__).with_name("test_ignis.db")
if TEST_DB_PATH.exists():
TEST_DB_PATH.unlink()
MASTER_KEY = "master-secret-for-tests"
os.environ["IGNIS_API_KEY"] = MASTER_KEY
os.environ["IGNIS_DATABASE_URL"] = f"sqlite+aiosqlite:///{TEST_DB_PATH}"
os.environ["IGNIS_SYNC_DATABASE_URL"] = f"sqlite:///{TEST_DB_PATH}"
import main # noqa: E402
from app.api.routes import control # noqa: E402
from app.core.database import async_session, engine, init_db, sync_engine # noqa: E402
from app.core.state import state_manager # noqa: E402
from app.drivers.wiz import WizResponse # noqa: E402
from app.models.api_key import ApiKeyModel # noqa: E402
from app.models.device import DeviceSchema # noqa: E402
from app.models.event_log import EventLog # noqa: E402
class SecurityAndControlApiTests(unittest.IsolatedAsyncioTestCase):
@classmethod
def tearDownClass(cls):
sync_engine.dispose()
asyncio.run(engine.dispose())
if TEST_DB_PATH.exists():
TEST_DB_PATH.unlink()
super().tearDownClass()
async def asyncSetUp(self):
os.environ["IGNIS_API_KEY"] = MASTER_KEY
await init_db()
await self._reset_database()
state_manager.devices.clear()
state_manager.groups.clear()
self.client = AsyncClient(
transport=ASGITransport(app=main.app),
base_url="http://testserver",
)
async def asyncTearDown(self):
await self.client.aclose()
state_manager.devices.clear()
state_manager.groups.clear()
async def _reset_database(self):
async with async_session() as session:
await session.execute(delete(EventLog))
await session.execute(delete(ApiKeyModel))
await session.commit()
async def _event_actions(self) -> list[str]:
async with async_session() as session:
result = await session.execute(
select(EventLog.action).order_by(EventLog.id)
)
return list(result.scalars().all())
def _master_headers(self) -> dict[str, str]:
return {"X-API-Key": MASTER_KEY}
def _set_single_device_state(self):
state_manager.devices["dev-1"] = DeviceSchema(
id="dev-1",
ip="192.168.1.10",
name="Lamp 1",
room="Office",
)
def _set_group_state(self):
state_manager.devices["dev-1"] = DeviceSchema(
id="dev-1",
ip="192.168.1.10",
name="Lamp 1",
room="Office",
)
state_manager.devices["dev-2"] = DeviceSchema(
id="dev-2",
ip="192.168.1.11",
name="Lamp 2",
room="Office",
)
state_manager.groups["grp-1"] = SimpleNamespace(
id="grp-1",
name="Office",
device_ids=["dev-1", "dev-2"],
)
async def test_auth_is_fail_closed_when_master_key_missing(self):
os.environ["IGNIS_API_KEY"] = ""
response = await self.client.get("/auth/me")
self.assertEqual(response.status_code, 503)
self.assertEqual(
response.json()["detail"], "Сервер не настроен: задайте IGNIS_API_KEY"
)
async def test_system_info_returns_installed_server_metadata_without_secrets(
self,
):
with patch.dict(
os.environ,
{
"IGNIS_INSTANCE_NAME": "Home Lab",
"APP_TIMEZONE": "Europe/Moscow",
"IGNIS_PUBLIC_BASE_URL": "https://ignis.example.local/",
"IGNIS_BUILD_VERSION": "1.2.3",
"IGNIS_BUILD_DATE": "2026-05-21T10:11:12Z",
"IGNIS_GIT_SHA": "abcdef1234567890",
"SCAN_NETWORK": "192.168.0.0/24",
},
clear=False,
):
response = await self.client.get(
"/system/info",
headers=self._master_headers(),
)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["app_name"], "Ignis Core")
self.assertEqual(payload["instance_name"], "Home Lab")
self.assertEqual(payload["timezone"], "Europe/Moscow")
self.assertEqual(payload["build"]["version"], "1.2.3")
self.assertEqual(payload["build"]["build_date"], "2026-05-21T10:11:12Z")
self.assertEqual(payload["build"]["git_sha"], "abcdef1234567890")
self.assertEqual(
payload["urls"]["configured_public_base_url"],
"https://ignis.example.local",
)
self.assertEqual(
payload["urls"]["effective_public_base_url"],
"https://ignis.example.local",
)
self.assertEqual(payload["urls"]["observed_base_url"], "http://testserver")
self.assertTrue(payload["configuration"]["configured"])
self.assertTrue(payload["configuration"]["master_key_configured"])
self.assertTrue(payload["configuration"]["scan_network_configured"])
self.assertTrue(payload["configuration"]["public_base_url_configured"])
self.assertTrue(payload["configuration"]["build_metadata_complete"])
self.assertIn("started_at", payload)
self.assertNotIn(MASTER_KEY, response.text)
self.assertNotIn("api_key", payload)
async def test_guest_key_can_read_system_info(self):
create_response = await self.client.post(
"/api-keys",
headers=self._master_headers(),
params={"name": "wall-panel"},
)
guest_key = create_response.json()["key"]
response = await self.client.get(
"/system/info",
headers={"X-API-Key": guest_key},
)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["app_name"], "Ignis Core")
self.assertGreaterEqual(payload["uptime_seconds"], 0)
self.assertFalse(payload["diagnostics_visible"])
self.assertNotIn("instance_name", payload)
self.assertNotIn("timezone", payload)
self.assertNotIn("started_at", payload)
self.assertNotIn("build", payload)
self.assertNotIn("urls", payload)
self.assertNotIn("configuration", payload)
async def test_master_can_create_key_and_list_endpoint_returns_public_id(self):
create_response = await self.client.post(
"/api-keys",
headers=self._master_headers(),
params={"name": "tablet", "is_admin": True},
)
self.assertEqual(create_response.status_code, 200)
created = create_response.json()
self.assertIn("key", created)
self.assertIn("key_id", created)
self.assertNotEqual(created["key"], created["key_id"])
list_response = await self.client.get(
"/api-keys",
headers=self._master_headers(),
)
self.assertEqual(list_response.status_code, 200)
listed = list_response.json()
self.assertEqual(len(listed), 1)
self.assertEqual(listed[0]["name"], "tablet")
self.assertEqual(listed[0]["key"], created["key_id"])
self.assertNotEqual(listed[0]["key"], created["key"])
self.assertIn("display_key", listed[0])
async def test_admin_guest_cannot_manage_api_keys(self):
create_response = await self.client.post(
"/api-keys",
headers=self._master_headers(),
params={"name": "guest-admin", "is_admin": True},
)
guest_key = create_response.json()["key"]
auth_response = await self.client.get(
"/auth/me",
headers={"X-API-Key": guest_key},
)
self.assertEqual(auth_response.status_code, 200)
self.assertEqual(
auth_response.json(),
{"is_admin": True, "is_master": False, "name": "guest-admin"},
)
forbidden_response = await self.client.get(
"/api-keys",
headers={"X-API-Key": guest_key},
)
self.assertEqual(forbidden_response.status_code, 403)
self.assertEqual(forbidden_response.json()["detail"], "Требуется мастер-ключ")
async def test_master_can_revoke_and_activate_by_public_key_id(self):
create_response = await self.client.post(
"/api-keys",
headers=self._master_headers(),
params={"name": "wall-panel"},
)
payload = create_response.json()
public_id = payload["key_id"]
secret = payload["key"]
revoke_response = await self.client.post(
"/api-keys/revoke",
headers=self._master_headers(),
json={"key": public_id},
)
self.assertEqual(revoke_response.status_code, 200)
revoked_auth = await self.client.get(
"/auth/me",
headers={"X-API-Key": secret},
)
self.assertEqual(revoked_auth.status_code, 403)
activate_response = await self.client.post(
"/api-keys/activate",
headers=self._master_headers(),
json={"key": public_id},
)
self.assertEqual(activate_response.status_code, 200)
active_auth = await self.client.get(
"/auth/me",
headers={"X-API-Key": secret},
)
self.assertEqual(active_auth.status_code, 200)
async def test_admin_guest_keeps_access_to_admin_routes_except_master_only_ones(
self,
):
create_response = await self.client.post(
"/api-keys",
headers=self._master_headers(),
params={"name": "guest-admin", "is_admin": True},
)
guest_key = create_response.json()["key"]
response = await self.client.post(
"/devices/groups",
headers={"X-API-Key": guest_key},
json={"id": "bedroom", "name": "Bedroom", "macs": ["dev-1"]},
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["status"], "created")
async def test_device_control_timeout_returns_504_and_does_not_log_false_success(
self,
):
self._set_single_device_state()
with patch.object(
control.wiz,
"set_pilot",
AsyncMock(
return_value=WizResponse(
ok=False,
ip="192.168.1.10",
kind="timeout",
message="Таймаут ответа от лампы",
)
),
):
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
json={"state": True},
)
self.assertEqual(response.status_code, 504)
self.assertIn("Команда лампе не доставлена", response.json()["detail"])
self.assertEqual(
await self._event_actions(), ["toggle_on_requested", "toggle_on_failed"]
)
async def test_group_control_partial_success_reports_partial_and_logs_applied_result(
self,
):
self._set_group_state()
mocked_results = [
WizResponse(
ok=True,
ip="192.168.1.10",
kind="ok",
payload={"result": {"success": True}},
),
WizResponse(
ok=False,
ip="192.168.1.11",
kind="timeout",
message="Таймаут ответа от лампы",
),
]
with patch.object(
control.wiz, "set_pilot", AsyncMock(side_effect=mocked_results)
):
response = await self.client.post(
"/control/group/grp-1",
headers=self._master_headers(),
json={"state": True},
)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["status"], "partial")
self.assertEqual(payload["success_count"], 1)
self.assertEqual(payload["failure_count"], 1)
self.assertEqual(len(payload["results"]), 2)
self.assertEqual(
await self._event_actions(), ["toggle_on_requested", "toggle_on"]
)
async def test_group_control_total_failure_returns_gateway_error(self):
self._set_group_state()
mocked_results = [
WizResponse(
ok=False,
ip="192.168.1.10",
kind="timeout",
message="Таймаут ответа от лампы",
),
WizResponse(
ok=False,
ip="192.168.1.11",
kind="timeout",
message="Таймаут ответа от лампы",
),
]
with patch.object(
control.wiz, "set_pilot", AsyncMock(side_effect=mocked_results)
):
response = await self.client.post(
"/control/group/grp-1",
headers=self._master_headers(),
json={"state": True},
)
self.assertEqual(response.status_code, 504)
self.assertIn("Команда группе не доставлена", response.json()["detail"])
self.assertEqual(
await self._event_actions(), ["toggle_on_requested", "toggle_on_failed"]
)
async def test_group_status_returns_per_device_errors_instead_of_500(self):
self._set_group_state()
mocked_results = [
WizResponse(
ok=False,
ip="192.168.1.10",
kind="timeout",
message="Таймаут ответа от лампы",
),
WizResponse(
ok=True,
ip="192.168.1.11",
kind="ok",
payload={"result": {"state": True, "dimming": 50}},
),
]
with patch.object(
control.wiz, "get_pilot", AsyncMock(side_effect=mocked_results)
):
response = await self.client.get(
"/control/group/grp-1/status",
headers=self._master_headers(),
)
self.assertEqual(response.status_code, 200)
payload = response.json()
self.assertEqual(payload["group_id"], "grp-1")
self.assertEqual(payload["results"][0]["kind"], "timeout")
self.assertEqual(payload["results"][1]["status"]["dimming"], 50)
async def test_device_status_timeout_is_reported_as_504(self):
self._set_single_device_state()
with patch.object(
control.wiz,
"get_pilot",
AsyncMock(
return_value=WizResponse(
ok=False,
ip="192.168.1.10",
kind="timeout",
message="Таймаут ответа от лампы",
)
),
):
response = await self.client.get(
"/control/device/dev-1/status",
headers=self._master_headers(),
)
self.assertEqual(response.status_code, 504)
self.assertIn("Ошибка опроса лампы", response.json()["detail"])
async def test_device_control_unknown_scene_returns_clear_400(self):
self._set_single_device_state()
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
json={"scene": "not_a_scene"},
)
self.assertEqual(response.status_code, 400)
self.assertEqual(response.json()["detail"], "Неизвестная сцена")
self.assertEqual(await self._event_actions(), [])
async def test_device_control_rejects_conflicting_scene_and_temp_in_body(self):
self._set_single_device_state()
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
json={"scene": "party", "temp": 3200},
)
self.assertEqual(response.status_code, 422)
self.assertIn(
"Можно передать только один режим из scene, temp или rgb",
str(response.json()),
)
async def test_device_control_rejects_partial_rgb_triplet(self):
self._set_single_device_state()
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
json={"r": 255, "g": 128},
)
self.assertEqual(response.status_code, 422)
self.assertIn("Поля r, g и b нужно передавать вместе", str(response.json()))
async def test_device_control_rejects_legacy_query_only_contract(self):
self._set_single_device_state()
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
params={"state": "true"},
)
self.assertEqual(response.status_code, 422)
async def test_stats_summary_counts_real_commands_without_requested_duplicates(self):
self._set_single_device_state()
with patch.object(
control.wiz,
"set_pilot",
AsyncMock(
return_value=WizResponse(
ok=True,
ip="192.168.1.10",
kind="ok",
payload={"result": {"success": True}},
)
),
):
response = await self.client.post(
"/control/device/dev-1",
headers=self._master_headers(),
json={"temp": 4200},
)
self.assertEqual(response.status_code, 200)
summary_response = await self.client.get(
"/stats/summary",
headers=self._master_headers(),
)
self.assertEqual(summary_response.status_code, 200)
payload = summary_response.json()
self.assertEqual(len(payload["groups"]), 1)
self.assertEqual(payload["groups"][0]["target_id"], "dev-1")
self.assertEqual(payload["groups"][0]["total_commands"], 1)
self.assertEqual(payload["groups"][0]["by_user"], {"master": 1})