diff --git a/.github/workflows/python-check.yml b/.github/workflows/python-check.yml new file mode 100644 index 0000000..b5b6b8f --- /dev/null +++ b/.github/workflows/python-check.yml @@ -0,0 +1,39 @@ +name: Python check + +on: + push: + branches: [ "main", "refactor" ] + pull_request: + branches: [ "main" ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11"] + + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install -r requirements.txt + python -m pip install flake8 pylint black + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics + # - name: Analysing the code with pylint + # run: | + # pylint --max-line-length 88 $(git ls-files '*.py') + - name: Check black style + run: | + black . --check diff --git a/README.md b/README.md index 46cfb42..b28df60 100644 --- a/README.md +++ b/README.md @@ -47,28 +47,28 @@ settings: ### List devices ```python import asyncio -from pyhon import HonConnection +from pyhon import Hon async def devices_example(): - async with HonConnection(USER, PASSWORD) as hon: - for device in hon.devices: - print(device.nick_name) + async with Hon(USER, PASSWORD) as hon: + for appliance in hon.appliances: + print(appliance.nick_name) asyncio.run(devices_example()) ``` ### Execute a command ```python -async with HonConnection(USER, PASSWORD) as hon: - washing_machine = hon.devices[0] +async with Hon(USER, PASSWORD) as hon: + washing_machine = hon.appliances[0] pause_command = washing_machine.commands["pauseProgram"] await pause_command.send() ``` ### Set command parameter ```python -async with HonConnection(USER, PASSWORD) as hon: - washing_machine = hon.devices[0] +async with Hon(USER, PASSWORD) as hon: + washing_machine = hon.appliances[0] start_command = washing_machine.commands["startProgram"] for name, setting in start_command.settings: print("Setting", name) diff --git a/pyhon/__init__.py b/pyhon/__init__.py index b13705c..93728b7 100644 --- a/pyhon/__init__.py +++ b/pyhon/__init__.py @@ -1 +1,4 @@ -from .api import HonConnection +from .connection.api import HonAPI +from .hon import Hon + +__all__ = ["Hon", "HonAPI"] diff --git a/pyhon/__main__.py b/pyhon/__main__.py index e5843d4..38e6958 100755 --- a/pyhon/__main__.py +++ b/pyhon/__main__.py @@ -6,12 +6,11 @@ import logging import sys from getpass import getpass from pathlib import Path -from pprint import pprint if __name__ == "__main__": sys.path.insert(0, str(Path(__file__).parent.parent)) -from pyhon import HonConnection +from pyhon import Hon, HonAPI _LOGGER = logging.getLogger(__name__) @@ -25,8 +24,12 @@ def get_arguments(): keys = subparser.add_parser("keys", help="print as key format") keys.add_argument("keys", help="print as key format", action="store_true") keys.add_argument("--all", help="print also full keys", action="store_true") - translate = subparser.add_parser("translate", help="print available translation keys") - translate.add_argument("translate", help="language (de, en, fr...)", metavar="LANGUAGE") + translate = subparser.add_parser( + "translate", help="print available translation keys" + ) + translate.add_argument( + "translate", help="language (de, en, fr...)", metavar="LANGUAGE" + ) translate.add_argument("--json", help="print as json", action="store_true") return vars(parser.parse_args()) @@ -51,7 +54,9 @@ def pretty_print(data, key="", intend=0, is_list=False): else: pretty_print(value, key=key, intend=intend) else: - print(f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}") + print( + f"{' ' * intend}{'- ' if is_list else ''}{key}{': ' if key else ''}{data}" + ) def key_print(data, key="", start=True): @@ -85,12 +90,17 @@ def create_command(commands, concat=False): async def translate(language, json_output=False): - async with HonConnection() as hon: + async with HonAPI(anonymous=True) as hon: keys = await hon.translation_keys(language) if json_output: print(json.dumps(keys, indent=4)) else: - clean_keys = json.dumps(keys).replace("\\n", "\\\\n").replace("\\\\r", "").replace("\\r", "") + clean_keys = ( + json.dumps(keys) + .replace("\\n", "\\\\n") + .replace("\\\\r", "") + .replace("\\r", "") + ) keys = json.loads(clean_keys) pretty_print(keys) @@ -104,8 +114,8 @@ async def main(): user = input("User for hOn account: ") if not (password := args["password"]): password = getpass("Password for hOn account: ") - async with HonConnection(user, password) as hon: - for device in hon.devices: + async with Hon(user, password) as hon: + for device in hon.appliances: print("=" * 10, device.appliance_type, "-", device.nick_name, "=" * 10) if args.get("keys"): data = device.data.copy() @@ -126,5 +136,5 @@ def start(): print("Aborted.") -if __name__ == '__main__': +if __name__ == "__main__": start() diff --git a/pyhon/api.py b/pyhon/api.py deleted file mode 100644 index 4a72274..0000000 --- a/pyhon/api.py +++ /dev/null @@ -1,196 +0,0 @@ -import asyncio -import json -import logging -import secrets -from datetime import datetime -from typing import List - -import aiohttp as aiohttp - -from pyhon import const -from pyhon.auth import HonAuth -from pyhon.device import HonDevice - -_LOGGER = logging.getLogger() - - -class HonConnection: - def __init__(self, email="", password="", session=None) -> None: - super().__init__() - self._email = email - self._password = password - self._request_headers = {"Content-Type": "application/json"} - self._session = session - self._devices = [] - self._mobile_id = secrets.token_hex(8) - - async def __aenter__(self): - self._session = aiohttp.ClientSession() - if self._email and self._password: - await self.setup() - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._session.close() - - @property - def devices(self) -> List[HonDevice]: - return self._devices - - @property - async def _headers(self): - if "cognito-token" not in self._request_headers or "id-token" not in self._request_headers: - auth = HonAuth() - if await auth.authorize(self._email, self._password, self._mobile_id): - self._request_headers["cognito-token"] = auth.cognito_token - self._request_headers["id-token"] = auth.id_token - else: - raise PermissionError("Can't Login") - return self._request_headers - - async def setup(self): - async with aiohttp.ClientSession() as session: - async with session.get(f"{const.API_URL}/commands/v1/appliance", - headers=await self._headers) as resp: - try: - appliances = (await resp.json())["payload"]["appliances"] - for appliance in appliances: - device = HonDevice(self, appliance) - if device.mac_address is None: - continue - await asyncio.gather(*[ - device.load_attributes(), - device.load_commands(), - device.load_statistics()]) - self._devices.append(device) - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after GET: %s", await resp.text()) - return False - return True - - async def load_commands(self, device: HonDevice): - params = { - "applianceType": device.appliance_type, - "code": device.appliance["code"], - "applianceModelId": device.appliance_model_id, - "firmwareId": device.appliance["eepromId"], - "macAddress": device.mac_address, - "fwVersion": device.appliance["fwVersion"], - "os": const.OS, - "appVersion": const.APP_VERSION, - "series": device.appliance["series"], - } - url = f"{const.API_URL}/commands/v1/retrieve" - async with self._session.get(url, params=params, headers=await self._headers) as response: - result = (await response.json()).get("payload", {}) - if not result or result.pop("resultCode") != "0": - return {} - return result - - async def command_history(self, device: HonDevice): - url = f"{const.API_URL}/commands/v1/appliance/{device.mac_address}/history" - async with self._session.get(url, headers=await self._headers) as response: - result = await response.json() - if not result or not result.get("payload"): - return {} - return result["payload"]["history"] - - async def last_activity(self, device: HonDevice): - url = f"{const.API_URL}/commands/v1/retrieve-last-activity" - params = {"macAddress": device.mac_address} - async with self._session.get(url, params=params, headers=await self._headers) as response: - result = await response.json() - if result and (activity := result.get("attributes")): - return activity - return {} - - async def appliance_configuration(self): - url = f"{const.API_URL}/config/v1/appliance-configuration" - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - async with self._session.get(url, headers=headers) as response: - result = await response.json() - if result and (data := result.get("payload")): - return data - return {} - - async def app_config(self, language="en", beta=True): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - url = f"{const.API_URL}/app-config" - payload = { - "languageCode": language, - "beta": beta, - "appVersion": const.APP_VERSION, - "os": const.OS - } - payload = json.dumps(payload, separators=(',', ':')) - async with self._session.post(url, headers=headers, data=payload) as response: - if (result := await response.json()) and (data := result.get("payload")): - return data - return {} - - async def translation_keys(self, language="en"): - headers = {"x-api-key": const.API_KEY, "content-type": "application/json"} - config = await self.app_config(language=language) - if url := config.get("language", {}).get("jsonPath"): - async with self._session.get(url, headers=headers) as response: - if result := await response.json(): - return result - return {} - - async def load_attributes(self, device: HonDevice, loop=False): - params = { - "macAddress": device.mac_address, - "applianceType": device.appliance_type, - "category": "CYCLE" - } - url = f"{const.API_URL}/commands/v1/context" - async with self._session.get(url, params=params, headers=await self._headers) as response: - if response.status == 403 and not loop: - _LOGGER.error("%s - Error %s - %s", url, response.status, await response.text()) - self._request_headers.pop("cognito-token", None) - self._request_headers.pop("id-token", None) - return await self.load_attributes(device, loop=True) - return (await response.json()).get("payload", {}) - - async def load_statistics(self, device: HonDevice): - params = { - "macAddress": device.mac_address, - "applianceType": device.appliance_type - } - url = f"{const.API_URL}/commands/v1/statistics" - async with self._session.get(url, params=params, headers=await self._headers) as response: - return (await response.json()).get("payload", {}) - - async def send_command(self, device, command, parameters, ancillary_parameters): - now = datetime.utcnow().isoformat() - data = { - "macAddress": device.mac_address, - "timestamp": f"{now[:-3]}Z", - "commandName": command, - "transactionId": f"{device.mac_address}_{now[:-3]}Z", - "applianceOptions": device.commands_options, - "device": { - "mobileId": self._mobile_id, - "mobileOs": const.OS, - "osVersion": const.OS_VERSION, - "appVersion": const.APP_VERSION, - "deviceModel": const.DEVICE_MODEL - }, - "attributes": { - "channel": "mobileApp", - "origin": "standardProgram", - "energyLabel": "0" - }, - "ancillaryParameters": ancillary_parameters, - "parameters": parameters, - "applianceType": device.appliance_type - } - url = f"{const.API_URL}/commands/v1/send" - async with self._session.post(url, headers=await self._headers, json=data) as resp: - try: - json_data = await resp.json() - except json.JSONDecodeError: - return False - if json_data["payload"]["resultCode"] == "0": - return True - return False diff --git a/pyhon/device.py b/pyhon/appliance.py similarity index 69% rename from pyhon/device.py rename to pyhon/appliance.py index 711b7e1..5ce7db3 100644 --- a/pyhon/device.py +++ b/pyhon/appliance.py @@ -5,12 +5,12 @@ from pyhon.commands import HonCommand from pyhon.parameter import HonParameterFixed -class HonDevice: - def __init__(self, connector, appliance): - if attributes := appliance.get("attributes"): - appliance["attributes"] = {v["parName"]: v["parValue"] for v in attributes} - self._appliance = appliance - self._connector = connector +class HonAppliance: + def __init__(self, api, info): + if attributes := info.get("attributes"): + info["attributes"] = {v["parName"]: v["parValue"] for v in attributes} + self._info = info + self._api = api self._appliance_model = {} self._commands = {} @@ -18,7 +18,9 @@ class HonDevice: self._attributes = {} try: - self._extra = importlib.import_module(f'pyhon.appliances.{self.appliance_type.lower()}').Appliance() + self._extra = importlib.import_module( + f"pyhon.appliances.{self.appliance_type.lower()}" + ).Appliance() except ModuleNotFoundError: self._extra = None @@ -36,7 +38,7 @@ class HonDevice: return self.data[item] if item in self.attributes["parameters"]: return self.attributes["parameters"].get(item) - return self.appliance[item] + return self.info[item] def get(self, item, default=None): try: @@ -46,23 +48,23 @@ class HonDevice: @property def appliance_model_id(self): - return self._appliance.get("applianceModelId") + return self._info.get("applianceModelId") @property def appliance_type(self): - return self._appliance.get("applianceTypeName") + return self._info.get("applianceTypeName") @property def mac_address(self): - return self._appliance.get("macAddress") + return self._info.get("macAddress") @property def model_name(self): - return self._appliance.get("modelName") + return self._info.get("modelName") @property def nick_name(self): - return self._appliance.get("nickName") + return self._info.get("nickName") @property def commands_options(self): @@ -81,13 +83,20 @@ class HonDevice: return self._statistics @property - def appliance(self): - return self._appliance + def info(self): + return self._info async def _recover_last_command_states(self, commands): - command_history = await self._connector.command_history(self) + command_history = await self._api.command_history(self) for name, command in commands.items(): - last = next((index for (index, d) in enumerate(command_history) if d.get("command", {}).get("commandName") == name), None) + last = next( + ( + index + for (index, d) in enumerate(command_history) + if d.get("command", {}).get("commandName") == name + ), + None, + ) if last is None: continue parameters = command_history[last].get("command", {}).get("parameters", {}) @@ -95,24 +104,29 @@ class HonDevice: command.set_program(parameters.pop("program").split(".")[-1].lower()) command = self.commands[name] for key, data in command.settings.items(): - if not isinstance(data, HonParameterFixed) and parameters.get(key) is not None: + if ( + not isinstance(data, HonParameterFixed) + and parameters.get(key) is not None + ): with suppress(ValueError): data.value = parameters.get(key) async def load_commands(self): - raw = await self._connector.load_commands(self) + raw = await self._api.load_commands(self) self._appliance_model = raw.pop("applianceModel") for item in ["settings", "options", "dictionaryId"]: raw.pop(item) commands = {} for command, attr in raw.items(): if "parameters" in attr: - commands[command] = HonCommand(command, attr, self._connector, self) + commands[command] = HonCommand(command, attr, self._api, self) elif "parameters" in attr[list(attr)[0]]: multi = {} for program, attr2 in attr.items(): program = program.split(".")[-1].lower() - cmd = HonCommand(command, attr2, self._connector, self, multi=multi, program=program) + cmd = HonCommand( + command, attr2, self._api, self, multi=multi, program=program + ) multi[program] = cmd commands[command] = cmd self._commands = commands @@ -137,20 +151,24 @@ class HonDevice: return result async def load_attributes(self): - self._attributes = await self._connector.load_attributes(self) + self._attributes = await self._api.load_attributes(self) for name, values in self._attributes.pop("shadow").get("parameters").items(): self._attributes.setdefault("parameters", {})[name] = values["parNewVal"] async def load_statistics(self): - self._statistics = await self._connector.load_statistics(self) + self._statistics = await self._api.load_statistics(self) async def update(self): await self.load_attributes() @property def data(self): - result = {"attributes": self.attributes, "appliance": self.appliance, "statistics": self.statistics, - **self.parameters} + result = { + "attributes": self.attributes, + "appliance": self.info, + "statistics": self.statistics, + **self.parameters, + } if self._extra: return self._extra.data(result) return result diff --git a/pyhon/auth.py b/pyhon/auth.py deleted file mode 100644 index 1baca58..0000000 --- a/pyhon/auth.py +++ /dev/null @@ -1,157 +0,0 @@ -import json -import logging -import re -import secrets -import urllib -from urllib import parse - -import aiohttp as aiohttp -from yarl import URL - -from pyhon import const - -_LOGGER = logging.getLogger() - - -class HonAuth: - def __init__(self) -> None: - self._access_token = "" - self._refresh_token = "" - self._cognito_token = "" - self._id_token = "" - - @property - def cognito_token(self): - return self._cognito_token - - @property - def id_token(self): - return self._id_token - - @property - def access_token(self): - return self._access_token - - @property - def refresh_token(self): - return self._refresh_token - - async def _load_login(self, session): - nonce = secrets.token_hex(16) - nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" - params = { - "response_type": "token+id_token", - "client_id": const.CLIENT_ID, - "redirect_uri": urllib.parse.quote(f"{const.APP}://mobilesdk/detect/oauth/done"), - "display": "touch", - "scope": "api openid refresh_token web", - "nonce": nonce - } - params = "&".join([f"{k}={v}" for k, v in params.items()]) - async with session.get(f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}") as resp: - if not (login_url := re.findall("url = '(.+?)'", await resp.text())): - return False - async with session.get(login_url[0], allow_redirects=False) as redirect1: - if not (url := redirect1.headers.get("Location")): - return False - async with session.get(url, allow_redirects=False) as redirect2: - if not (url := redirect2.headers.get("Location") + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn"): - return False - async with session.get(URL(url, encoded=True)) as login_screen: - if context := re.findall('"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text()): - fw_uid, loaded_str = context[0] - loaded = json.loads(loaded_str) - login_url = login_url[0].replace("/".join(const.AUTH_API.split("/")[:-1]), "") - return fw_uid, loaded, login_url - return False - - async def _login(self, session, email, password, fw_uid, loaded, login_url): - data = { - "message": { - "actions": [ - { - "id": "79;a", - "descriptor": "apex://LightningLoginCustomController/ACTION$login", - "callingDescriptor": "markup://c:loginForm", - "params": { - "username": email, - "password": password, - "startUrl": parse.unquote(login_url.split("startURL=")[-1]).split("%3D")[0] - } - } - ] - }, - "aura.context": { - "mode": "PROD", - "fwuid": fw_uid, - "app": "siteforce:loginApp2", - "loaded": loaded, - "dn": [], - "globals": {}, - "uad": False}, - "aura.pageURI": login_url, - "aura.token": None} - - params = {"r": 3, "other.LightningLoginCustom.login": 1} - async with session.post( - const.AUTH_API + "/s/sfsites/aura", - headers={"Content-Type": "application/x-www-form-urlencoded"}, - data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), - params=params - ) as response: - if response.status == 200: - try: - return (await response.json())["events"][0]["attributes"]["values"]["url"] - except json.JSONDecodeError: - pass - _LOGGER.error("Unable to login: %s\n%s", response.status, await response.text()) - return "" - - async def _get_token(self, session, url): - async with session.get(url) as resp: - if resp.status != 200: - _LOGGER.error("Unable to get token: %s", resp.status) - return False - url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) - async with session.get(url[0]) as resp: - if resp.status != 200: - _LOGGER.error("Unable to get token: %s", resp.status) - return False - url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) - url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] - async with session.get(url) as resp: - if resp.status != 200: - _LOGGER.error("Unable to connect to the login service: %s", resp.status) - return False - text = await resp.text() - if access_token := re.findall("access_token=(.*?)&", text): - self._access_token = access_token[0] - if refresh_token := re.findall("refresh_token=(.*?)&", text): - self._refresh_token = refresh_token[0] - if id_token := re.findall("id_token=(.*?)&", text): - self._id_token = id_token[0] - return True - - async def authorize(self, email, password, mobile_id): - headers = {"user-agent": const.USER_AGENT} - async with aiohttp.ClientSession(headers=headers) as session: - if login_site := await self._load_login(session): - fw_uid, loaded, login_url = login_site - else: - return False - if not (url := await self._login(session, email, password, fw_uid, loaded, login_url)): - return False - if not await self._get_token(session, url): - return False - - post_headers = {"Content-Type": "application/json", "id-token": self._id_token} - data = {"appVersion": const.APP_VERSION, "mobileId": mobile_id, "osVersion": const.OS_VERSION, - "os": const.OS, "deviceModel": const.DEVICE_MODEL} - async with session.post(f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data) as resp: - try: - json_data = await resp.json() - except json.JSONDecodeError: - _LOGGER.error("No JSON Data after POST: %s", await resp.text()) - return False - self._cognito_token = json_data["cognitoUser"]["Token"] - return True diff --git a/pyhon/commands.py b/pyhon/commands.py index 7c1e9ee..b12493a 100644 --- a/pyhon/commands.py +++ b/pyhon/commands.py @@ -1,4 +1,9 @@ -from pyhon.parameter import HonParameterFixed, HonParameterEnum, HonParameterRange, HonParameterProgram +from pyhon.parameter import ( + HonParameterFixed, + HonParameterEnum, + HonParameterRange, + HonParameterProgram, +) class HonCommand: @@ -10,7 +15,9 @@ class HonCommand: self._program = program self._description = attributes.get("description", "") self._parameters = self._create_parameters(attributes.get("parameters", {})) - self._ancillary_parameters = self._create_parameters(attributes.get("ancillaryParameters", {})) + self._ancillary_parameters = self._create_parameters( + attributes.get("ancillaryParameters", {}) + ) def __repr__(self): return f"{self._name} command" @@ -35,11 +42,18 @@ class HonCommand: @property def ancillary_parameters(self): - return {key: parameter.value for key, parameter in self._ancillary_parameters.items()} + return { + key: parameter.value + for key, parameter in self._ancillary_parameters.items() + } async def send(self): - parameters = {name: parameter.value for name, parameter in self._parameters.items()} - return await self._connector.send_command(self._device, self._name, parameters, self.ancillary_parameters) + parameters = { + name: parameter.value for name, parameter in self._parameters.items() + } + return await self._connector.send_command( + self._device, self._name, parameters, self.ancillary_parameters + ) def get_programs(self): return self._multi @@ -61,11 +75,16 @@ class HonCommand: def setting_keys(self): if not self._multi: return self._get_settings_keys() - result = [key for cmd in self._multi.values() for key in self._get_settings_keys(cmd)] + result = [ + key for cmd in self._multi.values() for key in self._get_settings_keys(cmd) + ] return list(set(result + ["program"])) @property def settings(self): """Parameters with typology enum and range""" - return {s: self._parameters.get(s) for s in self.setting_keys if self._parameters.get(s) is not None} - + return { + s: self._parameters.get(s) + for s in self.setting_keys + if self._parameters.get(s) is not None + } diff --git a/pyhon/connection/__init__.py b/pyhon/connection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pyhon/connection/api.py b/pyhon/connection/api.py new file mode 100644 index 0000000..dfa2b65 --- /dev/null +++ b/pyhon/connection/api.py @@ -0,0 +1,156 @@ +import json +import logging +from datetime import datetime + +from pyhon import const +from pyhon.appliance import HonAppliance +from pyhon.connection.handler import HonConnectionHandler, HonAnonymousConnectionHandler + +_LOGGER = logging.getLogger() + + +class HonAPI: + def __init__(self, email="", password="", anonymous=False, session=None) -> None: + super().__init__() + self._email = email + self._password = password + self._anonymous = anonymous + self._hon = None + self._hon_anonymous = None + self._session = session + + async def __aenter__(self): + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def create(self): + self._hon_anonymous = await HonAnonymousConnectionHandler( + self._session + ).create() + if not self._anonymous: + self._hon = await HonConnectionHandler( + self._email, self._password, self._session + ).create() + return self + + async def load_appliances(self): + async with self._hon.get(f"{const.API_URL}/commands/v1/appliance") as resp: + return await resp.json() + + async def load_commands(self, appliance: HonAppliance): + params = { + "applianceType": appliance.appliance_type, + "code": appliance.info["code"], + "applianceModelId": appliance.appliance_model_id, + "firmwareId": appliance.info["eepromId"], + "macAddress": appliance.mac_address, + "fwVersion": appliance.info["fwVersion"], + "os": const.OS, + "appVersion": const.APP_VERSION, + "series": appliance.info["series"], + } + url = f"{const.API_URL}/commands/v1/retrieve" + async with self._hon.get(url, params=params) as response: + result = (await response.json()).get("payload", {}) + if not result or result.pop("resultCode") != "0": + return {} + return result + + async def command_history(self, appliance: HonAppliance): + url = f"{const.API_URL}/commands/v1/appliance/{appliance.mac_address}/history" + async with self._hon.get(url) as response: + result = await response.json() + if not result or not result.get("payload"): + return {} + return result["payload"]["history"] + + async def last_activity(self, appliance: HonAppliance): + url = f"{const.API_URL}/commands/v1/retrieve-last-activity" + params = {"macAddress": appliance.mac_address} + async with self._hon.get(url, params=params) as response: + result = await response.json() + if result and (activity := result.get("attributes")): + return activity + return {} + + async def load_attributes(self, appliance: HonAppliance): + params = { + "macAddress": appliance.mac_address, + "applianceType": appliance.appliance_type, + "category": "CYCLE", + } + url = f"{const.API_URL}/commands/v1/context" + async with self._hon.get(url, params=params) as response: + return (await response.json()).get("payload", {}) + + async def load_statistics(self, appliance: HonAppliance): + params = { + "macAddress": appliance.mac_address, + "applianceType": appliance.appliance_type, + } + url = f"{const.API_URL}/commands/v1/statistics" + async with self._hon.get(url, params=params) as response: + return (await response.json()).get("payload", {}) + + async def send_command(self, appliance, command, parameters, ancillary_parameters): + now = datetime.utcnow().isoformat() + data = { + "macAddress": appliance.mac_address, + "timestamp": f"{now[:-3]}Z", + "commandName": command, + "transactionId": f"{appliance.mac_address}_{now[:-3]}Z", + "applianceOptions": appliance.commands_options, + "appliance": self._hon.device.get(), + "attributes": { + "channel": "mobileApp", + "origin": "standardProgram", + "energyLabel": "0", + }, + "ancillaryParameters": ancillary_parameters, + "parameters": parameters, + "applianceType": appliance.appliance_type, + } + url = f"{const.API_URL}/commands/v1/send" + async with self._hon.post(url, json=data) as resp: + json_data = await resp.json() + if json_data.get("payload", {}).get("resultCode") == "0": + return True + return False + + async def appliance_configuration(self): + url = f"{const.API_URL}/config/v1/appliance-configuration" + async with self._hon_anonymous.get(url) as response: + result = await response.json() + if result and (data := result.get("payload")): + return data + return {} + + async def app_config(self, language="en", beta=True): + url = f"{const.API_URL}/app-config" + payload = { + "languageCode": language, + "beta": beta, + "appVersion": const.APP_VERSION, + "os": const.OS, + } + payload = json.dumps(payload, separators=(",", ":")) + async with self._hon_anonymous.post(url, data=payload) as response: + if (result := await response.json()) and (data := result.get("payload")): + return data + return {} + + async def translation_keys(self, language="en"): + config = await self.app_config(language=language) + if url := config.get("language", {}).get("jsonPath"): + async with self._hon_anonymous.get(url) as response: + if result := await response.json(): + return result + return {} + + async def close(self): + if self._hon: + await self._hon.close() + if self._hon_anonymous: + await self._hon_anonymous.close() diff --git a/pyhon/connection/auth.py b/pyhon/connection/auth.py new file mode 100644 index 0000000..b6e0d5e --- /dev/null +++ b/pyhon/connection/auth.py @@ -0,0 +1,200 @@ +import json +import logging +import re +import secrets +import urllib +from pprint import pformat +from urllib import parse + +from yarl import URL + +from pyhon import const + +_LOGGER = logging.getLogger() + + +class HonAuth: + def __init__(self, session, email, password, device) -> None: + self._session = session + self._email = email + self._password = password + self._access_token = "" + self._refresh_token = "" + self._cognito_token = "" + self._id_token = "" + self._device = device + + @property + def cognito_token(self): + return self._cognito_token + + @property + def id_token(self): + return self._id_token + + @property + def access_token(self): + return self._access_token + + @property + def refresh_token(self): + return self._refresh_token + + async def _load_login(self): + nonce = secrets.token_hex(16) + nonce = f"{nonce[:8]}-{nonce[8:12]}-{nonce[12:16]}-{nonce[16:20]}-{nonce[20:]}" + params = { + "response_type": "token+id_token", + "client_id": const.CLIENT_ID, + "redirect_uri": urllib.parse.quote( + f"{const.APP}://mobilesdk/detect/oauth/done" + ), + "display": "touch", + "scope": "api openid refresh_token web", + "nonce": nonce, + } + params = "&".join([f"{k}={v}" for k, v in params.items()]) + async with self._session.get( + f"{const.AUTH_API}/services/oauth2/authorize/expid_Login?{params}" + ) as resp: + if not (login_url := re.findall("url = '(.+?)'", await resp.text())): + return False + async with self._session.get(login_url[0], allow_redirects=False) as redirect1: + if not (url := redirect1.headers.get("Location")): + return False + async with self._session.get(url, allow_redirects=False) as redirect2: + if not ( + url := redirect2.headers.get("Location") + + "&System=IoT_Mobile_App&RegistrationSubChannel=hOn" + ): + return False + async with self._session.get(URL(url, encoded=True)) as login_screen: + if context := re.findall( + '"fwuid":"(.*?)","loaded":(\\{.*?})', await login_screen.text() + ): + fw_uid, loaded_str = context[0] + loaded = json.loads(loaded_str) + login_url = login_url[0].replace( + "/".join(const.AUTH_API.split("/")[:-1]), "" + ) + return fw_uid, loaded, login_url + return False + + async def _login(self, fw_uid, loaded, login_url): + data = { + "message": { + "actions": [ + { + "id": "79;a", + "descriptor": "apex://LightningLoginCustomController/ACTION$login", + "callingDescriptor": "markup://c:loginForm", + "params": { + "username": self._email, + "password": self._password, + "startUrl": parse.unquote( + login_url.split("startURL=")[-1] + ).split("%3D")[0], + }, + } + ] + }, + "aura.context": { + "mode": "PROD", + "fwuid": fw_uid, + "app": "siteforce:loginApp2", + "loaded": loaded, + "dn": [], + "globals": {}, + "uad": False, + }, + "aura.pageURI": login_url, + "aura.token": None, + } + params = {"r": 3, "other.LightningLoginCustom.login": 1} + async with self._session.post( + const.AUTH_API + "/s/sfsites/aura", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data="&".join(f"{k}={json.dumps(v)}" for k, v in data.items()), + params=params, + ) as response: + if response.status == 200: + try: + data = await response.json() + return data["events"][0]["attributes"]["values"]["url"] + except json.JSONDecodeError: + pass + except KeyError: + _LOGGER.error( + "Can't get login url - %s", pformat(await response.json()) + ) + _LOGGER.error( + "Unable to login: %s\n%s", response.status, await response.text() + ) + return "" + + async def _get_token(self, url): + async with self._session.get(url) as resp: + if resp.status != 200: + _LOGGER.error("Unable to get token: %s", resp.status) + return False + url = re.findall("href\\s*=\\s*[\"'](http.+?)[\"']", await resp.text()) + if not url: + _LOGGER.error("Can't get login url - \n%s", await resp.text()) + raise PermissionError + async with self._session.get(url[0]) as resp: + if resp.status != 200: + _LOGGER.error("Unable to get token: %s", resp.status) + return False + url = re.findall("href\\s*=\\s*[\"'](.*?)[\"']", await resp.text()) + url = "/".join(const.AUTH_API.split("/")[:-1]) + url[0] + async with self._session.get(url) as resp: + if resp.status != 200: + _LOGGER.error("Unable to connect to the login service: %s", resp.status) + return False + text = await resp.text() + if access_token := re.findall("access_token=(.*?)&", text): + self._access_token = access_token[0] + if refresh_token := re.findall("refresh_token=(.*?)&", text): + self._refresh_token = refresh_token[0] + if id_token := re.findall("id_token=(.*?)&", text): + self._id_token = id_token[0] + return True + + async def authorize(self): + if login_site := await self._load_login(): + fw_uid, loaded, login_url = login_site + else: + return False + if not (url := await self._login(fw_uid, loaded, login_url)): + return False + if not await self._get_token(url): + return False + + post_headers = {"id-token": self._id_token} + data = self._device.get() + async with self._session.post( + f"{const.API_URL}/auth/v1/login", headers=post_headers, json=data + ) as resp: + try: + json_data = await resp.json() + except json.JSONDecodeError: + _LOGGER.error("No JSON Data after POST: %s", await resp.text()) + return False + self._cognito_token = json_data["cognitoUser"]["Token"] + return True + + async def refresh(self): + params = { + "client_id": const.CLIENT_ID, + "refresh_token": self._refresh_token, + "grant_type": "refresh_token", + } + async with self._session.post( + f"{const.AUTH_API}/services/oauth2/token", params=params + ) as resp: + if resp.status >= 400: + return False + data = await resp.json() + self._id_token = data["id_token"] + self._access_token = data["access_token"] + return True diff --git a/pyhon/connection/device.py b/pyhon/connection/device.py new file mode 100644 index 0000000..f7399d9 --- /dev/null +++ b/pyhon/connection/device.py @@ -0,0 +1,41 @@ +import secrets + +from pyhon import const + + +class HonDevice: + def __init__(self): + self._app_version = const.APP_VERSION + self._os_version = const.OS_VERSION + self._os = const.OS + self._device_model = const.DEVICE_MODEL + self._mobile_id = secrets.token_hex(8) + + @property + def app_version(self): + return self._app_version + + @property + def os_version(self): + return self._os_version + + @property + def os(self): + return self._os + + @property + def device_model(self): + return self._device_model + + @property + def mobile_id(self): + return self._mobile_id + + def get(self): + return { + "appVersion": self.app_version, + "mobileId": self.mobile_id, + "osVersion": self.os_version, + "os": self.os, + "deviceModel": self.device_model, + } diff --git a/pyhon/connection/handler.py b/pyhon/connection/handler.py new file mode 100644 index 0000000..b16f48a --- /dev/null +++ b/pyhon/connection/handler.py @@ -0,0 +1,127 @@ +import json +from contextlib import asynccontextmanager + +import aiohttp + +from pyhon import const +from pyhon.connection.auth import HonAuth, _LOGGER +from pyhon.connection.device import HonDevice + + +class HonBaseConnectionHandler: + _HEADERS = {"user-agent": const.USER_AGENT, "Content-Type": "application/json"} + + def __init__(self, session=None): + self._session = session + self._auth = None + + async def __aenter__(self): + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def create(self): + self._session = aiohttp.ClientSession(headers=self._HEADERS) + return self + + @asynccontextmanager + async def _intercept(self, method, *args, loop=0, **kwargs): + raise NotImplementedError + + @asynccontextmanager + async def get(self, *args, **kwargs): + async with self._intercept(self._session.get, *args, **kwargs) as response: + yield response + + @asynccontextmanager + async def post(self, *args, **kwargs): + async with self._intercept(self._session.post, *args, **kwargs) as response: + yield response + + async def close(self): + await self._session.close() + + +class HonConnectionHandler(HonBaseConnectionHandler): + def __init__(self, email, password, session=None): + super().__init__(session=session) + self._device = HonDevice() + self._email = email + self._password = password + if not self._email: + raise PermissionError("Login-Error - An email address must be specified") + if not self._password: + raise PermissionError("Login-Error - A password address must be specified") + self._request_headers = {} + + @property + def device(self): + return self._device + + async def create(self): + await super().create() + self._auth = HonAuth(self._session, self._email, self._password, self._device) + return self + + async def _check_headers(self, headers): + if ( + "cognito-token" not in self._request_headers + or "id-token" not in self._request_headers + ): + if await self._auth.authorize(): + self._request_headers["cognito-token"] = self._auth.cognito_token + self._request_headers["id-token"] = self._auth.id_token + else: + raise PermissionError("Can't Login") + return {h: v for h, v in self._request_headers.items() if h not in headers} + + @asynccontextmanager + async def _intercept(self, method, *args, loop=0, **kwargs): + kwargs["headers"] = await self._check_headers(kwargs.get("headers", {})) + async with method(*args, **kwargs) as response: + if response.status == 403 and not loop: + _LOGGER.info("Try refreshing token...") + await self._auth.refresh() + yield await self._intercept(method, *args, loop=loop + 1, **kwargs) + elif response.status == 403 and loop < 2: + _LOGGER.warning( + "%s - Error %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + await self.create() + yield await self._intercept(method, *args, loop=loop + 1, **kwargs) + elif loop >= 2: + _LOGGER.error( + "%s - Error %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + raise PermissionError("Login failure") + else: + try: + await response.json() + yield response + except json.JSONDecodeError: + _LOGGER.warning( + "%s - JsonDecodeError %s - %s", + response.request_info.url, + response.status, + await response.text(), + ) + yield {} + + +class HonAnonymousConnectionHandler(HonBaseConnectionHandler): + _HEADERS = HonBaseConnectionHandler._HEADERS | {"x-api-key": const.API_KEY} + + @asynccontextmanager + async def _intercept(self, method, *args, loop=0, **kwargs): + kwargs["headers"] = kwargs.pop("headers", {}) | self._HEADERS + async with method(*args, **kwargs) as response: + if response.status == 403: + print("Can't authorize") + yield response diff --git a/pyhon/hon.py b/pyhon/hon.py new file mode 100644 index 0000000..322b077 --- /dev/null +++ b/pyhon/hon.py @@ -0,0 +1,48 @@ +import asyncio +from typing import List + +from pyhon import HonAPI +from pyhon.appliance import HonAppliance + + +class Hon: + def __init__(self, email, password, session=None): + self._email = email + self._password = password + self._session = session + self._appliances = [] + self._api = None + + async def __aenter__(self): + return await self.create() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def create(self): + self._api = await HonAPI( + self._email, self._password, session=self._session + ).create() + await self.setup() + return self + + @property + def appliances(self) -> List[HonAppliance]: + return self._appliances + + async def setup(self): + for appliance in (await self._api.load_appliances())["payload"]["appliances"]: + appliance = HonAppliance(self._api, appliance) + if appliance.mac_address is None: + continue + await asyncio.gather( + *[ + appliance.load_attributes(), + appliance.load_commands(), + appliance.load_statistics(), + ] + ) + self._appliances.append(appliance) + + async def close(self): + await self._api.close() diff --git a/pyhon/parameter.py b/pyhon/parameter.py index 2e2d25b..bbb269b 100644 --- a/pyhon/parameter.py +++ b/pyhon/parameter.py @@ -82,7 +82,9 @@ class HonParameterRange(HonParameter): if self._min <= value <= self._max and not value % self._step: self._value = value else: - raise ValueError(f"Allowed: min {self._min} max {self._max} step {self._step}") + raise ValueError( + f"Allowed: min {self._min} max {self._max} step {self._step}" + ) class HonParameterEnum(HonParameter): diff --git a/setup.py b/setup.py index df6672c..83d4176 100644 --- a/setup.py +++ b/setup.py @@ -7,11 +7,11 @@ with open("README.md", "r") as f: setup( name="pyhOn", - version="0.5.0", + version="0.6.0", author="Andre Basche", description="Control hOn devices with python", long_description=long_description, - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", project_urls={ "GitHub": "https://github.com/Andre0512/pyhOn", "PyPI": "https://pypi.org/project/pyhOn", @@ -33,8 +33,8 @@ setup( "Topic :: Software Development :: Libraries :: Python Modules", ], entry_points={ - 'console_scripts': [ - 'pyhOn = pyhon.__main__:start', + "console_scripts": [ + "pyhOn = pyhon.__main__:start", ] - } + }, )