"""HTTP views for Wall Panel.""" from __future__ import annotations import secrets from urllib.parse import urljoin from typing import Any from aiohttp import web from homeassistant.components.http import HomeAssistantView from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from yarl import URL from .const import CONF_CONFIG, CONF_PANEL_URL, CONF_SYNC_TOKEN, DOMAIN from .helpers import ( build_patch_payload, config_to_json, create_room_layout_item, current_entry_config, current_entry_panel, delete_room_layout_item, normalize_config, reorder_room_grid, save_settings, update_entity_override, update_room_layout_item, update_room_override, ) def _entry_from_hass(hass: HomeAssistant, entry_id: str): return hass.data.get(DOMAIN, {}).get(entry_id, {}).get("entry") def _default_entry_from_hass(hass: HomeAssistant): entries = hass.data.get(DOMAIN, {}) for value in entries.values(): entry = value.get("entry") if isinstance(value, dict) else None if entry is not None: return entry return None def _current_entry(hass: HomeAssistant, entry_id: str | None = None): if entry_id: entry = _entry_from_hass(hass, entry_id) if entry is not None: return entry return _default_entry_from_hass(hass) def _request_token(request: web.Request) -> str: header = request.headers.get("X-Wall-Panel-Token", "").strip() if header: return header auth = request.headers.get("Authorization", "").strip() if auth.lower().startswith("bearer "): return auth[7:].strip() return request.query.get("token", "").strip() def _authorized(entry, request: web.Request) -> bool: expected = str(entry.options.get(CONF_SYNC_TOKEN, "") or "").strip() if not expected: return False return secrets.compare_digest(_request_token(request), expected) def _response(data: Any, status: int = 200) -> web.Response: if isinstance(data, str): return web.Response(text=data, status=status, content_type="text/plain; charset=utf-8") return web.json_response(data, status=status) def _save_entry_config(hass: HomeAssistant, entry, config: dict[str, Any]) -> None: options = dict(entry.options) options[CONF_CONFIG] = config hass.config_entries.async_update_entry(entry, options=options) hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["config"] = config def _save_entry_panel_url(hass: HomeAssistant, entry, panel_url: str) -> None: options = dict(entry.options) options[CONF_PANEL_URL] = panel_url hass.config_entries.async_update_entry(entry, options=options) hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["panel_url"] = panel_url def _proxy_root(entry_id: str) -> str: return f"/api/wall_panel/proxy/{entry_id}/" def _forward_headers(request: web.Request) -> dict[str, str]: headers: dict[str, str] = {} for key in ("Accept", "Content-Type", "Range", "If-Modified-Since", "If-None-Match", "Origin", "Referer", "User-Agent"): value = request.headers.get(key) if value: headers[key] = value return headers def _rewrite_location(location: str, base_url: str, proxy_root: str) -> str: location = location.strip() if not location: return location if location.startswith(base_url): suffix = location[len(base_url):].lstrip("/") return proxy_root + suffix if location.startswith("/"): return proxy_root + location.lstrip("/") return location class WallPanelConfigView(HomeAssistantView): """Serve and update the canonical Wall Panel config.""" url = "/api/wall_panel/config/{entry_id}" name = "api:wall_panel:config" requires_auth = False async def async_get(self, request: web.Request, entry_id: str) -> web.Response: hass = request.app["hass"] entry = _entry_from_hass(hass, entry_id) if entry is None: return _response({"ok": False, "error": "Unknown entry"}, 404) if not _authorized(entry, request): return _response({"ok": False, "error": "Unauthorized"}, 401) config = current_entry_config(entry) return _response({ "ok": True, "config": config, "panel": current_entry_panel(entry), }) async def async_post(self, request: web.Request, entry_id: str) -> web.Response: hass = request.app["hass"] entry = _entry_from_hass(hass, entry_id) if entry is None: return _response({"ok": False, "error": "Unknown entry"}, 404) if not _authorized(entry, request): return _response({"ok": False, "error": "Unauthorized"}, 401) payload = await request.json() if not isinstance(payload, dict): return _response({"ok": False, "error": "Invalid payload"}, 400) config = current_entry_config(entry) action = str(payload.get("action", "") or "").strip().lower() action_payload = payload.get("payload") if not isinstance(action_payload, dict): action_payload = payload if action == "save-settings": config = save_settings(config, action_payload) elif action == "save-entity-override": room_id = str(action_payload.get("room_id", "") or "").strip() entity_id = str(action_payload.get("entity_id", "") or "").strip() if not room_id or not entity_id: return _response({"ok": False, "error": "room_id and entity_id are required"}, 400) patch = build_patch_payload(action_payload, ["visible", "order", "card_type", "title", "icon"]) config = update_entity_override(config, room_id, entity_id, patch) elif action == "save-space-override": room_id = str(action_payload.get("room_id", "") or "").strip() if not room_id: return _response({"ok": False, "error": "room_id is required"}, 400) patch = build_patch_payload(action_payload, ["visible", "order", "name", "icon", "temperature_sensor_entity_id"]) config = update_room_override(config, room_id, patch) elif action == "create-room-layout-item": room_id = str(action_payload.get("room_id", "") or "").strip() if not room_id: return _response({"ok": False, "error": "room_id is required"}, 400) layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip() if not layout_item_id: layout_item_id = f"slot_{secrets.token_hex(12)}" order = action_payload.get("order") config = create_room_layout_item(config, room_id, layout_item_id, int(order) if order is not None else None) _save_entry_config(hass, entry, config) return _response({ "ok": True, "layout_item_id": layout_item_id, "config": config, }) elif action == "save-room-layout-item": room_id = str(action_payload.get("room_id", "") or "").strip() layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip() if not room_id or not layout_item_id: return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400) patch = build_patch_payload(action_payload, ["order"]) config = update_room_layout_item(config, room_id, layout_item_id, patch) elif action == "delete-room-layout-item": room_id = str(action_payload.get("room_id", "") or "").strip() layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip() if not room_id or not layout_item_id: return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400) config = delete_room_layout_item(config, room_id, layout_item_id) elif action == "reorder-room-grid": room_id = str(action_payload.get("room_id", "") or "").strip() entries = action_payload.get("entries", []) if not room_id or not isinstance(entries, list): return _response({"ok": False, "error": "room_id and entries are required"}, 400) config = reorder_room_grid(config, room_id, entries) elif action == "register-panel": panel_url = str(action_payload.get("panel_url", "") or "").strip() if not panel_url: return _response({"ok": False, "error": "panel_url is required"}, 400) _save_entry_panel_url(hass, entry, panel_url) return _response({ "ok": True, "panel": current_entry_panel(entry), }) elif isinstance(payload.get("config"), dict): config = normalize_config(payload["config"]) else: return _response({"ok": False, "error": "Unknown action"}, 404) _save_entry_config(hass, entry, config) return _response({ "ok": True, "config": config, "panel": current_entry_panel(entry), }) class WallPanelPanelView(HomeAssistantView): """Serve the active Wall Panel config without an entry id.""" url = "/api/wall_panel/panel" name = "api:wall_panel:panel" requires_auth = False async def async_get(self, request: web.Request) -> web.Response: hass = request.app["hass"] entry = _default_entry_from_hass(hass) if entry is None: return _response({"ok": False, "error": "Unknown entry"}, 404) if not _authorized(entry, request): return _response({"ok": False, "error": "Unauthorized"}, 401) config = current_entry_config(entry) return _response({ "ok": True, "config": config, "panel": current_entry_panel(entry), }) async def async_post(self, request: web.Request) -> web.Response: hass = request.app["hass"] entry = _default_entry_from_hass(hass) if entry is None: return _response({"ok": False, "error": "Unknown entry"}, 404) payload = await request.json() if not isinstance(payload, dict): return _response({"ok": False, "error": "Invalid payload"}, 400) action = str(payload.get("action", "") or "").strip().lower() if action != "register-panel": return _response({"ok": False, "error": "Unknown action"}, 404) action_payload = payload.get("payload") if not isinstance(action_payload, dict): action_payload = payload panel_url = str(action_payload.get("panel_url", "") or "").strip() if not panel_url: return _response({"ok": False, "error": "panel_url is required"}, 400) _save_entry_panel_url(hass, entry, panel_url) return _response({ "ok": True, "panel": current_entry_panel(entry), }) class WallPanelProxyView(HomeAssistantView): """Reverse proxy the PHP panel through Home Assistant.""" url = "/api/wall_panel/proxy/{entry_id}/{path:.*}" name = "api:wall_panel:proxy" requires_auth = True async def _proxy(self, request: web.Request, entry_id: str, path: str, method: str) -> web.StreamResponse: hass = request.app["hass"] entry = _entry_from_hass(hass, entry_id) if entry is None: return _response({"ok": False, "error": "Unknown entry"}, 404) base_url = str(entry.options.get(CONF_PANEL_URL, "") or "").strip() if not base_url: return _response({"ok": False, "error": "PHP panel URL is not configured"}, 400) if not base_url.endswith("/"): base_url += "/" upstream_url = urljoin(base_url, path or "") if request.query_string: upstream_url = upstream_url + "?" + request.query_string body = None if method not in {"GET", "HEAD"}: body = await request.read() session = async_get_clientsession(hass) async with session.request( method, upstream_url, headers=_forward_headers(request), data=body, allow_redirects=False, ) as upstream: response_headers: dict[str, str] = {} for key, value in upstream.headers.items(): lower = key.lower() if lower in { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailers", "transfer-encoding", "upgrade", "content-encoding", }: continue if lower == "location": response_headers[key] = _rewrite_location(value, base_url, _proxy_root(entry_id)) continue response_headers[key] = value response_body = await upstream.read() return web.Response( body=response_body, status=upstream.status, headers=response_headers, ) async def async_get(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse: return await self._proxy(request, entry_id, path, "GET") async def async_post(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse: return await self._proxy(request, entry_id, path, "POST") async def async_put(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse: return await self._proxy(request, entry_id, path, "PUT") async def async_patch(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse: return await self._proxy(request, entry_id, path, "PATCH") async def async_delete(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse: return await self._proxy(request, entry_id, path, "DELETE")