diff --git a/kasa/discover.py b/kasa/discover.py index 9625f7c38..bfc84bfb7 100755 --- a/kasa/discover.py +++ b/kasa/discover.py @@ -444,20 +444,23 @@ def _get_device_instance( f"Unable to read response from device: {ip}: {ex}" ) from ex - if ( - discovery_result.mgt_encrypt_schm.encrypt_type == "KLAP" - and discovery_result.mgt_encrypt_schm.lv is None + # AES only included here to test whether an EP25 might use klap lv2. Unlikely. + if discovery_result.mgt_encrypt_schm.encrypt_type in ("KLAP", "AES") and ( + discovery_result.mgt_encrypt_schm.lv is None + or discovery_result.mgt_encrypt_schm.lv == 2 ): type_ = discovery_result.device_type device_class = None - if type_.upper() == "IOT.SMARTPLUGSWITCH": + if type_.upper() in ("IOT.SMARTPLUGSWITCH", "SMART.KASAPLUG"): device_class = SmartPlug if device_class: _LOGGER.debug("[DISCOVERY] %s << %s", ip, info) device = device_class(ip, port=port, credentials=credentials) device.update_from_discover_info(discovery_result.get_dict()) - device.protocol = TPLinkKlap(ip, credentials=credentials) + device.protocol = TPLinkKlap( + ip, credentials=credentials, lv=discovery_result.mgt_encrypt_schm.lv + ) return device else: raise UnsupportedDeviceException( diff --git a/kasa/klapprotocol.py b/kasa/klapprotocol.py index 36a42c589..c6641ba5d 100755 --- a/kasa/klapprotocol.py +++ b/kasa/klapprotocol.py @@ -64,7 +64,10 @@ def _sha256(payload: bytes) -> bytes: - return hashlib.sha256(payload).digest() + digest = hashes.Hash(hashes.SHA256()) # noqa: S303 + digest.update(payload) + hash = digest.finalize() + return hash def _md5(payload: bytes) -> bytes: @@ -74,6 +77,12 @@ def _md5(payload: bytes) -> bytes: return hash +def _sha1(payload: bytes) -> bytes: + digest = hashes.Hash(hashes.SHA1()) # noqa: S303 + digest.update(payload) + return digest.finalize() + + class TPLinkKlap(TPLinkProtocol): """Implementation of the KLAP encryption protocol. @@ -94,6 +103,7 @@ def __init__( *, credentials: Optional[Credentials] = None, timeout: Optional[int] = None, + lv: Optional[int] = None, ) -> None: super().__init__(host=host, port=self.DEFAULT_PORT) @@ -103,8 +113,9 @@ def __init__( else Credentials(username="", password="") ) + self.lv = lv self._local_seed: Optional[bytes] = None - self.local_auth_hash = self.generate_auth_hash(self.credentials) + self.local_auth_hash = self.generate_auth_hash(self.credentials, self.lv) self.local_auth_owner = self.generate_owner_hash(self.credentials).hex() self.kasa_setup_auth_hash = None self.blank_auth_hash = None @@ -183,7 +194,12 @@ async def perform_handshake1(self) -> Tuple[bytes, bytes, bytes]: server_hash.hex(), ) - local_seed_auth_hash = _sha256(local_seed + self.local_auth_hash) + if self.lv == 2: + local_seed_auth_hash = _sha256( + local_seed + remote_seed + self.local_auth_hash + ) + else: + local_seed_auth_hash = _sha256(local_seed + self.local_auth_hash) # Check the response from the device with local credentials if local_seed_auth_hash == server_hash: @@ -196,11 +212,18 @@ async def perform_handshake1(self) -> Tuple[bytes, bytes, bytes]: username=TPLinkKlap.KASA_SETUP_EMAIL, password=TPLinkKlap.KASA_SETUP_PASSWORD, ) - self.kasa_setup_auth_hash = TPLinkKlap.generate_auth_hash(kasa_setup_creds) + self.kasa_setup_auth_hash = TPLinkKlap.generate_auth_hash( + kasa_setup_creds, self.lv + ) - kasa_setup_seed_auth_hash = _sha256( - local_seed + self.kasa_setup_auth_hash # type: ignore - ) + if self.lv == 2: + kasa_setup_seed_auth_hash = _sha256( + local_seed + remote_seed + self.kasa_setup_auth_hash # type: ignore + ) + else: + kasa_setup_seed_auth_hash = _sha256( + local_seed + self.kasa_setup_auth_hash # type: ignore + ) if kasa_setup_seed_auth_hash == server_hash: _LOGGER.debug( "Server response doesn't match our expected hash on ip %s" @@ -212,8 +235,15 @@ async def perform_handshake1(self) -> Tuple[bytes, bytes, bytes]: # Finally check against blank credentials if not already blank if self.credentials != (blank_creds := Credentials(username="", password="")): if not self.blank_auth_hash: - self.blank_auth_hash = TPLinkKlap.generate_auth_hash(blank_creds) - blank_seed_auth_hash = _sha256(local_seed + self.blank_auth_hash) # type: ignore + self.blank_auth_hash = TPLinkKlap.generate_auth_hash( + blank_creds, self.lv + ) + if self.lv == 2: + blank_seed_auth_hash = _sha256( + local_seed + remote_seed + self.blank_auth_hash # type: ignore + ) + else: + blank_seed_auth_hash = _sha256(local_seed + self.blank_auth_hash) # type: ignore if blank_seed_auth_hash == server_hash: _LOGGER.debug( "Server response doesn't match our expected hash on ip %s" @@ -235,7 +265,10 @@ async def perform_handshake2( url = f"http://{self.host}/app/handshake2" - payload = _sha256(remote_seed + auth_hash) + if self.lv == 2: + payload = _sha256(remote_seed + local_seed + auth_hash) + else: + payload = _sha256(remote_seed + auth_hash) response_status, response_data = await self.client_post(url, data=payload) @@ -288,10 +321,13 @@ def handshake_session_expired(self): ) @staticmethod - def generate_auth_hash(creds: Credentials): + def generate_auth_hash(creds: Credentials, lv: Optional[int] = None): """Generate an md5 auth hash for the protocol on the supplied credentials.""" un = creds.username or "" pw = creds.password or "" + if lv == 2: + return _sha256(_sha1(un.encode()) + _sha1(pw.encode())) + return _md5(_md5(un.encode()) + _md5(pw.encode())) @staticmethod