-
Notifications
You must be signed in to change notification settings - Fork 5
/
basic_awareness.py
152 lines (115 loc) · 4.06 KB
/
basic_awareness.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import qi
import argparse
import sys
import time
class HumanTrackedEventWatcher(object):
""" A class to react to HumanTracked and PeopleLeft events """
def __init__(self, app):
"""
Initialisation of qi framework and event detection.
"""
super(HumanTrackedEventWatcher, self).__init__()
try:
app.start()
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " +
str(args.port) + ".\n")
sys.exit(1)
session = app.session
self.subscribers_list = []
self.is_speech_reco_started = False
# SUBSCRIBING SERVICES
self.tts = session.service("ALTextToSpeech")
self.memory = session.service("ALMemory")
self.motion = session.service("ALMotion")
self.speech_reco = session.service("ALSpeechRecognition")
self.basic_awareness = session.service("ALBasicAwareness")
self.posture_service = session.service("ALRobotPosture")
def create_callbacks(self):
self.connect_callback("ALBasicAwareness/HumanTracked",
self.on_human_tracked)
self.connect_callback("ALBasicAwareness/HumanLost",
self.on_people_left)
return
def connect_callback(self, event_name, callback_func):
""" connect a callback for a given event """
print "Callback connection"
subscriber = self.memory.subscriber(event_name)
subscriber.signal.connect(callback_func)
self.subscribers_list.append(subscriber)
return
def on_human_tracked(self, value):
""" callback for event HumanTracked """
print "Got HumanTracked: detected person with ID:", str(value)
if value >= 0: # found a new person
self.pepper_speak("Ohh I found a new person, let me tag you with id {}".format(value))
position_human = self.get_people_perception_data(value)
[x, y, z] = position_human
print "The tracked person with ID", value, "is at the position:", \
"x=", x, "/ y=", y, "/ z=", z
return
def on_people_left(self, value):
""" callback for event PeopleLeft """
print "Got PeopleLeft: lost person", str(value)
self.pepper_speak("Ohh No ! I lost the person")
def pepper_speak(self, msg):
sentence = "\RSPD="+ str( 100 ) + "\ "
sentence += "\VCT="+ str( 100 ) + "\ "
sentence += msg
sentence += "\RST\ "
self.tts.say(str(sentence))
return
def get_people_perception_data(self, id_person_tracked):
"""
return information related to the person who has the id
"id_person_tracked" from People Perception
"""
memory_key = "PeoplePerception/Person/" + str(id_person_tracked) + \
"/PositionInWorldFrame"
return self.memory.getData(memory_key)
def start_basic_awareness(self):
print "Starting BasicAwareness with the fully engaged mode"
self.basic_awareness.setEngagementMode("FullyEngaged")
self.basic_awareness.setEnabled(True)
return
def stop_basic_awareness(self):
print "Stopping BasicAwareness"
self.basic_awareness.setEnabled(False)
self.posture_service.goToPosture("StandInit", 0.1)
return
def run(self):
"""
this example uses the setEngagementMode, startAwareness and
stopAwareness methods
"""
# start
print "Waiting for the robot to be in wake up position"
self.motion.wakeUp()
self.create_callbacks()
self.start_basic_awareness()
# loop on, wait for events until manual interruption
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print "Interrupted by user, shutting down"
# stop
self.stop_basic_awareness()
print "Waiting for the robot to be in rest position"
self.motion.rest()
sys.exit(0)
return
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="10.9.45.11",
help="Robot IP address. On robot or Local Naoqi: use \
'127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["HumanTrackedEventWatcher",
"--qi-url=" + connection_url])
human_tracked_event_watcher = HumanTrackedEventWatcher(app)
human_tracked_event_watcher.run()