-
Notifications
You must be signed in to change notification settings - Fork 19
/
carlo_gavazzi.py
125 lines (105 loc) · 3.38 KB
/
carlo_gavazzi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import device
import probe
from register import *
class Reg_ver(Reg, int):
def __init__(self, base, name):
super().__init__(base, 1, name, text='%d.%d.%d')
def __int__(self):
v = self.value
return v[0] << 16 | v[1] << 8 | v[2]
def decode(self, values):
v = values[0]
return self.update((v >> 12, v >> 8 & 0xf, v & 0xff))
nr_phases = [ 3, 3, 2, 1, 3 ]
phase_configs = [
'3P.n',
'3P.1',
'2P',
'1P',
'3P',
]
switch_positions = [
'kVARh',
'2',
'1',
'Locked',
]
class EM24_Meter(device.CustomName, device.EnergyMeter):
vendor_id = 'cg'
vendor_name = 'Carlo Gavazzi'
productid = 0xb017
productname = 'Carlo Gavazzi EM24 Ethernet Energy Meter'
min_timeout = 0.5
def phase_regs(self, n):
s = 2 * (n - 1)
return [
Reg_s32l(0x0000 + s, '/Ac/L%d/Voltage' % n, 10, '%.1f V'),
Reg_s32l(0x000c + s, '/Ac/L%d/Current' % n, 1000, '%.1f A'),
Reg_s32l(0x0012 + s, '/Ac/L%d/Power' % n, 10, '%.1f W'),
Reg_s32l(0x0040 + s, '/Ac/L%d/Energy/Forward' % n, 10, '%.1f kWh'),
]
def device_init(self):
self.info_regs = [
Reg_ver( 0x0302, '/HardwareVersion'),
Reg_ver( 0x0304, '/FirmwareVersion'),
Reg_u16( 0x1002, '/PhaseConfig', text=phase_configs, write=(0, 4)),
Reg_text(0x5000, 7, '/Serial'),
]
# make sure application is set to H
appreg = Reg_u16(0xa000)
if self.read_register(appreg) != 7:
self.write_register(appreg, 7)
# read back the value in case the setting is not accepted
# for some reason
if self.read_register(appreg) != 7:
self.log.error('%s: failed to set application to H', self)
return
self.read_info()
phases = nr_phases[int(self.info['/PhaseConfig'])]
regs = [
Reg_s32l(0x0028, '/Ac/Power', 10, '%.1f W'),
Reg_u16( 0x0033, '/Ac/Frequency', 10, '%.1f Hz'),
Reg_s32l(0x0034, '/Ac/Energy/Forward', 10, '%.1f kWh'),
Reg_s32l(0x004e, '/Ac/Energy/Reverse', 10, '%.1f kWh'),
Reg_u16( 0xa100, '/SwitchPos', text=switch_positions),
]
if phases == 3:
regs += [
Reg_mapu16(0x0032, '/PhaseSequence', { 0: 0, 0xffff: 1 }),
]
for n in range(1, phases + 1):
regs += self.phase_regs(n)
self.data_regs = regs
self.nr_phases = phases
def dbus_write_register(self, reg, path, val):
super().dbus_write_register(reg, path, val)
self.sched_reinit()
models = {
1648: {
'model': 'EM24DINAV23XE1X',
'handler': EM24_Meter,
},
1649: {
'model': 'EM24DINAV23XE1PFA',
'handler': EM24_Meter,
},
1650: {
'model': 'EM24DINAV23XE1PFB',
'handler': EM24_Meter,
},
1651: {
'model': 'EM24DINAV53XE1X',
'handler': EM24_Meter,
},
1652: {
'model': 'EM24DINAV53XE1PFA',
'handler': EM24_Meter,
},
1653: {
'model': 'EM24DINAV53XE1PFB',
'handler': EM24_Meter,
},
}
probe.add_handler(probe.ModelRegister(Reg_u16(0x000b), models,
methods=['tcp'],
units=[1]))