From 54425ddb520eb687f6edd0dc2ad943046f89bc22 Mon Sep 17 00:00:00 2001 From: Oleg Volkov Date: Sun, 28 Apr 2024 18:19:22 +0300 Subject: [PATCH] Test script updated --- test/ble_test.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ test/bt_echo.py | 19 ++++++++++++++++-- 2 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 test/ble_test.py diff --git a/test/ble_test.py b/test/ble_test.py new file mode 100644 index 0000000..08b5029 --- /dev/null +++ b/test/ble_test.py @@ -0,0 +1,51 @@ +import sys +import time +import random +import serial +from serial import Serial + +serial_baud_rate = 19200 +serial_parity = serial.PARITY_EVEN + +max_len = 512 +msg_delay = .1 + +random.seed() + +def random_str(sz): + buff = '' + for i in range(sz): + v = random.randrange(ord('A'), ord('Z') + 1) + buff += chr(v) + return buff + +def read_resp(com, tout=1): + resp = b'' + dline = time.time() + tout + while True: + r = com.read(max_len) + if r: + resp += r + if resp[-1] == term_chr: + return resp + continue + if time.time() > dline: + return resp + +total_bytes = 0 +start = time.time() + +with Serial(sys.argv[1], baudrate=serial_baud_rate, parity=serial_parity, timeout=.1) as com: + try: + while True: + s = random_str(random.randrange(1, max_len)) + msg = s + '_' + s + '#' + com.write(msg) + resp = read_resp(com) + total_bytes += len(msg) + len(resp) + print '.', + time.sleep(msg_delay) + except KeyboardInterrupt: + now = time.time() + print '\n%u bytes transferred (%u bytes/sec)' % (total_bytes, int(total_bytes/(now-start))) + pass diff --git a/test/bt_echo.py b/test/bt_echo.py index 0d408da..d81f060 100644 --- a/test/bt_echo.py +++ b/test/bt_echo.py @@ -6,6 +6,7 @@ max_msg_len = 17*1024 max_msg_offset = 1024 msg_buff = None +idle_delay = .001 def msg_init(): global msg_buff @@ -33,7 +34,8 @@ def receive(sock, size, tout = 5.): deadline = time.time() + tout elif time.time() > deadline: raise RuntimeError('receive timeout, %u out of %u bytes received' % (size - sz, size)) - continue + time.sleep(idle_delay) + continue chunk_sz = len(data) if chunk_sz == size: return data @@ -42,6 +44,18 @@ def receive(sock, size, tout = 5.): sz -= chunk_sz return ''.join(chunks) +def chk_resp(msg, resp): + if msg == resp: + return True + print >> sys.stderr, 'bad response:' + cnt = 0 + for i, sent in enumerate(msg): + if sent != resp[i]: + print >> sys.stderr, '[%u] sent %02x, recv %02x' % (i, ord(sent), ord(resp[i])) + cnt += 1 + print >> sys.stderr, '%u of %u bytes mismatched' % (cnt, len(msg)) + return False + def do_test(addr): sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) sock.connect((addr, 1)) @@ -59,7 +73,8 @@ def do_test(addr): msg_len = len(msg) resp = receive(sock, msg_len) assert len(resp) == msg_len - assert resp == msg + if not chk_resp(msg, resp): + break bytes += msg_len print >> sys.stderr, '.', if bytes > 1000000: