# coding: utf-8 from bluepy import btle import time import logging import globals import struct from multiconnect import Connector from notification import Notification from Cryptodome.Cipher import AES H_STRUCT = struct.Struct("H', bd[2:4]) # check data is present if not (framectrl & 0x4000): logging.info('LYWSD03------No Data') return action if framectrl & 0x0800: logging.info('LYWSD03------Encrypted') nonce = b"".join([mac_reversed, bd[4:6], bd[6:7]]) if mac.upper() in keys: key = bytes.fromhex(keys[mac.upper()].lower()) else: logging.error("No decryption key for %s" % mac) return action decrypted_payload = self.decrypt_payload(bd[13:], key, nonce) typecode = decrypted_payload[:2] value_length = decrypted_payload[2] value = decrypted_payload[3:3 + value_length] if typecode == b'\x06\x10': (humi,) = H_STRUCT.unpack(value) action["moisture"] = humi / 10 elif typecode == b'\x04\x10': (temp,) = T_STRUCT.unpack(value) action["temperature"]= temp / 10 elif typecode == b'\x0A\x10': action["battery"] = value[0] else: logging.info('LYWSD03------adv could not match typecode %s' % str(typecode)) else: logging.info('LYWSD03------adv no framectrl %s' % framectrl) return action def decrypt_payload(self, encrypted_payload, key, nonce): """Decrypt payload.""" aad = b"\x11" token = encrypted_payload[-4:] payload_counter = encrypted_payload[-7:-4] nonce = b"".join([nonce, payload_counter]) cipherpayload = encrypted_payload[:-7] cipher = AES.new(key, AES.MODE_CCM, nonce=nonce, mac_len=4) cipher.update(aad) plaindata = None try: plaindata = cipher.decrypt_and_verify(cipherpayload, token) except ValueError as error: logging.error("Decryption failed: %s", error) logging.error("token: %s", token.hex()) logging.error("nonce: %s", nonce.hex()) logging.error("encrypted_payload: %s", encrypted_payload.hex()) logging.error("cipherpayload: %s", cipherpayload.hex()) return None return plaindata def read(self,mac): result={} try: conn = Connector(mac) conn.connect() if not conn.isconnected: conn.connect() if not conn.isconnected: return batt = bytearray(conn.readCharacteristic('0x3a')) battery = batt[0] notification = Notification(conn,Lywsd03) notification.subscribe(10) result['battery'] = battery result['id'] = mac logging.debug('LYWSD03------'+str(result)) return result except Exception as e: logging.error(str(e)) return result def handlenotification(self,conn,handle,data,action={}): result={} if hex(handle) == '0x36': received = bytearray(data) temperature = float(received[1] * 256 + received[0]) / 100 moisture = received[2] result['moisture'] = moisture result['temperature'] = temperature result['id'] = conn.mac result['source'] = globals.daemonname globals.JEEDOM_COM.add_changes('devices::'+conn.mac,result) globals.COMPATIBILITY.append(Lywsd03)