-
Notifications
You must be signed in to change notification settings - Fork 1
/
util.py
78 lines (69 loc) · 2.59 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# -*- coding: utf-8 -*-
import numpy as np
import time
from pylsl import StreamInlet, resolve_stream
from pythonosc.udp_client import SimpleUDPClient
def osc_single_sample_test(samples, sleepTime, address, client):
# Sends one sample at a time via osc with given sleepTime in s
array = np.arange(1, samples+1).astype('float')
#sum = 0
#t0 = time.time()
for i in np.arange(len(array)):
print (samples)
sample = array[i]
#t1 = time.time()
client.send_message(address, sample)
time.sleep(sleepTime)
print ('Message %s sent'% i)
#t2 = time.time()
#sum += t2-t1
#tf = time.time()
#print ('total time: ', tf-t0)
#print ('comm time total: ', sum)
def send_osc_singleSampleTest(ip, port, samples, sleepTime, address):
# run = type of data: oneSampleTest, Test, Actual
client = SimpleUDPClient(ip, port)
osc_single_sample_test(samples, sleepTime, address, client)
def read_lsl(prop, val):
try:
# Resolve an EEG stream on the network
print ("Looking for EEG stream...")
#stream_name = lsl_get_name()
#print(f'stram name{stream_name}')
#stream_info = lsl_get_streamInfo()
#stream_type = lsl_get_type()
streams = resolve_stream(prop, val)
print ('EEG stream found')
print(f'stream is {streams}')
# Create a new inlet to read from the stream
print ("Opening an inlet")
inlet = StreamInlet(streams[0])
print ("inlet opened")
while True:
print ("Pulling sample")
sample, timestamp = inlet.pull_sample()
print (timestamp, sample)
except KeyboardInterrupt() as e:
print ("Ending program")
raise e
def soloPerformer(ip, port):
try:
# Resolve an EEG stream on the network
print ("Looking for EEG stream...")
streams = resolve_stream('type', 'EEG')
print ('EEG stream found')
print(f'stream is {streams}')
# Create a new inlet to read from the stream
print ("Opening an inlet")
inlet = StreamInlet(streams[0])
print ("inlet opened")
# Create osc client
client = SimpleUDPClient(ip, port)
while True:
print ("Pulling sample")
sample, timestamp = inlet.pullsample()
print (timestamp, sample)
client.send_message(address, sample)
except KeyboardInterrupt() as e:
print ("Ending program")
raise e