wallpanell/custom_components/wall_panel/views.py
2026-03-25 16:23:11 +03:00

394 lines
15 KiB
Python
Executable File

"""HTTP views for Wall Panel."""
from __future__ import annotations
import secrets
import logging
from urllib.parse import urljoin
from typing import Any
from aiohttp import web
from homeassistant.components.http import HomeAssistantView
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from yarl import URL
from .const import CONF_CONFIG, CONF_PANEL_URL, CONF_SYNC_TOKEN, DOMAIN
from .helpers import (
build_patch_payload,
config_to_json,
create_room_layout_item,
current_entry_config,
current_entry_panel,
delete_room_layout_item,
normalize_config,
reorder_room_grid,
save_settings,
update_entity_override,
update_room_layout_item,
update_room_override,
)
_LOGGER = logging.getLogger(__name__)
def _entry_from_hass(hass: HomeAssistant, entry_id: str):
return hass.data.get(DOMAIN, {}).get(entry_id, {}).get("entry")
def _default_entry_from_hass(hass: HomeAssistant):
entries = hass.data.get(DOMAIN, {})
for value in entries.values():
entry = value.get("entry") if isinstance(value, dict) else None
if entry is not None:
return entry
return None
def _current_entry(hass: HomeAssistant, entry_id: str | None = None):
if entry_id:
entry = _entry_from_hass(hass, entry_id)
if entry is not None:
return entry
return _default_entry_from_hass(hass)
def _request_token(request: web.Request) -> str:
header = request.headers.get("X-Wall-Panel-Token", "").strip()
if header:
return header
auth = request.headers.get("Authorization", "").strip()
if auth.lower().startswith("bearer "):
return auth[7:].strip()
return request.query.get("token", "").strip()
def _authorized(entry, request: web.Request) -> bool:
expected = str(entry.options.get(CONF_SYNC_TOKEN, "") or "").strip()
if not expected:
return False
return secrets.compare_digest(_request_token(request), expected)
def _response(data: Any, status: int = 200) -> web.Response:
if isinstance(data, str):
return web.Response(text=data, status=status, content_type="text/plain; charset=utf-8")
return web.json_response(data, status=status)
async def _handle_config_request(request: web.Request, entry_id: str) -> web.Response:
"""Handle canonical config requests for a specific config entry."""
hass = request.app["hass"]
entry = _entry_from_hass(hass, entry_id)
if entry is None:
return _response({"ok": False, "error": "Unknown entry"}, 404)
if not _authorized(entry, request):
return _response({"ok": False, "error": "Unauthorized"}, 401)
if request.method == "GET":
config = current_entry_config(entry)
return _response({
"ok": True,
"config": config,
"panel": current_entry_panel(entry),
})
payload = await request.json()
if not isinstance(payload, dict):
return _response({"ok": False, "error": "Invalid payload"}, 400)
config = current_entry_config(entry)
action = str(payload.get("action", "") or "").strip().lower()
action_payload = payload.get("payload")
if not isinstance(action_payload, dict):
action_payload = payload
if action == "save-settings":
config = save_settings(config, action_payload)
elif action == "save-entity-override":
room_id = str(action_payload.get("room_id", "") or "").strip()
entity_id = str(action_payload.get("entity_id", "") or "").strip()
if not room_id or not entity_id:
return _response({"ok": False, "error": "room_id and entity_id are required"}, 400)
patch = build_patch_payload(action_payload, ["visible", "order", "card_type", "title", "icon"])
config = update_entity_override(config, room_id, entity_id, patch)
elif action == "save-space-override":
room_id = str(action_payload.get("room_id", "") or "").strip()
if not room_id:
return _response({"ok": False, "error": "room_id is required"}, 400)
patch = build_patch_payload(action_payload, ["visible", "order", "name", "icon", "temperature_sensor_entity_id"])
config = update_room_override(config, room_id, patch)
elif action == "create-room-layout-item":
room_id = str(action_payload.get("room_id", "") or "").strip()
if not room_id:
return _response({"ok": False, "error": "room_id is required"}, 400)
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
if not layout_item_id:
layout_item_id = f"slot_{secrets.token_hex(12)}"
order = action_payload.get("order")
config = create_room_layout_item(config, room_id, layout_item_id, int(order) if order is not None else None)
_save_entry_config(hass, entry, config)
return _response({
"ok": True,
"layout_item_id": layout_item_id,
"config": config,
})
elif action == "save-room-layout-item":
room_id = str(action_payload.get("room_id", "") or "").strip()
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
if not room_id or not layout_item_id:
return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400)
patch = build_patch_payload(action_payload, ["order"])
config = update_room_layout_item(config, room_id, layout_item_id, patch)
elif action == "delete-room-layout-item":
room_id = str(action_payload.get("room_id", "") or "").strip()
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
if not room_id or not layout_item_id:
return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400)
config = delete_room_layout_item(config, room_id, layout_item_id)
elif action == "reorder-room-grid":
room_id = str(action_payload.get("room_id", "") or "").strip()
entries = action_payload.get("entries", [])
if not room_id or not isinstance(entries, list):
return _response({"ok": False, "error": "room_id and entries are required"}, 400)
config = reorder_room_grid(config, room_id, entries)
elif action == "register-panel":
panel_url = str(action_payload.get("panel_url", "") or "").strip()
if not panel_url:
return _response({"ok": False, "error": "panel_url is required"}, 400)
_save_entry_panel_url(hass, entry, panel_url)
return _response({
"ok": True,
"panel": current_entry_panel(entry),
})
elif isinstance(payload.get("config"), dict):
config = normalize_config(payload["config"])
else:
return _response({"ok": False, "error": "Unknown action"}, 404)
_save_entry_config(hass, entry, config)
return _response({
"ok": True,
"config": config,
"panel": current_entry_panel(entry),
})
async def handle_config_request(request: web.Request) -> web.Response:
"""Handle canonical config requests for a specific config entry."""
entry_id = str(request.match_info.get("entry_id", "") or "").strip()
return await _handle_config_request(request, entry_id)
async def handle_panel_request(request: web.Request) -> web.Response:
"""Handle the panel config endpoint without requiring an entry id."""
hass = request.app["hass"]
entry = _default_entry_from_hass(hass)
if entry is None:
return _response({"ok": False, "error": "Unknown entry"}, 404)
if not _authorized(entry, request):
return _response({"ok": False, "error": "Unauthorized"}, 401)
if request.method == "GET":
config = current_entry_config(entry)
return _response({
"ok": True,
"config": config,
"panel": current_entry_panel(entry),
})
payload = await request.json()
if not isinstance(payload, dict):
return _response({"ok": False, "error": "Invalid payload"}, 400)
action = str(payload.get("action", "") or "").strip().lower()
if action != "register-panel":
return _response({"ok": False, "error": "Unknown action"}, 404)
action_payload = payload.get("payload")
if not isinstance(action_payload, dict):
action_payload = payload
panel_url = str(action_payload.get("panel_url", "") or "").strip()
if not panel_url:
return _response({"ok": False, "error": "panel_url is required"}, 400)
_save_entry_panel_url(hass, entry, panel_url)
return _response({
"ok": True,
"panel": current_entry_panel(entry),
})
async def _handle_proxy_request(request: web.Request, entry_id: str, path: str, method: str) -> web.StreamResponse:
"""Handle reverse-proxy requests to the PHP panel."""
hass = request.app["hass"]
entry = _entry_from_hass(hass, entry_id)
if entry is None:
return _response({"ok": False, "error": "Unknown entry"}, 404)
base_url = str(entry.options.get(CONF_PANEL_URL, "") or "").strip()
if not base_url:
_LOGGER.warning("Wall Panel proxy denied for %s: panel_url is empty", entry_id)
return _response({"ok": False, "error": "PHP panel URL is not configured"}, 400)
if not base_url.endswith("/"):
base_url += "/"
upstream_url = urljoin(base_url, path or "")
if request.query_string:
upstream_url = upstream_url + "?" + request.query_string
_LOGGER.warning("Wall Panel proxy %s %s -> %s", method, request.path_qs, upstream_url)
body = None
if method not in {"GET", "HEAD"}:
body = await request.read()
session = async_get_clientsession(hass)
async with session.request(
method,
upstream_url,
headers=_forward_headers(request),
data=body,
allow_redirects=False,
) as upstream:
response_headers: dict[str, str] = {}
for key, value in upstream.headers.items():
lower = key.lower()
if lower in {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
"content-encoding",
}:
continue
if lower == "location":
response_headers[key] = _rewrite_location(value, base_url, _proxy_root(entry_id))
continue
response_headers[key] = value
response_body = await upstream.read()
_LOGGER.warning("Wall Panel upstream %s %s", upstream.status, upstream_url)
return web.Response(
body=response_body,
status=upstream.status,
headers=response_headers,
)
async def handle_proxy_request(request: web.Request) -> web.StreamResponse:
"""Handle reverse-proxy requests to the PHP panel."""
entry_id = str(request.match_info.get("entry_id", "") or "").strip()
path = str(request.match_info.get("path", "") or "").strip()
method = request.method.upper()
return await _handle_proxy_request(request, entry_id, path, method)
def _save_entry_config(hass: HomeAssistant, entry, config: dict[str, Any]) -> None:
options = dict(entry.options)
options[CONF_CONFIG] = config
hass.config_entries.async_update_entry(entry, options=options)
hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["config"] = config
def _save_entry_panel_url(hass: HomeAssistant, entry, panel_url: str) -> None:
options = dict(entry.options)
options[CONF_PANEL_URL] = panel_url
hass.config_entries.async_update_entry(entry, options=options)
hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["panel_url"] = panel_url
_LOGGER.warning("Wall Panel panel_url updated for %s: %s", entry.entry_id, panel_url)
def _proxy_root(entry_id: str) -> str:
return f"/api/wall_panel/proxy/{entry_id}/"
def _forward_headers(request: web.Request) -> dict[str, str]:
headers: dict[str, str] = {}
for key in ("Accept", "Content-Type", "Range", "If-Modified-Since", "If-None-Match", "Origin", "Referer", "User-Agent"):
value = request.headers.get(key)
if value:
headers[key] = value
return headers
def _rewrite_location(location: str, base_url: str, proxy_root: str) -> str:
location = location.strip()
if not location:
return location
if location.startswith(base_url):
suffix = location[len(base_url):].lstrip("/")
return proxy_root + suffix
if location.startswith("/"):
return proxy_root + location.lstrip("/")
return location
class WallPanelConfigView(HomeAssistantView):
"""Serve and update the canonical Wall Panel config."""
url = "/api/wall_panel/config/{entry_id}"
name = "api:wall_panel:config"
requires_auth = False
async def get(self, request: web.Request, entry_id: str) -> web.Response:
return await _handle_config_request(request, entry_id)
async def post(self, request: web.Request, entry_id: str) -> web.Response:
return await _handle_config_request(request, entry_id)
class WallPanelPanelView(HomeAssistantView):
"""Serve the active Wall Panel config without an entry id."""
url = "/api/wall_panel/panel"
name = "api:wall_panel:panel"
requires_auth = False
async def get(self, request: web.Request) -> web.Response:
return await handle_panel_request(request)
async def post(self, request: web.Request) -> web.Response:
return await handle_panel_request(request)
class WallPanelProxyView(HomeAssistantView):
"""Reverse proxy the PHP panel through Home Assistant."""
url = "/api/wall_panel/proxy/{entry_id}/{path:.*}"
name = "api:wall_panel:proxy"
requires_auth = True
async def get(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
return await _handle_proxy_request(request, entry_id, path, "GET")
async def post(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
return await _handle_proxy_request(request, entry_id, path, "POST")
async def put(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
return await _handle_proxy_request(request, entry_id, path, "PUT")
async def patch(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
return await _handle_proxy_request(request, entry_id, path, "PATCH")
async def delete(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
return await _handle_proxy_request(request, entry_id, path, "DELETE")