From 708c72be04148d55ef9925512bb3c8f57fb26113 Mon Sep 17 00:00:00 2001 From: sdb9696 Date: Sat, 30 Dec 2023 11:11:23 +0000 Subject: [PATCH 1/3] Add known smart requests to dump_devinfo --- devtools/dump_devinfo.py | 141 ++++++++++++---- kasa/smartrequests.py | 350 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 462 insertions(+), 29 deletions(-) create mode 100644 kasa/smartrequests.py diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index c03e97d55..3c380e0a4 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -17,11 +17,14 @@ import asyncclick as click -from kasa import AuthenticationException, Credentials, Discover, SmartDevice +from kasa import AuthenticationException, Credentials, Discover from kasa.discover import DiscoveryResult +from kasa.exceptions import SmartErrorCode +from kasa.smartrequests import COMPONENT_REQUESTS, SmartRequest from kasa.tapo.tapodevice import TapoDevice Call = namedtuple("Call", "module method") +SmartCall = namedtuple("SmartCall", "module request should_succeed") def scrub(res): @@ -46,11 +49,19 @@ def scrub(res): "oem_id", "nickname", "alias", + "bssid", + "channel", ] for k, v in res.items(): if isinstance(v, collections.abc.Mapping): res[k] = scrub(res.get(k)) + elif ( + isinstance(v, list) + and len(v) > 0 + and isinstance(v[0], collections.abc.Mapping) + ): + res[k] = [scrub(vi) for vi in v] else: if k in keys_to_scrub: if k in ["latitude", "latitude_i", "longitude", "longitude_i"]: @@ -64,6 +75,8 @@ def scrub(res): v = base64.b64encode(b"#MASKED_NAME#").decode() elif k in ["alias"]: v = "#MASKED_NAME#" + elif isinstance(res[k], int): + v = 0 else: v = re.sub(r"\w", "0", v) @@ -179,7 +192,7 @@ async def get_legacy_fixture(device): ) ) - if device._discovery_info: + if device._discovery_info and not device._discovery_info.get("system"): # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. dr = DiscoveryResult(**device._discovery_info) @@ -200,33 +213,88 @@ async def get_legacy_fixture(device): return save_filename, copy_folder, final -async def get_smart_fixture(device: SmartDevice): +async def get_smart_fixture(device: TapoDevice): """Get fixture for new TAPO style protocol.""" - items = [ - Call(module="component_nego", method="component_nego"), - Call(module="device_info", method="get_device_info"), - Call(module="device_usage", method="get_device_usage"), - Call(module="device_time", method="get_device_time"), - Call(module="energy_usage", method="get_energy_usage"), - Call(module="current_power", method="get_current_power"), - Call(module="temp_humidity_records", method="get_temp_humidity_records"), - Call(module="child_device_list", method="get_child_device_list"), - Call( - module="trigger_logs", - method={"get_trigger_logs": {"page_size": 5, "start_id": 0}}, + extra_test_calls = [ + SmartCall( + module="temp_humidity_records", + request=SmartRequest.get_raw_request("get_temp_humidity_records"), + should_succeed=False, ), - Call( + SmartCall( + module="child_device_list", + request=SmartRequest.get_raw_request("get_child_device_list"), + should_succeed=False, + ), + SmartCall( module="child_device_component_list", - method="get_child_device_component_list", + request=SmartRequest.get_raw_request("get_child_device_component_list"), + should_succeed=False, + ), + SmartCall( + module="trigger_logs", + request=SmartRequest.get_raw_request( + "get_trigger_logs", SmartRequest.GetTriggerLogsParams(5, 0) + ), + should_succeed=False, ), ] successes = [] - for test_call in items: + try: + click.echo("Testing component_nego call ..", nl=False) + component_info_response = await device.protocol.query( + SmartRequest._create_request_dict(SmartRequest.component_nego()) + ) + click.echo(click.style("OK", fg="green")) + successes.append( + SmartCall( + module="component_nego", + request=SmartRequest("component_nego"), + should_succeed=True, + ) + ) + except AuthenticationException as ex: + click.echo( + click.style( + f"Unable to query the device due to an authentication error: {ex}", + bold=True, + fg="red", + ) + ) + exit(1) + except Exception: + click.echo( + click.style("CRITICAL FAIL on component_nego call, exiting", fg="red") + ) + exit(1) + + test_calls = [] + should_succeed = [] + + for item in component_info_response["component_nego"]["component_list"]: + component_id = item["id"] + if requests := COMPONENT_REQUESTS.get(component_id): + component_test_calls = [ + SmartCall(module=component_id, request=request, should_succeed=True) + for request in requests + ] + test_calls.extend(component_test_calls) + should_succeed.extend(component_test_calls) + elif component_id not in COMPONENT_REQUESTS: + click.echo(f"Skipping {component_id}..", nl=False) + click.echo(click.style("UNSUPPORTED", fg="yellow")) + + test_calls.extend(extra_test_calls) + + for test_call in test_calls: + click.echo(f"Testing {test_call.module}..", nl=False) try: click.echo(f"Testing {test_call}..", nl=False) - response = await device.protocol.query(test_call.method) + response = await device.protocol.query( + SmartRequest._create_request_dict(test_call.request) + ) except AuthenticationException as ex: click.echo( click.style( @@ -237,22 +305,40 @@ async def get_smart_fixture(device: SmartDevice): ) exit(1) except Exception as ex: - click.echo(click.style(f"FAIL {ex}", fg="red")) + if ( + not test_call.should_succeed + and hasattr(ex, "error_code") + and ex.error_code == SmartErrorCode.UNKNOWN_METHOD_ERROR + ): + click.echo(click.style("FAIL - EXPECTED", fg="green")) + else: + click.echo(click.style(f"FAIL {ex}", fg="red")) else: if not response: - click.echo(click.style("FAIL not suported", fg="red")) + click.echo(click.style("FAIL no response", fg="red")) else: - click.echo(click.style("OK", fg="green")) + if not test_call.should_succeed: + click.echo(click.style("OK - EXPECTED FAIL", fg="red")) + else: + click.echo(click.style("OK", fg="green")) successes.append(test_call) requests = [] for succ in successes: - requests.append({"method": succ.method}) - - final_query = {"multipleRequest": {"requests": requests}} + requests.append(succ.request) + final = {} try: - responses = await device.protocol.query(final_query) + end = len(requests) + step = 10 # Break the requests down as there seems to be a size limit + for i in range(0, end, step): + x = i + requests_step = requests[x : x + step] + responses = await device.protocol.query( + SmartRequest._create_request_dict(requests_step) + ) + for method, result in responses.items(): + final[method] = result except AuthenticationException as ex: click.echo( click.style( @@ -269,9 +355,6 @@ async def get_smart_fixture(device: SmartDevice): ) ) exit(1) - final = {} - for method, result in responses.items(): - final[method] = result # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. diff --git a/kasa/smartrequests.py b/kasa/smartrequests.py new file mode 100644 index 000000000..4bb50ae3d --- /dev/null +++ b/kasa/smartrequests.py @@ -0,0 +1,350 @@ +"""SmartRequest helper classes and functions for new SMART/TAPO devices. + +List of known requests with associated parameter classes. + +Other requests that are known but not currently implemented +or tested are: + +get_child_device_component_list +get_child_device_list +control_child +get_device_running_info - seems to be a subset of get_device_info + +get_tss_info +get_raw_dvi +get_homekit_info + +fw_download + +sync_env +account_sync + +device_reset +close_device_ble +heart_beat + +""" + +import logging +from dataclasses import asdict, dataclass +from typing import List, Optional, Union + +_LOGGER = logging.getLogger(__name__) +logging.getLogger("httpx").propagate = False + + +class SmartRequest: + """Class to represent a smart protocol request.""" + + def __init__(self, method_name: str, params: Optional["SmartRequestParams"] = None): + self.method_name = method_name + if params: + self.params = params.to_dict() + else: + self.params = None + + def __repr__(self): + return f"SmartRequest({self.method_name})" + + def to_dict(self): + """Return the request as a dict suitable for passing to query().""" + return {self.method_name: self.params} + + @dataclass + class SmartRequestParams: + """Base class for Smart request params. + + The to_dict() method of this class omits null values which + is required by the devices. + """ + + def to_dict(self): + """Return the params as a dict with values of None ommited.""" + return asdict( + self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None} + ) + + @dataclass + class DeviceOnParams(SmartRequestParams): + """Get Rules Params.""" + + device_on: bool + + @dataclass + class GetRulesParams(SmartRequestParams): + """Get Rules Params.""" + + start_index: int = 0 + + @dataclass + class GetTriggerLogsParams(SmartRequestParams): + """Trigger Logs params.""" + + page_size: int = 5 + start_id: int = 0 + + @dataclass + class LedStatusParams(SmartRequestParams): + """LED Status params.""" + + led_rule: Optional[str] = None + + @staticmethod + def from_bool(state: bool): + """Set the led_rule from the state.""" + rule = "always" if state else "never" + return SmartRequest.LedStatusParams(led_rule=rule) + + @dataclass + class LightInfoParams(SmartRequestParams): + """LightInfo params.""" + + brightness: Optional[int] = None + color_temp: Optional[int] = None + hue: Optional[int] = None + saturation: Optional[int] = None + + @dataclass + class DynamicLightEffectParams(SmartRequestParams): + """LightInfo params.""" + + enable: bool + id: Optional[str] = None + + @staticmethod + def get_raw_request( + method: str, params: Optional[SmartRequestParams] = None + ) -> "SmartRequest": + """Send a raw request to the device.""" + return SmartRequest(method, params) + + @staticmethod + def component_nego() -> "SmartRequest": + """Get quick setup component info.""" + return SmartRequest("component_nego") + + @staticmethod + def get_device_info() -> "SmartRequest": + """Get device info.""" + return SmartRequest("get_device_info") + + @staticmethod + def get_device_usage() -> "SmartRequest": + """Get device usage.""" + return SmartRequest("get_device_usage") + + @staticmethod + def device_info_list() -> List["SmartRequest"]: + """Get device info list.""" + return [ + SmartRequest.get_device_info(), + SmartRequest.get_device_usage(), + ] + + @staticmethod + def get_auto_update_info() -> "SmartRequest": + """Get auto update info.""" + return SmartRequest("get_auto_update_info") + + @staticmethod + def firmware_info_list() -> List["SmartRequest"]: + """Get info list.""" + return [ + SmartRequest.get_auto_update_info(), + SmartRequest.get_raw_request("get_fw_download_state"), + SmartRequest.get_raw_request("get_latest_fw"), + ] + + @staticmethod + def qs_component_nego() -> "SmartRequest": + """Get quick setup component info.""" + return SmartRequest("qs_component_nego") + + @staticmethod + def get_device_time() -> "SmartRequest": + """Get device time.""" + return SmartRequest("get_device_time") + + @staticmethod + def get_wireless_scan_info() -> "SmartRequest": + """Get wireless scan info.""" + return SmartRequest("get_wireless_scan_info") + + @staticmethod + def get_schedule_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get schedule rules.""" + return SmartRequest( + "get_schedule_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_next_event(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get next scheduled event.""" + return SmartRequest("get_next_event", params or SmartRequest.GetRulesParams()) + + @staticmethod + def schedule_info_list() -> List["SmartRequest"]: + """Get schedule info list.""" + return [ + SmartRequest.get_schedule_rules(), + SmartRequest.get_next_event(), + ] + + @staticmethod + def get_countdown_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get countdown rules.""" + return SmartRequest( + "get_countdown_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_antitheft_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get antitheft rules.""" + return SmartRequest( + "get_antitheft_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_led_info(params: Optional[LedStatusParams] = None) -> "SmartRequest": + """Get led info.""" + return SmartRequest("get_led_info", params or SmartRequest.LedStatusParams()) + + @staticmethod + def get_auto_off_config(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get auto off config.""" + return SmartRequest( + "get_auto_off_config", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def get_delay_action_info() -> "SmartRequest": + """Get delay action info.""" + return SmartRequest("get_delay_action_info") + + @staticmethod + def auto_off_list() -> List["SmartRequest"]: + """Get energy usage.""" + return [ + SmartRequest.get_auto_off_config(), + SmartRequest.get_delay_action_info(), # May not live here + ] + + @staticmethod + def get_energy_usage() -> "SmartRequest": + """Get energy usage.""" + return SmartRequest("get_energy_usage") + + @staticmethod + def energy_monitoring_list() -> List["SmartRequest"]: + """Get energy usage.""" + return [ + SmartRequest("get_energy_usage"), + SmartRequest.get_raw_request("get_electricity_price_config"), + ] + + @staticmethod + def get_current_power() -> "SmartRequest": + """Get current power.""" + return SmartRequest("get_current_power") + + @staticmethod + def power_protection_list() -> List["SmartRequest"]: + """Get power protection info list.""" + return [ + SmartRequest.get_current_power(), + SmartRequest.get_raw_request("get_max_power"), + SmartRequest.get_raw_request("get_protection_power"), + ] + + @staticmethod + def get_preset_rules(params: Optional[GetRulesParams] = None) -> "SmartRequest": + """Get preset rules.""" + return SmartRequest("get_preset_rules", params or SmartRequest.GetRulesParams()) + + @staticmethod + def get_auto_light_info() -> "SmartRequest": + """Get auto light info.""" + return SmartRequest("get_auto_light_info") + + @staticmethod + def get_dynamic_light_effect_rules( + params: Optional[GetRulesParams] = None + ) -> "SmartRequest": + """Get dynamic light effect rules.""" + return SmartRequest( + "get_dynamic_light_effect_rules", params or SmartRequest.GetRulesParams() + ) + + @staticmethod + def set_device_on(params: DeviceOnParams) -> "SmartRequest": + """Set device on state.""" + return SmartRequest("set_device_info", params) + + @staticmethod + def set_light_info(params: LightInfoParams) -> "SmartRequest": + """Set color temperature.""" + return SmartRequest("set_device_info", params) + + @staticmethod + def set_dynamic_light_effect_rule_enable( + params: DynamicLightEffectParams + ) -> "SmartRequest": + """Enable dynamic light effect rule.""" + return SmartRequest("set_dynamic_light_effect_rule_enable", params) + + @staticmethod + def get_component_info_requests(component_nego_response) -> List["SmartRequest"]: + """Get a list of requests based on the component info response.""" + request_list = [] + for component in component_nego_response["component_list"]: + if requests := COMPONENT_REQUESTS.get(component["id"]): + request_list.extend(requests) + return request_list + + @staticmethod + def _create_request_dict( + smart_request: Union["SmartRequest", List["SmartRequest"]] + ) -> dict: + """Create request dict to be passed to SmartProtocol.query().""" + if isinstance(smart_request, list): + request = {} + for sr in smart_request: + request[sr.method_name] = sr.params + else: + request = smart_request.to_dict() + return request + + +COMPONENT_REQUESTS = { + "device": SmartRequest.device_info_list(), + "firmware": SmartRequest.firmware_info_list(), + "quick_setup": [SmartRequest.qs_component_nego()], + "inherit": [SmartRequest.get_raw_request("get_inherit_info")], + "time": [SmartRequest.get_device_time()], + "wireless": [SmartRequest.get_wireless_scan_info()], + "schedule": SmartRequest.schedule_info_list(), + "countdown": [SmartRequest.get_countdown_rules()], + "antitheft": [SmartRequest.get_antitheft_rules()], + "account": None, + "synchronize": None, # sync_env + "sunrise_sunset": None, # for schedules + "led": [SmartRequest.get_led_info()], + "cloud_connect": [SmartRequest.get_raw_request("get_connect_cloud_state")], + "iot_cloud": None, + "device_local_time": None, + "default_states": None, # in device_info + "auto_off": [SmartRequest.get_auto_off_config()], + "localSmart": None, + "energy_monitoring": SmartRequest.energy_monitoring_list(), + "power_protection": SmartRequest.power_protection_list(), + "current_protection": None, # overcurrent in device_info + "matter": None, + "preset": [SmartRequest.get_preset_rules()], + "brightness": None, # in device_info + "color": None, # in device_info + "color_temperature": None, # in device_info + "auto_light": [SmartRequest.get_auto_light_info()], + "light_effect": [SmartRequest.get_dynamic_light_effect_rules()], + "bulb_quick_control": None, + "on_off_gradually": [SmartRequest.get_raw_request("get_on_off_gradually_info")], +} From 113c2b5eb40b59d046c9cfb60b4614b2d3fa993a Mon Sep 17 00:00:00 2001 From: sdb9696 Date: Sun, 31 Dec 2023 15:15:44 +0000 Subject: [PATCH 2/3] Move smartrequest.py to devtools --- devtools/dump_devinfo.py | 3 ++- {kasa => devtools}/smartrequests.py | 0 2 files changed, 2 insertions(+), 1 deletion(-) rename {kasa => devtools}/smartrequests.py (100%) diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index 3c380e0a4..54e68294f 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -20,9 +20,10 @@ from kasa import AuthenticationException, Credentials, Discover from kasa.discover import DiscoveryResult from kasa.exceptions import SmartErrorCode -from kasa.smartrequests import COMPONENT_REQUESTS, SmartRequest from kasa.tapo.tapodevice import TapoDevice +from .smartrequests import COMPONENT_REQUESTS, SmartRequest + Call = namedtuple("Call", "module method") SmartCall = namedtuple("SmartCall", "module request should_succeed") diff --git a/kasa/smartrequests.py b/devtools/smartrequests.py similarity index 100% rename from kasa/smartrequests.py rename to devtools/smartrequests.py From 6349613694d58ec4b1df34bea6a39e59bad0b0a5 Mon Sep 17 00:00:00 2001 From: sdb9696 Date: Tue, 2 Jan 2024 12:52:42 +0000 Subject: [PATCH 3/3] Update post-review --- devtools/__init__.py | 1 + devtools/dump_devinfo.py | 107 +++++++++++------------- devtools/helpers/__init__.py | 1 + devtools/{ => helpers}/smartrequests.py | 0 4 files changed, 51 insertions(+), 58 deletions(-) create mode 100644 devtools/__init__.py create mode 100644 devtools/helpers/__init__.py rename devtools/{ => helpers}/smartrequests.py (100%) diff --git a/devtools/__init__.py b/devtools/__init__.py new file mode 100644 index 000000000..49189835e --- /dev/null +++ b/devtools/__init__.py @@ -0,0 +1 @@ +"""Devtools package.""" diff --git a/devtools/dump_devinfo.py b/devtools/dump_devinfo.py index 54e68294f..985ce669f 100644 --- a/devtools/dump_devinfo.py +++ b/devtools/dump_devinfo.py @@ -14,16 +14,16 @@ import re from collections import defaultdict, namedtuple from pprint import pprint +from typing import Dict, List import asyncclick as click -from kasa import AuthenticationException, Credentials, Discover +from devtools.helpers.smartrequests import COMPONENT_REQUESTS, SmartRequest +from kasa import AuthenticationException, Credentials, Discover, SmartDevice from kasa.discover import DiscoveryResult from kasa.exceptions import SmartErrorCode from kasa.tapo.tapodevice import TapoDevice -from .smartrequests import COMPONENT_REQUESTS, SmartRequest - Call = namedtuple("Call", "module method") SmartCall = namedtuple("SmartCall", "module request should_succeed") @@ -214,6 +214,38 @@ async def get_legacy_fixture(device): return save_filename, copy_folder, final +async def _make_requests_or_exit( + device: SmartDevice, requests: List[SmartRequest], name: str +) -> Dict[str, Dict]: + final = {} + try: + end = len(requests) + step = 10 # Break the requests down as there seems to be a size limit + for i in range(0, end, step): + x = i + requests_step = requests[x : x + step] + responses = await device.protocol.query( + SmartRequest._create_request_dict(requests_step) + ) + for method, result in responses.items(): + final[method] = result + return final + except AuthenticationException as ex: + click.echo( + click.style( + f"Unable to query the device due to an authentication error: {ex}", + bold=True, + fg="red", + ) + ) + exit(1) + except Exception as ex: + click.echo( + click.style(f"Unable to query {name} at once: {ex}", bold=True, fg="red") + ) + exit(1) + + async def get_smart_fixture(device: TapoDevice): """Get fixture for new TAPO style protocol.""" extra_test_calls = [ @@ -243,38 +275,24 @@ async def get_smart_fixture(device: TapoDevice): successes = [] - try: - click.echo("Testing component_nego call ..", nl=False) - component_info_response = await device.protocol.query( - SmartRequest._create_request_dict(SmartRequest.component_nego()) - ) - click.echo(click.style("OK", fg="green")) - successes.append( - SmartCall( - module="component_nego", - request=SmartRequest("component_nego"), - should_succeed=True, - ) - ) - except AuthenticationException as ex: - click.echo( - click.style( - f"Unable to query the device due to an authentication error: {ex}", - bold=True, - fg="red", - ) - ) - exit(1) - except Exception: - click.echo( - click.style("CRITICAL FAIL on component_nego call, exiting", fg="red") + click.echo("Testing component_nego call ..", nl=False) + responses = await _make_requests_or_exit( + device, [SmartRequest.component_nego()], "component_nego call" + ) + component_info_response = responses["component_nego"] + click.echo(click.style("OK", fg="green")) + successes.append( + SmartCall( + module="component_nego", + request=SmartRequest("component_nego"), + should_succeed=True, ) - exit(1) + ) test_calls = [] should_succeed = [] - for item in component_info_response["component_nego"]["component_list"]: + for item in component_info_response["component_list"]: component_id = item["id"] if requests := COMPONENT_REQUESTS.get(component_id): component_test_calls = [ @@ -328,34 +346,7 @@ async def get_smart_fixture(device: TapoDevice): for succ in successes: requests.append(succ.request) - final = {} - try: - end = len(requests) - step = 10 # Break the requests down as there seems to be a size limit - for i in range(0, end, step): - x = i - requests_step = requests[x : x + step] - responses = await device.protocol.query( - SmartRequest._create_request_dict(requests_step) - ) - for method, result in responses.items(): - final[method] = result - except AuthenticationException as ex: - click.echo( - click.style( - f"Unable to query the device due to an authentication error: {ex}", - bold=True, - fg="red", - ) - ) - exit(1) - except Exception as ex: - click.echo( - click.style( - f"Unable to query all successes at once: {ex}", bold=True, fg="red" - ) - ) - exit(1) + final = await _make_requests_or_exit(device, requests, "all successes at once") # Need to recreate a DiscoverResult here because we don't want the aliases # in the fixture, we want the actual field names as returned by the device. diff --git a/devtools/helpers/__init__.py b/devtools/helpers/__init__.py new file mode 100644 index 000000000..182958c66 --- /dev/null +++ b/devtools/helpers/__init__.py @@ -0,0 +1 @@ +"""Helpers package.""" diff --git a/devtools/smartrequests.py b/devtools/helpers/smartrequests.py similarity index 100% rename from devtools/smartrequests.py rename to devtools/helpers/smartrequests.py