394 lines
15 KiB
Python
Executable File
394 lines
15 KiB
Python
Executable File
"""HTTP views for Wall Panel."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import secrets
|
|
import logging
|
|
from urllib.parse import urljoin
|
|
from typing import Any
|
|
|
|
from aiohttp import web
|
|
from homeassistant.components.http import HomeAssistantView
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
from yarl import URL
|
|
|
|
from .const import CONF_CONFIG, CONF_PANEL_URL, CONF_SYNC_TOKEN, DOMAIN
|
|
from .helpers import (
|
|
build_patch_payload,
|
|
config_to_json,
|
|
create_room_layout_item,
|
|
current_entry_config,
|
|
current_entry_panel,
|
|
delete_room_layout_item,
|
|
normalize_config,
|
|
reorder_room_grid,
|
|
save_settings,
|
|
update_entity_override,
|
|
update_room_layout_item,
|
|
update_room_override,
|
|
)
|
|
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _entry_from_hass(hass: HomeAssistant, entry_id: str):
|
|
return hass.data.get(DOMAIN, {}).get(entry_id, {}).get("entry")
|
|
|
|
|
|
def _default_entry_from_hass(hass: HomeAssistant):
|
|
entries = hass.data.get(DOMAIN, {})
|
|
for value in entries.values():
|
|
entry = value.get("entry") if isinstance(value, dict) else None
|
|
if entry is not None:
|
|
return entry
|
|
return None
|
|
|
|
|
|
def _current_entry(hass: HomeAssistant, entry_id: str | None = None):
|
|
if entry_id:
|
|
entry = _entry_from_hass(hass, entry_id)
|
|
if entry is not None:
|
|
return entry
|
|
return _default_entry_from_hass(hass)
|
|
|
|
|
|
def _request_token(request: web.Request) -> str:
|
|
header = request.headers.get("X-Wall-Panel-Token", "").strip()
|
|
if header:
|
|
return header
|
|
|
|
auth = request.headers.get("Authorization", "").strip()
|
|
if auth.lower().startswith("bearer "):
|
|
return auth[7:].strip()
|
|
|
|
return request.query.get("token", "").strip()
|
|
|
|
|
|
def _authorized(entry, request: web.Request) -> bool:
|
|
expected = str(entry.options.get(CONF_SYNC_TOKEN, "") or "").strip()
|
|
if not expected:
|
|
return False
|
|
|
|
return secrets.compare_digest(_request_token(request), expected)
|
|
|
|
|
|
def _response(data: Any, status: int = 200) -> web.Response:
|
|
if isinstance(data, str):
|
|
return web.Response(text=data, status=status, content_type="text/plain; charset=utf-8")
|
|
|
|
return web.json_response(data, status=status)
|
|
|
|
|
|
async def _handle_config_request(request: web.Request, entry_id: str) -> web.Response:
|
|
"""Handle canonical config requests for a specific config entry."""
|
|
|
|
hass = request.app["hass"]
|
|
entry = _entry_from_hass(hass, entry_id)
|
|
if entry is None:
|
|
return _response({"ok": False, "error": "Unknown entry"}, 404)
|
|
if not _authorized(entry, request):
|
|
return _response({"ok": False, "error": "Unauthorized"}, 401)
|
|
|
|
if request.method == "GET":
|
|
config = current_entry_config(entry)
|
|
return _response({
|
|
"ok": True,
|
|
"config": config,
|
|
"panel": current_entry_panel(entry),
|
|
})
|
|
|
|
payload = await request.json()
|
|
if not isinstance(payload, dict):
|
|
return _response({"ok": False, "error": "Invalid payload"}, 400)
|
|
|
|
config = current_entry_config(entry)
|
|
action = str(payload.get("action", "") or "").strip().lower()
|
|
action_payload = payload.get("payload")
|
|
if not isinstance(action_payload, dict):
|
|
action_payload = payload
|
|
|
|
if action == "save-settings":
|
|
config = save_settings(config, action_payload)
|
|
elif action == "save-entity-override":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
entity_id = str(action_payload.get("entity_id", "") or "").strip()
|
|
if not room_id or not entity_id:
|
|
return _response({"ok": False, "error": "room_id and entity_id are required"}, 400)
|
|
patch = build_patch_payload(action_payload, ["visible", "order", "card_type", "title", "icon"])
|
|
config = update_entity_override(config, room_id, entity_id, patch)
|
|
elif action == "save-space-override":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
if not room_id:
|
|
return _response({"ok": False, "error": "room_id is required"}, 400)
|
|
patch = build_patch_payload(action_payload, ["visible", "order", "name", "icon", "temperature_sensor_entity_id"])
|
|
config = update_room_override(config, room_id, patch)
|
|
elif action == "create-room-layout-item":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
if not room_id:
|
|
return _response({"ok": False, "error": "room_id is required"}, 400)
|
|
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
|
|
if not layout_item_id:
|
|
layout_item_id = f"slot_{secrets.token_hex(12)}"
|
|
order = action_payload.get("order")
|
|
config = create_room_layout_item(config, room_id, layout_item_id, int(order) if order is not None else None)
|
|
_save_entry_config(hass, entry, config)
|
|
return _response({
|
|
"ok": True,
|
|
"layout_item_id": layout_item_id,
|
|
"config": config,
|
|
})
|
|
elif action == "save-room-layout-item":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
|
|
if not room_id or not layout_item_id:
|
|
return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400)
|
|
patch = build_patch_payload(action_payload, ["order"])
|
|
config = update_room_layout_item(config, room_id, layout_item_id, patch)
|
|
elif action == "delete-room-layout-item":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
layout_item_id = str(action_payload.get("layout_item_id", "") or "").strip()
|
|
if not room_id or not layout_item_id:
|
|
return _response({"ok": False, "error": "room_id and layout_item_id are required"}, 400)
|
|
config = delete_room_layout_item(config, room_id, layout_item_id)
|
|
elif action == "reorder-room-grid":
|
|
room_id = str(action_payload.get("room_id", "") or "").strip()
|
|
entries = action_payload.get("entries", [])
|
|
if not room_id or not isinstance(entries, list):
|
|
return _response({"ok": False, "error": "room_id and entries are required"}, 400)
|
|
config = reorder_room_grid(config, room_id, entries)
|
|
elif action == "register-panel":
|
|
panel_url = str(action_payload.get("panel_url", "") or "").strip()
|
|
if not panel_url:
|
|
return _response({"ok": False, "error": "panel_url is required"}, 400)
|
|
_save_entry_panel_url(hass, entry, panel_url)
|
|
return _response({
|
|
"ok": True,
|
|
"panel": current_entry_panel(entry),
|
|
})
|
|
elif isinstance(payload.get("config"), dict):
|
|
config = normalize_config(payload["config"])
|
|
else:
|
|
return _response({"ok": False, "error": "Unknown action"}, 404)
|
|
|
|
_save_entry_config(hass, entry, config)
|
|
return _response({
|
|
"ok": True,
|
|
"config": config,
|
|
"panel": current_entry_panel(entry),
|
|
})
|
|
|
|
|
|
async def handle_config_request(request: web.Request) -> web.Response:
|
|
"""Handle canonical config requests for a specific config entry."""
|
|
|
|
entry_id = str(request.match_info.get("entry_id", "") or "").strip()
|
|
return await _handle_config_request(request, entry_id)
|
|
|
|
|
|
async def handle_panel_request(request: web.Request) -> web.Response:
|
|
"""Handle the panel config endpoint without requiring an entry id."""
|
|
|
|
hass = request.app["hass"]
|
|
entry = _default_entry_from_hass(hass)
|
|
if entry is None:
|
|
return _response({"ok": False, "error": "Unknown entry"}, 404)
|
|
if not _authorized(entry, request):
|
|
return _response({"ok": False, "error": "Unauthorized"}, 401)
|
|
|
|
if request.method == "GET":
|
|
config = current_entry_config(entry)
|
|
return _response({
|
|
"ok": True,
|
|
"config": config,
|
|
"panel": current_entry_panel(entry),
|
|
})
|
|
|
|
payload = await request.json()
|
|
if not isinstance(payload, dict):
|
|
return _response({"ok": False, "error": "Invalid payload"}, 400)
|
|
|
|
action = str(payload.get("action", "") or "").strip().lower()
|
|
if action != "register-panel":
|
|
return _response({"ok": False, "error": "Unknown action"}, 404)
|
|
|
|
action_payload = payload.get("payload")
|
|
if not isinstance(action_payload, dict):
|
|
action_payload = payload
|
|
|
|
panel_url = str(action_payload.get("panel_url", "") or "").strip()
|
|
if not panel_url:
|
|
return _response({"ok": False, "error": "panel_url is required"}, 400)
|
|
|
|
_save_entry_panel_url(hass, entry, panel_url)
|
|
return _response({
|
|
"ok": True,
|
|
"panel": current_entry_panel(entry),
|
|
})
|
|
|
|
|
|
async def _handle_proxy_request(request: web.Request, entry_id: str, path: str, method: str) -> web.StreamResponse:
|
|
"""Handle reverse-proxy requests to the PHP panel."""
|
|
|
|
hass = request.app["hass"]
|
|
entry = _entry_from_hass(hass, entry_id)
|
|
if entry is None:
|
|
return _response({"ok": False, "error": "Unknown entry"}, 404)
|
|
|
|
base_url = str(entry.options.get(CONF_PANEL_URL, "") or "").strip()
|
|
if not base_url:
|
|
_LOGGER.warning("Wall Panel proxy denied for %s: panel_url is empty", entry_id)
|
|
return _response({"ok": False, "error": "PHP panel URL is not configured"}, 400)
|
|
if not base_url.endswith("/"):
|
|
base_url += "/"
|
|
|
|
upstream_url = urljoin(base_url, path or "")
|
|
if request.query_string:
|
|
upstream_url = upstream_url + "?" + request.query_string
|
|
|
|
_LOGGER.warning("Wall Panel proxy %s %s -> %s", method, request.path_qs, upstream_url)
|
|
|
|
body = None
|
|
if method not in {"GET", "HEAD"}:
|
|
body = await request.read()
|
|
|
|
session = async_get_clientsession(hass)
|
|
async with session.request(
|
|
method,
|
|
upstream_url,
|
|
headers=_forward_headers(request),
|
|
data=body,
|
|
allow_redirects=False,
|
|
) as upstream:
|
|
response_headers: dict[str, str] = {}
|
|
for key, value in upstream.headers.items():
|
|
lower = key.lower()
|
|
if lower in {
|
|
"connection",
|
|
"keep-alive",
|
|
"proxy-authenticate",
|
|
"proxy-authorization",
|
|
"te",
|
|
"trailers",
|
|
"transfer-encoding",
|
|
"upgrade",
|
|
"content-encoding",
|
|
}:
|
|
continue
|
|
if lower == "location":
|
|
response_headers[key] = _rewrite_location(value, base_url, _proxy_root(entry_id))
|
|
continue
|
|
response_headers[key] = value
|
|
|
|
response_body = await upstream.read()
|
|
_LOGGER.warning("Wall Panel upstream %s %s", upstream.status, upstream_url)
|
|
return web.Response(
|
|
body=response_body,
|
|
status=upstream.status,
|
|
headers=response_headers,
|
|
)
|
|
|
|
|
|
async def handle_proxy_request(request: web.Request) -> web.StreamResponse:
|
|
"""Handle reverse-proxy requests to the PHP panel."""
|
|
|
|
entry_id = str(request.match_info.get("entry_id", "") or "").strip()
|
|
path = str(request.match_info.get("path", "") or "").strip()
|
|
method = request.method.upper()
|
|
return await _handle_proxy_request(request, entry_id, path, method)
|
|
|
|
|
|
def _save_entry_config(hass: HomeAssistant, entry, config: dict[str, Any]) -> None:
|
|
options = dict(entry.options)
|
|
options[CONF_CONFIG] = config
|
|
hass.config_entries.async_update_entry(entry, options=options)
|
|
hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["config"] = config
|
|
|
|
|
|
def _save_entry_panel_url(hass: HomeAssistant, entry, panel_url: str) -> None:
|
|
options = dict(entry.options)
|
|
options[CONF_PANEL_URL] = panel_url
|
|
hass.config_entries.async_update_entry(entry, options=options)
|
|
hass.data.setdefault(DOMAIN, {}).setdefault(entry.entry_id, {})["panel_url"] = panel_url
|
|
_LOGGER.warning("Wall Panel panel_url updated for %s: %s", entry.entry_id, panel_url)
|
|
|
|
|
|
def _proxy_root(entry_id: str) -> str:
|
|
return f"/api/wall_panel/proxy/{entry_id}/"
|
|
|
|
|
|
def _forward_headers(request: web.Request) -> dict[str, str]:
|
|
headers: dict[str, str] = {}
|
|
for key in ("Accept", "Content-Type", "Range", "If-Modified-Since", "If-None-Match", "Origin", "Referer", "User-Agent"):
|
|
value = request.headers.get(key)
|
|
if value:
|
|
headers[key] = value
|
|
return headers
|
|
|
|
|
|
def _rewrite_location(location: str, base_url: str, proxy_root: str) -> str:
|
|
location = location.strip()
|
|
if not location:
|
|
return location
|
|
|
|
if location.startswith(base_url):
|
|
suffix = location[len(base_url):].lstrip("/")
|
|
return proxy_root + suffix
|
|
|
|
if location.startswith("/"):
|
|
return proxy_root + location.lstrip("/")
|
|
|
|
return location
|
|
|
|
|
|
class WallPanelConfigView(HomeAssistantView):
|
|
"""Serve and update the canonical Wall Panel config."""
|
|
|
|
url = "/api/wall_panel/config/{entry_id}"
|
|
name = "api:wall_panel:config"
|
|
requires_auth = False
|
|
|
|
async def get(self, request: web.Request, entry_id: str) -> web.Response:
|
|
return await _handle_config_request(request, entry_id)
|
|
|
|
async def post(self, request: web.Request, entry_id: str) -> web.Response:
|
|
return await _handle_config_request(request, entry_id)
|
|
|
|
|
|
class WallPanelPanelView(HomeAssistantView):
|
|
"""Serve the active Wall Panel config without an entry id."""
|
|
|
|
url = "/api/wall_panel/panel"
|
|
name = "api:wall_panel:panel"
|
|
requires_auth = False
|
|
|
|
async def get(self, request: web.Request) -> web.Response:
|
|
return await handle_panel_request(request)
|
|
|
|
async def post(self, request: web.Request) -> web.Response:
|
|
return await handle_panel_request(request)
|
|
|
|
|
|
class WallPanelProxyView(HomeAssistantView):
|
|
"""Reverse proxy the PHP panel through Home Assistant."""
|
|
|
|
url = "/api/wall_panel/proxy/{entry_id}/{path:.*}"
|
|
name = "api:wall_panel:proxy"
|
|
requires_auth = True
|
|
|
|
async def get(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
|
|
return await _handle_proxy_request(request, entry_id, path, "GET")
|
|
|
|
async def post(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
|
|
return await _handle_proxy_request(request, entry_id, path, "POST")
|
|
|
|
async def put(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
|
|
return await _handle_proxy_request(request, entry_id, path, "PUT")
|
|
|
|
async def patch(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
|
|
return await _handle_proxy_request(request, entry_id, path, "PATCH")
|
|
|
|
async def delete(self, request: web.Request, entry_id: str, path: str = "") -> web.StreamResponse:
|
|
return await _handle_proxy_request(request, entry_id, path, "DELETE")
|