372 lines
12 KiB
Python
Executable File
372 lines
12 KiB
Python
Executable File
"""Helper functions for Wall Panel."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from copy import deepcopy
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .const import (
|
|
CONF_CONFIG,
|
|
CONF_FRONTEND_URL_PATH,
|
|
CONF_PANEL_URL,
|
|
CONF_REQUIRE_ADMIN,
|
|
CONF_SIDEBAR_ICON,
|
|
CONF_SIDEBAR_TITLE,
|
|
CONF_SYNC_TOKEN,
|
|
DEFAULT_FRONTEND_URL_PATH,
|
|
DEFAULT_PANEL_URL,
|
|
DEFAULT_SIDEBAR_ICON,
|
|
DEFAULT_SIDEBAR_TITLE,
|
|
)
|
|
|
|
|
|
def shared_config_path() -> Path:
|
|
override = os.getenv("WALL_PANEL_SHARED_CONFIG_PATH", "").strip()
|
|
if override:
|
|
return Path(override)
|
|
return Path("/config/wall_panel/wall_panel_config.json")
|
|
|
|
|
|
def default_config() -> dict[str, Any]:
|
|
return {
|
|
"app": {
|
|
"title": "Striker Panel",
|
|
"poll_interval_ms": 5000,
|
|
"main_room_name": "Главная",
|
|
"main_room_icon": "mdi:home",
|
|
"edit_mode": False,
|
|
"battery_history_hours": 4320,
|
|
},
|
|
"home_assistant": {
|
|
"base_url": "",
|
|
"token": "",
|
|
"verify_ssl": True,
|
|
"sync_url": "",
|
|
"sync_token": "",
|
|
"sync_timeout": 10,
|
|
"sync_verify_ssl": True,
|
|
"sync_cache_seconds": 30,
|
|
"weather_entity_id": "",
|
|
"auto_label": "auto",
|
|
"auto_entity_ids": [],
|
|
},
|
|
"camera": {
|
|
"rtsp_url": "",
|
|
"stream_url": "",
|
|
"stream_mode": "hls",
|
|
"poster_url": "",
|
|
"popup_timeout_minutes": 3,
|
|
"trigger_entities": [],
|
|
},
|
|
"rooms": [],
|
|
}
|
|
|
|
|
|
def normalize_config(value: Any) -> dict[str, Any]:
|
|
config = deepcopy(default_config())
|
|
if not isinstance(value, dict):
|
|
return config
|
|
|
|
_deep_merge(config, value)
|
|
if not isinstance(config.get("rooms"), list):
|
|
config["rooms"] = []
|
|
return config
|
|
|
|
|
|
def config_to_json(config: dict[str, Any]) -> str:
|
|
return json.dumps(config, ensure_ascii=False, indent=2, sort_keys=False)
|
|
|
|
|
|
def parse_config_json(raw: str) -> dict[str, Any]:
|
|
data = json.loads(raw)
|
|
if not isinstance(data, dict):
|
|
raise ValueError("Config JSON must be an object")
|
|
return normalize_config(data)
|
|
|
|
|
|
def current_entry_config(entry) -> dict[str, Any]:
|
|
path = shared_config_path()
|
|
options_config = normalize_config(entry.options.get(CONF_CONFIG))
|
|
|
|
def _config_score(config: dict[str, Any]) -> int:
|
|
score = 0
|
|
rooms = config.get("rooms", [])
|
|
if isinstance(rooms, list):
|
|
score += len(rooms) * 25
|
|
for room in rooms:
|
|
if not isinstance(room, dict):
|
|
continue
|
|
score += len([value for value in (
|
|
room.get("name"),
|
|
room.get("icon"),
|
|
room.get("area_id"),
|
|
room.get("floor_id"),
|
|
room.get("temperature_sensor_entity_id"),
|
|
) if isinstance(value, str) and value.strip()])
|
|
entity_ids = room.get("entity_ids", [])
|
|
if isinstance(entity_ids, list):
|
|
score += len([item for item in entity_ids if str(item).strip()])
|
|
overrides = room.get("entity_overrides", {})
|
|
if isinstance(overrides, dict):
|
|
score += len(overrides) * 3
|
|
layout_items = room.get("layout_items", [])
|
|
if isinstance(layout_items, list):
|
|
score += len(layout_items) * 5
|
|
|
|
app = config.get("app", {})
|
|
if isinstance(app, dict):
|
|
for key in ("title", "main_room_name", "main_room_icon"):
|
|
value = app.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
score += 3
|
|
for key in ("main_boiler", "main_print"):
|
|
value = app.get(key)
|
|
if isinstance(value, dict) and value:
|
|
score += len(value) * 2
|
|
actions = app.get("main_weather_actions", [])
|
|
if isinstance(actions, list):
|
|
score += len(actions) * 2
|
|
|
|
camera = config.get("camera", {})
|
|
if isinstance(camera, dict):
|
|
for key in ("rtsp_url", "stream_url", "poster_url"):
|
|
value = camera.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
score += 5
|
|
trigger_entities = camera.get("trigger_entities", [])
|
|
if isinstance(trigger_entities, list):
|
|
score += len([item for item in trigger_entities if str(item).strip()])
|
|
|
|
return score
|
|
|
|
chosen_config = options_config
|
|
try:
|
|
if path.is_file():
|
|
raw = path.read_text(encoding="utf-8")
|
|
if raw.strip():
|
|
data = json.loads(raw)
|
|
if isinstance(data, dict):
|
|
file_config = normalize_config(data)
|
|
if _config_score(file_config) >= _config_score(options_config):
|
|
chosen_config = file_config
|
|
except (OSError, json.JSONDecodeError, ValueError):
|
|
pass
|
|
|
|
try:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(config_to_json(chosen_config) + "\n", encoding="utf-8")
|
|
except OSError:
|
|
pass
|
|
return chosen_config
|
|
|
|
|
|
def current_entry_panel(entry) -> dict[str, Any]:
|
|
return {
|
|
CONF_PANEL_URL: str(entry.options.get(CONF_PANEL_URL, DEFAULT_PANEL_URL) or ""),
|
|
CONF_SYNC_TOKEN: str(entry.options.get(CONF_SYNC_TOKEN, "") or ""),
|
|
CONF_SIDEBAR_TITLE: str(entry.options.get(CONF_SIDEBAR_TITLE, DEFAULT_SIDEBAR_TITLE) or DEFAULT_SIDEBAR_TITLE),
|
|
CONF_SIDEBAR_ICON: str(entry.options.get(CONF_SIDEBAR_ICON, DEFAULT_SIDEBAR_ICON) or DEFAULT_SIDEBAR_ICON),
|
|
# The HA panel path must remain canonical and independent from the PHP proxy URL.
|
|
CONF_FRONTEND_URL_PATH: DEFAULT_FRONTEND_URL_PATH,
|
|
CONF_REQUIRE_ADMIN: bool(entry.options.get(CONF_REQUIRE_ADMIN, False)),
|
|
}
|
|
|
|
|
|
def save_settings(config: dict[str, Any], payload: dict[str, Any]) -> dict[str, Any]:
|
|
app = config.setdefault("app", {})
|
|
if "edit_mode" in payload:
|
|
app["edit_mode"] = bool(payload["edit_mode"])
|
|
if isinstance(payload.get("title"), str) and payload["title"].strip():
|
|
app["title"] = payload["title"].strip()
|
|
return config
|
|
|
|
|
|
def update_entity_override(config: dict[str, Any], room_id: str, entity_id: str, patch: dict[str, Any]) -> dict[str, Any]:
|
|
room = _ensure_room(config, room_id)
|
|
overrides = room.setdefault("entity_overrides", {})
|
|
current = overrides.get(entity_id, {})
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
|
|
merged = deepcopy(current)
|
|
for key, value in patch.items():
|
|
if value is not None:
|
|
merged[key] = value
|
|
overrides[entity_id] = merged
|
|
return config
|
|
|
|
|
|
def update_room_override(config: dict[str, Any], room_id: str, patch: dict[str, Any]) -> dict[str, Any]:
|
|
room = _ensure_room(config, room_id)
|
|
for key, value in patch.items():
|
|
if value is not None:
|
|
room[key] = value
|
|
return config
|
|
|
|
|
|
def update_room_layout_item(config: dict[str, Any], room_id: str, layout_item_id: str, patch: dict[str, Any]) -> dict[str, Any]:
|
|
room = _ensure_room(config, room_id)
|
|
items = room.setdefault("layout_items", [])
|
|
if not isinstance(items, list):
|
|
items = []
|
|
room["layout_items"] = items
|
|
|
|
current = None
|
|
for item in items:
|
|
if isinstance(item, dict) and str(item.get("id", "")) == layout_item_id:
|
|
current = item
|
|
break
|
|
|
|
if current is None:
|
|
current = {
|
|
"id": layout_item_id,
|
|
"type": "ghost",
|
|
}
|
|
items.append(current)
|
|
|
|
for key, value in patch.items():
|
|
if value is not None:
|
|
current[key] = value
|
|
|
|
_sort_room_layout_items(room)
|
|
return config
|
|
|
|
|
|
def create_room_layout_item(config: dict[str, Any], room_id: str, layout_item_id: str, order: int | None = None) -> dict[str, Any]:
|
|
return update_room_layout_item(config, room_id, layout_item_id, {
|
|
"order": order,
|
|
"type": "ghost",
|
|
})
|
|
|
|
|
|
def delete_room_layout_item(config: dict[str, Any], room_id: str, layout_item_id: str) -> dict[str, Any]:
|
|
room = _ensure_room(config, room_id)
|
|
items = room.get("layout_items", [])
|
|
if not isinstance(items, list):
|
|
room["layout_items"] = []
|
|
return config
|
|
|
|
room["layout_items"] = [
|
|
item for item in items
|
|
if not isinstance(item, dict) or str(item.get("id", "")) != layout_item_id
|
|
]
|
|
return config
|
|
|
|
|
|
def reorder_room_grid(config: dict[str, Any], room_id: str, entries: list[Any]) -> dict[str, Any]:
|
|
room = _ensure_room(config, room_id)
|
|
normalized: list[dict[str, str]] = []
|
|
for entry in entries:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
kind = str(entry.get("kind", "")).strip()
|
|
item_id = str(entry.get("id", "")).strip()
|
|
if kind not in {"entity", "layout"} or not item_id:
|
|
continue
|
|
normalized.append({"kind": kind, "id": item_id})
|
|
|
|
entity_overrides = room.setdefault("entity_overrides", {})
|
|
if not isinstance(entity_overrides, dict):
|
|
entity_overrides = {}
|
|
room["entity_overrides"] = entity_overrides
|
|
|
|
layout_items = room.setdefault("layout_items", [])
|
|
if not isinstance(layout_items, list):
|
|
layout_items = []
|
|
room["layout_items"] = layout_items
|
|
|
|
layout_by_id = {
|
|
str(item.get("id", "")): item
|
|
for item in layout_items
|
|
if isinstance(item, dict) and str(item.get("id", "")).strip()
|
|
}
|
|
|
|
order = 10000
|
|
for entry in normalized:
|
|
if entry["kind"] == "entity":
|
|
current = entity_overrides.get(entry["id"], {})
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
merged = deepcopy(current)
|
|
merged["order"] = order
|
|
entity_overrides[entry["id"]] = merged
|
|
else:
|
|
current = layout_by_id.get(entry["id"], {
|
|
"id": entry["id"],
|
|
"type": "ghost",
|
|
})
|
|
merged = deepcopy(current)
|
|
merged["id"] = entry["id"]
|
|
merged["type"] = "ghost"
|
|
merged["order"] = order
|
|
layout_by_id[entry["id"]] = merged
|
|
order += 10
|
|
|
|
room["layout_items"] = sorted(
|
|
layout_by_id.values(),
|
|
key=lambda item: (int(item.get("order", 9999) or 9999), str(item.get("id", ""))),
|
|
)
|
|
return config
|
|
|
|
|
|
def build_patch_payload(payload: dict[str, Any], keys: list[str]) -> dict[str, Any]:
|
|
result: dict[str, Any] = {}
|
|
for key in keys:
|
|
if key in payload:
|
|
result[key] = payload[key]
|
|
return result
|
|
|
|
|
|
def save_shared_config(config: dict[str, Any]) -> None:
|
|
path = shared_config_path()
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(config_to_json(normalize_config(config)) + "\n", encoding="utf-8")
|
|
|
|
|
|
def _deep_merge(target: dict[str, Any], source: dict[str, Any]) -> None:
|
|
for key, value in source.items():
|
|
if isinstance(value, dict) and isinstance(target.get(key), dict):
|
|
_deep_merge(target[key], value)
|
|
else:
|
|
target[key] = deepcopy(value)
|
|
|
|
|
|
def _ensure_room(config: dict[str, Any], room_id: str) -> dict[str, Any]:
|
|
rooms = config.setdefault("rooms", [])
|
|
if not isinstance(rooms, list):
|
|
rooms = []
|
|
config["rooms"] = rooms
|
|
|
|
for room in rooms:
|
|
if isinstance(room, dict) and str(room.get("id", "")) == room_id:
|
|
room.setdefault("visible", True)
|
|
room.setdefault("entity_ids", [])
|
|
room.setdefault("entity_overrides", {})
|
|
room.setdefault("layout_items", [])
|
|
return room
|
|
|
|
room = {
|
|
"id": room_id,
|
|
"visible": True,
|
|
"entity_ids": [],
|
|
"entity_overrides": {},
|
|
"layout_items": [],
|
|
}
|
|
rooms.append(room)
|
|
return room
|
|
|
|
|
|
def _sort_room_layout_items(room: dict[str, Any]) -> None:
|
|
items = room.get("layout_items", [])
|
|
if not isinstance(items, list):
|
|
room["layout_items"] = []
|
|
return
|
|
|
|
room["layout_items"] = sorted(
|
|
[item for item in items if isinstance(item, dict)],
|
|
key=lambda item: (int(item.get("order", 9999) or 9999), str(item.get("id", ""))),
|
|
)
|