Source code for aioafero.v1.auth

"""Handle authentication to Afero API."""

from __future__ import annotations

__all__ = ["AferoAuth", "TokenData", "passthrough"]

import asyncio
import base64
from contextlib import contextmanager
import datetime
import hashlib
import logging
import os
import re
from typing import Final, NamedTuple
from urllib.parse import parse_qs, urlparse

import aiohttp
from aiohttp import ClientResponseError, ContentTypeError, web_exceptions
from bs4 import BeautifulSoup
from securelogging import LogRedactorMessage, add_secret, remove_secret

from aioafero.errors import InvalidAuth, InvalidOTP, InvalidResponse, OTPRequired

from . import v1_const

logger = logging.getLogger(__name__)

DEFAULT_TOKEN_TIMEOUT: Final[int] = 118
TOKEN_EXPIRY_BUFFER: Final[int] = 2
STATUS_CODE: Final[str] = "Status Code: %s"


class AuthChallenge(NamedTuple):
    """Data used to perform the initial authentication."""

    challenge: str
    verifier: str


[docs] class TokenData(NamedTuple): """Data related to the current token.""" token: str | None access_token: str | None refresh_token: str expiration: float
class AuthSessionData(NamedTuple): """Data related to current attempt to login.""" session_code: str execution: str tab_id: str
[docs] @contextmanager def passthrough(): """Do nothing.""" yield
def _token_expiration(resp_json: dict, *, now: float | None = None) -> float: """Compute bearer token expiration from an OAuth token response.""" timestamp = now if now is not None else datetime.datetime.now().timestamp() expires_in = resp_json.get("expires_in") if expires_in is not None: return timestamp + float(expires_in) - TOKEN_EXPIRY_BUFFER return timestamp + DEFAULT_TOKEN_TIMEOUT def _remove_secrets_not_in(old: TokenData, new: TokenData) -> None: """Remove old token secrets that are not reused in the replacement data.""" new_secrets = {new.token, new.access_token, new.refresh_token} for secret in (old.token, old.access_token, old.refresh_token): if secret and secret not in new_secrets: remove_secret(secret)
[docs] class AferoAuth: """Authentication against the Afero IoT API. This class follows the Afero IoT authentication workflow and utilizes refresh tokens. Construct runtime instances with ``AferoAuth(session, username, refresh_token, ...)`` or use :meth:`for_login` for credential-based login. """
[docs] def __init__( self, session: aiohttp.ClientSession, username: str, refresh_token: str, *, token: str | None = None, token_expiration: float | None = None, hide_secrets: bool = True, afero_client: str = "hubspace", client_name: str = "aioafero", ) -> None: """Initialize auth for refresh-token runtime flows; use :meth:`for_login` for credentials. :param session: Shared ``aiohttp.ClientSession`` for OpenID and token requests. :param username: Afero-backed account username. :param refresh_token: OAuth refresh token from login or storage. :param token: Optional non-expired bearer token to skip the initial refresh. :param token_expiration: Unix timestamp when ``token`` expires; omit with ``token`` to refresh on first API use. :param hide_secrets: Redact sensitive values from logs. :param afero_client: Afero client identifier (default ``hubspace``). :param client_name: User-Agent token for auth requests. """ if session is None: raise ValueError("session is required") if not refresh_token: raise ValueError("refresh_token is required") self._init_state( session, username, hide_secrets=hide_secrets, afero_client=afero_client, client_name=client_name, ) add_secret(refresh_token) now = datetime.datetime.now().timestamp() if token: add_secret(token) if token_expiration is not None: expiration = token_expiration else: # No expiry supplied: treat bearer as stale so token() refreshes on first use. expiration = now self._token_data = TokenData(token, None, refresh_token, expiration) else: self._token_data = TokenData(None, None, refresh_token, now)
def _init_state( self, session: aiohttp.ClientSession, username: str, *, hide_secrets: bool, afero_client: str, client_name: str, ) -> None: """Shared setup for runtime and login instances.""" self.logger = logging.getLogger(f"{__package__}[{username}]") if hide_secrets: self.secret_logger = LogRedactorMessage else: self.secret_logger = passthrough self._hide_secrets = hide_secrets self._async_lock = asyncio.Lock() self._session = session self._username = username self._password = None self._afero_client = afero_client self._client_name = client_name self._token_data = None self._token_headers = { "Content-Type": "application/x-www-form-urlencoded", "accept-encoding": "gzip", "host": v1_const.AFERO_CLIENTS[afero_client]["AUTH_OPENID_HOST"], } self._otp_data = {}
[docs] @classmethod def for_login( cls, session: aiohttp.ClientSession, username: str, password: str, *, afero_client: str = "hubspace", hide_secrets: bool = True, client_name: str = "aioafero", ) -> AferoAuth: """Create an auth instance for credential-based login flows. :param session: Shared ``aiohttp.ClientSession`` for OpenID and token requests. :param username: Afero-backed account username. :param password: Account password (cleared after successful login). """ if session is None: raise ValueError("session is required") auth = cls.__new__(cls) auth._init_state( # noqa: SLF001 session, username, hide_secrets=hide_secrets, afero_client=afero_client, client_name=client_name, ) auth._password = password # noqa: SLF001 if hide_secrets and password: add_secret(password) return auth
def _clear_password(self) -> None: """Drop the in-memory password and remove it from the log redactor.""" password = self._password self._password = None if password and self._hide_secrets: remove_secret(password) @property async def is_expired(self) -> bool: """Determine if the token is expired.""" if not self._token_data: return True return datetime.datetime.now().timestamp() >= self._token_data.expiration async def _needs_token_refresh(self) -> bool: """Return True when the bearer token is missing or past its expiration.""" if not self._token_data or not self._token_data.token: return True return await self.is_expired @property def refresh_token(self) -> str | None: """Get the current refresh token.""" if not self._token_data: return None return self._token_data.refresh_token
[docs] def generate_auth_url(self, endpoint: str) -> str: """Generate an auth URL for the Afero API.""" endpoint = endpoint.removeprefix("/") return f"https://{v1_const.AFERO_CLIENTS[self._afero_client]['AUTH_OPENID_HOST']}/auth/realms/{v1_const.AFERO_CLIENTS[self._afero_client]['AUTH_REALM']}/{endpoint}"
[docs] def set_token_data(self, data: TokenData) -> None: """Set the current token data. When ``hide_secrets`` is True (default), updates the securelogging registry: removes old token values not reused in ``data``, then registers new ones. When ``hide_secrets`` is False, only ``_token_data`` is replaced (no registry changes); DEBUG logs may expose secrets. """ if self._hide_secrets and self._token_data is not None: _remove_secrets_not_in(self._token_data, data) self._token_data = data if self._hide_secrets: for secret in (data.token, data.access_token, data.refresh_token): if secret: add_secret(secret)
async def _auth_request( self, method: str, url: str, **kwargs ) -> aiohttp.ClientResponse: """Make an unauthenticated request to the OpenID/token endpoints.""" headers = { "user-agent": v1_const.AFERO_GENERICS["DEFAULT_USERAGENT"].safe_substitute( client_name=self._client_name ), "accept-encoding": "gzip", } headers.update(kwargs.pop("headers", {})) kwargs["headers"] = headers kwargs.setdefault("ssl", True) response = await self._session.request(method, url, **kwargs) await response.read() if response.status == 403: raise web_exceptions.HTTPForbidden return response
[docs] async def webapp_login(self, challenge: AuthChallenge) -> str: """Perform login to the webapp for a code. Login to the webapp and generate a code used for generating tokens. :param challenge: Challenge data for connection and approving :return: Code used for generating a refresh token """ code_params: dict[str, str] = { "response_type": "code", "client_id": v1_const.AFERO_CLIENTS[self._afero_client][ "AUTH_DEFAULT_CLIENT_ID" ], "redirect_uri": v1_const.AFERO_CLIENTS[self._afero_client][ "AUTH_DEFAULT_REDIRECT_URI" ], "code_challenge": challenge.challenge, "code_challenge_method": "S256", "scope": "openid offline_access", } url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_OPENID_ENDPOINT"]) self.logger.debug( "URL: %s\n\tparams: %s", url, code_params, ) try: response = await self._auth_request( "GET", url, params=code_params, allow_redirects=False, ) except aiohttp.web_exceptions.HTTPError as err: raise InvalidResponse("Unable to query login page") from err if response.status == 200: contents = await response.text() login_data = await extract_login_data(contents, "kc-form-login") self.logger.debug( ("WebApp Login:\n\tSession Code: %s\n\tExecution: %s\n\tTab ID:%s"), login_data.session_code, login_data.execution, login_data.tab_id, ) return await self.generate_code(login_data, challenge) if response.status == 302: self.logger.debug("Hubspace returned an active session") return await AferoAuth.parse_code(response) try: response.raise_for_status() except ClientResponseError as err: raise InvalidResponse("Unable to query login page") from err
[docs] @staticmethod async def generate_challenge_data() -> AuthChallenge: """Generate data to send to Afero API when logging into the system.""" code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8") code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier) code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8") code_challenge = code_challenge.replace("=", "") chal = AuthChallenge(code_challenge, code_verifier) with LogRedactorMessage(): logger.debug( "Challenge information: %s", AuthChallenge(code_challenge, "<redacted>"), ) return chal
[docs] async def generate_code( self, data: AuthSessionData, challenge: AuthChallenge ) -> str: """Finalize login to Afero IoT page. :param session_code: Session code during form interaction :param execution: Session code during form interaction :param tab_id: Session code during form interaction :return: code for generating tokens """ self.logger.debug("Generating code") params = extract_login_codes(data, self._afero_client) headers = { "Content-Type": "application/x-www-form-urlencoded", "x-requested-with": "io.afero.partner.hubspace", } auth_data = { "username": self._username, "password": self._password, "credentialId": "", } url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_CODE_ENDPOINT"]) self.logger.debug( "URL: %s\n\tparams: %s\n\theaders: %s", url, params, headers, ) try: response = await self._auth_request( "POST", url, params=params, data=auth_data, headers=headers, allow_redirects=False, ) self.logger.debug(STATUS_CODE, response.status) # If OTP is required, a 200 will be returned. If a 302 is returned, then # there is no OTP required. content = await response.text() if await AferoAuth.requires_otp(content): login_data = await extract_login_data(content, "kc-otp-login-form") otp_params = extract_login_codes(login_data, self._afero_client) self._otp_data = { "params": otp_params, "headers": headers, "challenge": challenge, } raise OTPRequired if response.status != 302: raise InvalidAuth( "Unable to authenticate with the supplied username / password" ) return await AferoAuth.parse_code(response) finally: self._clear_password()
[docs] @staticmethod async def requires_otp(content: str) -> bool: """Determine if the user requires otp.""" return "kc-otp-login-form" in content
[docs] @staticmethod async def parse_code(response: aiohttp.ClientResponse) -> str: """Parse the code for generating tokens.""" try: parsed_url = urlparse(response.headers["location"]) code = parse_qs(parsed_url.query)["code"][0] except KeyError as err: raise InvalidResponse( f"Unable to process the result from {response.url}: {response.status}" ) from err add_secret(code) location = response.headers.get("location") with LogRedactorMessage(): logger.debug("Location: %s", location) logger.debug("Code: %s", code) return code
[docs] async def generate_refresh_token( self, challenge: AuthChallenge | None = None, code: str | None = None, ) -> TokenData: """Generate a refresh token. If a challenge is provided, it will send the correct data. If no challenge is required, it will use the existing refresh token. :param challenge: Challenge data for connection and approving. :param code: Code used for generating refresh token. :return: Token data including refresh and bearer tokens. """ self.logger.debug("Generating refresh token") if code: add_secret(code) if challenge: add_secret(challenge.verifier) data = { "grant_type": "authorization_code", "code": code, "redirect_uri": v1_const.AFERO_CLIENTS[self._afero_client][ "AUTH_DEFAULT_REDIRECT_URI" ], "code_verifier": challenge.verifier, "client_id": v1_const.AFERO_CLIENTS[self._afero_client][ "AUTH_DEFAULT_CLIENT_ID" ], } else: data = { "grant_type": "refresh_token", "refresh_token": self._token_data.refresh_token, "scope": "openid email offline_access profile", "client_id": v1_const.AFERO_CLIENTS[self._afero_client][ "AUTH_DEFAULT_CLIENT_ID" ], } url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_TOKEN_ENDPOINT"]) with self.secret_logger(): self.logger.debug( "URL: %s\n\tdata: %s\n\theaders: %s", url, data, self._token_headers, ) try: response = await self._auth_request( "POST", url, headers=self._token_headers, data=data, ) self.logger.debug(STATUS_CODE, response.status) try: resp_json = await response.json() except (ValueError, ContentTypeError) as err: raise InvalidResponse( "Unexpected data returned during token refresh" ) from err if response.status != 200: if resp_json and resp_json.get("error") == "invalid_grant": raise InvalidAuth try: response.raise_for_status() except ClientResponseError as err: raise InvalidResponse( "Unexpected data returned during token refresh" ) from err try: refresh_token = resp_json["refresh_token"] access_token = resp_json["access_token"] id_token = resp_json["id_token"] except KeyError as err: raise InvalidResponse("Unable to extract refresh token") from err if not refresh_token or not access_token or not id_token: raise InvalidResponse("Unable to extract refresh token") add_secret(refresh_token) add_secret(access_token) add_secret(id_token) with self.secret_logger(): self.logger.debug("JSON response: %s", resp_json) return TokenData( id_token, access_token, refresh_token, _token_expiration(resp_json), ) finally: if code: remove_secret(code) if challenge: remove_secret(challenge.verifier)
[docs] async def perform_initial_login(self) -> TokenData: """Login to generate a refresh token. :return: Refresh token for the auth """ challenge: AuthChallenge | None = None try: challenge = await AferoAuth.generate_challenge_data() code: str = await self.webapp_login(challenge) self.logger.debug("Successfully generated an auth code") refresh_token = await self.generate_refresh_token( code=code, challenge=challenge ) self.logger.debug("Successfully generated a refresh token") return refresh_token finally: self._clear_password()
[docs] async def login(self) -> TokenData: """Perform credential-based login and return token data.""" return await self.perform_initial_login()
[docs] async def perform_otp_login(self, otp_code: str) -> TokenData: """Perform otp login to generate a refresh token. :return: Refresh token for the auth """ return await self.submit_otp(otp_code)
[docs] async def submit_otp(self, otp_code: str) -> TokenData: """Submit OTP code and complete login. :param otp_code: OTP code provided by the user :return: Token data for the authenticated session """ if self._otp_data == {}: raise OTPRequired("No OTP data available to perform login") self.logger.debug("Performing otp login") code: str = await self._submit_otp_code(otp_code) token_data = await self.generate_refresh_token( code=code, challenge=self._otp_data["challenge"] ) self.logger.debug("Successfully generated a refresh token") self._token_data = token_data self._otp_data = {} return token_data
async def _submit_otp_code(self, otp_code: str) -> str: """Submit OTP code to obtain an authorization code.""" otp_data = { "action": "submit", "flowName": "doLogIn", "emailCode": otp_code, } url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_CODE_ENDPOINT"]) response = await self._auth_request( "POST", url, params=self._otp_data["params"], data=otp_data, headers=self._otp_data["headers"], allow_redirects=False, ) if response.status != 302: self.logger.warning("OTP code was invalid. Re-enter the OTP code.") content = await response.text() kc_error = get_kc_error(content) login_data = await extract_login_data(content, "kc-otp-login-form") otp_params = extract_login_codes(login_data, self._afero_client) self._otp_data["params"] = otp_params raise InvalidOTP(kc_error) return await AferoAuth.parse_code(response)
[docs] async def token(self, retry: bool = True) -> str: """Generate the token required to make Afero API calls.""" retry_refresh = False async with self._async_lock: if not self._token_data: raise InvalidAuth("No token data available") if await self._needs_token_refresh(): self.logger.debug("Token has not been generated or is expired") try: old_data = self._token_data new_data = await self.generate_refresh_token() _remove_secrets_not_in(old_data, new_data) self._token_data = new_data except InvalidAuth: self.logger.debug("Provided refresh token is no longer valid.") if not retry: raise retry_refresh = True else: self.logger.debug("Token has been successfully generated") if retry_refresh: return await self.token(retry=False) bearer = self._token_data.token if not bearer: raise InvalidAuth("No token data available") return bearer
async def extract_login_data(page: str, form_login_element: str) -> AuthSessionData: """Extract the required login data from the auth page. :param page: the response from performing a GET against v1_const.AFERO_CLIENTS[self._afero_client]['OPENID_URL'] """ auth_page = BeautifulSoup(page, features="html.parser") login_form = auth_page.find("form", id=form_login_element) if login_form is None: raise InvalidResponse("Unable to parse login page") try: login_url: str = login_form.attrs["action"] except KeyError as err: raise InvalidResponse("Unable to extract login url") from err parsed_url = urlparse(login_url) login_data = parse_qs(parsed_url.query) try: return AuthSessionData( login_data["session_code"][0], login_data["execution"][0], login_data["tab_id"][0], ) except (KeyError, IndexError) as err: raise InvalidResponse("Unable to parse login url") from err def extract_login_codes(data: AuthSessionData, client: str) -> dict: return { "session_code": data.session_code, "execution": data.execution, "client_id": v1_const.AFERO_CLIENTS[client]["AUTH_DEFAULT_CLIENT_ID"], "tab_id": data.tab_id, } def get_kc_error(page: str) -> str: """Extract the error message from the otp page.""" auth_page = BeautifulSoup(page, features="html.parser") error_div = auth_page.find("span", class_="kc-feedback-text") if error_div is None: return "Unknown error" return error_div.text.strip()