-
Notifications
You must be signed in to change notification settings - Fork 3
/
monitor.py
190 lines (147 loc) · 5.26 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# monitor.py
# Monitor an asynchronous program by sending single bytes down an interface.
# Copyright (c) 2021-2024 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# V0.2 Supports monitoring dual-core applications on RP2
import asyncio
from machine import UART, SPI, Pin, disable_irq, enable_irq
from time import sleep_us
from sys import exit
_lck = False
try:
import _thread
_lock = _thread.allocate_lock()
_acquire = _lock.acquire
_release = _lock.release
except ImportError: # Only source of hard concurrency is IRQ
def _acquire(): # Crude blocking lock. Held only for short periods.
global _lck
istate = disable_irq()
while _lck:
pass
_lck = True
enable_irq(istate)
def _release():
global _lck
_lck = False
# Quit with an error message rather than throw.
def _quit(s):
print("Monitor", s)
exit(0)
_write = lambda _: _quit("must run set_device")
_ifrst = lambda: None # Reset interface. If UART do nothing.
# For UART pass initialised UART. Baudrate must be 1_000_000.
# For SPI pass initialised instance SPI. Can be any baudrate, but
# must be default in other respects.
def set_device(dev, cspin=None):
global _write, _ifrst
if isinstance(dev, UART) and cspin is None: # UART
def uwrite(data, buf=bytearray(1)):
_acquire()
buf[0] = data
dev.write(buf)
_release()
_write = uwrite
elif isinstance(dev, SPI) and isinstance(cspin, Pin):
cspin(1)
def spiwrite(data, buf=bytearray(1)):
_acquire()
buf[0] = data
cspin(0)
dev.write(buf)
cspin(1)
_release()
_write = spiwrite
def clear_sm(): # Set Pico SM to its initial state
cspin(1)
dev.write(b"\0") # SM is now waiting for CS low.
_ifrst = clear_sm
else:
_quit("set_device: invalid args.")
# Valid idents are 0..21
# Looping: some idents may be repeatedly instantiated. This can occur
# if decorator is run in looping code. A CM is likely to be used in a
# loop. In these cases only validate on first use.
def _validate(ident, num=1, looping=False, loopers=set(), available=set(range(0, 22))):
if ident >= 0 and ident + num < 22:
try:
for x in range(ident, ident + num):
if looping:
if x not in loopers:
available.remove(x)
loopers.add(x)
else:
available.remove(x)
except KeyError:
_quit("error - ident {:02d} already allocated.".format(x))
else:
_quit("error - ident {:02d} out of range.".format(ident))
# /mnt/qnap2/data/Projects/Python/AssortedTechniques/decorators
# asynchronous monitor
def asyn(ident, max_instances=1, verbose=True, looping=False):
def decorator(coro):
_validate(ident, max_instances, looping)
instance = 0
async def wrapped_coro(*args, **kwargs):
nonlocal instance
d = 0x40 + ident + min(instance, max_instances - 1)
instance += 1
if verbose and instance > max_instances: # Warning only.
print("Monitor ident: {:02d} instances: {}.".format(ident, instance))
_write(d)
try:
res = await coro(*args, **kwargs)
except asyncio.CancelledError:
raise # Other exceptions produce traceback.
finally:
_write(d | 0x20)
instance -= 1
return res
return wrapped_coro
return decorator
# If SPI, clears the state machine in case prior test resulted in the DUT
# crashing. It does this by sending a byte with CS\ False (high).
def init():
_ifrst() # Reset interface. Does nothing if UART.
_write(ord("z")) # Clear Pico's instance counters etc.
# Optionally run this to show up periods of blocking behaviour
async def hog_detect(s=(0x40, 0x60)):
while True:
for v in s:
_write(v)
await asyncio.sleep_ms(0)
# Monitor a synchronous function definition
def sync(ident, looping=False):
def decorator(func):
_validate(ident, 1, looping)
def wrapped_func(*args, **kwargs):
_write(0x40 + ident)
res = func(*args, **kwargs)
_write(0x60 + ident)
return res
return wrapped_func
return decorator
# Monitor a function call
class mon_call:
def __init__(self, ident):
# looping: a CM may be instantiated many times
_validate(ident, 1, True)
self.ident = ident
def __enter__(self):
_write(0x40 + self.ident)
return self
def __exit__(self, type, value, traceback):
_write(0x60 + self.ident)
return False # Don't silence exceptions
# Either cause pico ident n to produce a brief (~80μs) pulse or turn it
# on or off on demand. No looping: docs suggest instantiating at start.
def trigger(ident):
_validate(ident)
def wrapped(state=None):
if state is None:
_write(0x40 + ident)
sleep_us(20)
_write(0x60 + ident)
else:
_write(ident + (0x40 if state else 0x60))
return wrapped