Skip to content

Commit

Permalink
v4 supported for encrypted format
Browse files Browse the repository at this point in the history
Added support for encrypted format
Added support for custom format
Ensured mqtt.conf file compability with old versions
Added troubleshooting for connection issues
  • Loading branch information
JsBergbau authored Oct 12, 2021
1 parent 08983ae commit a6b07aa
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 85 deletions.
234 changes: 162 additions & 72 deletions LYWSD03MMC.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
#!/usr/bin/python3 -u
#!/home/openhabian/Python3/Python-3.7.4/python -u
#-u to unbuffer output. Otherwise when calling with nohup or redirecting output things are printed very lately or would even mixup

print("---------------------------------------------")
print("MiTemperature2 / ATC Thermometer version 3.1")
print("MiTemperature2 / ATC Thermometer version 4.0")
print("---------------------------------------------")

readme="""
Please read README.md in this folder. Latest version is available at https://github.com/JsBergbau/MiTemperature2#readme
This file explains very detailed about the usage and covers everything you need to know as user.
"""

print(readme)


from bluepy import btle
import argparse
import os
Expand All @@ -22,6 +31,7 @@
import requests
import ssl


@dataclass
class Measurement:
temperature: float
Expand Down Expand Up @@ -309,7 +319,7 @@ def MQTTOnDisconnect(client, userdata,rc):
print("MQTT disconnected, Client:", client, "Userdata:", userdata, "RC:", rc)

# Main loop --------
parser=argparse.ArgumentParser(allow_abbrev=False)
parser=argparse.ArgumentParser(allow_abbrev=False,epilog=readme)
parser.add_argument("--device","-d", help="Set the device MAC-Address in format AA:BB:CC:DD:EE:FF",metavar='AA:BB:CC:DD:EE:FF')
parser.add_argument("--battery","-b", help="Get estimated battery level, in ATC-Mode: Get battery level from device", metavar='', type=int, nargs='?', const=1)
parser.add_argument("--count","-c", help="Read/Receive N measurements and then exit script", metavar='N', type=int)
Expand All @@ -319,7 +329,7 @@ def MQTTOnDisconnect(client, userdata,rc):


rounding = parser.add_argument_group("Rounding and debouncing")
rounding.add_argument("--round","-r", help="Round temperature to one decimal place",action='store_true')
rounding.add_argument("--round","-r", help="Round temperature to one decimal place (and in ATC mode humidity to whole numbers)",action='store_true')
rounding.add_argument("--debounce","-deb", help="Enable this option to get more stable temperature values, requires -r option",action='store_true')

offsetgroup = parser.add_argument_group("Offset calibration mode")
Expand Down Expand Up @@ -368,12 +378,12 @@ def MQTTOnDisconnect(client, userdata,rc):
port = int(mqttConfig["MQTT"]["port"])

# MQTTS parameters
tls = int(mqttConfig["MQTT"]["tls"])
cacerts = mqttConfig["MQTT"]["cacerts"] if mqttConfig["MQTT"]["cacerts"] else None
certificate = mqttConfig["MQTT"]["certificate"] if mqttConfig["MQTT"]["certificate"] else None
certificate_key = mqttConfig["MQTT"]["certificate_key"] if mqttConfig["MQTT"]["certificate_key"] else None
insecure = int(mqttConfig["MQTT"]["insecure"])

tls = int(mqttConfig["MQTT"]["tls"]) if "tls" in mqttConfig["MQTT"] else 0
if tls != 0:
cacerts = mqttConfig["MQTT"]["cacerts"] if mqttConfig["MQTT"]["cacerts"] else None
certificate = mqttConfig["MQTT"]["certificate"] if mqttConfig["MQTT"]["certificate"] else None
certificate_key = mqttConfig["MQTT"]["certificate_key"] if mqttConfig["MQTT"]["certificate_key"] else None
insecure = int(mqttConfig["MQTT"]["insecure"])
username = mqttConfig["MQTT"]["username"]
password = mqttConfig["MQTT"]["password"]
MQTTTopic = mqttConfig["MQTT"]["topic"]
Expand Down Expand Up @@ -402,7 +412,6 @@ def MQTTOnDisconnect(client, userdata,rc):
if len(lwt) > 0:
print("Using lastwill with topic:",lwt,"and message:",lastwill)
client.will_set(lwt,lastwill,qos=1)

