"""Handle authentication to Afero API."""
from __future__ import annotations
__all__ = ["AferoAuth", "TokenData", "passthrough"]
import asyncio
import base64
from contextlib import contextmanager
import datetime
import hashlib
import logging
import os
import re
from typing import Final, NamedTuple
from urllib.parse import parse_qs, urlparse
import aiohttp
from aiohttp import ClientResponseError, ContentTypeError, web_exceptions
from bs4 import BeautifulSoup
from securelogging import LogRedactorMessage, add_secret, remove_secret
from aioafero.errors import InvalidAuth, InvalidOTP, InvalidResponse, OTPRequired
from . import v1_const
logger = logging.getLogger(__name__)
DEFAULT_TOKEN_TIMEOUT: Final[int] = 118
TOKEN_EXPIRY_BUFFER: Final[int] = 2
STATUS_CODE: Final[str] = "Status Code: %s"
class AuthChallenge(NamedTuple):
"""Data used to perform the initial authentication."""
challenge: str
verifier: str
[docs]
class TokenData(NamedTuple):
"""Data related to the current token."""
token: str | None
access_token: str | None
refresh_token: str
expiration: float
class AuthSessionData(NamedTuple):
"""Data related to current attempt to login."""
session_code: str
execution: str
tab_id: str
[docs]
@contextmanager
def passthrough():
"""Do nothing."""
yield
def _token_expiration(resp_json: dict, *, now: float | None = None) -> float:
"""Compute bearer token expiration from an OAuth token response."""
timestamp = now if now is not None else datetime.datetime.now().timestamp()
expires_in = resp_json.get("expires_in")
if expires_in is not None:
return timestamp + float(expires_in) - TOKEN_EXPIRY_BUFFER
return timestamp + DEFAULT_TOKEN_TIMEOUT
def _remove_secrets_not_in(old: TokenData, new: TokenData) -> None:
"""Remove old token secrets that are not reused in the replacement data."""
new_secrets = {new.token, new.access_token, new.refresh_token}
for secret in (old.token, old.access_token, old.refresh_token):
if secret and secret not in new_secrets:
remove_secret(secret)
[docs]
class AferoAuth:
"""Authentication against the Afero IoT API.
This class follows the Afero IoT authentication workflow and utilizes
refresh tokens.
Construct runtime instances with ``AferoAuth(session, username, refresh_token, ...)``
or use :meth:`for_login` for credential-based login.
"""
[docs]
def __init__(
self,
session: aiohttp.ClientSession,
username: str,
refresh_token: str,
*,
token: str | None = None,
token_expiration: float | None = None,
hide_secrets: bool = True,
afero_client: str = "hubspace",
client_name: str = "aioafero",
) -> None:
"""Initialize auth for refresh-token runtime flows; use :meth:`for_login` for credentials.
:param session: Shared ``aiohttp.ClientSession`` for OpenID and token requests.
:param username: Afero-backed account username.
:param refresh_token: OAuth refresh token from login or storage.
:param token: Optional non-expired bearer token to skip the initial refresh.
:param token_expiration: Unix timestamp when ``token`` expires; omit with ``token``
to refresh on first API use.
:param hide_secrets: Redact sensitive values from logs.
:param afero_client: Afero client identifier (default ``hubspace``).
:param client_name: User-Agent token for auth requests.
"""
if session is None:
raise ValueError("session is required")
if not refresh_token:
raise ValueError("refresh_token is required")
self._init_state(
session,
username,
hide_secrets=hide_secrets,
afero_client=afero_client,
client_name=client_name,
)
add_secret(refresh_token)
now = datetime.datetime.now().timestamp()
if token:
add_secret(token)
if token_expiration is not None:
expiration = token_expiration
else:
# No expiry supplied: treat bearer as stale so token() refreshes on first use.
expiration = now
self._token_data = TokenData(token, None, refresh_token, expiration)
else:
self._token_data = TokenData(None, None, refresh_token, now)
def _init_state(
self,
session: aiohttp.ClientSession,
username: str,
*,
hide_secrets: bool,
afero_client: str,
client_name: str,
) -> None:
"""Shared setup for runtime and login instances."""
self.logger = logging.getLogger(f"{__package__}[{username}]")
if hide_secrets:
self.secret_logger = LogRedactorMessage
else:
self.secret_logger = passthrough
self._hide_secrets = hide_secrets
self._async_lock = asyncio.Lock()
self._session = session
self._username = username
self._password = None
self._afero_client = afero_client
self._client_name = client_name
self._token_data = None
self._token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
"accept-encoding": "gzip",
"host": v1_const.AFERO_CLIENTS[afero_client]["AUTH_OPENID_HOST"],
}
self._otp_data = {}
[docs]
@classmethod
def for_login(
cls,
session: aiohttp.ClientSession,
username: str,
password: str,
*,
afero_client: str = "hubspace",
hide_secrets: bool = True,
client_name: str = "aioafero",
) -> AferoAuth:
"""Create an auth instance for credential-based login flows.
:param session: Shared ``aiohttp.ClientSession`` for OpenID and token requests.
:param username: Afero-backed account username.
:param password: Account password (cleared after successful login).
"""
if session is None:
raise ValueError("session is required")
auth = cls.__new__(cls)
auth._init_state( # noqa: SLF001
session,
username,
hide_secrets=hide_secrets,
afero_client=afero_client,
client_name=client_name,
)
auth._password = password # noqa: SLF001
if hide_secrets and password:
add_secret(password)
return auth
def _clear_password(self) -> None:
"""Drop the in-memory password and remove it from the log redactor."""
password = self._password
self._password = None
if password and self._hide_secrets:
remove_secret(password)
@property
async def is_expired(self) -> bool:
"""Determine if the token is expired."""
if not self._token_data:
return True
return datetime.datetime.now().timestamp() >= self._token_data.expiration
async def _needs_token_refresh(self) -> bool:
"""Return True when the bearer token is missing or past its expiration."""
if not self._token_data or not self._token_data.token:
return True
return await self.is_expired
@property
def refresh_token(self) -> str | None:
"""Get the current refresh token."""
if not self._token_data:
return None
return self._token_data.refresh_token
[docs]
def generate_auth_url(self, endpoint: str) -> str:
"""Generate an auth URL for the Afero API."""
endpoint = endpoint.removeprefix("/")
return f"https://{v1_const.AFERO_CLIENTS[self._afero_client]['AUTH_OPENID_HOST']}/auth/realms/{v1_const.AFERO_CLIENTS[self._afero_client]['AUTH_REALM']}/{endpoint}"
[docs]
def set_token_data(self, data: TokenData) -> None:
"""Set the current token data.
When ``hide_secrets`` is True (default), updates the securelogging registry:
removes old token values not reused in ``data``, then registers new ones.
When ``hide_secrets`` is False, only ``_token_data`` is replaced (no registry
changes); DEBUG logs may expose secrets.
"""
if self._hide_secrets and self._token_data is not None:
_remove_secrets_not_in(self._token_data, data)
self._token_data = data
if self._hide_secrets:
for secret in (data.token, data.access_token, data.refresh_token):
if secret:
add_secret(secret)
async def _auth_request(
self, method: str, url: str, **kwargs
) -> aiohttp.ClientResponse:
"""Make an unauthenticated request to the OpenID/token endpoints."""
headers = {
"user-agent": v1_const.AFERO_GENERICS["DEFAULT_USERAGENT"].safe_substitute(
client_name=self._client_name
),
"accept-encoding": "gzip",
}
headers.update(kwargs.pop("headers", {}))
kwargs["headers"] = headers
kwargs.setdefault("ssl", True)
response = await self._session.request(method, url, **kwargs)
await response.read()
if response.status == 403:
raise web_exceptions.HTTPForbidden
return response
[docs]
async def webapp_login(self, challenge: AuthChallenge) -> str:
"""Perform login to the webapp for a code.
Login to the webapp and generate a code used for generating tokens.
:param challenge: Challenge data for connection and approving
:return: Code used for generating a refresh token
"""
code_params: dict[str, str] = {
"response_type": "code",
"client_id": v1_const.AFERO_CLIENTS[self._afero_client][
"AUTH_DEFAULT_CLIENT_ID"
],
"redirect_uri": v1_const.AFERO_CLIENTS[self._afero_client][
"AUTH_DEFAULT_REDIRECT_URI"
],
"code_challenge": challenge.challenge,
"code_challenge_method": "S256",
"scope": "openid offline_access",
}
url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_OPENID_ENDPOINT"])
self.logger.debug(
"URL: %s\n\tparams: %s",
url,
code_params,
)
try:
response = await self._auth_request(
"GET",
url,
params=code_params,
allow_redirects=False,
)
except aiohttp.web_exceptions.HTTPError as err:
raise InvalidResponse("Unable to query login page") from err
if response.status == 200:
contents = await response.text()
login_data = await extract_login_data(contents, "kc-form-login")
self.logger.debug(
("WebApp Login:\n\tSession Code: %s\n\tExecution: %s\n\tTab ID:%s"),
login_data.session_code,
login_data.execution,
login_data.tab_id,
)
return await self.generate_code(login_data, challenge)
if response.status == 302:
self.logger.debug("Hubspace returned an active session")
return await AferoAuth.parse_code(response)
try:
response.raise_for_status()
except ClientResponseError as err:
raise InvalidResponse("Unable to query login page") from err
[docs]
@staticmethod
async def generate_challenge_data() -> AuthChallenge:
"""Generate data to send to Afero API when logging into the system."""
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8")
code_verifier = re.sub("[^a-zA-Z0-9]+", "", code_verifier)
code_challenge = hashlib.sha256(code_verifier.encode("utf-8")).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode("utf-8")
code_challenge = code_challenge.replace("=", "")
chal = AuthChallenge(code_challenge, code_verifier)
with LogRedactorMessage():
logger.debug(
"Challenge information: %s",
AuthChallenge(code_challenge, "<redacted>"),
)
return chal
[docs]
async def generate_code(
self, data: AuthSessionData, challenge: AuthChallenge
) -> str:
"""Finalize login to Afero IoT page.
:param session_code: Session code during form interaction
:param execution: Session code during form interaction
:param tab_id: Session code during form interaction
:return: code for generating tokens
"""
self.logger.debug("Generating code")
params = extract_login_codes(data, self._afero_client)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"x-requested-with": "io.afero.partner.hubspace",
}
auth_data = {
"username": self._username,
"password": self._password,
"credentialId": "",
}
url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_CODE_ENDPOINT"])
self.logger.debug(
"URL: %s\n\tparams: %s\n\theaders: %s",
url,
params,
headers,
)
try:
response = await self._auth_request(
"POST",
url,
params=params,
data=auth_data,
headers=headers,
allow_redirects=False,
)
self.logger.debug(STATUS_CODE, response.status)
# If OTP is required, a 200 will be returned. If a 302 is returned, then
# there is no OTP required.
content = await response.text()
if await AferoAuth.requires_otp(content):
login_data = await extract_login_data(content, "kc-otp-login-form")
otp_params = extract_login_codes(login_data, self._afero_client)
self._otp_data = {
"params": otp_params,
"headers": headers,
"challenge": challenge,
}
raise OTPRequired
if response.status != 302:
raise InvalidAuth(
"Unable to authenticate with the supplied username / password"
)
return await AferoAuth.parse_code(response)
finally:
self._clear_password()
[docs]
@staticmethod
async def requires_otp(content: str) -> bool:
"""Determine if the user requires otp."""
return "kc-otp-login-form" in content
[docs]
@staticmethod
async def parse_code(response: aiohttp.ClientResponse) -> str:
"""Parse the code for generating tokens."""
try:
parsed_url = urlparse(response.headers["location"])
code = parse_qs(parsed_url.query)["code"][0]
except KeyError as err:
raise InvalidResponse(
f"Unable to process the result from {response.url}: {response.status}"
) from err
add_secret(code)
location = response.headers.get("location")
with LogRedactorMessage():
logger.debug("Location: %s", location)
logger.debug("Code: %s", code)
return code
[docs]
async def generate_refresh_token(
self,
challenge: AuthChallenge | None = None,
code: str | None = None,
) -> TokenData:
"""Generate a refresh token.
If a challenge is provided, it will send the correct data. If no challenge is
required, it will use the existing refresh token.
:param challenge: Challenge data for connection and approving.
:param code: Code used for generating refresh token.
:return: Token data including refresh and bearer tokens.
"""
self.logger.debug("Generating refresh token")
if code:
add_secret(code)
if challenge:
add_secret(challenge.verifier)
data = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": v1_const.AFERO_CLIENTS[self._afero_client][
"AUTH_DEFAULT_REDIRECT_URI"
],
"code_verifier": challenge.verifier,
"client_id": v1_const.AFERO_CLIENTS[self._afero_client][
"AUTH_DEFAULT_CLIENT_ID"
],
}
else:
data = {
"grant_type": "refresh_token",
"refresh_token": self._token_data.refresh_token,
"scope": "openid email offline_access profile",
"client_id": v1_const.AFERO_CLIENTS[self._afero_client][
"AUTH_DEFAULT_CLIENT_ID"
],
}
url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_TOKEN_ENDPOINT"])
with self.secret_logger():
self.logger.debug(
"URL: %s\n\tdata: %s\n\theaders: %s",
url,
data,
self._token_headers,
)
try:
response = await self._auth_request(
"POST",
url,
headers=self._token_headers,
data=data,
)
self.logger.debug(STATUS_CODE, response.status)
try:
resp_json = await response.json()
except (ValueError, ContentTypeError) as err:
raise InvalidResponse(
"Unexpected data returned during token refresh"
) from err
if response.status != 200:
if resp_json and resp_json.get("error") == "invalid_grant":
raise InvalidAuth
try:
response.raise_for_status()
except ClientResponseError as err:
raise InvalidResponse(
"Unexpected data returned during token refresh"
) from err
try:
refresh_token = resp_json["refresh_token"]
access_token = resp_json["access_token"]
id_token = resp_json["id_token"]
except KeyError as err:
raise InvalidResponse("Unable to extract refresh token") from err
if not refresh_token or not access_token or not id_token:
raise InvalidResponse("Unable to extract refresh token")
add_secret(refresh_token)
add_secret(access_token)
add_secret(id_token)
with self.secret_logger():
self.logger.debug("JSON response: %s", resp_json)
return TokenData(
id_token,
access_token,
refresh_token,
_token_expiration(resp_json),
)
finally:
if code:
remove_secret(code)
if challenge:
remove_secret(challenge.verifier)
[docs]
async def login(self) -> TokenData:
"""Perform credential-based login and return token data."""
return await self.perform_initial_login()
[docs]
async def submit_otp(self, otp_code: str) -> TokenData:
"""Submit OTP code and complete login.
:param otp_code: OTP code provided by the user
:return: Token data for the authenticated session
"""
if self._otp_data == {}:
raise OTPRequired("No OTP data available to perform login")
self.logger.debug("Performing otp login")
code: str = await self._submit_otp_code(otp_code)
token_data = await self.generate_refresh_token(
code=code, challenge=self._otp_data["challenge"]
)
self.logger.debug("Successfully generated a refresh token")
self._token_data = token_data
self._otp_data = {}
return token_data
async def _submit_otp_code(self, otp_code: str) -> str:
"""Submit OTP code to obtain an authorization code."""
otp_data = {
"action": "submit",
"flowName": "doLogIn",
"emailCode": otp_code,
}
url = self.generate_auth_url(v1_const.AFERO_GENERICS["AUTH_CODE_ENDPOINT"])
response = await self._auth_request(
"POST",
url,
params=self._otp_data["params"],
data=otp_data,
headers=self._otp_data["headers"],
allow_redirects=False,
)
if response.status != 302:
self.logger.warning("OTP code was invalid. Re-enter the OTP code.")
content = await response.text()
kc_error = get_kc_error(content)
login_data = await extract_login_data(content, "kc-otp-login-form")
otp_params = extract_login_codes(login_data, self._afero_client)
self._otp_data["params"] = otp_params
raise InvalidOTP(kc_error)
return await AferoAuth.parse_code(response)
[docs]
async def token(self, retry: bool = True) -> str:
"""Generate the token required to make Afero API calls."""
retry_refresh = False
async with self._async_lock:
if not self._token_data:
raise InvalidAuth("No token data available")
if await self._needs_token_refresh():
self.logger.debug("Token has not been generated or is expired")
try:
old_data = self._token_data
new_data = await self.generate_refresh_token()
_remove_secrets_not_in(old_data, new_data)
self._token_data = new_data
except InvalidAuth:
self.logger.debug("Provided refresh token is no longer valid.")
if not retry:
raise
retry_refresh = True
else:
self.logger.debug("Token has been successfully generated")
if retry_refresh:
return await self.token(retry=False)
bearer = self._token_data.token
if not bearer:
raise InvalidAuth("No token data available")
return bearer
async def extract_login_data(page: str, form_login_element: str) -> AuthSessionData:
"""Extract the required login data from the auth page.
:param page: the response from performing a GET against
v1_const.AFERO_CLIENTS[self._afero_client]['OPENID_URL']
"""
auth_page = BeautifulSoup(page, features="html.parser")
login_form = auth_page.find("form", id=form_login_element)
if login_form is None:
raise InvalidResponse("Unable to parse login page")
try:
login_url: str = login_form.attrs["action"]
except KeyError as err:
raise InvalidResponse("Unable to extract login url") from err
parsed_url = urlparse(login_url)
login_data = parse_qs(parsed_url.query)
try:
return AuthSessionData(
login_data["session_code"][0],
login_data["execution"][0],
login_data["tab_id"][0],
)
except (KeyError, IndexError) as err:
raise InvalidResponse("Unable to parse login url") from err
def extract_login_codes(data: AuthSessionData, client: str) -> dict:
return {
"session_code": data.session_code,
"execution": data.execution,
"client_id": v1_const.AFERO_CLIENTS[client]["AUTH_DEFAULT_CLIENT_ID"],
"tab_id": data.tab_id,
}
def get_kc_error(page: str) -> str:
"""Extract the error message from the otp page."""
auth_page = BeautifulSoup(page, features="html.parser")
error_div = auth_page.find("span", class_="kc-feedback-text")
if error_div is None:
return "Unknown error"
return error_div.text.strip()