"""Controller holding and managing Afero IoT resources of type `security-system`."""
import asyncio
import copy
from aioafero.device import (
AferoCapability,
AferoDevice,
AferoState,
get_function_from_device,
)
from aioafero.errors import DeviceNotFound, SecuritySystemError
from aioafero.util import process_function
from aioafero.v1.models import SecuritySystem, SecuritySystemPut, features
from aioafero.v1.models.resource import DeviceInformation, ResourceTypes
from .base import AferoBinarySensor, AferoSensor, BaseResourcesController, NumbersName
from .event import CallbackResponse
SENSOR_SPLIT_IDENTIFIER = "sensor"
GENERIC_MODES = {0: "Off", 1: "On"}
TRIGGER_MODES = {
0: "Off",
1: "Home",
2: "Away",
3: "Home/Away",
}
KNOWN_SENSOR_MODELS = {
1: "Motion Sensor",
2: "Door/Window Sensor",
}
BYPASS_MODES = {
0: "Off",
1: "Manual",
4: "On",
}
UPDATE_TIME = 3 # Seconds after performing an action to refresh state
[docs]
def get_sensor_ids(device) -> set[int]:
"""Determine available sensors from the states."""
sensor_ids = set()
for state in device.states:
if state.functionInstance is None:
continue
if state.functionInstance.startswith("sensor-") and state.value is not None:
sensor_id = int(state.functionInstance.split("-", 1)[1])
sensor_ids.add(sensor_id)
return sensor_ids
[docs]
def generate_sensor_name(afero_device, sensor_id: int) -> str:
"""Generate the name for an instanced element."""
return f"{afero_device.id}-{SENSOR_SPLIT_IDENTIFIER}-{sensor_id}"
[docs]
def get_valid_states(afero_states: list, sensor_id: int) -> list:
"""Find states associated with the specific sensor."""
valid_states: list = []
for state in afero_states:
if (
state.functionClass not in ["sensor-state", "sensor-config"]
or state.value is None
):
continue
state_sensor_split = state.functionInstance.rsplit("-", 1)
state_sensor_id = int(state_sensor_split[1])
if state_sensor_id != sensor_id:
continue
top_level_key = list(state.value.keys())[0]
if state.functionClass == "sensor-state":
valid_states.append(
AferoState(
functionClass="battery-level",
functionInstance=None,
value=state.value[top_level_key]["batteryLevel"],
)
)
valid_states.append(
AferoState(
functionClass="tampered",
functionInstance=None,
value=GENERIC_MODES[state.value[top_level_key]["tampered"]],
)
)
valid_states.append(
AferoState(
functionClass="triggered",
functionInstance=None,
value=GENERIC_MODES[state.value[top_level_key]["triggered"]],
)
)
valid_states.append(
AferoState(
functionClass="available",
functionInstance=None,
value=not bool(state.value[top_level_key]["missing"]),
)
)
else:
valid_states.append(
AferoState(
functionClass="chirpMode",
functionInstance=None,
value=GENERIC_MODES[state.value[top_level_key]["chirpMode"]],
)
)
valid_states.append(
AferoState(
functionClass="triggerType",
functionInstance=None,
value=TRIGGER_MODES[state.value[top_level_key]["triggerType"]],
)
)
valid_states.append(
AferoState(
functionClass="bypassType",
functionInstance=None,
value=BYPASS_MODES[state.value[top_level_key]["bypassType"]],
)
)
valid_states.append(
AferoState(
functionClass="top-level-key",
functionInstance=None,
value=top_level_key,
)
)
return valid_states
[docs]
def get_model_type(states: list[AferoState], sensor_id: int) -> str:
"""Get the model type from the state list."""
for state in states:
if state.functionClass != "sensor-state" or state.value is None:
continue
state_sensor_split = state.functionInstance.rsplit("-", 1)
state_sensor_id = int(state_sensor_split[1])
if state_sensor_id != sensor_id:
continue
top_level_key = list(state.value.keys())[0]
return KNOWN_SENSOR_MODELS.get(
int(state.value[top_level_key]["deviceType"]), "Unknown"
)
return "Unknown"
[docs]
def get_valid_functions(afero_functions: list, sensor_id: int) -> list:
"""Find functions associated with the specific sensor."""
valid_functions: list = []
for func in afero_functions:
if func["functionClass"] not in ["sensor-state", "sensor-config"]:
continue
sensor_split = func["functionInstance"].rsplit("-", 1)
state_sensor_id = int(sensor_split[1])
if state_sensor_id != sensor_id:
continue
if func["functionClass"] == "sensor-config":
valid_functions.append(
{
"functionClass": "chirpMode",
"functionInstance": func["functionInstance"],
"type": "category",
"values": [{"name": x} for x in GENERIC_MODES.values()],
}
)
valid_functions.append(
{
"functionClass": "triggerType",
"functionInstance": func["functionInstance"],
"type": "category",
"values": [{"name": x} for x in TRIGGER_MODES.values()],
}
)
valid_functions.append(
{
"functionClass": "bypassType",
"functionInstance": func["functionInstance"],
"type": "category",
"values": [{"name": x} for x in BYPASS_MODES.values()],
}
)
return valid_functions
[docs]
def get_sensor_name(afero_capabilities: list[AferoCapability], sensor_id: int) -> str:
"""Get the Afero name for a specific sensor."""
for capability in afero_capabilities:
if (
capability.functionClass != "sensor-state"
or capability.functionInstance != f"sensor-{sensor_id}"
or capability.options.get("name") is None
):
continue
return capability.options["name"]
return f"Sensor {sensor_id}"
[docs]
def security_system_callback(afero_device: AferoDevice) -> CallbackResponse:
"""Convert an AferoDevice into multiple devices."""
multi_devs: list[AferoDevice] = []
if afero_device.device_class == "security-system":
for sensor_id in get_sensor_ids(afero_device):
cloned = copy.deepcopy(afero_device)
cloned.device_id = generate_sensor_name(afero_device, sensor_id)
cloned.id = generate_sensor_name(afero_device, sensor_id)
cloned.split_identifier = SENSOR_SPLIT_IDENTIFIER
cloned.device_class = ResourceTypes.SECURITY_SYSTEM_SENSOR.value
cloned.friendly_name = f"{afero_device.friendly_name} - {get_sensor_name(afero_device.capabilities, sensor_id)}"
cloned.states = get_valid_states(afero_device.states, sensor_id)
cloned.functions = get_valid_functions(afero_device.functions, sensor_id)
cloned.model = f"{afero_device.model} - {get_model_type(afero_device.states, sensor_id)}"
multi_devs.append(cloned)
return CallbackResponse(
split_devices=multi_devs,
remove_original=False,
)
[docs]
class SecuritySystemController(BaseResourcesController[SecuritySystem]):
"""Security systems on ``bridge.security_systems``."""
ITEM_TYPE_ID = ResourceTypes.DEVICE
ITEM_TYPES = [ResourceTypes.SECURITY_SYSTEM]
ITEM_CLS = SecuritySystem
ITEM_MAPPING = {"alarm_state": "alarm-state"}
# Sensors map functionClass -> Unit
ITEM_SENSORS: dict[str, str] = {
"alarm-state": None,
"history-event": None,
"disarmed-by": None,
}
# Binary sensors map key -> alerting value
ITEM_BINARY_SENSORS: dict[str, str] = {
"battery-powered": "battery-powered",
}
# Elements that map to numbers. func class / func instance to NumbersName
ITEM_NUMBERS: dict[tuple[str, str | None], NumbersName] = {
("arm-exit-delay", "away"): NumbersName(
unit="seconds", display_name="Arm Exit Delay Away"
),
("arm-exit-delay", "stay"): NumbersName(
unit="seconds", display_name="Arm Exit Delay Home"
),
("temporary-bypass-time", None): NumbersName(
unit="seconds", display_name="Bypass Time"
),
("disarm-entry-delay", None): NumbersName(
unit="seconds", display_name="Disarm Entry Delay"
),
("siren-alarm-timeout", None): NumbersName(
unit="seconds", display_name="Siren Timeout"
),
}
# Elements that map to Select. func class / func instance to name
ITEM_SELECTS = {
("song-id", "alarm"): "Alarm Noise",
("volume", "siren"): "Alarm Volume",
("bypass-allowed", None): "Enable Temporary Bypass",
("song-id", "chime"): "Chime Noise",
("volume", "chime"): "Chime Volume",
("volume", "entry-delay"): "Entry Delay Volume",
("volume", "exit-delay-away"): "Exit Delay Volume Away",
("volume", "exit-delay-stay"): "Exit Delay Volume Home",
# ("dark-mode", None): "KeyPad Dark Mode",
# ("song-id", "beep"): "KeyPad Noise",
}
# Split sensors from the primary payload
DEVICE_SPLIT_CALLBACKS: dict[str, callable] = {
ResourceTypes.SECURITY_SYSTEM_SENSOR.value: security_system_callback
}
[docs]
async def disarm(self, device_id: str, disarm_pin: int) -> None:
"""Disarm the security system.
Args:
device_id: Device ID from this controller.
disarm_pin: Numeric PIN configured on the panel.
"""
await self.set_state(device_id, disarm_pin=disarm_pin)
[docs]
async def arm_home(self, device_id: str) -> None:
"""Arm the system in home mode.
Args:
device_id: Device ID from this controller.
"""
await self.set_state(device_id, command=4)
[docs]
async def arm_away(self, device_id: str) -> None:
"""Arm the system in away mode.
Args:
device_id: Device ID from this controller.
"""
await self.set_state(device_id, command=2)
[docs]
async def alarm_trigger(self, device_id: str) -> None:
"""Manually trigger the alarm siren.
Args:
device_id: Device ID from this controller.
"""
await self.set_state(device_id, command=5)
[docs]
async def initialize_elem(self, afero_device: AferoDevice) -> SecuritySystem:
"""Initialize the element.
:param afero_device: Afero Device that contains the updated states
:return: Newly initialized resource
"""
available: bool = False
alarm_state: features.ModeFeature | None = None
siren_action: features.SecuritySensorSirenFeature | None = None
numbers: dict[tuple[str, str | None], features.NumbersFeature] | None = {}
selects: dict[tuple[str, str | None], features.SelectFeature] | None = {}
sensors: dict[str, AferoSensor] = {}
binary_sensors: dict[str, AferoBinarySensor] = {}
for state in afero_device.states:
func_def = get_function_from_device(
afero_device.functions, state.functionClass, state.functionInstance
)
if state.functionClass == "available":
available = state.value
elif state.functionClass == "alarm-state":
alarm_state = features.ModeFeature(
mode=state.value,
modes=set(
process_function(afero_device.functions, state.functionClass)
),
)
elif sensor := await self.initialize_sensor(state, afero_device.device_id):
if isinstance(sensor, AferoBinarySensor):
binary_sensors[sensor.id] = sensor
else:
sensors[sensor.id] = sensor
elif number := await self.initialize_number(func_def, state):
numbers[number[0]] = number[1]
elif select := await self.initialize_select(afero_device.functions, state):
selects[select[0]] = select[1]
elif state.functionClass == "siren-action":
try:
result_code = state.value["security-siren-action"]["resultCode"]
command = state.value["security-siren-action"]["command"]
except TypeError:
result_code = None
command = None
siren_action = features.SecuritySensorSirenFeature(
result_code=result_code,
command=command,
)
self._items[afero_device.id] = SecuritySystem(
_id=afero_device.id,
available=available,
sensors=sensors,
binary_sensors=binary_sensors,
numbers=numbers,
selects=selects,
device_information=DeviceInformation(
device_class=afero_device.device_class,
default_image=afero_device.default_image,
default_name=afero_device.default_name,
manufacturer=afero_device.manufacturerName,
model=afero_device.model,
name=afero_device.friendly_name,
parent_id=afero_device.device_id,
children=afero_device.children,
functions=afero_device.functions,
),
alarm_state=alarm_state,
siren_action=siren_action,
)
return self._items[afero_device.id]
[docs]
async def update_elem(self, afero_device: AferoDevice) -> set:
"""Update the Security System with the latest API data.
:param afero_device: Afero Device that contains the updated states
:return: States that have been modified
"""
cur_item = self.get_device(afero_device.id)
updated_keys = set()
for state in afero_device.states:
if state.functionClass == "available":
if cur_item.available != state.value:
updated_keys.add("available")
cur_item.available = state.value
elif state.functionClass == "alarm-state":
if cur_item.alarm_state.mode != state.value:
updated_keys.add(state.functionClass)
cur_item.alarm_state.mode = state.value
elif (
(update_key := await self.update_sensor(state, cur_item))
or (update_key := await self.update_number(state, cur_item))
or (update_key := await self.update_select(state, cur_item))
):
updated_keys.add(update_key)
elif state.functionClass == "siren-action":
try:
result_code = state.value["security-siren-action"]["resultCode"]
command = state.value["security-siren-action"]["command"]
except TypeError:
result_code = None
command = None
if (
result_code != cur_item.siren_action.result_code
or command != cur_item.siren_action.command
):
cur_item.siren_action.result_code = result_code
cur_item.siren_action.command = command
updated_keys.add("siren-action")
return updated_keys
[docs]
async def set_state(
self,
device_id: str,
disarm_pin: int | None = None,
command: int | None = None,
numbers: dict[tuple[str, str | None], float] | None = None,
selects: dict[tuple[str, str | None], str] | None = None,
) -> None:
"""Update security system state in the cloud.
Args:
device_id: Device ID from this controller.
disarm_pin: PIN for disarm commands.
command: Panel command code (``2`` away, ``4`` home, ``5`` alarm trigger).
numbers: Number features keyed by ``(functionClass, functionInstance)``.
selects: Select features keyed by ``(functionClass, functionInstance)``.
"""
update_obj = SecuritySystemPut()
force_mode = False
try:
cur_item = self.get_device(device_id)
except DeviceNotFound:
self._logger.info("Unable to find device %s", device_id)
return
if command is not None:
force_mode = True
if command != 5:
await self.validate_arm_state(device_id, command)
update_obj.siren_action = features.SecuritySensorSirenFeature(
result_code=0,
command=command,
)
if disarm_pin is not None:
force_mode = True
update_obj.disarm_pin = features.SecuritySystemDisarmPin(pin=disarm_pin)
if numbers:
for key, val in numbers.items():
if key not in cur_item.numbers:
continue
update_obj.numbers[key] = features.NumbersFeature(
value=val,
min=cur_item.numbers[key].min,
max=cur_item.numbers[key].max,
step=cur_item.numbers[key].step,
name=cur_item.numbers[key].name,
unit=cur_item.numbers[key].unit,
)
if selects:
for key, val in selects.items():
if key not in cur_item.selects:
continue
update_obj.selects[key] = features.SelectFeature(
selected=val,
selects=cur_item.selects[key].selects,
name=cur_item.selects[key].name,
)
await self.update(
device_id, obj_in=update_obj, send_duplicate_states=force_mode
)
# Ensure the correct pin was used
if disarm_pin:
await self.validate_disarm_pin(device_id)
elif command:
await self.refresh_alarm_state(device_id)
[docs]
async def refresh_alarm_state(self, device_id: str) -> None:
"""Refresh the alarm state after alarm state change command."""
task = asyncio.create_task(asyncio.sleep(UPDATE_TIME))
await task
states = await self._bridge.fetch_device_states(device_id)
device = self._bridge.get_afero_device(device_id)
device.states = states
await self._bridge.events.generate_events_from_update(device)
[docs]
async def validate_disarm_pin(self, device_id: str) -> None:
"""Ensure the system has switched to disarmed."""
# alarm-state does not update immediately, so wait and recheck
task = asyncio.create_task(asyncio.sleep(UPDATE_TIME))
await task
states = await self._bridge.fetch_device_states(device_id)
for state in states:
if state.functionClass != "alarm-state":
continue
if state.value != "disarmed":
raise SecuritySystemError("Disarm PIN was not accepted")
device = self._bridge.get_afero_device(device_id)
device.states = states
await self._bridge.events.generate_events_from_update(device)
[docs]
async def validate_arm_state(self, device_id: str, arm_type: int) -> bool:
"""Ensure the system can arm."""
dev = self._bridge.get_afero_device(device_id)
sensors_with_issues = []
num_sensors = 0
for child in dev.children:
controller = self._bridge.get_device_controller(child)
if type(controller).__name__ != "SecuritySystemSensorController":
continue
sensor = controller[child]
if (
# Generic Bypass
sensor.selects.get(("bypassType", None)).selected in ["Manual", "On"]
or
# Arm Away Bypass
(
arm_type == 2
and sensor.selects.get(("triggerType", None)).selected
not in ["Away", "Home/Away"]
)
or
# Arm Home Bypass
(
arm_type == 4
and sensor.selects.get(("triggerType", None)).selected
not in ["Home", "Home/Away"]
)
):
self._logger.debug("Bypassing sensor %s", sensor.id)
continue
if sensor.available is False:
sensors_with_issues.append(
f"{sensor.device_information.name} (Unavailable)"
)
if sensor.binary_sensors.get("triggered|None").current_value == "On":
sensors_with_issues.append(
f"{sensor.device_information.name} (Triggered)"
)
if sensor.binary_sensors.get("tampered|None").current_value == "On":
sensors_with_issues.append(
f"{sensor.device_information.name} (Tampered)"
)
num_sensors += 1
if sensors_with_issues:
raise SecuritySystemError(
f"Sensors are open or unavailable: {', '.join(sensors_with_issues)}"
)
if num_sensors == 0:
raise SecuritySystemError(
"No sensors are configured for the requested mode."
)
return True