import asyncio import os import unittest from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock, patch from httpx import ASGITransport, AsyncClient from sqlalchemy import delete, select TEST_DB_PATH = Path(__file__).with_name("test_ignis.db") if TEST_DB_PATH.exists(): TEST_DB_PATH.unlink() MASTER_KEY = "master-secret-for-tests" os.environ["IGNIS_API_KEY"] = MASTER_KEY os.environ["IGNIS_DATABASE_URL"] = f"sqlite+aiosqlite:///{TEST_DB_PATH}" os.environ["IGNIS_SYNC_DATABASE_URL"] = f"sqlite:///{TEST_DB_PATH}" import main # noqa: E402 from app.api.routes import control # noqa: E402 from app.core.database import async_session, engine, init_db, sync_engine # noqa: E402 from app.core.state import state_manager # noqa: E402 from app.drivers.wiz import WizResponse # noqa: E402 from app.models.api_key import ApiKeyModel # noqa: E402 from app.models.device import DeviceSchema # noqa: E402 from app.models.event_log import EventLog # noqa: E402 class SecurityAndControlApiTests(unittest.IsolatedAsyncioTestCase): @classmethod def tearDownClass(cls): sync_engine.dispose() asyncio.run(engine.dispose()) if TEST_DB_PATH.exists(): TEST_DB_PATH.unlink() super().tearDownClass() async def asyncSetUp(self): os.environ["IGNIS_API_KEY"] = MASTER_KEY await init_db() await self._reset_database() state_manager.devices.clear() state_manager.groups.clear() self.client = AsyncClient( transport=ASGITransport(app=main.app), base_url="http://testserver", ) async def asyncTearDown(self): await self.client.aclose() state_manager.devices.clear() state_manager.groups.clear() async def _reset_database(self): async with async_session() as session: await session.execute(delete(EventLog)) await session.execute(delete(ApiKeyModel)) await session.commit() async def _event_actions(self) -> list[str]: async with async_session() as session: result = await session.execute( select(EventLog.action).order_by(EventLog.id) ) return list(result.scalars().all()) def _master_headers(self) -> dict[str, str]: return {"X-API-Key": MASTER_KEY} def _set_single_device_state(self): state_manager.devices["dev-1"] = DeviceSchema( id="dev-1", ip="192.168.1.10", name="Lamp 1", room="Office", ) def _set_group_state(self): state_manager.devices["dev-1"] = DeviceSchema( id="dev-1", ip="192.168.1.10", name="Lamp 1", room="Office", ) state_manager.devices["dev-2"] = DeviceSchema( id="dev-2", ip="192.168.1.11", name="Lamp 2", room="Office", ) state_manager.groups["grp-1"] = SimpleNamespace( id="grp-1", name="Office", device_ids=["dev-1", "dev-2"], ) async def test_auth_is_fail_closed_when_master_key_missing(self): os.environ["IGNIS_API_KEY"] = "" response = await self.client.get("/auth/me") self.assertEqual(response.status_code, 503) self.assertEqual( response.json()["detail"], "Сервер не настроен: задайте IGNIS_API_KEY" ) async def test_master_can_create_key_and_list_endpoint_returns_public_id(self): create_response = await self.client.post( "/api-keys", headers=self._master_headers(), params={"name": "tablet", "is_admin": True}, ) self.assertEqual(create_response.status_code, 200) created = create_response.json() self.assertIn("key", created) self.assertIn("key_id", created) self.assertNotEqual(created["key"], created["key_id"]) list_response = await self.client.get( "/api-keys", headers=self._master_headers(), ) self.assertEqual(list_response.status_code, 200) listed = list_response.json() self.assertEqual(len(listed), 1) self.assertEqual(listed[0]["name"], "tablet") self.assertEqual(listed[0]["key"], created["key_id"]) self.assertNotEqual(listed[0]["key"], created["key"]) self.assertIn("display_key", listed[0]) async def test_admin_guest_cannot_manage_api_keys(self): create_response = await self.client.post( "/api-keys", headers=self._master_headers(), params={"name": "guest-admin", "is_admin": True}, ) guest_key = create_response.json()["key"] auth_response = await self.client.get( "/auth/me", headers={"X-API-Key": guest_key}, ) self.assertEqual(auth_response.status_code, 200) self.assertEqual( auth_response.json(), {"is_admin": True, "is_master": False, "name": "guest-admin"}, ) forbidden_response = await self.client.get( "/api-keys", headers={"X-API-Key": guest_key}, ) self.assertEqual(forbidden_response.status_code, 403) self.assertEqual(forbidden_response.json()["detail"], "Требуется мастер-ключ") async def test_master_can_revoke_and_activate_by_public_key_id(self): create_response = await self.client.post( "/api-keys", headers=self._master_headers(), params={"name": "wall-panel"}, ) payload = create_response.json() public_id = payload["key_id"] secret = payload["key"] revoke_response = await self.client.post( "/api-keys/revoke", headers=self._master_headers(), json={"key": public_id}, ) self.assertEqual(revoke_response.status_code, 200) revoked_auth = await self.client.get( "/auth/me", headers={"X-API-Key": secret}, ) self.assertEqual(revoked_auth.status_code, 403) activate_response = await self.client.post( "/api-keys/activate", headers=self._master_headers(), json={"key": public_id}, ) self.assertEqual(activate_response.status_code, 200) active_auth = await self.client.get( "/auth/me", headers={"X-API-Key": secret}, ) self.assertEqual(active_auth.status_code, 200) async def test_admin_guest_keeps_access_to_admin_routes_except_master_only_ones( self, ): create_response = await self.client.post( "/api-keys", headers=self._master_headers(), params={"name": "guest-admin", "is_admin": True}, ) guest_key = create_response.json()["key"] response = await self.client.post( "/devices/groups", headers={"X-API-Key": guest_key}, json={"id": "bedroom", "name": "Bedroom", "macs": ["dev-1"]}, ) self.assertEqual(response.status_code, 200) self.assertEqual(response.json()["status"], "created") async def test_device_control_timeout_returns_504_and_does_not_log_false_success( self, ): self._set_single_device_state() with patch.object( control.wiz, "set_pilot", AsyncMock( return_value=WizResponse( ok=False, ip="192.168.1.10", kind="timeout", message="Таймаут ответа от лампы", ) ), ): response = await self.client.post( "/control/device/dev-1", headers=self._master_headers(), params={"state": "true"}, ) self.assertEqual(response.status_code, 504) self.assertIn("Команда лампе не доставлена", response.json()["detail"]) self.assertEqual( await self._event_actions(), ["toggle_on_requested", "toggle_on_failed"] ) async def test_group_control_partial_success_reports_partial_and_logs_applied_result( self, ): self._set_group_state() mocked_results = [ WizResponse( ok=True, ip="192.168.1.10", kind="ok", payload={"result": {"success": True}}, ), WizResponse( ok=False, ip="192.168.1.11", kind="timeout", message="Таймаут ответа от лампы", ), ] with patch.object( control.wiz, "set_pilot", AsyncMock(side_effect=mocked_results) ): response = await self.client.post( "/control/group/grp-1", headers=self._master_headers(), params={"state": "true"}, ) self.assertEqual(response.status_code, 200) payload = response.json() self.assertEqual(payload["status"], "partial") self.assertEqual(payload["success_count"], 1) self.assertEqual(payload["failure_count"], 1) self.assertEqual(len(payload["results"]), 2) self.assertEqual( await self._event_actions(), ["toggle_on_requested", "toggle_on"] ) async def test_group_control_total_failure_returns_gateway_error(self): self._set_group_state() mocked_results = [ WizResponse( ok=False, ip="192.168.1.10", kind="timeout", message="Таймаут ответа от лампы", ), WizResponse( ok=False, ip="192.168.1.11", kind="timeout", message="Таймаут ответа от лампы", ), ] with patch.object( control.wiz, "set_pilot", AsyncMock(side_effect=mocked_results) ): response = await self.client.post( "/control/group/grp-1", headers=self._master_headers(), params={"state": "true"}, ) self.assertEqual(response.status_code, 504) self.assertIn("Команда группе не доставлена", response.json()["detail"]) self.assertEqual( await self._event_actions(), ["toggle_on_requested", "toggle_on_failed"] ) async def test_group_status_returns_per_device_errors_instead_of_500(self): self._set_group_state() mocked_results = [ WizResponse( ok=False, ip="192.168.1.10", kind="timeout", message="Таймаут ответа от лампы", ), WizResponse( ok=True, ip="192.168.1.11", kind="ok", payload={"result": {"state": True, "dimming": 50}}, ), ] with patch.object( control.wiz, "get_pilot", AsyncMock(side_effect=mocked_results) ): response = await self.client.get( "/control/group/grp-1/status", headers=self._master_headers(), ) self.assertEqual(response.status_code, 200) payload = response.json() self.assertEqual(payload["group_id"], "grp-1") self.assertEqual(payload["results"][0]["kind"], "timeout") self.assertEqual(payload["results"][1]["status"]["dimming"], 50) async def test_device_status_timeout_is_reported_as_504(self): self._set_single_device_state() with patch.object( control.wiz, "get_pilot", AsyncMock( return_value=WizResponse( ok=False, ip="192.168.1.10", kind="timeout", message="Таймаут ответа от лампы", ) ), ): response = await self.client.get( "/control/device/dev-1/status", headers=self._master_headers(), ) self.assertEqual(response.status_code, 504) self.assertIn("Ошибка опроса лампы", response.json()["detail"]) async def test_device_control_unknown_scene_returns_clear_400(self): self._set_single_device_state() response = await self.client.post( "/control/device/dev-1", headers=self._master_headers(), params={"scene": "not_a_scene"}, ) self.assertEqual(response.status_code, 400) self.assertEqual(response.json()["detail"], "Неизвестная сцена") self.assertEqual(await self._event_actions(), []) async def test_stats_summary_counts_real_commands_without_requested_duplicates(self): self._set_single_device_state() with patch.object( control.wiz, "set_pilot", AsyncMock( return_value=WizResponse( ok=True, ip="192.168.1.10", kind="ok", payload={"result": {"success": True}}, ) ), ): response = await self.client.post( "/control/device/dev-1", headers=self._master_headers(), params={"temp": "4200"}, ) self.assertEqual(response.status_code, 200) summary_response = await self.client.get( "/stats/summary", headers=self._master_headers(), ) self.assertEqual(summary_response.status_code, 200) payload = summary_response.json() self.assertEqual(len(payload["groups"]), 1) self.assertEqual(payload["groups"][0]["target_id"], "dev-1") self.assertEqual(payload["groups"][0]["total_commands"], 1) self.assertEqual(payload["groups"][0]["by_user"], {"master": 1})