Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add MH-Z19 sensor module #365

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ sensor_modules:
type: DS18S20
address: 000803702e49

- name: mhz19
module: mhz19
device: "/dev/ttyS1"
range: 5000

digital_inputs:
- name: button
module: raspberrypi
Expand Down Expand Up @@ -76,3 +81,10 @@ sensor_inputs:
module: ds18b22
interval: 10
digits: 2

- name: co2_mhz19
module: mhz19
interval: 30
ha_discovery:
name: CO2 MH-Z19
device_class: carbon_dioxide
63 changes: 63 additions & 0 deletions mqtt_io/modules/sensor/mhz19.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
MH-Z19 NDIR CO2 sensor
"""

from mqtt_io.modules.sensor import GenericSensor
from mqtt_io.types import CerberusSchemaType, ConfigType, SensorValueType

REQUIREMENTS = ("pyserial",)
CONFIG_SCHEMA: CerberusSchemaType = {
"device": dict(type="string", required=True, empty=False),
"range": dict(type="integer", required=False, empty=False, default=5000,
allowed=[2000, 5000, 10000]),
}

class Sensor(GenericSensor):
"""
Implementation of Sensor class for the MH-Z19 CO2 sensor using UART/serial.
"""

@staticmethod
def _calc_checksum(data: bytes) -> bytes:
value = sum(data[1:]) % 0x100
value = (0xff - value + 1) % 0x100
return value.to_bytes(1, "big")

@staticmethod
def _check_checksum(data: bytes) -> bool:
return Sensor._calc_checksum(data[:-1]) == data[-1:]

@staticmethod
def _add_checksum(data: bytes) -> bytes:
return data + Sensor._calc_checksum(data)

def setup_module(self) -> None:
# pylint: disable=import-error,import-outside-toplevel
import serial # type: ignore

self.ser = serial.Serial(
port=self.config["device"],
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
)

# setup detection range
cmd = Sensor._add_checksum(b"\xff\x01\x99\x00\x00\x00" +
self.config["range"].to_bytes(2, "big"))
self.ser.write(cmd)
# no response

def cleanup(self) -> None:
self.ser.close()

def get_value(self, sens_conf: ConfigType) -> SensorValueType:
self.ser.write(Sensor._add_checksum(b"\xff\x01\x86\x00\x00\x00\x00\x00"))
resp = self.ser.read(9)

if len(resp) == 9:
if resp[0:2] == b"\xff\x86" and Sensor._check_checksum(resp):
return int.from_bytes(resp[2:4], "big")

return None
Loading