-
Notifications
You must be signed in to change notification settings - Fork 6
/
run.py
executable file
·123 lines (105 loc) · 5.2 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from Logger import *
from Experiment import *
from Stimulus import *
import sys
from datetime import datetime, timedelta
logg = RPLogger() # setup logger & timer
logg.log_setup() # publish IP and make setup available
stim = Stimulus(logg)
stim.setup()
stim.unshow([0, 0, 0])
def train(logger=logg):
""" Run training experiment """
# # # # # Global Run # # # # #
while logger.get_setup_state() == 'running':
# # # # # Prepare # # # # #
logger.init_params() # clear settings from previous session
logger.log_session() # start session
params = (Task() & dict(task_idx=logger.task_idx)).fetch1() # get parameters
timer = Timer() # main timer for trials
exprmt = eval(params['exp_type'])(logger, timer, params) # get experiment & init
exprmt.prepare() # prepare stuff
# # # # # Session Run # # # # #
while exprmt.run():
# # # # # PAUSE # # # # #
now = datetime.now()
start = params['start_time'] + now.replace(hour=0, minute=0, second=0)
stop = params['stop_time'] + now.replace(hour=0, minute=0, second=0)
if stop < start:
stop = stop + timedelta(days=1)
if now < start or now > stop:
logger.update_setup_state('offtime')
exprmt.stim.unshow([0, 0, 0])
while (now < start or now > stop) and logger.get_setup_state() == 'offtime':
logger.ping()
now = datetime.now()
start = params['start_time'] + now.replace(hour=0, minute=0, second=0)
stop = params['stop_time'] + now.replace(hour=0, minute=0, second=0)
if stop < start:
stop = stop + timedelta(days=1)
time.sleep(5)
if logger.get_setup_state() == 'offtime':
logger.update_setup_state('running')
exprmt.stim.unshow()
break
# # # # # Pre-Trial period # # # # #
break_trial = exprmt.pre_trial()
if break_trial:
break
# # # # # Trial period # # # # #
timer.start() # Start countdown for response
while timer.elapsed_time() < params['trial_duration']*1000: # response period
break_trial = exprmt.trial() # get appropriate response
if break_trial:
break # break if experiment calls for it
# # # # # Post-Trial Period # # # # #
exprmt.post_trial()
# # # # # Intertrial period # # # # #
timer.start()
while timer.elapsed_time() < params['intertrial_duration']*1000:
exprmt.inter_trial()
# # # # # Cleanup # # # # #
exprmt.cleanup()
def calibrate(logger=logg):
""" Lickspout liquid delivery calibration """
task_idx = (SetupInfo() & dict(setup=logger.setup)).fetch1('task_idx')
duration, probes, pulsenum, pulse_interval, save, probe_control = \
(CalibrationTask() & dict(task_idx=task_idx)).fetch1(
'pulse_dur', 'probe', 'pulse_num', 'pulse_interval', 'save', 'probe_control')
probes = eval(probes)
valve = eval(probe_control)(logger) # get valve object
print('Running calibration')
pulse = 0
stim = Stimulus(logger)
stim.setup()
font = pygame.font.SysFont("comicsansms", 100)
while pulse < pulsenum:
text = font.render('Pulse %d/%d' % (pulse + 1, pulsenum), True, (0, 128, 0))
stim.screen.fill((255, 255, 255))
stim.screen.blit(text, (stim.size[1]/4, stim.size[1]/2))
stim.flip()
for probe in probes:
valve.give_liquid(probe, duration, False) # release liquid
time.sleep(duration / 1000 + pulse_interval / 1000) # wait for next pulse
pulse += 1 # update trial
if save == 'yes':
for probe in probes:
logger.log_pulse_weight(duration, probe, pulsenum) # insert
stim.screen.fill((255, 255, 255))
stim.screen.blit(font.render('Done calibrating', True, (0, 128, 0)), (stim.size[1]/4, stim.size[1]/2))
stim.flip()
valve.cleanup()
# # # # Waiting for instructions loop # # # # #
while not logg.get_setup_state() == 'stopped':
while logg.get_setup_state() == 'ready': # wait for remote start
time.sleep(1)
logg.ping()
if not logg.get_setup_state() == 'stopped': # run experiment unless stopped
#try:
eval(logg.get_setup_task())(logg)
logg.update_setup_state('ready') # update setup state
#except:
# print("Unexpected error:", sys.exc_info()[0])
# logg.update_setup_state('stopped', sys.exc_info()[0])
# # # # # Exit # # # # #
sys.exit(0)