-
Notifications
You must be signed in to change notification settings - Fork 1
/
coolrs_wt310.py
executable file
·244 lines (192 loc) · 5.48 KB
/
coolrs_wt310.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python3
#
# a code to read Yokogawa wt310 via the usbtmc interface
#
# This code requires the usbtmc driver for temperature reading.
#
# /dev/usbtmc[0-9] needs to be readable for this code
#
# Contact: Kazutomo Yoshii <[email protected]>
#
import re, os, sys
import time
import smq
import keypress
import json
import math
class wt310_reader:
def __init__(self):
self.fd = -1
def open(self, devfn = '/dev/usbtmc0'):
self.fd = os.open(devfn, os.O_RDWR)
if self.fd < 0:
return -1
return 0
# read() and write() are low-level method
def read(self):
if self.fd < 0:
return -1
buf = os.read(self.fd, 256) # 256 bytes
return buf
def write(self, buf):
if self.fd < 0:
return -1
n = os.write(self.fd, buf)
return n
# wt310 methods should call read() and write()
def set(self, cmd):
self.write(cmd)
def get(self, cmd):
self.write(cmd)
buf = self.read()
return buf
# send wt310 commands
def readvals(self):
buf = self.get(":NUM:NORM:VAL?\n") # is \n required?
return buf.split(',')
def sample(self):
a = self.readvals()
# default setting. to query, :NUM:NORM:ITEM1? for exampe
# 1: Watt hour, 2: Current, 3: Active Power, 4: Apparent Power
# 5: Reactive Power, 6: Power factor
# note that a's index is the item number minus one
ret = {}
# ret['WH'] = float(a[0])
ret['J'] = float(a[0]) * 3600.0
ret['P'] = float(a[2])
ret['PF'] = float(a[5])
return ret
def start(self):
# set the update interval 100ms (fastest)
self.set(":RATE 100MS")
# set item1 watt-hour
self.set(":NUM:NORM:ITEM1 WH,1")
# start integration
self.set(":INTEG:MODE NORM")
self.set(":INTEG:TIM 2,0,0") # 2 hours
self.set(":INTEG:START")
# ":INTEG:RESET" reset integration
def reset(self):
self.set('*RST') # initialize the settings
self.set(':COMM:REM ON') # set remote mode
def stop(self):
self.set(":INTEG:STOP") # stop integration
import getopt
def usage():
print('')
print('Usage: coolrs_wt310.py [options]')
print('')
print('[options]')
print('')
print('-i int : sampling interval in sec')
print('-o str : output filename')
print('-s str : start the mq producer. str is ip address')
print('--set str : issue command')
print('--get str : query value')
print('')
print('Examples:')
print('$ coolrs_wt310.py -i 0.2 # sample every 0.2 sec')
print('$ coolrs_wt310.py --set=:INTEG:RESET # reset the energy itegration')
print('$ coolrs_wt310.py --get=:NUM:NORM:VAL? # query values')
print('')
if __name__ == '__main__':
interval_sec = .2
outputfn = ''
smqflag = False
ipaddr = ''
cmdmode = ''
cmd = ''
samplemode = False
shortopt = "hi:o:s:"
longopt = ["set=", "get=", "sample"]
try:
opts, args = getopt.getopt(sys.argv[1:],
shortopt, longopt)
except getopt.GetoptError as err:
print(err)
usage()
sys.exit(1)
for o, a in opts:
if o in ('-h'):
usage()
sys.exit(0)
elif o in ('-i'):
interval_sec = float(a)
elif o in ('--sample'):
samplemode = True
elif o in ('--set'):
cmdmode = 'set'
cmd = a
elif o in ('--get'):
cmdmode = 'get'
cmd = a
elif o in ('-o'):
outputfn = a
elif o in ('-s'):
smqflag = True
ipadr = a
else:
print('Unknown:', o, a)
#
#
wt310 = wt310_reader()
if wt310.open():
sys.exit(1)
if samplemode:
s = wt310.sample()
ts = time.time()
str = '# {"sample":"wt310", "time":%.2lf, "power":%.2lf}' % \
(ts, s['P'])
print(str)
sys.exit(0)
if len(cmd) > 0:
if cmdmode == 'set':
wt310.set(cmd)
else:
print(wt310.get(cmd))
sys.exit(0)
f = sys.stdout
if len(outputfn) > 0:
try:
f = open(outputfn, 'w')
except:
print('Error: failed to open', fn)
sys.exit(1)
print('Writing to', outputfn)
cfg = {}
cfg["c1"] = {"label":"Time","unit":"Sec"}
cfg["c1"] = {"label":"Energy","unit":"Joules"}
cfg["c3"] = {"label":"Power", "unit":"Watt"}
#cfg["c"] = {"label":"Power Factor", "unit":""}
# print >>f, json.dumps(cfg)
# XXX: smq is not tested
if smqflag:
mq = smq.producer(ipaddr)
mq.start()
mq.dict = cfg
print('Message queue is started:', ipaddr)
kp = keypress.keypress()
kp.enable()
s = wt310.sample()
time.sleep(1)
sys.stderr.write('Press "q" to terminate\n')
while True:
wt310.start()
s = wt310.sample()
ts = time.time()
ev = s['J'] # energy
pv = s['P'] # power
str = '%.2lf %.2lf %.2lf' % \
(ts, ev, pv)
if math.isnan(ev) or math.isnan(pv):
str = '#' + str
print(str, file=f)
f.flush()
if smqflag:
mq.append(str)
time.sleep(interval_sec)
if kp.available() and kp.readkey() == 'q':
break
wt310.stop()
kp.disable()
sys.stderr.write('done\n')