-
-
Notifications
You must be signed in to change notification settings - Fork 30.5k
/
__init__.py
426 lines (349 loc) · 13.9 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
"""The Elexa Guardian integration."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
from typing import Any
from aioguardian import Client
from aioguardian.errors import GuardianError
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import (
ATTR_DEVICE_ID,
CONF_DEVICE_ID,
CONF_FILENAME,
CONF_IP_ADDRESS,
CONF_PORT,
CONF_URL,
Platform,
)
from homeassistant.core import HomeAssistant, ServiceCall, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.entity import EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
API_SENSOR_PAIR_DUMP,
API_SENSOR_PAIRED_SENSOR_STATUS,
API_SYSTEM_DIAGNOSTICS,
API_SYSTEM_ONBOARD_SENSOR_STATUS,
API_VALVE_STATUS,
API_WIFI_STATUS,
CONF_UID,
DOMAIN,
LOGGER,
SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED,
)
from .coordinator import GuardianDataUpdateCoordinator
DATA_PAIRED_SENSOR_MANAGER = "paired_sensor_manager"
SERVICE_NAME_PAIR_SENSOR = "pair_sensor"
SERVICE_NAME_UNPAIR_SENSOR = "unpair_sensor"
SERVICE_NAME_UPGRADE_FIRMWARE = "upgrade_firmware"
SERVICES = (
SERVICE_NAME_PAIR_SENSOR,
SERVICE_NAME_UNPAIR_SENSOR,
SERVICE_NAME_UPGRADE_FIRMWARE,
)
SERVICE_BASE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DEVICE_ID): cv.string,
}
)
SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DEVICE_ID): cv.string,
vol.Required(CONF_UID): cv.string,
}
)
SERVICE_UPGRADE_FIRMWARE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_DEVICE_ID): cv.string,
vol.Optional(CONF_URL): cv.url,
vol.Optional(CONF_PORT): cv.port,
vol.Optional(CONF_FILENAME): cv.string,
},
)
PLATFORMS = [
Platform.BINARY_SENSOR,
Platform.BUTTON,
Platform.SENSOR,
Platform.SWITCH,
Platform.VALVE,
]
@dataclass
class GuardianData:
"""Define an object to be stored in `hass.data`."""
entry: ConfigEntry
client: Client
valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator]
paired_sensor_manager: PairedSensorManager
@callback
def async_get_entry_id_for_service_call(hass: HomeAssistant, call: ServiceCall) -> str:
"""Get the entry ID related to a service call (by device ID)."""
device_id = call.data[CONF_DEVICE_ID]
device_registry = dr.async_get(hass)
if (device_entry := device_registry.async_get(device_id)) is None:
raise ValueError(f"Invalid Guardian device ID: {device_id}")
for entry_id in device_entry.config_entries:
if (entry := hass.config_entries.async_get_entry(entry_id)) is None:
continue
if entry.domain == DOMAIN:
return entry_id
raise ValueError(f"No config entry for device ID: {device_id}")
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Elexa Guardian from a config entry."""
client = Client(entry.data[CONF_IP_ADDRESS], port=entry.data[CONF_PORT])
# The valve controller's UDP-based API can't handle concurrent requests very well,
# so we use a lock to ensure that only one API request is reaching it at a time:
api_lock = asyncio.Lock()
async def async_init_coordinator(
coordinator: GuardianDataUpdateCoordinator,
) -> None:
"""Initialize a GuardianDataUpdateCoordinator."""
await coordinator.async_initialize()
await coordinator.async_config_entry_first_refresh()
# Set up GuardianDataUpdateCoordinators for the valve controller:
valve_controller_coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
init_valve_controller_tasks = []
for api, api_coro in (
(API_SENSOR_PAIR_DUMP, client.sensor.pair_dump),
(API_SYSTEM_DIAGNOSTICS, client.system.diagnostics),
(API_SYSTEM_ONBOARD_SENSOR_STATUS, client.system.onboard_sensor_status),
(API_VALVE_STATUS, client.valve.status),
(API_WIFI_STATUS, client.wifi.status),
):
coordinator = valve_controller_coordinators[api] = (
GuardianDataUpdateCoordinator(
hass,
entry=entry,
client=client,
api_name=api,
api_coro=api_coro,
api_lock=api_lock,
valve_controller_uid=entry.data[CONF_UID],
)
)
init_valve_controller_tasks.append(async_init_coordinator(coordinator))
await asyncio.gather(*init_valve_controller_tasks)
# Set up an object to evaluate each batch of paired sensor UIDs and add/remove
# devices as appropriate:
paired_sensor_manager = PairedSensorManager(
hass,
entry,
client,
api_lock,
valve_controller_coordinators[API_SENSOR_PAIR_DUMP],
)
await paired_sensor_manager.async_initialize()
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = GuardianData(
entry=entry,
client=client,
valve_controller_coordinators=valve_controller_coordinators,
paired_sensor_manager=paired_sensor_manager,
)
# Set up all of the Guardian entity platforms:
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@callback
def call_with_data(
func: Callable[[ServiceCall, GuardianData], Coroutine[Any, Any, None]],
) -> Callable[[ServiceCall], Coroutine[Any, Any, None]]:
"""Hydrate a service call with the appropriate GuardianData object."""
async def wrapper(call: ServiceCall) -> None:
"""Wrap the service function."""
entry_id = async_get_entry_id_for_service_call(hass, call)
data = hass.data[DOMAIN][entry_id]
try:
async with data.client:
await func(call, data)
except GuardianError as err:
raise HomeAssistantError(
f"Error while executing {func.__name__}: {err}"
) from err
return wrapper
@call_with_data
async def async_pair_sensor(call: ServiceCall, data: GuardianData) -> None:
"""Add a new paired sensor."""
uid = call.data[CONF_UID]
await data.client.sensor.pair_sensor(uid)
await data.paired_sensor_manager.async_pair_sensor(uid)
@call_with_data
async def async_unpair_sensor(call: ServiceCall, data: GuardianData) -> None:
"""Remove a paired sensor."""
uid = call.data[CONF_UID]
await data.client.sensor.unpair_sensor(uid)
await data.paired_sensor_manager.async_unpair_sensor(uid)
@call_with_data
async def async_upgrade_firmware(call: ServiceCall, data: GuardianData) -> None:
"""Upgrade the device firmware."""
await data.client.system.upgrade_firmware(
url=call.data[CONF_URL],
port=call.data[CONF_PORT],
filename=call.data[CONF_FILENAME],
)
for service_name, schema, method in (
(
SERVICE_NAME_PAIR_SENSOR,
SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
async_pair_sensor,
),
(
SERVICE_NAME_UNPAIR_SENSOR,
SERVICE_PAIR_UNPAIR_SENSOR_SCHEMA,
async_unpair_sensor,
),
(
SERVICE_NAME_UPGRADE_FIRMWARE,
SERVICE_UPGRADE_FIRMWARE_SCHEMA,
async_upgrade_firmware,
),
):
if hass.services.has_service(DOMAIN, service_name):
continue
hass.services.async_register(DOMAIN, service_name, method, schema=schema)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
loaded_entries = [
entry
for entry in hass.config_entries.async_entries(DOMAIN)
if entry.state == ConfigEntryState.LOADED
]
if len(loaded_entries) == 1:
# If this is the last loaded instance of Guardian, deregister any services
# defined during integration setup:
for service_name in SERVICES:
hass.services.async_remove(DOMAIN, service_name)
return unload_ok
class PairedSensorManager:
"""Define an object that manages the addition/removal of paired sensors."""
def __init__(
self,
hass: HomeAssistant,
entry: ConfigEntry,
client: Client,
api_lock: asyncio.Lock,
sensor_pair_dump_coordinator: GuardianDataUpdateCoordinator,
) -> None:
"""Initialize."""
self._api_lock = api_lock
self._client = client
self._entry = entry
self._hass = hass
self._paired_uids: set[str] = set()
self._sensor_pair_dump_coordinator = sensor_pair_dump_coordinator
self.coordinators: dict[str, GuardianDataUpdateCoordinator] = {}
async def async_initialize(self) -> None:
"""Initialize the manager."""
@callback
def async_create_process_task() -> None:
"""Define a callback for when new paired sensor data is received."""
self._hass.async_create_task(self.async_process_latest_paired_sensor_uids())
self._entry.async_on_unload(
self._sensor_pair_dump_coordinator.async_add_listener(
async_create_process_task
)
)
async def async_pair_sensor(self, uid: str) -> None:
"""Add a new paired sensor coordinator."""
LOGGER.debug("Adding paired sensor: %s", uid)
self._paired_uids.add(uid)
coordinator = self.coordinators[uid] = GuardianDataUpdateCoordinator(
self._hass,
entry=self._entry,
client=self._client,
api_name=f"{API_SENSOR_PAIRED_SENSOR_STATUS}_{uid}",
api_coro=lambda: self._client.sensor.paired_sensor_status(uid),
api_lock=self._api_lock,
valve_controller_uid=self._entry.data[CONF_UID],
)
await coordinator.async_request_refresh()
async_dispatcher_send(
self._hass,
SIGNAL_PAIRED_SENSOR_COORDINATOR_ADDED.format(self._entry.data[CONF_UID]),
uid,
)
async def async_process_latest_paired_sensor_uids(self) -> None:
"""Process a list of new UIDs."""
try:
uids = set(self._sensor_pair_dump_coordinator.data["paired_uids"])
except KeyError:
# Sometimes the paired_uids key can fail to exist; the user can't do anything
# about it, so in this case, we quietly abort and return:
return
if uids == self._paired_uids:
return
old = self._paired_uids
new = self._paired_uids = set(uids)
tasks = [self.async_pair_sensor(uid) for uid in new.difference(old)]
tasks += [self.async_unpair_sensor(uid) for uid in old.difference(new)]
if tasks:
await asyncio.gather(*tasks)
async def async_unpair_sensor(self, uid: str) -> None:
"""Remove a paired sensor coordinator."""
LOGGER.debug("Removing paired sensor: %s", uid)
# Clear out objects related to this paired sensor:
self._paired_uids.remove(uid)
self.coordinators.pop(uid)
# Remove the paired sensor device from the device registry (which will
# clean up entities and the entity registry):
dev_reg = dr.async_get(self._hass)
device = dev_reg.async_get_or_create(
config_entry_id=self._entry.entry_id, identifiers={(DOMAIN, uid)}
)
dev_reg.async_remove_device(device.id)
class GuardianEntity(CoordinatorEntity[GuardianDataUpdateCoordinator]):
"""Define a base Guardian entity."""
_attr_has_entity_name = True
def __init__(
self, coordinator: GuardianDataUpdateCoordinator, description: EntityDescription
) -> None:
"""Initialize."""
super().__init__(coordinator)
self.entity_description = description
class PairedSensorEntity(GuardianEntity):
"""Define a Guardian paired sensor entity."""
def __init__(
self,
entry: ConfigEntry,
coordinator: GuardianDataUpdateCoordinator,
description: EntityDescription,
) -> None:
"""Initialize."""
super().__init__(coordinator, description)
paired_sensor_uid = coordinator.data["uid"]
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, paired_sensor_uid)},
manufacturer="Elexa",
model=coordinator.data["codename"],
name=f"Guardian paired sensor {paired_sensor_uid}",
via_device=(DOMAIN, entry.data[CONF_UID]),
)
self._attr_unique_id = f"{paired_sensor_uid}_{description.key}"
@dataclass(frozen=True, kw_only=True)
class ValveControllerEntityDescription(EntityDescription):
"""Describe a Guardian valve controller entity."""
api_category: str
class ValveControllerEntity(GuardianEntity):
"""Define a Guardian valve controller entity."""
def __init__(
self,
entry: ConfigEntry,
coordinators: dict[str, GuardianDataUpdateCoordinator],
description: ValveControllerEntityDescription,
) -> None:
"""Initialize."""
super().__init__(coordinators[description.api_category], description)
self._diagnostics_coordinator = coordinators[API_SYSTEM_DIAGNOSTICS]
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, entry.data[CONF_UID])},
manufacturer="Elexa",
model=self._diagnostics_coordinator.data["firmware"],
name=f"Guardian valve controller {entry.data[CONF_UID]}",
)
self._attr_unique_id = f"{entry.data[CONF_UID]}_{description.key}"