-
Notifications
You must be signed in to change notification settings - Fork 0
/
gpib.py
339 lines (278 loc) · 12.6 KB
/
gpib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import string
import serial, sys, time
import numpy as np
import pickle
from datetime import datetime
import matplotlib.pyplot as plt
import os.path
import matplotlib.backends.backend_pdf
# DEFINE TERMINAL PRINT COLORS
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
pdf = matplotlib.backends.backend_pdf.PdfPages("output.pdf")
debug_flag = False # becomes a bit more verbose
filename = None # set to the pickle file we want to load instead of capturing a new trace
GPIB_delay = 0.5 # delay in seconds after each serial command
# GPIB addressing table
address_talk = list('@' + string.ascii_uppercase) # '@ABCDEFGHIJKLMNOPQRSTUVWXYZ'
address_listen = list(''.join([chr(i) for i in range(32,63)]) ) # ' !"#$%&\'()*+,-./0123456789:;<=>'
# the addresses of my devices
control_index = 10 # Arduino GPIB-USB serial converter (Jacek Greniger GitHub)
fsas_index = 20 # Rohde-Schwarz Spectrum Analyzer 100Hz-2GHz
smcp_index = 1 # Rohde-Schwarz Signal Generator 5kHz-1.3GHz
# verify if an active serial port is present
serial_ports = ["/dev/ttyUSB%d"%i for i in xrange(4)]
serial_open_flag = False
buff = "" # define a global buffer
def cprint(string, color):
print color + string + bcolors.ENDC
def delay(t):
time.sleep(float(t))
# send a serial command followed by a LF CR "\r\n"
def write(cmd):
string = cmd+"\r\n"
s.write(string)
if debug_flag:
cprint(string, bcolors.OKBLUE)
time.sleep(GPIB_delay)
# send a serial command followed by a LF CR "\r\n and listen for a reply"
def query(cmd):
string = cmd+"\r\n"
s.write(string)
time.sleep(GPIB_delay)
# if s.inWaiting():
# tmp = s.readline()
tmp = ""
while s.inWaiting():
tmp += s.read(1)
if debug_flag:
cprint(string+" "+tmp , bcolors.OKBLUE)
# obtain the talker or listener index from a specific address as specified in the GPIB addressing table
def set_listener_talker(listener, talker):
listener_ = address_listen[int(listener)]
talker_ = address_talk[int(talker)]
string = "C_?%s%s" % (listener_, talker_)
if debug_flag:
p = "Listens: %d <- Talks: %d -- CMD: %s" % (listener, talker, string)
cprint(p , bcolors.OKBLUE)
write(string)
def read_trace_data(trace_name):
TRACE_SIZE = 2190 # FSAS dumps 2190 bytes when asked for a trace complete with the header (the first 1802 bytes for 901 points (one int16 each))
query('DTRACE:BLOCK:T_1?') # ask for a trace (1) dump
# switch control interface to listen to commands and FSAS to talk
set_listener_talker(control_index, fsas_index)
p = "Flushing the serial queue..."
cprint(p , bcolors.OKGREEN)
while s.inWaiting(): # flush the serial interface before beginning
s.read(1)
p = "---- NOW RECEIVING TRACE DUMP DATA FROM FSAS ----"
cprint(p , bcolors.WARNING)
buff = ""
while len(buff) < TRACE_SIZE + 10: # we add some margin since we're adding control strings
tmp = ""
write("Y") # read a full buffer of data from the instrument
time.sleep(GPIB_delay) # wait a bit for the data transmission to finish
while s.inWaiting()>0: # is any character present in the serial buff
tmp += s.read(1) # read one byte
buff += tmp
buff += "XXX" # we add a marker to locate different dumps (~395 bytes each)
# remove garbage from buff and return
buff = buff.replace("OK\r\n\xfe", "").replace("XXX\xfe", "")[1:]
# Save binary buffer to disk
with open(trace_name+gpib_buff_file, 'wb') as f:
pickle.dump(buff,f)
# let's decode the uncorrected trace information:
# val = uint16 >> 4
raw_trace_data = np.right_shift(np.fromstring(buff[:], dtype=np.uint16, count=901), 4)
# let's decode the header information: we don't need everything!
# this dictionary contains low level data types for each FSAS property at its byte offset
data_type_dict = {1802:'<B',1803:'<B',1804:'<B', 1805:'<i2', 1807:'<i2', 1809:'<i2', 1811:'<i2',1813:'<B',1814:'<B', 1815:'<i2', 1817:'<B', 1819:'<B', 1853:'<i2',
1896:'<i4', 1914:'<i4', 1920:'<i4', 1926:'<i4', 1930:'<i4'}
# each offset has its full description
data_descript_dict = {1802:'Mode (3: Freq. Analyzer, 4: Scalar Network Analyzer, 6:Communication Analyzer, 7: Receiver)',1803:'keys',1804:'keys', 1805:'Ref. Level (dB * 0.01)', 1807:'Ref. Level Offset (dB * 0.01)', 1809:'Lev. Tracking Gen. (dB * 0.01)', 1811:'Lev. Offset Tracking Gen. (dB * 0.01)',
1813:'RF Attenuator (dB)',1814:'Tracking Generator Attenuation (dB)', 1815:'Mixer Level (dB * 0.01)', 1817:"Range etc", 1819:"Units", 1853:'Averaging Samples',
1896:'Center Freq. (Hz)', 1914:'Start Freq. (Hz)', 1920:'Stop Freq. (Hz)', 1926:'Sweep Time (0.1 ms)', 1930:'RBW (Hz)'}
# each property has a concise name linked to the byte offset
data_name_dict = {'mode':1802,'keys1':1803,'keys2':1804, 'ref_level':1805, 'ref_level_offset':1807, 'level_tracking_gen':1809, 'level_offset_tracking_gen':1811,
'rf_att':1813,'tracking_gen_att':1814, 'mixer_level':1815, "range":1817, "units":1819, 'avg_samples':1853,
'ceter_freq':1896, 'start_freq':1914, 'stop_freq':1920, 'sweep_time':1926, 'rbw':1930}
data_values_dict = {}
# read in the values
cprint("------------------------------", bcolors.OKGREEN)
cprint("A few parameters from the FSAS", bcolors.OKGREEN)
cprint("------------------------------\n", bcolors.OKGREEN)
for off in data_type_dict:
data_values_dict[off] = np.fromstring(buff[off:], dtype=data_type_dict[off], count=1)
print "byte offset:", off, "value:", data_values_dict[off][0],"\t\t", data_descript_dict[off]
############ Visualization
font = {'weight' : 'bold',
'size' : 15}
plt.rc('font', **font)
fig = plt.figure(figsize=(16,16))
start_freq, stop_freq = np.fromstring(buff[1914:], dtype=data_type_dict[1914], count=1)/1e6, np.fromstring(buff[1920:], dtype=data_type_dict[1920], count=1)/1e6
ref_level = np.fromstring(buff[1805:], dtype=data_type_dict[1805], count=1)[0]/100.
ref_level_offset = np.fromstring(buff[1807:], dtype=data_type_dict[1807], count=1)[0]/100.
rbw = np.fromstring(buff[1930:], dtype=data_type_dict[1930], count=1)[0]/1e4
sweep_time = np.fromstring(buff[1926:], dtype=data_type_dict[1926], count=1)[0]/1e4
rf_att = np.fromstring(buff[1813:], dtype=data_type_dict[1813], count=1)[0]
range_ = data_values_dict[ data_name_dict['range'] ]
mask = int('00000111', 2)
if range_&mask == 0:
range_range = 110.
elif range_&mask == 1:
range_range = 100.
elif range_&mask == 2:
range_range = 50.
elif range_&mask == 2:
range_range = 50.
elif range_&mask == 3:
range_range = 20.
elif range_&mask == 4:
range_range = 10.
elif range_&mask == 5:
range_range = 1.
mask = int('00001000', 2)
if range_&mask:
range_scaling = "lin"
else:
range_scaling = "log"
mask = int('00010000', 2)
if range_&mask:
range_axis = "db"
else:
range_axis = "percent" # not implemented yet
mask = int('00100000', 2)
if range_&mask:
range_grid = "abs"
else:
range_grid = "rel" # not implemented yet
mask = int('10000000', 2)
if range_&mask:
range_freq = "lin"
else:
range_freq = "log"
ax = fig.add_subplot(111)#, axisbg='blue')
ax.set_facecolor('blue')
fig.patch.set_facecolor('blue')
if range_scaling == 'log':
trace_data = (raw_trace_data-3938.)/3938.*(range_range)+ref_level
plt.plot(np.linspace(start_freq, stop_freq, 901), trace_data[:], c='lightgreen')
plt.ylim(-range_range+ref_level+ref_level_offset, ref_level+ref_level_offset)
plt.yticks(np.linspace(-range_range+ref_level+ref_level_offset, ref_level+ref_level_offset, 12), color="w")
else: # linear scaling incomplete!
trace_data = (2000. * np.log10(raw_trace_data/3938.)+ref_level)
plt.plot(np.linspace(start_freq, stop_freq, 901), trace_data[:], c='lightgreen')
#plt.yscale('symlog')
# plt.ylim(-range_range+ref_level+ref_level_offset, ref_level+ref_level_offset)
# plt.yticks(np.linspace(-range_range+ref_level+ref_level_offset, ref_level+ref_level_offset, 12), color="w")
plt.grid(color="w")
plt.xlabel("Freq. (MHz)", color="w", fontweight='bold')
plt.ylabel("Power (dBm)", color="w", fontweight='bold')
plt.xlim(start_freq, stop_freq)
plt.xticks(np.linspace(start_freq, stop_freq, 11), color="w")
plt.title("ROHDE & SCHWARZ FSAS "+trace_name, color='w', fontsize=20)
rbw_text = "RBW: "+str(rbw)+" kHz"
rf_att_text = "RF Att: "+str(rf_att)+" dB"
sweep_text = "Sweep: "+str(sweep_time)+" s"
plt.text(0.8,1.08, rbw_text,
horizontalalignment='left',
verticalalignment='center', transform=ax.transAxes, color="w")
plt.text(0.8,1.06, rf_att_text,
horizontalalignment='left',
verticalalignment='center', transform=ax.transAxes, color="w")
plt.text(0.8,1.04, sweep_text,
horizontalalignment='left',
verticalalignment='center', transform=ax.transAxes, color="w")
pdf.savefig(fig, frameon=True, facecolor='b', edgecolor='b',)
print "---------------------------"
print "--- Rohde-Schwarz ---"
print "--- GPIB-SPCI COMMAND ---"
print "--- INTERPRETER ---"
print "---------------------------"
print "--- marco cogoni IS0KYB ---"
print "---------------------------"
print
if len(sys.argv) == 1: # no command line options
print "Command line options:\n"
print "-p commands.txt\t -> load a command sequence from an external file and process it"
exit()
elif len(sys.argv) == 3: # don't connect to the FSAS, just load an old tracedump from file
if sys.argv[1] == "-p": # load program from file
prog_name = sys.argv[2]
with open(prog_name, "rb") as fd:
buff_program = fd.readlines()
print "... loading program <%s> to run! ...\n" % prog_name
else:
exit("Attention! Unknown option!")
else:
exit("Attention! Wrong number of options!")
for p in serial_ports:
if os.path.exists(p):
#s = serial.Serial(port=p, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=None, xonxoff=0, rtscts=0)
s = serial.Serial(port=p, baudrate=115200)
time.sleep(3)
if s.isOpen():
serial_open_flag = True
print "Active Serial Connection:", p
break
if not serial_open_flag:
print "Serial connection not working!!!"
exit()
gpib_buff_file = "GPIB_tracedump.%s.pickle" % str(datetime.now()).split(".")[0].replace(" ", "_")
#########################################
# Start writing on the serial interface #
#########################################
print "Flushing the serial queue..."
while s.inWaiting(): # flush the serial interface before beginning
s.read(1)
skipping_until = False
# Meta GPIB-SPCI INTERPRETER
user_dict = {}
meta_func_dict = {"set_listener_talker":set_listener_talker, "read_trace_data":read_trace_data, "delay":delay}
for row in buff_program:
print row.rstrip()
if row[0] == "#":
continue
if skipping_until and row[0] == ":":
if row[1:] == skipping_until:
skipping_until = False
elif skipping_until and row[0] != ":":
print "SKIP! ",
continue
elif "#" in row:
row = row.split("#")[0].rstrip()
elif row[0]=="$" and ":=" in row:
var, val = row[1:].split(":=")[0], (row[1:].split(":=")[1])
user_dict[var] = val
print "DEFINED NEW USER VARIABLE:", var, "=", user_dict[var]
elif row[0]=="!":
func, args = row[1:].split("(")[0], [el for el in row[1:].split("(")[1].split(",") ]
args = [el.replace(")", "").strip() for el in args] # remove trailing and preceding spaces
args_ = []
for el in args:
if el in user_dict.keys():
tmp = user_dict[el].strip()
args_.append(tmp)
else:
args_.append(el)
if "" in args_:
args_.remove("")
print "CALLING METAFUNCTION:", func, args_
if len(args_)>=1:
meta_func_dict[func](*args_)
else:
meta_func_dict[func]()
elif row[0:5]=="GOTO:":
skipping_until = row.split(":")[1] #.strip()
print "SKIP TIL", skipping_until
else: # run generic GPIB-SPCI command
query(row)
s.close() # close serial connection
pdf.close()