Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coils read/write feature #59

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions modbus4mqtt/modbus_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ def __init__(self,
self._port = port
# This is a dict of sets. Each key represents one table of modbus registers.
# At the moment it has 'input' and 'holding'
self._tables = {'input': set(), 'holding': set()}
self._tables = {'input': set(), 'holding': set(), 'coils': set()}

# This is a dicts of dicts. These hold the current values of the interesting registers
self._values = {'input': {}, 'holding': {}}
self._values = {'input': {}, 'holding': {}, 'coils': {}}

self._planned_writes = Queue()
self._writing = False
Expand Down Expand Up @@ -144,20 +144,26 @@ def get_value(self, table, addr, type='uint16'):
raise ValueError("Unsupported table type. Please only use: {}".format(self._values.keys()))
if addr not in self._values[table]:
raise ValueError("Unpolled address. Use add_monitor_register(addr, table) to add a register to the polled list.")
# Read sequential addresses to get enough bytes to satisfy the type of this register.
# Note: Each address provides 2 bytes of data.
value = bytes(0)
type_len = type_length(type)
for i in range(type_len):
if self._word_order == WordOrder.HighLow:
data = self._values[table][addr + i]
else:
data = self._values[table][addr + (type_len-i-1)]
value += data.to_bytes(2, 'big')
value = _convert_from_bytes_to_type(value, type)
if table == 'coils':
value = self._values[table][addr]
else:
# Read sequential addresses to get enough bytes to satisfy the type of this register.
# Note: Each address provides 2 bytes of data.
value = bytes(0)
type_len = type_length(type)
for i in range(type_len):
if self._word_order == WordOrder.HighLow:
data = self._values[table][addr + i]
else:
data = self._values[table][addr + (type_len-i-1)]
value += data.to_bytes(2, 'big')
value = _convert_from_bytes_to_type(value, type)
return value

def set_value(self, table, addr, value, mask=0xFFFF, type='uint16'):
if table == 'coils':
self._mb.write_coil(addr, value)
return
if table != 'holding':
# I'm not sure if this is true for all devices. I might support writing to coils later,
# so leave this door open.
Expand Down Expand Up @@ -226,13 +232,17 @@ def _scan_value_range(self, table, start, count):
result = self._mb.read_input_registers(start, count, unit=self._unit)
elif table == 'holding':
result = self._mb.read_holding_registers(start, count, unit=self._unit)
elif table == 'coils':
result = self._mb.read_coils(start, count, unit=self._unit)
try:
return result.registers
if table in ['input', 'holding']:
return result.registers
else:
return result.bits
except:
# The result doesn't have a registers attribute, something has gone wrong!
raise ValueError("Failed to read {} {} table registers starting from {}: {}".format(count, table, start, result))


def type_length(type):
# Return the number of addresses needed for the type.
# Note: Each address provides 2 bytes of data.
Expand Down
Loading