-
Notifications
You must be signed in to change notification settings - Fork 2
/
dbusdevice.py
54 lines (45 loc) · 1.79 KB
/
dbusdevice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import logging
import dbus
logger = logging.getLogger(__name__)
class DbusDevice(object):
## The constructor processes the tree of dbus-items.
# @param bus Session/System bus object
# @param name the dbus-service-name.
def __init__(self, bus, name, eventCallback):
self._dbus_name = name
self._dbus_conn = bus
self._eventCallback = eventCallback
self._service_id = self._dbus_conn.get_name_owner(name)
self._dbus_conn.add_signal_receiver(self._on_dbus_value_changed,
dbus_interface='com.victronenergy.BusItem', signal_name='PropertiesChanged', path_keyword='path',
sender_keyword='service_id')
self._dbus_conn.add_signal_receiver(self._on_dbus_items_changed,
dbus_interface='com.victronenergy.BusItem',
signal_name='ItemsChanged', path='/', sender_keyword='service_id')
def __del__(self):
logger.debug('__del__ %s' % self)
self._dbus_name = None
self._value = None
self._eventCallback = None
def _on_dbus_value_changed(self, changes, path=None, service_id=None):
if service_id == self._service_id:
self._eventCallback(self._dbus_name, path, changes)
def _on_dbus_items_changed(self, items, service_id=None):
if service_id == self._service_id:
for path, changes in items.items():
self._eventCallback(self._dbus_name, path, changes)
## Returns the dbus-service-name which represents the Victron-device.
def __str__(self):
return "DbusDevice=%s" % self._dbus_name
def getBusName(self):
return self._dbus_name
def getValues(self):
data = self._dbus_conn.call_blocking(self._dbus_name, '/', None, 'GetValue', '', [])
texts = self._dbus_conn.call_blocking(self._dbus_name, '/', None, 'GetText', '', [])
values = {}
for p, v in data.items():
values['/' + p] = {
'Value': v,
'Valid': bool(v != dbus.Array([])),
'Text': texts[p]}
return values