# MQTTS parameters
if tls:
client.tls_set(cacerts, certificate, certificate_key, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
Expand All @@ -411,6 +420,7 @@ def MQTTOnDisconnect(client, userdata,rc):
client.connect_async(broker,port)
MQTTClient=client


if args.device:
if re.match("[0-9a-fA-F]{2}([:]?)[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$",args.device):
adress=args.device
Expand Down Expand Up @@ -530,19 +540,21 @@ def MQTTOnDisconnect(client, userdata,rc):
print("----------------------------")
print("In this mode all devices within reach are read out, unless a devicelistfile and --onlydevicelist is specified.")
print("Also --name Argument is ignored, if you require names, please use --devicelistfile.")
print("In this mode rounding and debouncing are not available, since ATC firmware sends out only one decimal place.")
print("In this mode debouncing is not available. Rounding option will round humidity and temperature to one decimal place.")
print("ATC mode usually requires root rights. If you want to use it with normal user rights, \nplease execute \"sudo setcap cap_net_raw,cap_net_admin+eip $(eval readlink -f `which python3`)\"")
print("You have to redo this step if you upgrade your python version.")
print("----------------------------")

import sys
import bluetooth._bluetooth as bluez
import cryptoFunctions

from bluetooth_utils import (toggle_device,
enable_le_scan, parse_le_advertising_events,
disable_le_scan, raw_packet_to_str)

advCounter=dict()
advCounter=dict()
#encryptedPacketStore=dict()
sensors = dict()
if args.devicelistfile:
#import configparser
Expand All @@ -557,6 +569,15 @@ def MQTTOnDisconnect(client, userdata,rc):
sensorsnew[key.upper()] = sensors[key]
sensors = sensorsnew

#loop through sensors to generate key
sensorsnew=sensors
for sensor in sensors:
if "decryption" in sensors[sensor]:
if sensors[sensor]["decryption"][0] == "k":
sensorsnew[sensor]["key"] = sensors[sensor]["decryption"][1:]
#print(sensorsnew[sensor]["key"])
sensors = sensorsnew

if args.onlydevicelist and not args.devicelistfile:
print("Error: --onlydevicelist requires --devicelistfile <devicelistfile>")
os._exit(1)
Expand All @@ -581,83 +602,152 @@ def le_advertise_packet_handler(mac, adv_type, data, rssi):
lastBLEPaketReceived = time.time()
lastBLEPaketReceived = time.time()
data_str = raw_packet_to_str(data)
preeamble = "10161a18"
preeamble = "161a18"
paketStart = data_str.find(preeamble)
offset = paketStart + len(preeamble)
#print("reveived BLE packet")+
atcData_str = data_str[offset:offset+26]
atcData_str = data_str[offset:offset+26] #if shorter will just be shorter then 13 Bytes
atcData_str = data_str[offset:] #if shorter will just be shorter then 13 Bytes
customFormat_str = data_str[offset:offset+29]
ATCPaketMAC = atcData_str[0:12].upper()
macStr = mac.replace(":","").upper()
atcIdentifier = data_str[(offset-4):offset].upper()

if(atcIdentifier == "1A18" and ATCPaketMAC == macStr) and not args.onlydevicelist or (atcIdentifier == "1A18" and mac in sensors) and len(atcData_str) == 26: #only Data from ATC devices, double checked
advNumber = atcData_str[-2:]
# if (atcIdentifier == "1A18" ) and mac == "A4:C1:38:92:E3:BD" : #debug
# print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
# print("raw:",data_str)


batteryVoltage=None
if(atcIdentifier == "1A18" ) and not args.onlydevicelist or (atcIdentifier == "1A18" and mac in sensors) and (len(atcData_str) == 26 or len(atcData_str) == 16 or len(atcData_str) == 22): #only Data from ATC devices
global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
if len(atcData_str) == 30: #custom format, next-to-last ist adv number
advNumber = atcData_str[-4:-2]
else:
advNumber = atcData_str[-2:] #last data in paket is adv number

if macStr in advCounter:
lastAdvNumber = advCounter[macStr]
else:
lastAdvNumber = None
if lastAdvNumber == None or lastAdvNumber != advNumber:
advCounter[macStr] = advNumber
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
#print("AdvNumber: ", advNumber)
#temp = data_str[22:26].encode('utf-8')
#temperature = int.from_bytes(bytearray.fromhex(data_str[22:26]),byteorder='big') / 10.
global measurements
measurement = Measurement(0,0,0,0,0,0,0,0)
if args.influxdb == 1:
measurement.timestamp = int((time.time() // 10) * 10)
else:
measurement.timestamp = int(time.time())

if len(atcData_str) == 26: #ATC1441 Format
#print("atc14441") #debug
advCounter[macStr] = advNumber
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
#print("AdvNumber: ", advNumber)
#temp = data_str[22:26].encode('utf-8')
#temperature = int.from_bytes(bytearray.fromhex(data_str[22:26]),byteorder='big') / 10.
#temperature = int(data_str[22:26],16) / 10.
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='big',signed=True) / 10.
# print("Temperature: ", temperature)
humidity = int(atcData_str[16:18], 16)
# print("Humidity: ", humidity)
batteryVoltage = int(atcData_str[20:24], 16) / 1000
# print ("Battery voltage:", batteryVoltage,"V")
# print ("RSSI:", rssi, "dBm")

#if args.battery:
batteryPercent = int(atcData_str[18:20], 16)
#print ("Battery:", batteryPercent,"%")

elif len(atcData_str) == 30: #custom format
#print("custom:", atcData_str)
print("BLE packet: %s %02x %s %d" % (mac, adv_type, data_str, rssi))
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='little',signed=True) / 100.
humidity = int.from_bytes(bytearray.fromhex(atcData_str[16:20]),byteorder='little',signed=False) / 100.
batteryVoltage = int.from_bytes(bytearray.fromhex(atcData_str[20:24]),byteorder='little',signed=False) / 1000.
batteryPercent = int.from_bytes(bytearray.fromhex(atcData_str[24:26]),byteorder='little',signed=False)



elif len(atcData_str) == 22 or len(atcData_str) == 16: #encrypted: length 22/11 Bytes on custom format, 16/8 Bytes on ATC1441 Format
#print("enc") # debug
#if macStr in encryptedPacketStore:
if macStr in advCounter:
lastData = advCounter[macStr]
else:
lastData = None

if lastData == None or lastData != atcData_str:
print("Encrypted BLE packet: %s %02x %s %d, length: %d" % (mac, adv_type, data_str, rssi, len(atcData_str)/2))
if mac in sensors and "key" in sensors[mac]:
bindkey = bytes.fromhex(sensors[mac]["key"])
macReversed=""
for x in range(-1,-len(macStr),-2):
macReversed += macStr[x-1] + macStr[x]
macReversed = bytes.fromhex(macReversed.lower())
#print("New encrypted format, MAC:" , macStr, "Reversed: ", macReversed)
lengthHex=data_str[offset-8:offset-6]
#lengthHex="0b"
ret = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
if ret == None: #Error decrypting
print("\n")
return
#temperature, humidity, batteryPercent = cryptoFunctions.decrypt_aes_ccm(bindkey,macReversed,bytes.fromhex(lengthHex + "161a18" + atcData_str))
temperature, humidity, batteryPercent = ret
else:
print("Warning: No key provided for sensor:", mac,"\n")
return
else: #no fitting paket
return

else: #Packet is just repeated
return

if args.influxdb == 1:
measurement.timestamp = int((time.time() // 10) * 10)
else:
measurement.timestamp = int(time.time())

#temperature = int(data_str[22:26],16) / 10.
temperature = int.from_bytes(bytearray.fromhex(atcData_str[12:16]),byteorder='big',signed=True) / 10.
print("Temperature: ", temperature)
humidity = int(atcData_str[16:18], 16)
print("Humidity: ", humidity)
batteryVoltage = int(atcData_str[20:24], 16) / 1000
if args.round:
temperature=round(temperature,1)
humidity=round(humidity,1)

measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.voltage = batteryVoltage if batteryVoltage != None else 0
measurement.rssi = rssi

print("Temperature: ", temperature)
print("Humidity: ", humidity)
if batteryVoltage != None:
print ("Battery voltage:", batteryVoltage,"V")
print ("RSSI:", rssi, "dBm")

#if args.battery:
batteryPercent = int(atcData_str[18:20], 16)
print ("Battery:", batteryPercent,"%")
measurement.battery = batteryPercent
measurement.humidity = humidity
measurement.temperature = temperature
measurement.voltage = batteryVoltage
measurement.rssi = rssi

currentMQTTTopic = MQTTTopic
if mac in sensors:
try:
measurement.sensorname = sensors[mac]["sensorname"]
except:
measurement.sensorname = mac
if "offset1" in sensors[mac] and "offset2" in sensors[mac] and "calpoint1" in sensors[mac] and "calpoint2" in sensors[mac]:
measurement.humidity = calibrateHumidity2Points(humidity,int(sensors[mac]["offset1"]),int(sensors[mac]["offset2"]),int(sensors[mac]["calpoint1"]),int(sensors[mac]["calpoint2"]))
print ("Humidity calibrated (2 points calibration): ", measurement.humidity)
elif "humidityOffset" in sensors[mac]:
measurement.humidity = humidity + int(sensors[mac]["humidityOffset"])
print ("Humidity calibrated (offset calibration): ", measurement.humidity)
if "topic" in sensors[mac]:
currentMQTTTopic=sensors[mac]["topic"]
else:
print ("RSSI:", rssi, "dBm")
print ("Battery:", batteryPercent,"%")

currentMQTTTopic = MQTTTopic
if mac in sensors:
try:
measurement.sensorname = sensors[mac]["sensorname"]
except:
measurement.sensorname = mac

if measurement.calibratedHumidity == 0:
measurement.calibratedHumidity = measurement.humidity
if "offset1" in sensors[mac] and "offset2" in sensors[mac] and "calpoint1" in sensors[mac] and "calpoint2" in sensors[mac]:
measurement.humidity = calibrateHumidity2Points(humidity,int(sensors[mac]["offset1"]),int(sensors[mac]["offset2"]),int(sensors[mac]["calpoint1"]),int(sensors[mac]["calpoint2"]))
print ("Humidity calibrated (2 points calibration): ", measurement.humidity)
elif "humidityOffset" in sensors[mac]:
measurement.humidity = humidity + int(sensors[mac]["humidityOffset"])
print ("Humidity calibrated (offset calibration): ", measurement.humidity)
if "topic" in sensors[mac]:
currentMQTTTopic=sensors[mac]["topic"]
else:
measurement.sensorname = mac

if measurement.calibratedHumidity == 0:
measurement.calibratedHumidity = measurement.humidity

if args.callback or args.httpcallback:
measurements.append(measurement)
if args.callback or args.httpcallback:
measurements.append(measurement)

if args.mqttconfigfile:
jsonString=buildJSONString(measurement)
myMQTTPublish(currentMQTTTopic,jsonString)
#MQTTClient.publish(currentMQTTTopic,jsonString,1)
if args.mqttconfigfile:
jsonString=buildJSONString(measurement)
myMQTTPublish(currentMQTTTopic,jsonString)
#MQTTClient.publish(currentMQTTTopic,jsonString,1)

#print("Length:", len(measurements))
print("")
#print("Length:", len(measurements))
print("")

if args.watchdogtimer:
keepingLEScanRunningThread = threading.Thread(target=keepingLEScanRunning)
Expand Down
Loading

0 comments on commit a6b07aa

Please sign in to comment.