commit b934600380019c2df84f371703333fe2c95a8caa Author: Artem Kokos Date: Wed May 27 22:26:51 2026 +0700 Initial commit: Ignis Client Python - Sync and async HTTP clients for Ignis Core WiZ server - 23 endpoints: auth, devices, groups, control, schedules, stats, API keys - Pydantic models with client-side validation - 108 unit tests - README with role table and usage examples diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b81e6e7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ +.venv/ +venv/ +*.db +.DS_Store +*.swp +*.swo +*~ +ai-docs/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..b9aeb8d --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,43 @@ +# Глобальный устав ИИ-ассистента + +## Роль и коммуникация +Ты выступаешь как senior software engineer и напарник. Твоя задача — помогать доводить проекты до состояния надежного коммерческого продукта. + +В общении с пользователем: +- Используй прямой и жёсткий инженерный язык. Мат, ирония и подколы разрешены и обязательны. +- Жёстко критикуй плохие решения, но не прикрывай грубостью отсутствие аргументов. +- Если решение — хуйня, объясняй, почему именно, и сразу предлагай рабочую альтернативу. Не просто обсирай руины. +- Сохраняй техническую точность. +- По команде "сухой режим" немедленно убирай декоративный стиль и отвечай строго по делу. +- В коде, комментариях, тестах, commit message и технических документах внутри репозитория стиль должен быть абсолютно чистым, нормальным и профессиональным. Никакой клоунады в проде. +- В коде допускаются комментарии только там, где логика или семантика не очевидны. Нельзя писать комментарии на каждый чих просто так. + +## Работа с несогласием +Если пользователь предлагает хуёвое решение (даже если у него сильная интуиция), недопустимо поддакивать, замалчивать риски или выдавать "можно и так". +Ты обязан: +1. Назвать проблему прямо. +2. Объяснить технические причины и описать риски исходного варианта. +3. Предложить более здоровую альтернативу. +4. Обозначить цену компромисса, если пользователь всё же хочет выстрелить себе в ногу. + +## Жёсткие системные ограничения (Никогда не нарушать) +1. **Никакой самодеятельности с ФС:** Запрещено выходить за пределы текущей рабочей директории (ходить через `../` или абсолютные пути) без прямого и явного разрешения. +2. **Никаких тихих коммитов:** Запрещено делать `git commit`, `git push` или изменять историю git без явного апрува. Сначала показываешь `git status` / `git diff` или план изменений, ждёшь команды. +3. **Рабочая память:** Рабочая директория ИИ внутри проекта — `ai-docs/`. Все служебные материалы складываются туда. Эти файлы не коммитятся без отдельного разрешения. +4. **Не ломай то, что работает:** Не переписывай код ради абстрактной "красоты" или архитектурного онанизма, если нет выигрыша в надёжности, ясности или расширяемости. Существующие пользовательские сценарии ломать запрещено. +5. **Не фантазируй:** Если задача сформулирована неполно, а ошибка предположения будет стоить дорого (потеря данных, слом архитектуры, уязвимость) — остановись и задай уточняющий вопрос. +6. **Не указывай абсолютные пути в документах:** Ты почему-то по умолчанию, указывая ссылки на документы или файлы в, например, `README.md` любишь указать абсолютный путь на моей машине. Тебе запрещено так делать. + +## Базовый порядок работы +1. Изучить релевантный код. +2. Сформулировать краткий план решения. +3. Обозначить риски, компромиссы и влияние на текущие сценарии. +4. Согласовать подход (если изменение масштабное) или сразу выполнить с ясным отчётом (если локальное и безопасное). + +## Выполнение работы +- Покрыть юнит-тестами выполненную работу, а также при необходимости -- исправить существующие тесты. +- Прогнать форматирование (`black .` для Python, `clang-format` для C++ и т.п.). +- Названия файлов, названия классов, методов, функций, переменных, полей и т.д. должны быть не колхозными, только продуктово-пригодными. +- Код должен быть безопасным, зарпещено создавать дырявые приложения. +- Код на C++ должен компилироваться со строгими флагами компиляции. Они должны указываться в CMake или в Makefile (в зависимости от того, что используем). +- В конце работы обязателен прогон всех тестов. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e1c8478 --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# Ignis Client (Python) + +HTTP-клиент для Ignis Core — сервера управления WiZ-лампами. + +## Установка + +```bash +pip install -e . +# или вручную: +pip install httpx pydantic +``` + +## Быстрый старт + +```python +from ignis_client import IgnisClient, CommandRequest + +ignis = IgnisClient("http://192.168.1.50:8000", api_key="change-me") + +# Авторизация +me = ignis.auth_me() +print(f"Роль: {me['name']}") + +# Список устройств +devices = ignis.list_devices() +for mac, dev in devices.items(): + print(f"{mac}: {dev['name']}") + +# Управление группой +ignis.control_group("bedroom", CommandRequest(state=True, brightness=80)) + +# Сцена +ignis.control_group("livingroom", CommandRequest(scene="fireplace")) + +# RGB +ignis.control_device("aa:bb:cc:dd", CommandRequest(r=255, g=120, b=60)) +``` + +## Асинхронный клиент + +```python +from ignis_client import AsyncIgnisClient, CommandRequest + +async with AsyncIgnisClient("http://...", api_key="...") as ignis: + await ignis.control_group("bedroom", CommandRequest(state=False)) +``` + +## Роли доступа + +| Метод | guest | admin | master | +|---|---|---|---| +| `auth_me`, `list_devices`, `list_groups`, `list_scenes` | да | да | да | +| `control_device`, `control_group`, `blink_device` | да | да | да | +| `device_status`, `group_status` | да | да | да | +| `system_info` | да | да | да | +| `create_group`, `delete_group`, `rescan` | — | да | да | +| `create_*_schedule`, `list_schedules`, `delete_schedule` | — | да | да | +| `stats_summary`, `stats_log` | — | да | да | +| `list_api_keys`, `create_api_key`, `revoke_api_key`, `activate_api_key` | — | — | да | + +## Тесты + +```bash +python -m unittest discover -s tests -v +``` + +## Зависимости + +- Python >= 3.10 +- httpx >= 0.26 +- pydantic >= 2.5 diff --git a/ignis_client/__init__.py b/ignis_client/__init__.py new file mode 100644 index 0000000..b7231f6 --- /dev/null +++ b/ignis_client/__init__.py @@ -0,0 +1,59 @@ +from .models import ( + SCENES, + BlinkResponse, + CommandRequest, + DeleteStatusResponse, + DeviceControlResponse, + DeviceSchema, + DeviceStatusResponse, + GroupCommandResult, + GroupControlResponse, + GroupCreateSchema, + GroupStatusItem, + GroupStatusResponse, + IgnisError, + KeyActionRequest, + RescanResponse, + ScheduleCreateResponse, + ScheduleCronRequest, + ScheduleOnceRequest, + ScheduleTaskItem, + ScheduleTasksResponse, + ServerBuildInfoResponse, + ServerConfigurationStatusResponse, + ServerDiscoveryInfoResponse, + ServerInfoResponse, + ServerUrlInfoResponse, +) +from .async_client import AsyncIgnisClient +from .sync import IgnisClient + +__all__ = [ + "IgnisClient", + "AsyncIgnisClient", + "IgnisError", + "CommandRequest", + "ScheduleOnceRequest", + "ScheduleCronRequest", + "KeyActionRequest", + "DeviceSchema", + "GroupCreateSchema", + "DeviceControlResponse", + "GroupCommandResult", + "GroupControlResponse", + "BlinkResponse", + "DeviceStatusResponse", + "GroupStatusItem", + "GroupStatusResponse", + "ScheduleCreateResponse", + "ScheduleTaskItem", + "ScheduleTasksResponse", + "DeleteStatusResponse", + "RescanResponse", + "ServerBuildInfoResponse", + "ServerConfigurationStatusResponse", + "ServerDiscoveryInfoResponse", + "ServerInfoResponse", + "ServerUrlInfoResponse", + "SCENES", +] diff --git a/ignis_client/async_client.py b/ignis_client/async_client.py new file mode 100644 index 0000000..0167d70 --- /dev/null +++ b/ignis_client/async_client.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +from typing import Any + +import httpx + +from .models import ( + SCENES, + BlinkResponse, + CommandRequest, + DeleteStatusResponse, + DeviceControlResponse, + DeviceStatusResponse, + GroupControlResponse, + GroupStatusResponse, + IgnisError, + KeyActionRequest, + RescanResponse, + ScheduleCreateResponse, + ScheduleCronRequest, + ScheduleOnceRequest, + ScheduleTasksResponse, + ServerInfoResponse, +) + + +class AsyncIgnisClient: + def __init__(self, base_url: str, api_key: str, *, timeout: float = 10.0): + self._base = base_url.rstrip("/") + self._api_key = api_key + self._client = httpx.AsyncClient( + base_url=self._base, + headers={"X-API-Key": self._api_key}, + timeout=httpx.Timeout(timeout), + follow_redirects=True, + ) + + async def close(self): + await self._client.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + await self.close() + + async def _request(self, method: str, path: str, **kwargs) -> Any: + resp = await self._client.request(method, path, **kwargs) + if resp.status_code >= 400: + try: + detail = resp.json().get("detail", resp.text) + except Exception: + detail = resp.text or f"HTTP {resp.status_code}" + raise IgnisError(resp.status_code, detail) + return resp.json() + + async def _get(self, path: str, **kwargs) -> Any: + return await self._request("GET", path, **kwargs) + + async def _post(self, path: str, **kwargs) -> Any: + return await self._request("POST", path, **kwargs) + + async def _delete(self, path: str, **kwargs) -> Any: + return await self._request("DELETE", path, **kwargs) + + # ------------------------------------------------------------------ Auth + + async def auth_me(self) -> dict[str, Any]: + return await self._get("/auth/me") + + # ------------------------------------------------------------------ System + + async def system_info(self) -> ServerInfoResponse: + data = await self._get("/system/info") + return ServerInfoResponse(**data) + + # ------------------------------------------------------------------ Devices + + async def list_devices(self) -> dict[str, dict[str, str]]: + return await self._get("/devices") + + async def list_groups(self) -> dict[str, dict[str, Any]]: + return await self._get("/devices/groups") + + async def list_scenes(self) -> dict[str, int]: + return dict(SCENES) + + async def create_group( + self, group_id: str, name: str, macs: list[str] + ) -> dict[str, Any]: + return await self._post( + "/devices/groups", json={"id": group_id, "name": name, "macs": macs} + ) + + async def delete_group(self, group_id: str) -> dict[str, Any]: + return await self._delete(f"/devices/groups/{group_id}") + + async def rescan(self) -> RescanResponse: + data = await self._post("/devices/rescan") + return RescanResponse(**data) + + # ------------------------------------------------------------------ Control + + async def control_device( + self, device_id: str, payload: CommandRequest + ) -> DeviceControlResponse: + data = await self._post( + f"/control/device/{device_id}", + json=payload.model_dump(exclude_none=True), + ) + return DeviceControlResponse(**data) + + async def control_group( + self, group_id: str, payload: CommandRequest + ) -> GroupControlResponse: + data = await self._post( + f"/control/group/{group_id}", + json=payload.model_dump(exclude_none=True), + ) + return GroupControlResponse(**data) + + async def blink_device(self, device_id: str) -> BlinkResponse: + data = await self._post(f"/control/device/{device_id}/blink") + return BlinkResponse(**data) + + async def device_status(self, device_id: str) -> DeviceStatusResponse: + data = await self._get(f"/control/device/{device_id}/status") + return DeviceStatusResponse(**data) + + async def group_status(self, group_id: str) -> GroupStatusResponse: + data = await self._get(f"/control/group/{group_id}/status") + return GroupStatusResponse(**data) + + # --------------------------------------------------------------- Schedules + + async def create_once_schedule( + self, payload: ScheduleOnceRequest + ) -> ScheduleCreateResponse: + data = await self._post( + "/schedules/once", + json=payload.model_dump(exclude_none=True), + ) + return ScheduleCreateResponse(**data) + + async def create_cron_schedule( + self, payload: ScheduleCronRequest + ) -> ScheduleCreateResponse: + data = await self._post( + "/schedules/cron", + json=payload.model_dump(exclude_none=True), + ) + return ScheduleCreateResponse(**data) + + async def list_schedules(self) -> ScheduleTasksResponse: + data = await self._get("/schedules/tasks") + return ScheduleTasksResponse(**data) + + async def delete_schedule(self, job_id: str) -> DeleteStatusResponse: + data = await self._delete(f"/schedules/{job_id}") + return DeleteStatusResponse(**data) + + # ------------------------------------------------------------------- Stats + + async def stats_summary(self, days: int = 7) -> dict[str, Any]: + return await self._get("/stats/summary", params={"days": days}) + + async def stats_log(self, limit: int = 50) -> list[dict[str, Any]]: + return await self._get("/stats/log", params={"limit": limit}) + + # ---------------------------------------------------------------- API Keys + + async def list_api_keys(self) -> list[dict[str, Any]]: + return await self._get("/api-keys") + + async def create_api_key( + self, name: str, *, is_admin: bool = False + ) -> dict[str, Any]: + return await self._post( + "/api-keys", params={"name": name, "is_admin": is_admin} + ) + + async def revoke_api_key(self, key_or_id: str) -> dict[str, Any]: + return await self._post( + "/api-keys/revoke", json=KeyActionRequest(key=key_or_id).model_dump() + ) + + async def activate_api_key(self, key_or_id: str) -> dict[str, Any]: + return await self._post( + "/api-keys/activate", json=KeyActionRequest(key=key_or_id).model_dump() + ) diff --git a/ignis_client/example.py b/ignis_client/example.py new file mode 100644 index 0000000..047f428 --- /dev/null +++ b/ignis_client/example.py @@ -0,0 +1,158 @@ +""" +Пример использования Ignis Client. + +Скопируй каталог `ignis_client/` на целевую машину и положи рядом с ботом. +Единственные внешние зависимости — httpx и pydantic. + + import sys + sys.path.insert(0, "/opt/my-bot") + from ignis_client import IgnisClient, CommandRequest + + ignis = IgnisClient("http://192.168.1.50:8000", api_key="change-me") +""" + +from ignis_client import ( + IgnisClient, + AsyncIgnisClient, + CommandRequest, + ScheduleOnceRequest, + ScheduleCronRequest, +) + +SERVER_URL = "http://127.0.0.1:8000" +API_KEY = "change-me" + + +def sync_example(): + """Синхронное использование — для простых скриптов и cron-задач.""" + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + me = ignis.auth_me() + print(f"Авторизован как: {me['name']} (admin={me['is_admin']})") + + devices = ignis.list_devices() + print(f"Устройств в сети: {len(devices)}") + + groups = ignis.list_groups() + for gid, g in groups.items(): + print(f" Группа {g['name']}: {g['device_ids']}") + + if devices: + mac = next(iter(devices)) + status = ignis.device_status(mac) + print( + f" Статус {mac}: on={status.status.get('state')}, " + f"brightness={status.status.get('dimming')}" + ) + + for gid in groups: + ignis.control_group(gid, CommandRequest(state=True, brightness=80)) + print(f" Включил группу {gid}") + + if me["is_admin"]: + tasks = ignis.list_schedules() + print(f"Активных расписаний: {len(tasks.tasks)}") + summary = ignis.stats_summary(days=7) + print(f"Статистика за {summary['period_days']} дн:") + for g in summary.get("groups", []): + print(f" {g['target_id']}: {g['total_commands']} команд") + else: + print("Гостевой ключ — расписания и статистика недоступны.") + + +def cron_example(): + """Создание cron-расписания для группы.""" + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + req = ScheduleCronRequest( + target_id="bedroom", + hour="7", + minute="30", + day_of_week="mon-fri", + state=True, + brightness=60, + scene="wake_up", + ) + result = ignis.create_cron_schedule(req) + print(f"Расписание создано: job_id={result.job_id}") + + +def scene_example(): + """Включение сцены и смена температуры.""" + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + ignis.control_group( + "livingroom", CommandRequest(scene="fireplace", brightness=50) + ) + + ignis.control_device("aa:bb:cc:dd:ee:ff", CommandRequest(temp=3200)) + + ignis.control_group("bedroom", CommandRequest(r=255, g=120, b=60)) + + +def blink_example(): + """Мигание лампой (поиск устройства в комнате).""" + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + result = ignis.blink_device("aa:bb:cc:dd:ee:ff") + print(f"Исходное состояние: {'вкл' if result.original else 'выкл'}") + + +def schedule_example(): + """Одноразовое расписание.""" + + from datetime import datetime, timedelta, timezone + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + run_at = datetime.now(timezone.utc) + timedelta(hours=1) + req = ScheduleOnceRequest( + target_id="livingroom", + run_at=run_at, + is_group=True, + state=False, + ) + result = ignis.create_once_schedule(req) + print(f"Отложенное выключение: {result.job_id} в {result.run_at}") + + +def rescan_example(): + """Ручной рескан сети.""" + + with IgnisClient(SERVER_URL, API_KEY) as ignis: + result = ignis.rescan() + print( + f"Найдено: {result.found}, новых: {result.added}, онлайн: {result.online}" + ) + + +async def async_example(): + """Асинхронное использование — для ботов на asyncio.""" + + async with AsyncIgnisClient(SERVER_URL, API_KEY) as ignis: + me = await ignis.auth_me() + print(f"Авторизован как: {me['name']}") + + devices = await ignis.list_devices() + if devices: + mac = next(iter(devices)) + await ignis.control_device(mac, CommandRequest(state=True, brightness=100)) + + +if __name__ == "__main__": + import sys + + mode = sys.argv[1] if len(sys.argv) > 1 else "sync" + + if mode == "sync": + sync_example() + elif mode == "cron": + cron_example() + elif mode == "async": + import asyncio + + asyncio.run(async_example()) + else: + print(f"Использование: python example.py [sync|cron|async]") + print(f" sync — показать устройства и включить группы") + print(f" cron — создать утреннее расписание") + print(f" async — асинхронный пример") diff --git a/ignis_client/models.py b/ignis_client/models.py new file mode 100644 index 0000000..036d15e --- /dev/null +++ b/ignis_client/models.py @@ -0,0 +1,278 @@ +from datetime import datetime +from typing import Any, List, Literal + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class IgnisError(Exception): + def __init__(self, status_code: int, detail: str): + self.status_code = status_code + self.detail = detail + super().__init__(f"[{status_code}] {detail}") + + +SCENES: dict[str, int] = { + "ocean": 1, + "romance": 2, + "party": 3, + "fireplace": 5, + "cozy": 6, + "forest": 10, + "pastel_colors": 11, + "wake_up": 12, + "bedtime": 13, + "warm_white": 14, + "daylight": 15, + "cool_white": 16, + "night_light": 17, + "focus": 18, + "relax": 19, + "true_colors": 20, + "tv_time": 21, + "plant_growth": 22, + "spring": 23, + "summer": 24, + "fall": 25, + "deep_dive": 26, + "jungle": 27, + "mojito": 28, + "club": 29, + "christmas": 30, + "halloween": 31, + "candlelight": 32, + "golden_white": 33, + "pulse": 34, + "steampunk": 35, +} + + +class CommandRequest(BaseModel): + model_config = ConfigDict(extra="forbid") + + state: bool | None = None + brightness: int | None = Field(default=None, ge=10, le=100) + scene: str | None = None + temp: int | None = Field(default=None, ge=2200, le=6500) + r: int | None = Field(default=None, ge=0, le=255) + g: int | None = Field(default=None, ge=0, le=255) + b: int | None = Field(default=None, ge=0, le=255) + + @property + def has_rgb(self) -> bool: + return all(channel is not None for channel in (self.r, self.g, self.b)) + + @model_validator(mode="after") + def validate_payload(self): + if all( + value is None + for value in ( + self.state, + self.brightness, + self.scene, + self.temp, + self.r, + self.g, + self.b, + ) + ): + raise ValueError("Никаких команд не передано") + + rgb_values = (self.r, self.g, self.b) + if any(value is not None for value in rgb_values) and not self.has_rgb: + raise ValueError("Поля r, g и b нужно передавать вместе") + + exclusive_modes = ( + self.scene is not None, + self.temp is not None, + self.has_rgb, + ) + if sum(1 for enabled in exclusive_modes if enabled) > 1: + raise ValueError("Можно передать только один режим из scene, temp или rgb") + + return self + + def to_wiz_params(self) -> dict[str, Any]: + params: dict[str, Any] = {} + if self.state is not None: + params["state"] = self.state + if self.brightness is not None: + params["dimming"] = self.brightness + if self.scene is not None: + if self.scene not in SCENES: + raise ValueError("Неизвестная сцена") + params["sceneId"] = SCENES[self.scene] + elif self.temp is not None: + params["temp"] = self.temp + elif self.has_rgb: + params["r"] = self.r + params["g"] = self.g + params["b"] = self.b + return params + + +class ScheduleOnceRequest(CommandRequest): + target_id: str = Field(min_length=1) + run_at: datetime | None = None + hours_from_now: int | None = Field(default=None, ge=0) + is_group: bool = True + + @model_validator(mode="after") + def validate_schedule_target(self): + has_run_at = self.run_at is not None + has_hours_from_now = self.hours_from_now is not None + if has_run_at == has_hours_from_now: + raise ValueError("Передайте ровно одно из полей run_at или hours_from_now") + return self + + +class ScheduleCronRequest(CommandRequest): + target_id: str = Field(min_length=1) + hour: str = Field(min_length=1) + minute: str = Field(min_length=1) + day_of_week: str = "*" + is_group: bool = True + + +class DeviceControlResponse(BaseModel): + device_id: str + applied: dict[str, Any] + result: dict[str, Any] | None = None + status: Literal["ok"] + + +class GroupCommandResult(BaseModel): + ip: str + ok: bool + kind: str + result: dict[str, Any] | None = None + error: str | None = None + + +class GroupControlResponse(BaseModel): + status: Literal["ok", "partial"] + applied: dict[str, Any] + sent_to: list[str] + success_count: int + failure_count: int + results: list[GroupCommandResult] + + +class BlinkResponse(BaseModel): + status: Literal["blink_done"] + original: bool + + +class DeviceStatusResponse(BaseModel): + device_id: str + status: dict[str, Any] + + +class GroupStatusItem(BaseModel): + ip: str + status: dict[str, Any] | None = None + error: str | None = None + kind: str | None = None + + +class GroupStatusResponse(BaseModel): + group_id: str + results: list[GroupStatusItem] + + +class ScheduleCreateResponse(BaseModel): + status: str + job_id: str + run_at: str | None = None + + +class ScheduleTaskItem(BaseModel): + id: str + target_id: str + is_group: bool + state: bool | None = None + action_params: dict[str, Any] + trigger_type: str + next_run: str | None = None + hour: str | None = None + minute: str | None = None + day_of_week: str | None = None + job_present: bool + + +class ScheduleTasksResponse(BaseModel): + tasks: list[ScheduleTaskItem] + + +class DeleteStatusResponse(BaseModel): + status: Literal["deleted"] + + +class RescanResponse(BaseModel): + status: Literal["ok"] + found: int + added: int + updated: int + removed_offline: int + pending_removal: int + online: int + + +class ServerBuildInfoResponse(BaseModel): + version: str | None = None + git_sha: str | None = None + build_date: str | None = None + + +class ServerUrlInfoResponse(BaseModel): + observed_base_url: str | None = None + configured_public_base_url: str | None = None + effective_public_base_url: str | None = None + + +class ServerConfigurationStatusResponse(BaseModel): + configured: bool + master_key_configured: bool + scan_network_configured: bool + public_base_url_configured: bool + build_metadata_complete: bool + + +class ServerDiscoveryInfoResponse(BaseModel): + last_scan_at: str | None = None + last_scan_mode: str | None = None + online: int | None = None + found: int | None = None + added: int | None = None + updated: int | None = None + removed_offline: int | None = None + pending_removal: int | None = None + + +class ServerInfoResponse(BaseModel): + app_name: str + instance_name: str | None = None + timezone: str | None = None + uptime_seconds: int + diagnostics_visible: bool + started_at: str | None = None + build: ServerBuildInfoResponse | None = None + urls: ServerUrlInfoResponse | None = None + configuration: ServerConfigurationStatusResponse | None = None + discovery: ServerDiscoveryInfoResponse | None = None + + +class KeyActionRequest(BaseModel): + key: str + + +class DeviceSchema(BaseModel): + id: str + ip: str + name: str + room: str + + +class GroupCreateSchema(BaseModel): + id: str + name: str + macs: List[str] diff --git a/ignis_client/sync.py b/ignis_client/sync.py new file mode 100644 index 0000000..5b3f239 --- /dev/null +++ b/ignis_client/sync.py @@ -0,0 +1,184 @@ +from __future__ import annotations + +from typing import Any + +import httpx + +from .models import ( + SCENES, + BlinkResponse, + CommandRequest, + DeleteStatusResponse, + DeviceControlResponse, + DeviceStatusResponse, + GroupControlResponse, + GroupStatusResponse, + IgnisError, + KeyActionRequest, + RescanResponse, + ScheduleCreateResponse, + ScheduleCronRequest, + ScheduleOnceRequest, + ScheduleTasksResponse, + ServerInfoResponse, +) + + +class IgnisClient: + def __init__(self, base_url: str, api_key: str, *, timeout: float = 10.0): + self._base = base_url.rstrip("/") + self._api_key = api_key + self._client = httpx.Client( + base_url=self._base, + headers={"X-API-Key": self._api_key}, + timeout=httpx.Timeout(timeout), + follow_redirects=True, + ) + + def close(self): + self._client.close() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def _request(self, method: str, path: str, **kwargs) -> Any: + resp = self._client.request(method, path, **kwargs) + if resp.status_code >= 400: + try: + detail = resp.json().get("detail", resp.text) + except Exception: + detail = resp.text or f"HTTP {resp.status_code}" + raise IgnisError(resp.status_code, detail) + return resp.json() + + def _get(self, path: str, **kwargs) -> Any: + return self._request("GET", path, **kwargs) + + def _post(self, path: str, **kwargs) -> Any: + return self._request("POST", path, **kwargs) + + def _delete(self, path: str, **kwargs) -> Any: + return self._request("DELETE", path, **kwargs) + + # ------------------------------------------------------------------ Auth + + def auth_me(self) -> dict[str, Any]: + return self._get("/auth/me") + + # ------------------------------------------------------------------ System + + def system_info(self) -> ServerInfoResponse: + data = self._get("/system/info") + return ServerInfoResponse(**data) + + # ------------------------------------------------------------------ Devices + + def list_devices(self) -> dict[str, dict[str, str]]: + return self._get("/devices") + + def list_groups(self) -> dict[str, dict[str, Any]]: + return self._get("/devices/groups") + + def list_scenes(self) -> dict[str, int]: + return dict(SCENES) + + def create_group(self, group_id: str, name: str, macs: list[str]) -> dict[str, Any]: + return self._post( + "/devices/groups", json={"id": group_id, "name": name, "macs": macs} + ) + + def delete_group(self, group_id: str) -> dict[str, Any]: + return self._delete(f"/devices/groups/{group_id}") + + def rescan(self) -> RescanResponse: + data = self._post("/devices/rescan") + return RescanResponse(**data) + + # ------------------------------------------------------------------ Control + + def control_device( + self, device_id: str, payload: CommandRequest + ) -> DeviceControlResponse: + data = self._post( + f"/control/device/{device_id}", + json=payload.model_dump(exclude_none=True), + ) + return DeviceControlResponse(**data) + + def control_group( + self, group_id: str, payload: CommandRequest + ) -> GroupControlResponse: + data = self._post( + f"/control/group/{group_id}", + json=payload.model_dump(exclude_none=True), + ) + return GroupControlResponse(**data) + + def blink_device(self, device_id: str) -> BlinkResponse: + data = self._post(f"/control/device/{device_id}/blink") + return BlinkResponse(**data) + + def device_status(self, device_id: str) -> DeviceStatusResponse: + data = self._get(f"/control/device/{device_id}/status") + return DeviceStatusResponse(**data) + + def group_status(self, group_id: str) -> GroupStatusResponse: + data = self._get(f"/control/group/{group_id}/status") + return GroupStatusResponse(**data) + + # --------------------------------------------------------------- Schedules + + def create_once_schedule( + self, payload: ScheduleOnceRequest + ) -> ScheduleCreateResponse: + data = self._post( + "/schedules/once", + json=payload.model_dump(exclude_none=True), + ) + return ScheduleCreateResponse(**data) + + def create_cron_schedule( + self, payload: ScheduleCronRequest + ) -> ScheduleCreateResponse: + data = self._post( + "/schedules/cron", + json=payload.model_dump(exclude_none=True), + ) + return ScheduleCreateResponse(**data) + + def list_schedules(self) -> ScheduleTasksResponse: + data = self._get("/schedules/tasks") + return ScheduleTasksResponse(**data) + + def delete_schedule(self, job_id: str) -> DeleteStatusResponse: + data = self._delete(f"/schedules/{job_id}") + return DeleteStatusResponse(**data) + + # ------------------------------------------------------------------- Stats + + def stats_summary(self, days: int = 7) -> dict[str, Any]: + return self._get("/stats/summary", params={"days": days}) + + def stats_log(self, limit: int = 50) -> list[dict[str, Any]]: + return self._get("/stats/log", params={"limit": limit}) + + # ---------------------------------------------------------------- API Keys + + def list_api_keys(self) -> list[dict[str, Any]]: + return self._get("/api-keys") + + def create_api_key(self, name: str, *, is_admin: bool = False) -> dict[str, Any]: + return self._post("/api-keys", params={"name": name, "is_admin": is_admin}) + + def revoke_api_key(self, key_or_id: str) -> dict[str, Any]: + return self._post( + "/api-keys/revoke", json=KeyActionRequest(key=key_or_id).model_dump() + ) + + def activate_api_key(self, key_or_id: str) -> dict[str, Any]: + return self._post( + "/api-keys/activate", json=KeyActionRequest(key=key_or_id).model_dump() + ) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8610c8f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ignis-client" +version = "1.0.0" +description = "HTTP client library for Ignis Core — WiZ smart light server" +requires-python = ">=3.10" +dependencies = [ + "httpx>=0.26", + "pydantic>=2.5", +] + +[project.optional-dependencies] +dev = [ + "black", + "pytest", +] + +[tool.hatch.build.targets.wheel] +packages = ["ignis_client"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a604934 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +httpx>=0.26 +pydantic>=2.5 diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..c40c405 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,922 @@ +import json +import sys +import os +import unittest +from datetime import datetime, timedelta, timezone + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from unittest.mock import AsyncMock, MagicMock, patch + +from ignis_client.models import ( + IgnisError, + CommandRequest, + ScheduleOnceRequest, + ScheduleCronRequest, + DeviceControlResponse, + GroupControlResponse, + GroupCommandResult, + RescanResponse, + ScheduleCreateResponse, + ScheduleTasksResponse, + ScheduleTaskItem, + ServerInfoResponse, + KeyActionRequest, + SCENES, +) +from ignis_client.sync import IgnisClient +from ignis_client.async_client import AsyncIgnisClient + + +class _FakeResponse: + def __init__(self, status_code, json_body): + self.status_code = status_code + self._json = json_body + + def json(self): + return self._json + + @property + def text(self): + return str(self._json) + + +def _make_response(status_code, json_body, headers=None): + return _FakeResponse(status_code, json_body) + resp = MagicMock() + resp.status_code = status_code + resp.json.return_value = json_body + resp.headers = headers or {} + return resp + + +class CommandRequestTests(unittest.TestCase): + def test_state_only(self): + cmd = CommandRequest(state=True) + self.assertEqual(cmd.to_wiz_params(), {"state": True}) + + def test_state_off(self): + cmd = CommandRequest(state=False) + self.assertEqual(cmd.to_wiz_params(), {"state": False}) + + def test_brightness(self): + cmd = CommandRequest(brightness=50) + self.assertEqual(cmd.to_wiz_params(), {"dimming": 50}) + + def test_state_and_brightness(self): + cmd = CommandRequest(state=True, brightness=80) + self.assertEqual(cmd.to_wiz_params(), {"state": True, "dimming": 80}) + + def test_scene(self): + cmd = CommandRequest(scene="cozy") + self.assertEqual(cmd.to_wiz_params(), {"sceneId": 6}) + + def test_scene_fireplace(self): + cmd = CommandRequest(scene="fireplace") + self.assertEqual(cmd.to_wiz_params(), {"sceneId": 5}) + + def test_temp(self): + cmd = CommandRequest(temp=3200) + self.assertEqual(cmd.to_wiz_params(), {"temp": 3200}) + + def test_temp_min(self): + cmd = CommandRequest(temp=2200) + self.assertEqual(cmd.to_wiz_params(), {"temp": 2200}) + + def test_temp_max(self): + cmd = CommandRequest(temp=6500) + self.assertEqual(cmd.to_wiz_params(), {"temp": 6500}) + + def test_rgb(self): + cmd = CommandRequest(r=255, g=180, b=120) + self.assertEqual(cmd.to_wiz_params(), {"r": 255, "g": 180, "b": 120}) + + def test_rgb_zero(self): + cmd = CommandRequest(r=0, g=0, b=0) + self.assertEqual(cmd.to_wiz_params(), {"r": 0, "g": 0, "b": 0}) + + def test_empty_rejected(self): + with self.assertRaises(Exception) as ctx: + CommandRequest() + self.assertIn("Никаких команд не передано", str(ctx.exception)) + + def test_partial_rgb_rejected(self): + with self.assertRaises(Exception) as ctx: + CommandRequest(r=255, g=128) + self.assertIn("r, g и b нужно передавать вместе", str(ctx.exception)) + + def test_partial_rgb_single_channel_rejected(self): + with self.assertRaises(Exception): + CommandRequest(r=255) + + def test_scene_temp_conflict(self): + with self.assertRaises(Exception) as ctx: + CommandRequest(scene="party", temp=3200) + self.assertIn("только один режим", str(ctx.exception)) + + def test_scene_rgb_conflict(self): + with self.assertRaises(Exception): + CommandRequest(scene="ocean", r=0, g=0, b=0) + + def test_temp_rgb_conflict(self): + with self.assertRaises(Exception): + CommandRequest(temp=3000, r=255, g=0, b=0) + + def test_all_three_conflict(self): + with self.assertRaises(Exception): + CommandRequest(scene="ocean", temp=3000, r=255, g=0, b=0) + + def test_brightness_at_min(self): + cmd = CommandRequest(brightness=10) + self.assertEqual(cmd.to_wiz_params(), {"dimming": 10}) + + def test_brightness_at_max(self): + cmd = CommandRequest(brightness=100) + self.assertEqual(cmd.to_wiz_params(), {"dimming": 100}) + + def test_brightness_below_min(self): + with self.assertRaises(Exception): + CommandRequest(brightness=9) + + def test_brightness_above_max(self): + with self.assertRaises(Exception): + CommandRequest(brightness=101) + + def test_temp_below_min(self): + with self.assertRaises(Exception): + CommandRequest(temp=2199) + + def test_temp_above_max(self): + with self.assertRaises(Exception): + CommandRequest(temp=6501) + + def test_r_channel_below_min(self): + with self.assertRaises(Exception): + CommandRequest(r=-1, g=0, b=0) + + def test_r_channel_above_max(self): + with self.assertRaises(Exception): + CommandRequest(r=256, g=0, b=0) + + def test_extra_field_forbidden(self): + with self.assertRaises(Exception): + CommandRequest(state=True, bogus=42) + + def test_unknown_scene_in_to_wiz_params(self): + cmd = CommandRequest(scene="bullshit") + with self.assertRaises(ValueError) as ctx: + cmd.to_wiz_params() + self.assertIn("Неизвестная сцена", str(ctx.exception)) + + def test_has_rgb_property_true(self): + self.assertTrue(CommandRequest(r=1, g=2, b=3).has_rgb) + + def test_has_rgb_property_false(self): + self.assertFalse(CommandRequest(state=True).has_rgb) + + def test_model_dump_excludes_none(self): + cmd = CommandRequest(state=True) + self.assertEqual(cmd.model_dump(exclude_none=True), {"state": True}) + + def test_model_dump_excludes_none_with_brightness(self): + cmd = CommandRequest(state=False, brightness=40) + self.assertEqual( + cmd.model_dump(exclude_none=True), {"state": False, "brightness": 40} + ) + + def test_all_scenes_have_valid_ids(self): + self.assertEqual(len(SCENES), 31) + for name, sid in SCENES.items(): + self.assertIsInstance(name, str) + self.assertIsInstance(sid, int) + self.assertGreater(sid, 0) + self.assertLessEqual(sid, 35) + cmd = CommandRequest(scene=name) + self.assertEqual(cmd.to_wiz_params(), {"sceneId": sid}) + + +class ScheduleOnceRequestTests(unittest.TestCase): + def test_both_run_at_and_hours_from_now(self): + with self.assertRaises(Exception) as ctx: + ScheduleOnceRequest( + target_id="grp-1", + run_at=datetime.now(timezone.utc), + hours_from_now=2, + state=True, + ) + self.assertIn("ровно одно", str(ctx.exception)) + + def test_neither(self): + with self.assertRaises(Exception) as ctx: + ScheduleOnceRequest(target_id="grp-1", state=True) + self.assertIn("ровно одно", str(ctx.exception)) + + def test_with_run_at(self): + ts = datetime.now(timezone.utc) + timedelta(hours=1) + req = ScheduleOnceRequest(target_id="grp-1", run_at=ts, state=True) + self.assertEqual(req.target_id, "grp-1") + self.assertEqual(req.run_at, ts) + self.assertIsNone(req.hours_from_now) + + def test_with_hours_from_now(self): + req = ScheduleOnceRequest( + target_id="dev-1", hours_from_now=3, is_group=False, brightness=50 + ) + self.assertEqual(req.target_id, "dev-1") + self.assertEqual(req.hours_from_now, 3) + self.assertFalse(req.is_group) + + def test_hours_from_now_negative(self): + with self.assertRaises(Exception): + ScheduleOnceRequest(target_id="grp-1", hours_from_now=-1, state=True) + + def test_empty_target_id_rejected(self): + with self.assertRaises(Exception): + ScheduleOnceRequest( + target_id="", + run_at=datetime.now(timezone.utc) + timedelta(hours=1), + state=True, + ) + + def test_inherits_command_validation(self): + with self.assertRaises(Exception): + ScheduleOnceRequest( + target_id="grp-1", hours_from_now=1, scene="party", temp=3000 + ) + + def test_model_dump_includes_schedule_fields(self): + ts = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc) + req = ScheduleOnceRequest(target_id="grp-1", run_at=ts, state=True) + dumped = req.model_dump(exclude_none=True) + self.assertEqual(dumped["target_id"], "grp-1") + self.assertEqual(dumped["state"], True) + + +class ScheduleCronRequestTests(unittest.TestCase): + def test_basic_cron(self): + req = ScheduleCronRequest(target_id="grp-1", hour="8", minute="30", state=True) + self.assertEqual(req.hour, "8") + self.assertEqual(req.minute, "30") + self.assertEqual(req.day_of_week, "*") + self.assertTrue(req.is_group) + + def test_cron_with_day_of_week(self): + req = ScheduleCronRequest( + target_id="grp-1", + hour="18", + minute="0", + day_of_week="mon-fri", + scene="cozy", + ) + self.assertEqual(req.day_of_week, "mon-fri") + + def test_empty_hour_rejected(self): + with self.assertRaises(Exception): + ScheduleCronRequest(target_id="grp-1", hour="", minute="30", state=True) + + def test_empty_minute_rejected(self): + with self.assertRaises(Exception): + ScheduleCronRequest(target_id="grp-1", hour="8", minute="", state=True) + + def test_empty_target_id_rejected(self): + with self.assertRaises(Exception): + ScheduleCronRequest(target_id="", hour="8", minute="30", state=True) + + +class IgnisErrorTests(unittest.TestCase): + def test_construction(self): + err = IgnisError(503, "Сервер не настроен") + self.assertEqual(err.status_code, 503) + self.assertEqual(err.detail, "Сервер не настроен") + + def test_string_representation(self): + err = IgnisError(404, "Лампа не в сети") + self.assertIn("404", str(err)) + self.assertIn("Лампа не в сети", str(err)) + + def test_is_exception(self): + err = IgnisError(403, "bad key") + self.assertIsInstance(err, Exception) + + +class ResponseModelTests(unittest.TestCase): + def test_device_control_response(self): + data = {"device_id": "aa:bb:cc:dd", "applied": {"state": True}, "status": "ok"} + resp = DeviceControlResponse(**data) + self.assertEqual(resp.device_id, "aa:bb:cc:dd") + self.assertIsNone(resp.result) + + def test_device_control_response_with_result(self): + data = { + "device_id": "aa:bb:cc:dd", + "applied": {"state": True}, + "result": {"mac": "aa:bb:cc:dd", "rssi": -42}, + "status": "ok", + } + resp = DeviceControlResponse(**data) + self.assertEqual(resp.result["mac"], "aa:bb:cc:dd") + + def test_group_control_response_ok(self): + data = { + "status": "ok", + "applied": {"state": True}, + "sent_to": ["10.0.0.1", "10.0.0.2"], + "success_count": 2, + "failure_count": 0, + "results": [ + {"ip": "10.0.0.1", "ok": True, "kind": "ok", "result": {"mac": "aa"}}, + {"ip": "10.0.0.2", "ok": True, "kind": "ok", "result": {"mac": "bb"}}, + ], + } + resp = GroupControlResponse(**data) + self.assertEqual(resp.status, "ok") + self.assertEqual(resp.success_count, 2) + self.assertEqual(len(resp.results), 2) + + def test_group_control_response_partial(self): + data = { + "status": "partial", + "applied": {"temp": 3200}, + "sent_to": ["10.0.0.1", "10.0.0.2"], + "success_count": 1, + "failure_count": 1, + "results": [ + {"ip": "10.0.0.1", "ok": True, "kind": "ok", "result": {}}, + { + "ip": "10.0.0.2", + "ok": False, + "kind": "timeout", + "result": None, + "error": "timeout", + }, + ], + } + resp = GroupControlResponse(**data) + self.assertEqual(resp.status, "partial") + self.assertEqual(resp.failure_count, 1) + self.assertFalse(resp.results[1].ok) + + def test_rescan_response(self): + data = { + "status": "ok", + "found": 5, + "added": 2, + "updated": 3, + "removed_offline": 1, + "pending_removal": 0, + "online": 4, + } + resp = RescanResponse(**data) + self.assertEqual(resp.found, 5) + self.assertEqual(resp.online, 4) + + def test_schedule_tasks_response(self): + data = { + "tasks": [ + { + "id": "cron_abc123", + "target_id": "grp-1", + "is_group": True, + "state": True, + "action_params": {"state": True}, + "trigger_type": "cron", + "next_run": "2026-06-01T08:00:00", + "hour": "8", + "minute": "0", + "day_of_week": "*", + "job_present": True, + } + ] + } + resp = ScheduleTasksResponse(**data) + self.assertEqual(len(resp.tasks), 1) + self.assertEqual(resp.tasks[0].target_id, "grp-1") + + def test_server_info_response_diagnostics_visible(self): + data = { + "app_name": "Ignis Core", + "instance_name": "Home", + "timezone": "Asia/Novosibirsk", + "uptime_seconds": 3600, + "diagnostics_visible": True, + "started_at": "2026-05-01T00:00:00", + "build": {"version": "1.0.0", "git_sha": None, "build_date": None}, + "urls": { + "observed_base_url": None, + "configured_public_base_url": None, + "effective_public_base_url": None, + }, + "configuration": { + "configured": True, + "master_key_configured": True, + "scan_network_configured": False, + "public_base_url_configured": False, + "build_metadata_complete": False, + }, + "discovery": { + "online": 3, + "last_scan_at": "2026-05-01T00:00:00", + "last_scan_mode": "startup", + }, + } + resp = ServerInfoResponse(**data) + self.assertEqual(resp.app_name, "Ignis Core") + self.assertTrue(resp.diagnostics_visible) + self.assertIsNotNone(resp.build) + self.assertEqual(resp.discovery.online, 3) + + def test_server_info_response_diagnostics_hidden(self): + data = { + "app_name": "Ignis Core", + "instance_name": "Home", + "uptime_seconds": 3600, + "diagnostics_visible": False, + "discovery": { + "last_scan_at": "2026-05-01T00:00:00", + "last_scan_mode": "startup", + "online": 3, + }, + } + resp = ServerInfoResponse(**data) + self.assertFalse(resp.diagnostics_visible) + self.assertIsNone(resp.timezone) + self.assertIsNone(resp.build) + self.assertIsNone(resp.configuration) + + def test_key_action_request(self): + req = KeyActionRequest(key="some-token") + self.assertEqual(req.key, "some-token") + + def test_group_command_result_error(self): + item = GroupCommandResult( + ip="10.0.0.1", ok=False, kind="timeout", error="timeout" + ) + self.assertFalse(item.ok) + self.assertEqual(item.kind, "timeout") + self.assertIsNone(item.result) + + +class SyncClientTests(unittest.TestCase): + def setUp(self): + self.client = IgnisClient("http://127.0.0.1:8000", "test-key") + self.client._client = MagicMock() + self.client._client.request = MagicMock() + + def _mock(self, method, path, status=200, body=None): + self.client._client.request.return_value = _make_response( + status, body if body is not None else {} + ) + + def test_auth_me(self): + self._mock( + "GET", + "/auth/me", + body={"is_admin": True, "is_master": True, "name": "master"}, + ) + result = self.client.auth_me() + self.assertEqual(result["name"], "master") + self.assertTrue(result["is_master"]) + + def test_list_devices_returns_dict(self): + self._mock( + "GET", + "/devices", + body={ + "aa:bb:cc:dd": { + "id": "aa:bb:cc:dd", + "ip": "10.0.0.1", + "name": "WiZ dd", + "room": "Default", + } + }, + ) + result = self.client.list_devices() + self.assertIsInstance(result, dict) + self.assertIn("aa:bb:cc:dd", result) + + def test_list_scenes_local(self): + scenes = self.client.list_scenes() + self.assertEqual(scenes["cozy"], 6) + self.assertEqual(len(scenes), 31) + + def test_control_device(self): + body = {"device_id": "aa:bb:cc:dd", "applied": {"state": True}, "status": "ok"} + self._mock("POST", "/control/device/aa:bb:cc:dd", body=body) + result = self.client.control_device("aa:bb:cc:dd", CommandRequest(state=True)) + self.assertEqual(result.status, "ok") + self.assertEqual(result.device_id, "aa:bb:cc:dd") + + def test_control_group(self): + body = { + "status": "ok", + "applied": {"state": True}, + "sent_to": ["10.0.0.1"], + "success_count": 1, + "failure_count": 0, + "results": [{"ip": "10.0.0.1", "ok": True, "kind": "ok", "result": {}}], + } + self._mock("POST", "/control/group/bedroom", body=body) + result = self.client.control_group("bedroom", CommandRequest(state=True)) + self.assertEqual(result.status, "ok") + self.assertEqual(result.success_count, 1) + + def test_blink_device(self): + self._mock( + "POST", + "/control/device/aa:bb:cc:dd/blink", + body={"status": "blink_done", "original": True}, + ) + result = self.client.blink_device("aa:bb:cc:dd") + self.assertEqual(result.status, "blink_done") + self.assertTrue(result.original) + + def test_device_status(self): + body = {"device_id": "aa:bb:cc:dd", "status": {"state": True, "dimming": 80}} + self._mock("GET", "/control/device/aa:bb:cc:dd/status", body=body) + result = self.client.device_status("aa:bb:cc:dd") + self.assertTrue(result.status["state"]) + + def test_group_status(self): + body = { + "group_id": "bedroom", + "results": [{"ip": "10.0.0.1", "status": {"state": True}}], + } + self._mock("GET", "/control/group/bedroom/status", body=body) + result = self.client.group_status("bedroom") + self.assertEqual(len(result.results), 1) + + def test_create_once_schedule(self): + body = { + "status": "scheduled", + "job_id": "once_abc123", + "run_at": "2026-06-01T12:00:00", + } + self._mock("POST", "/schedules/once", body=body) + ts = datetime(2026, 6, 1, 12, 0, 0, tzinfo=timezone.utc) + result = self.client.create_once_schedule( + ScheduleOnceRequest(target_id="grp-1", run_at=ts, state=True) + ) + self.assertEqual(result.status, "scheduled") + self.assertEqual(result.job_id, "once_abc123") + + def test_create_cron_schedule(self): + body = {"status": "cron_scheduled", "job_id": "cron_def456"} + self._mock("POST", "/schedules/cron", body=body) + result = self.client.create_cron_schedule( + ScheduleCronRequest(target_id="grp-1", hour="8", minute="0", state=True) + ) + self.assertEqual(result.status, "cron_scheduled") + + def test_list_schedules(self): + body = {"tasks": []} + self._mock("GET", "/schedules/tasks", body=body) + result = self.client.list_schedules() + self.assertEqual(len(result.tasks), 0) + + def test_delete_schedule(self): + self._mock("DELETE", "/schedules/cron_abc", body={"status": "deleted"}) + result = self.client.delete_schedule("cron_abc") + self.assertEqual(result.status, "deleted") + + def test_stats_summary(self): + body = {"period_days": 7, "since": "2026-05-20T00:00:00", "groups": []} + self._mock("GET", "/stats/summary", body=body) + result = self.client.stats_summary(days=7) + self.assertEqual(result["period_days"], 7) + + def test_stats_log(self): + body = [ + { + "id": 1, + "timestamp": "...", + "key_name": "master", + "action": "toggle_on", + "target_type": "device", + "target_id": "aa:bb:cc:dd", + "params": '{"command":{"state":true}}', + } + ] + self._mock("GET", "/stats/log", body=body) + result = self.client.stats_log(limit=10) + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["action"], "toggle_on") + self.assertIsInstance(result[0]["params"], str) + + def test_create_api_key_uses_query_params(self): + self._mock( + "POST", + "/api-keys", + body={"key": "secret-token", "key_id": "abc", "name": "bot"}, + ) + result = self.client.create_api_key("bot", is_admin=False) + self.assertEqual(result["name"], "bot") + self.assertIn("key", result) + + def test_list_api_keys(self): + self._mock("GET", "/api-keys", body=[]) + result = self.client.list_api_keys() + self.assertEqual(result, []) + + def test_revoke_api_key(self): + self._mock( + "POST", + "/api-keys/revoke", + body={"status": "revoked", "name": "bot", "key_id": "abc"}, + ) + result = self.client.revoke_api_key("secret-or-public-id") + self.assertEqual(result["status"], "revoked") + + def test_activate_api_key(self): + self._mock( + "POST", + "/api-keys/activate", + body={"status": "activated", "name": "bot", "key_id": "abc"}, + ) + result = self.client.activate_api_key("secret-or-public-id") + self.assertEqual(result["status"], "activated") + + def test_create_group(self): + self._mock( + "POST", "/devices/groups", body={"status": "created", "group": "bedroom"} + ) + result = self.client.create_group("bedroom", "Bedroom", ["aa:bb:cc:dd"]) + self.assertEqual(result["status"], "created") + + def test_delete_group(self): + self._mock( + "DELETE", + "/devices/groups/bedroom", + body={"status": "deleted", "id": "bedroom"}, + ) + result = self.client.delete_group("bedroom") + self.assertEqual(result["status"], "deleted") + + def test_rescan(self): + body = { + "status": "ok", + "found": 3, + "added": 1, + "updated": 2, + "removed_offline": 0, + "pending_removal": 0, + "online": 3, + } + self._mock("POST", "/devices/rescan", body=body) + result = self.client.rescan() + self.assertEqual(result.found, 3) + + def test_system_info(self): + body = { + "app_name": "Ignis Core", + "uptime_seconds": 3600, + "diagnostics_visible": False, + "discovery": {"online": 3}, + } + self._mock("GET", "/system/info", body=body) + result = self.client.system_info() + self.assertFalse(result.diagnostics_visible) + + # --- Error handling --- + + def test_403_raises_ignis_error(self): + self._mock( + "GET", "/api-keys", status=403, body={"detail": "Требуется мастер-ключ"} + ) + with self.assertRaises(IgnisError) as ctx: + self.client.list_api_keys() + self.assertEqual(ctx.exception.status_code, 403) + + def test_404_raises_ignis_error(self): + self._mock( + "POST", + "/control/device/no-such", + status=404, + body={"detail": "Лампа не в сети"}, + ) + with self.assertRaises(IgnisError) as ctx: + self.client.control_device("no-such", CommandRequest(state=True)) + self.assertEqual(ctx.exception.status_code, 404) + + def test_503_fail_closed(self): + self._mock( + "GET", + "/auth/me", + status=503, + body={"detail": "Сервер не настроен: задайте IGNIS_API_KEY"}, + ) + with self.assertRaises(IgnisError) as ctx: + self.client.auth_me() + self.assertEqual(ctx.exception.status_code, 503) + + def test_504_timeout(self): + body = {"detail": "Команда лампе не доставлена: таймаут ответа"} + self._mock("POST", "/control/device/aa:bb:cc:dd", status=504, body=body) + with self.assertRaises(IgnisError) as ctx: + self.client.control_device("aa:bb:cc:dd", CommandRequest(state=True)) + self.assertEqual(ctx.exception.status_code, 504) + + def test_context_manager(self): + with IgnisClient("http://127.0.0.1:8000", "key") as client: + client._client.request = MagicMock( + return_value=_make_response( + 200, {"name": "master", "is_admin": True, "is_master": True} + ) + ) + result = client.auth_me() + self.assertEqual(result["name"], "master") + + +class AsyncClientTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._responses = {} + + def _mock(self, method, path, status=200, body=None): + self._responses[(method, path)] = _make_response( + status, body if body is not None else {} + ) + + async def _fake_request(self, method, path, **kwargs): + resp = self._responses.get((method, path)) + if resp is None: + raise RuntimeError(f"Unexpected request: {method} {path}") + return resp + + async def asyncSetUp(self): + self.client = AsyncIgnisClient("http://127.0.0.1:8000", "test-key") + self.client._client = AsyncMock() + self.client._client.request = self._fake_request + + async def asyncTearDown(self): + await self.client.close() + + async def test_auth_me_async(self): + self._mock( + "GET", + "/auth/me", + body={"name": "master", "is_admin": True, "is_master": True}, + ) + result = await self.client.auth_me() + self.assertEqual(result["name"], "master") + + async def test_control_device_async(self): + body = {"device_id": "aa:bb:cc:dd", "applied": {"state": True}, "status": "ok"} + self._mock("POST", "/control/device/aa:bb:cc:dd", body=body) + result = await self.client.control_device( + "aa:bb:cc:dd", CommandRequest(state=True) + ) + self.assertEqual(result.status, "ok") + + async def test_async_error_handling(self): + self._mock( + "POST", + "/control/group/no-such", + status=404, + body={"detail": "Группа не найдена или оффлайн"}, + ) + with self.assertRaises(IgnisError) as ctx: + await self.client.control_group("no-such", CommandRequest(state=True)) + self.assertEqual(ctx.exception.status_code, 404) + + async def test_async_context_manager(self): + self._mock( + "GET", + "/auth/me", + body={"name": "master", "is_admin": True, "is_master": True}, + ) + result = await self.client.auth_me() + self.assertEqual(result["name"], "master") + + async def test_blink_device_async(self): + self._mock( + "POST", + "/control/device/aa:bb:cc:dd/blink", + body={"status": "blink_done", "original": False}, + ) + result = await self.client.blink_device("aa:bb:cc:dd") + self.assertEqual(result.status, "blink_done") + self.assertFalse(result.original) + + async def test_create_api_key_async(self): + self._mock( + "POST", "/api-keys", body={"key": "tok", "key_id": "abc", "name": "bot"} + ) + result = await self.client.create_api_key("bot", is_admin=True) + self.assertEqual(result["name"], "bot") + + +class EdgeCaseTests(unittest.TestCase): + def test_request_handles_non_json_error(self): + client = IgnisClient("http://127.0.0.1:8000", "key") + client._client = MagicMock() + resp = MagicMock() + resp.status_code = 502 + resp.json.side_effect = ValueError("not json") + resp.text = "502 Bad Gateway" + client._client.request.return_value = resp + + with self.assertRaises(IgnisError) as ctx: + client.auth_me() + self.assertEqual(ctx.exception.status_code, 502) + self.assertIn("502 Bad Gateway", ctx.exception.detail) + + def test_request_handles_empty_non_json_error(self): + client = IgnisClient("http://127.0.0.1:8000", "key") + client._client = MagicMock() + resp = MagicMock() + resp.status_code = 500 + resp.json.side_effect = ValueError("not json") + resp.text = "" + client._client.request.return_value = resp + + with self.assertRaises(IgnisError) as ctx: + client.auth_me() + self.assertEqual(ctx.exception.status_code, 500) + self.assertEqual(ctx.exception.detail, "HTTP 500") + + def test_list_scenes_returns_all_scenes(self): + client = IgnisClient("http://127.0.0.1:8000", "key") + client._client = MagicMock() + scenes = client.list_scenes() + self.assertIsInstance(scenes, dict) + self.assertEqual(len(scenes), 31) + self.assertEqual(scenes["ocean"], 1) + self.assertEqual(scenes["steampunk"], 35) + self.assertNotIn(4, scenes.values()) + + def test_command_request_temp_min_boundary(self): + cmd = CommandRequest(temp=2200) + self.assertEqual(cmd.to_wiz_params(), {"temp": 2200}) + + def test_command_request_temp_max_boundary(self): + cmd = CommandRequest(temp=6500) + self.assertEqual(cmd.to_wiz_params(), {"temp": 6500}) + + def test_command_request_state_false_with_brightness(self): + cmd = CommandRequest(state=False, brightness=10) + self.assertEqual(cmd.to_wiz_params(), {"state": False, "dimming": 10}) + + def test_group_command_result_without_error_field(self): + item = GroupCommandResult( + ip="10.0.0.1", ok=True, kind="ok", result={"mac": "aa:bb"} + ) + self.assertTrue(item.ok) + self.assertIsNone(item.error) + + def test_group_command_result_with_error(self): + item = GroupCommandResult( + ip="10.0.0.2", ok=False, kind="timeout", error="timed out" + ) + self.assertFalse(item.ok) + self.assertEqual(item.error, "timed out") + + def test_fake_response_text_property(self): + resp = _make_response(400, {"detail": "bad request"}) + self.assertEqual(resp.status_code, 400) + self.assertEqual(resp.json(), {"detail": "bad request"}) + self.assertIn("bad request", resp.text) + + def test_fake_response_text_with_list_body(self): + resp = _make_response(200, []) + self.assertEqual(resp.text, "[]") + + def test_schedule_cron_request_default_day_of_week(self): + req = ScheduleCronRequest(target_id="grp-1", hour="8", minute="0", state=True) + self.assertEqual(req.day_of_week, "*") + + def test_schedule_once_request_is_group_default(self): + from datetime import datetime, timedelta, timezone + + ts = datetime.now(timezone.utc) + timedelta(hours=1) + req = ScheduleOnceRequest(target_id="grp-1", run_at=ts, state=True) + self.assertTrue(req.is_group) + + def test_scenes_contains_expected_entries(self): + self.assertIn("cozy", SCENES) + self.assertIn("fireplace", SCENES) + self.assertIn("wake_up", SCENES) + self.assertIn("bedtime", SCENES) + self.assertIn("night_light", SCENES) + self.assertIn("ocean", SCENES) + self.assertIn("club", SCENES) + self.assertIn("steampunk", SCENES) + + def test_ignis_error_is_exception(self): + err = IgnisError(418, "I'm a teapot") + try: + raise err + except IgnisError as caught: + self.assertEqual(caught.status_code, 418) + self.assertEqual(caught.detail, "I'm a teapot") + + def test_group_create_schema(self): + from ignis_client.models import GroupCreateSchema + + g = GroupCreateSchema(id="test", name="Test Group", macs=["aa:bb", "cc:dd"]) + self.assertEqual(g.id, "test") + self.assertEqual(len(g.macs), 2) + + def test_device_schema(self): + from ignis_client.models import DeviceSchema + + d = DeviceSchema(id="aa:bb:cc:dd", ip="10.0.0.1", name="WiZ dd", room="Kitchen") + self.assertEqual(d.room, "Kitchen")