Source code for aioafero.v1.controllers.device

"""Controller that holds top-level devices."""

from aioafero.device import AferoDevice
from aioafero.v1.models.device import Device
from aioafero.v1.models.resource import DeviceInformation, ResourceTypes

from .base import AferoBinarySensor, AferoSensor, BaseResourcesController
from .event import AferoEvent, EventType


[docs] class DeviceController(BaseResourcesController[Device]): """Top-level / parent devices on ``bridge.devices`` (read-only).""" ITEM_TYPE_ID = ResourceTypes.DEVICE ITEM_TYPES = [] ITEM_CLS = Device # Sensors map functionClass -> Unit ITEM_SENSORS: dict[str, str] = { "battery-level": "%", "wifi-rssi": "dB", } # Binary sensors map key -> alerting value ITEM_BINARY_SENSORS: dict[str, str] = { "error": "alerting", }
[docs] def __init__(self, *args, **kwargs): """Initialize instance.""" super().__init__(*args, **kwargs) self._known_parents: dict[str, str] = {}
[docs] async def initialize_elem(self, afero_device: AferoDevice) -> Device: """Initialize the element. :param afero_device: Afero Device that contains the updated states :return: Newly initialized resource """ available: bool = False sensors: dict[str, AferoSensor] = {} binary_sensors: dict[str, AferoBinarySensor] = {} wifi_mac: str | None = None ble_mac: str | None = None for state in afero_device.states: if state.functionClass == "available": available = state.value elif sensor := await self.initialize_sensor(state, afero_device.id): if isinstance(sensor, AferoBinarySensor): binary_sensors[sensor.id] = sensor else: sensors[sensor.id] = sensor elif state.functionClass == "wifi-mac-address": wifi_mac = state.value elif state.functionClass == "ble-mac-address": ble_mac = state.value self._items[afero_device.id] = Device( _id=afero_device.id, available=available, sensors=sensors, binary_sensors=binary_sensors, device_information=DeviceInformation( device_class=afero_device.device_class, default_image=afero_device.default_image, default_name=afero_device.default_name, manufacturer=afero_device.manufacturerName, model=afero_device.model, name=afero_device.friendly_name, parent_id=afero_device.device_id, wifi_mac=wifi_mac, ble_mac=ble_mac, version_data=getattr(afero_device, "version_data", {}), children=afero_device.children, functions=afero_device.functions, ), ) return self._items[afero_device.id]
[docs] async def initialize(self) -> None: """Initialize controller by fetching all items for this resource type from bridge.""" if self._initialized: return # Subscribe to polled data to find all top-level devices self._bridge.events.subscribe( self._process_polled_devices, event_filter=EventType.POLLED_DEVICES, ) # Subscribe to updates for existing devices self._bridge.events.subscribe( self._process_update_response, event_filter=EventType.RESOURCE_UPDATE_RESPONSE, ) self._initialized = True
async def _process_update_response( self, evt_type: EventType, evt_data: AferoEvent | None ) -> None: """Process the response of an update.""" dev = evt_data["device"] if evt_data["device"].device_id in self._known_parents: evt = AferoEvent( type=EventType.RESOURCE_UPDATED, device_id=dev.id, device=dev, ) await self._handle_event(evt["type"], evt) async def _process_polled_devices( self, evt_type: EventType, evt_data: AferoEvent | None ) -> None: """Find all top-level devices within the payload.""" devices: list[AferoDevice] = evt_data["polled_devices"] parent_devices: list[AferoDevice] = self.get_filtered_devices(devices) processed: set[str] = set() for parent_device in parent_devices: evt = AferoEvent( type=EventType.RESOURCE_ADDED, device_id=parent_device.id, device=parent_device, ) if parent_device.device_id not in self._known_parents: self._known_parents[parent_device.device_id] = parent_device.id else: evt["type"] = EventType.RESOURCE_UPDATED processed.add(parent_device.device_id) await self._handle_event(evt["type"], evt) for known_id in list(self._known_parents): if known_id not in processed: device_id = self._known_parents.pop(known_id) self._logger.info("Device %s was not polled. Removing", known_id) evt = AferoEvent( type=EventType.RESOURCE_DELETED, device_id=device_id, ) await self._handle_event(evt["type"], evt)
[docs] def get_filtered_devices(self, devices: list[AferoDevice]) -> list[AferoDevice]: """Find parent devices.""" parents: dict = {} potential_parents: dict = {} for device in devices: if device.children: parents[device.device_id] = device elif device.device_id not in parents and ( device.device_id not in parents and device.device_id not in potential_parents ): potential_parents[device.device_id] = device else: self._logger.debug("skipping %s as its tracked", device.device_id) for potential_parent in potential_parents.values(): if potential_parent.device_id not in parents: parents[potential_parent.device_id] = potential_parent return list(parents.values())
[docs] async def update_elem(self, afero_device: AferoDevice) -> set: """Update the Device with the latest API data. :param afero_device: Afero Device that contains the updated states :return: States that have been modified """ cur_item = self.get_device(afero_device.id) updated_keys = set() for state in afero_device.states: if state.functionClass == "available": if cur_item.available != state.value: cur_item.available = state.value updated_keys.add(state.functionClass) elif update_key := await self.update_sensor(state, cur_item): updated_keys.add(update_key) return updated_keys