"""Handle connecting to Afero IoT and distribute events."""
import asyncio
from asyncio.coroutines import iscoroutinefunction
from collections.abc import Callable
import contextlib
import datetime
from enum import Enum
from types import NoneType
from typing import TYPE_CHECKING, Any, NamedTuple, NotRequired, TypedDict
from aiohttp.client_exceptions import ClientError
from aiohttp.web_exceptions import HTTPForbidden, HTTPTooManyRequests
from aioafero.device import AferoDevice, get_afero_device
from aioafero.errors import InvalidAuth
from aioafero.types import EventType
from aioafero.v1.models import ResourceTypes
from aioafero.v1.v1_const import VERSION_POLL_INTERVAL_SECONDS
if TYPE_CHECKING: # pragma: no cover
from aioafero.v1 import AferoBridgeV1
[docs]
class BackoffException(Exception):
"""Exception raised when a backoff is required."""
[docs]
class CallbackResponse(NamedTuple):
"""Callback response for DEVICE_SPLIT_CALLBACKS.
:param split_devices: New devices that should be added to the overall list
:param remove_original: Remove the original device from the list of devices
"""
split_devices: list[AferoDevice] = []
remove_original: bool = False
[docs]
class EventStreamStatus(Enum):
"""Status options of EventStream."""
CONNECTING = 0
CONNECTED = 1
DISCONNECTED = 2
[docs]
class AferoEvent(TypedDict):
"""Afero IoT Event message as emitted by the EventStream."""
type: EventType # = EventType (add, update, delete)
device_id: NotRequired[str] # ID for interacting with the device
device: NotRequired[AferoDevice] # Afero Device
polled_data: NotRequired[Any] # All data polled from the API
polled_devices: NotRequired[Any] # All devices after the device split callbacks
force_forward: NotRequired[bool]
EventCallBackType = Callable[[EventType, dict | None], None]
EventSubscriptionType = tuple[
EventCallBackType,
"tuple[EventType] | None",
"tuple[ResourceTypes] | None",
]
[docs]
class EventStream:
"""Data gatherer and eventer.
Polls Afero IoT API, converts the response into devices, and notifies subscribers
of the event.
"""
[docs]
def __init__(
self,
bridge: "AferoBridgeV1",
polling_interval: int,
poll_version: bool,
discovery_interval: int = 3600,
) -> None:
"""Initialize instance."""
self._bridge = bridge
self._listeners = set()
self._event_queue = asyncio.Queue()
self._status = EventStreamStatus.DISCONNECTED
self._scheduled_tasks: list[asyncio.Task] = []
self._subscribers: list[EventSubscriptionType] = []
self._logger = bridge.logger.getChild("events")
self._polling_interval: int = polling_interval
self._discovery_interval: int = discovery_interval
self._multiple_device_finder: dict[str, callable] = {}
self._version_poll_time: datetime.datetime | None = None
self._version_poll_enabled: bool = poll_version
self._first_poll_completed: bool = False
@property
def connected(self) -> bool:
"""Return bool if we're connected."""
return self._status == EventStreamStatus.CONNECTED
@property
def status(self) -> EventStreamStatus:
"""Return connection status."""
return self._status
@property
def registered_multiple_devices(self) -> dict[str, Callable]:
"""Get all registered callbacks for splitting devices."""
return self._multiple_device_finder
@property
def polling_interval(self) -> int:
"""Number of seconds between polling."""
return self._polling_interval
@polling_interval.setter
def polling_interval(self, polling_interval: int) -> None:
"""Set the time between polling Afero API."""
self._polling_interval = polling_interval
@property
def poll_version(self) -> bool:
"""Determine if version polling should occur."""
if not self._version_poll_enabled:
return False
now = datetime.datetime.now(datetime.UTC)
if self._version_poll_time is None:
self._version_poll_time = now
return True
if (
now - self._version_poll_time
).total_seconds() >= VERSION_POLL_INTERVAL_SECONDS:
self._version_poll_time = now
return True
return False
[docs]
async def wait_for_first_poll(self) -> None:
"""Wait until the first poll has completed."""
while not self._first_poll_completed:
await asyncio.sleep(0.05)
[docs]
async def initialize(self) -> None:
"""Start the polling processes."""
if len(self._scheduled_tasks) == 0:
await self.initialize_discovery()
await self.initialize_device_polling()
await self.initialize_processor()
[docs]
async def initialize_discovery(self) -> None:
"""Initialize gathering data from Afero API."""
self._scheduled_tasks.append(asyncio.create_task(self.__event_discovery()))
[docs]
async def initialize_device_polling(self) -> None:
"""Initialize gathering device state data from Afero API."""
self._scheduled_tasks.append(asyncio.create_task(self.__device_polling()))
[docs]
async def initialize_processor(self) -> None:
"""Initialize the processor."""
self._scheduled_tasks.append(asyncio.create_task(self.__event_processor()))
[docs]
def register_multi_device(self, name: str, generate_devices: callable):
"""Register a callable to find multi-devices within the payload.
The callable must return a list of tracked AferoDevices
"""
self._multiple_device_finder[name] = generate_devices
[docs]
async def stop(self) -> None:
"""Stop listening for events."""
with contextlib.suppress(asyncio.CancelledError):
for task in self._scheduled_tasks:
task.cancel()
self._status = EventStreamStatus.DISCONNECTED
self._scheduled_tasks = []
[docs]
def subscribe(
self,
callback: EventCallBackType,
event_filter: EventType | tuple[EventType] | None = None,
resource_filter: tuple[str] | None = None,
) -> Callable:
"""Subscribe to events emitted.
:param callback: callback function to call when an event emits.
:param event_filter: Optionally provide an EventType as filter.
:param resource_filter: Optionally provide a ResourceType as filter.
Returns:
function to unsubscribe.
"""
if not isinstance(event_filter, NoneType | tuple):
event_filter = (event_filter,)
if not isinstance(resource_filter, NoneType | tuple):
resource_filter = (resource_filter,)
subscription = (callback, event_filter, resource_filter)
def unsubscribe():
self._subscribers.remove(subscription)
self._subscribers.append(subscription)
return unsubscribe
[docs]
def add_job(self, event: AferoEvent) -> None:
"""Manually add a job to be processed."""
self._event_queue.put_nowait(event)
[docs]
async def async_block_until_done(self):
"""Blocking call until everything has finished."""
attempt = 0
while not self._event_queue.empty():
self._logger.debug(
"Number of events in queue: %d", self._event_queue.qsize()
)
await asyncio.sleep(0.01)
attempt += 1
if attempt > 100:
self._logger.warning(
"Queue did not empty within a second. Breaking out of the wait"
)
break
[docs]
def emit(self, event_type: EventType, data: AferoEvent = None) -> None:
"""Emit event to all listeners."""
for callback, event_filter, resource_filter in self._subscribers:
try:
if event_filter is not None and event_type not in event_filter:
continue
if (
resource_filter is not None
and data is not None
and (
"device" in data
and hasattr(data["device"], "device_class")
and not any(
data["device"].device_class == res_filter
for res_filter in resource_filter
)
)
):
continue
if iscoroutinefunction(callback):
self._bridge.add_job(
asyncio.create_task(callback(event_type, data))
)
else:
callback(event_type, data)
except Exception:
self._logger.exception("Unhandled exception. Please open a bug report")
[docs]
async def process_backoff(self, attempt: int) -> None:
"""Handle backoff timer for Afero IoT API.
:param attempt: Number of attempts
:param reason: Reason why the backoff is occurring
"""
backoff_time = min(attempt * self.polling_interval, 600)
debug_message = f"Waiting {backoff_time} seconds before next poll"
if attempt == 1:
self._logger.info("Lost connection to the Afero IoT API.")
self._logger.debug(debug_message)
if self._status != EventStreamStatus.DISCONNECTED:
self._status = EventStreamStatus.DISCONNECTED
self.emit(EventType.DISCONNECTED)
await asyncio.sleep(backoff_time)
[docs]
async def gather_discovery_data(self) -> list[dict[Any, str]]:
"""Gather all data from the Afero IoT API."""
consecutive_http_errors = 0
while True:
try:
data = await self._bridge.fetch_discovery_data(
version_poll=self.poll_version
)
except TimeoutError:
self._logger.warning("Timeout when contacting Afero IoT API.")
await self.process_backoff(consecutive_http_errors)
except InvalidAuth:
consecutive_http_errors += 1
self._logger.warning("Invalid credentials provided.")
await self.process_backoff(consecutive_http_errors)
except (HTTPForbidden, HTTPTooManyRequests, ClientError):
consecutive_http_errors += 1
await self.process_backoff(consecutive_http_errors)
except TypeError as err:
self._logger.warning(
"Unexpected data from Afero IoT API, %s.", err.args[0]
)
consecutive_http_errors += 1
await self.process_backoff(consecutive_http_errors)
except Exception:
self._logger.exception(
"Unknown error occurred. Please open a bug report."
)
raise
else:
# Successful connection
if consecutive_http_errors > 0:
self._logger.info("Reconnected to the Afero IoT API")
self.emit(EventType.RECONNECTED)
elif self._status != EventStreamStatus.CONNECTED:
self._status = EventStreamStatus.CONNECTED
self.emit(EventType.CONNECTED)
return data
[docs]
async def generate_devices_from_data(
self, data: list[dict[Any, str]]
) -> list[AferoDevice]:
"""Generate all devices from a given payload.
Generating devices will attempt to split devices where required and remove
devices that are no longer needed, as identified by the callback.
"""
devices = [
get_afero_device(dev)
for dev in data
if dev.get("typeId") == ResourceTypes.DEVICE.value
and dev.get("description", {}).get("device", {}).get("deviceClass")
]
for device in devices:
self._bridge.add_afero_dev(device)
return await self.split_devices(devices)
[docs]
async def split_devices(self, devices: list[AferoDevice]) -> list[AferoDevice]:
"""Split Afero devices into multiple devices where required."""
for name, multi_dev_callable in self._multiple_device_finder.items():
for dev in devices[:]:
split_devs: CallbackResponse = multi_dev_callable(dev)
if split_devs.remove_original:
with contextlib.suppress(KeyError):
devices.remove(dev)
if split_devs.split_devices:
self._logger.debug(
"Found %s devices from %s", len(split_devs.split_devices), name
)
for split_dev in split_devs.split_devices:
if split_dev.split_identifier:
# Cache each split clone under its own id; parent stays at dev.id.
self._bridge.add_afero_dev(split_dev)
if split_dev.id != dev.id and split_dev.id not in dev.children:
dev.children.append(split_dev.id)
devices.extend(split_devs.split_devices)
self._logger.debug("Total number of devices (post split): %s", len(devices))
return devices
[docs]
async def generate_events_from_update(self, dev: AferoDevice) -> None:
"""Generate updates for a single device update."""
devices = await self.split_devices([dev])
self._logger.debug(
"Received update for device %s. Generating %d events",
dev.device_class,
len(devices),
)
for device in devices:
self._event_queue.put_nowait(
AferoEvent(
type=EventType.RESOURCE_UPDATE_RESPONSE,
device_id=device.id,
device=device,
force_forward=False,
)
)
[docs]
async def generate_events_from_data(self, data: list[dict[Any, str]]) -> None:
"""Process the raw Afero IoT data for emitting.
:param data: Raw data from Afero IoT
"""
processed_ids = []
skipped_ids = []
devices = await self.generate_devices_from_data(data)
self._event_queue.put_nowait(
AferoEvent(
type=EventType.POLLED_DATA,
polled_data=data,
force_forward=False,
)
)
self._event_queue.put_nowait(
AferoEvent(
type=EventType.POLLED_DEVICES,
polled_devices=devices,
force_forward=False,
)
)
for device in devices:
event_type = EventType.RESOURCE_UPDATED
if device.id not in self._bridge.tracked_devices:
event_type = EventType.RESOURCE_ADDED
self._event_queue.put_nowait(
AferoEvent(
type=event_type,
device_id=device.id,
device=device,
force_forward=False,
)
)
processed_ids.append(device.id)
# Handle devices that did not report in from the API
for dev_id in self._bridge.tracked_devices:
if dev_id not in processed_ids + skipped_ids:
self._event_queue.put_nowait(
AferoEvent(type=EventType.RESOURCE_DELETED, device_id=dev_id)
)
self._bridge.remove_device(dev_id)
async def __event_discovery(self) -> None:
"""Poll the current states."""
self._status = EventStreamStatus.CONNECTING
while True:
await self.perform_discovery_poll()
await asyncio.sleep(self._discovery_interval)
async def __device_polling(self) -> None:
"""Poll the current device states."""
while True:
await asyncio.sleep(self._polling_interval)
if not self._first_poll_completed:
continue
try:
devices = await self._bridge.fetch_all_device_states()
except Exception:
self._logger.exception("Unable to poll device states")
continue
for device in await self.split_devices(devices):
self._event_queue.put_nowait(
AferoEvent(
type=EventType.RESOURCE_UPDATE_RESPONSE,
device_id=device.id,
device=device,
force_forward=False,
)
)
[docs]
async def process_event(self):
"""Process a single event in the queue."""
try:
event: AferoEvent = await self._event_queue.get()
self.emit(event["type"], event)
except Exception:
self._logger.exception("Unhandled exception. Please open a bug report")
async def __event_processor(self) -> None:
"""Process the Afero IoT devices."""
while True:
await self.process_event()