Skip to content

Commit

Permalink
Test script updated
Browse files Browse the repository at this point in the history
  • Loading branch information
olegv142 committed Apr 28, 2024
1 parent 4e81e28 commit 54425dd
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 2 deletions.
51 changes: 51 additions & 0 deletions test/ble_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import sys
import time
import random
import serial
from serial import Serial

serial_baud_rate = 19200
serial_parity = serial.PARITY_EVEN

max_len = 512
msg_delay = .1

random.seed()

def random_str(sz):
buff = ''
for i in range(sz):
v = random.randrange(ord('A'), ord('Z') + 1)
buff += chr(v)
return buff

def read_resp(com, tout=1):
resp = b''
dline = time.time() + tout
while True:
r = com.read(max_len)
if r:
resp += r
if resp[-1] == term_chr:
return resp
continue
if time.time() > dline:
return resp

total_bytes = 0
start = time.time()

with Serial(sys.argv[1], baudrate=serial_baud_rate, parity=serial_parity, timeout=.1) as com:
try:
while True:
s = random_str(random.randrange(1, max_len))
msg = s + '_' + s + '#'
com.write(msg)
resp = read_resp(com)
total_bytes += len(msg) + len(resp)
print '.',
time.sleep(msg_delay)
except KeyboardInterrupt:
now = time.time()
print '\n%u bytes transferred (%u bytes/sec)' % (total_bytes, int(total_bytes/(now-start)))
pass
19 changes: 17 additions & 2 deletions test/bt_echo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
max_msg_len = 17*1024
max_msg_offset = 1024
msg_buff = None
idle_delay = .001

def msg_init():
global msg_buff
Expand Down Expand Up @@ -33,7 +34,8 @@ def receive(sock, size, tout = 5.):
deadline = time.time() + tout
elif time.time() > deadline:
raise RuntimeError('receive timeout, %u out of %u bytes received' % (size - sz, size))
continue
time.sleep(idle_delay)
continue
chunk_sz = len(data)
if chunk_sz == size:
return data
Expand All @@ -42,6 +44,18 @@ def receive(sock, size, tout = 5.):
sz -= chunk_sz
return ''.join(chunks)

def chk_resp(msg, resp):
if msg == resp:
return True
print >> sys.stderr, 'bad response:'
cnt = 0
for i, sent in enumerate(msg):
if sent != resp[i]:
print >> sys.stderr, '[%u] sent %02x, recv %02x' % (i, ord(sent), ord(resp[i]))
cnt += 1
print >> sys.stderr, '%u of %u bytes mismatched' % (cnt, len(msg))
return False

def do_test(addr):
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
sock.connect((addr, 1))
Expand All @@ -59,7 +73,8 @@ def do_test(addr):
msg_len = len(msg)
resp = receive(sock, msg_len)
assert len(resp) == msg_len
assert resp == msg
if not chk_resp(msg, resp):
break
bytes += msg_len
print >> sys.stderr, '.',
if bytes > 1000000:
Expand Down

0 comments on commit 54425dd

Please sign in to comment.