Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/ocp_api_context #524

Merged
merged 4 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 25 additions & 26 deletions ovos_core/intent_services/ocp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def handle_play_intent(self, message: Message):
skills=skills, message=message)

# tell OCP to play
self.bus.emit(Message('ovos.common_play.reset'))
self.bus.emit(message.forward('ovos.common_play.reset'))
if not results:
self.speak_dialog("cant.play",
data={"phrase": query,
Expand All @@ -580,17 +580,14 @@ def handle_play_intent(self, message: Message):
LOG.debug(f"OCP Best match: {best}")
results = [r for r in results if r.as_dict != best.as_dict]
results.insert(0, best)
self.bus.emit(Message('add_context',
{'context': "Playing",
'word': "",
'origin': OCP_ID}))
self.set_context("Playing", origin=OCP_ID)

# ovos-PHAL-plugin-mk1 will display music icon in response to play message
player = self.get_player(message)
if not player.ocp_available:
self.legacy_play(results, query)
self.legacy_play(results, query, message=message)
else:
self.ocp_api.play(results, query)
self.ocp_api.play(results, query, source_message=message)

def handle_open_intent(self, message: Message):
LOG.info("Requesting OCP homescreen")
Expand All @@ -606,10 +603,10 @@ def handle_stop_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to stop")
self.legacy_api.stop()
self.legacy_api.stop(source_message=message)
else:
LOG.info("Requesting OCP to stop")
self.ocp_api.stop()
self.ocp_api.stop(source_message=message)
player = self.get_player(message)
player.player_state = PlayerState.STOPPED
self.update_player_proxy(player)
Expand All @@ -618,28 +615,28 @@ def handle_next_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to go to next track")
self.legacy_api.next()
self.legacy_api.next(source_message=message)
else:
LOG.info("Requesting OCP to go to next track")
self.ocp_api.next()
self.ocp_api.next(source_message=message)

def handle_prev_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to go to prev track")
self.legacy_api.prev()
self.legacy_api.prev(source_message=message)
else:
LOG.info("Requesting OCP to go to prev track")
self.ocp_api.prev()
self.ocp_api.prev(source_message=message)

def handle_pause_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to pause")
self.legacy_api.pause()
self.legacy_api.pause(source_message=message)
else:
LOG.info("Requesting OCP to go to pause")
self.ocp_api.pause()
self.ocp_api.pause(source_message=message)
player = self.get_player(message)
player.player_state = PlayerState.PAUSED
self.update_player_proxy(player)
Expand All @@ -648,10 +645,10 @@ def handle_resume_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to resume")
self.legacy_api.resume()
self.legacy_api.resume(source_message=message)
else:
LOG.info("Requesting OCP to go to resume")
self.ocp_api.resume()
self.ocp_api.resume(source_message=message)
player = self.get_player(message)
player.player_state = PlayerState.PLAYING
self.update_player_proxy(player)
Expand All @@ -662,10 +659,10 @@ def handle_search_error_intent(self, message: Message):
player = self.get_player(message)
if not player.ocp_available:
LOG.info("Requesting Legacy AudioService to stop")
self.legacy_api.stop()
self.legacy_api.stop(source_message=message)
else:
LOG.info("Requesting OCP to stop")
self.ocp_api.stop()
self.ocp_api.stop(source_message=message)

# NLP
def voc_match_media(self, query: str, lang: str) -> Tuple[MediaType, float]:
Expand Down Expand Up @@ -896,15 +893,16 @@ def filter_results(self, results: list, phrase: str, lang: str,
def _search(self, phrase: str, media_type: MediaType, lang: str,
skills: Optional[List[str]] = None,
message: Optional[Message] = None) -> list:
self.bus.emit(Message("ovos.common_play.search.start"))
self.bus.emit(message.reply("ovos.common_play.search.start"))
self.enclosure.mouth_think() # animate mk1 mouth during search

# Now we place a query on the messsagebus for anyone who wants to
# attempt to service a 'play.request' message.
results = []
for r in self._execute_query(phrase,
media_type=media_type,
skills=skills):
skills=skills,
message=message):
results += r["results"]

results = self.normalize_results(results)
Expand All @@ -917,18 +915,19 @@ def _search(self, phrase: str, media_type: MediaType, lang: str,
else: # no filtering if skill explicitly requested
LOG.debug(f"Got {len(results)} usable results from {skills}")

self.bus.emit(Message("ovos.common_play.search.end"))
self.bus.emit(message.reply("ovos.common_play.search.end"))
return results

def _execute_query(self, phrase: str,
media_type: MediaType = Union[int, MediaType],
skills: Optional[List[str]] = None) -> list:
skills: Optional[List[str]] = None,
message: Optional[Message] = None) -> list:
""" actually send the search to OCP skills"""
media_type = self._normalize_media_enum(media_type)

with self.search_lock:
# stop any search still happening
self.bus.emit(Message("ovos.common_play.search.stop"))
self.bus.emit(message.reply("ovos.common_play.search.stop"))

query = OCPQuery(query=phrase, media_type=media_type,
config=self.config, bus=self.bus)
Expand All @@ -940,7 +939,7 @@ def _execute_query(self, phrase: str,
LOG.debug(f"{skill_id} can't handle {media_type} queries")
continue
LOG.debug(f"Searching OCP Skill: {skill_id}")
query.send(skill_id)
query.send(skill_id, source_message=message)
query.wait()
results += query.results

Expand Down Expand Up @@ -1026,7 +1025,7 @@ def legacy_play(self, results: List[Union[MediaEntry, Playlist, PluginStream]],
# for legacy audio service we need to do stream extraction here
res.append(r.extract_uri(video=False))

self.legacy_api.play(res, utterance=phrase)
self.legacy_api.play(res, utterance=phrase, source_message=message)

player = self.get_player(message)
player.player_state = PlayerState.PLAYING
Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ padacioso~=0.2, >=0.2.1
adapt-parser>=1.0.0, <2.0.0

ovos-utils>=0.0.38
ovos_bus_client<0.1.0, >=0.0.9a22
ovos_bus_client<0.1.0, >=0.0.9a28
ovos-plugin-manager<0.1.0, >=0.0.25
ovos-config~=0.0,>=0.0.13a8
ovos-lingua-franca>=0.4.7
Expand Down
102 changes: 102 additions & 0 deletions test/end2end/routing/test_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from time import sleep
from unittest import TestCase
from ovos_core.intent_services.ocp_service import PlayerState, MediaState, OCPPlayerProxy

from ovos_bus_client.message import Message
from ovos_bus_client.session import SessionManager, Session
Expand Down Expand Up @@ -81,3 +82,104 @@ def wait_for_n_messages(n):
else:
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")


class TestOCPRouting(TestCase):

def setUp(self):
self.skill_id = "skill-fake-fm.openvoiceos"
self.core = get_minicroft(self.skill_id)

def tearDown(self) -> None:
self.core.stop()

def test_no_session(self):
self.assertIsNotNone(self.core.intent_service.ocp)
messages = []

def new_msg(msg):
nonlocal messages
m = Message.deserialize(msg)
if m.msg_type in ["gui.status.request",
"ovos.common_play.status",
"ovos.skills.settings_changed"]:
return # skip these
messages.append(m)
print(len(messages), m.msg_type, m.context.get("source"), m.context.get("destination"))

def wait_for_n_messages(n):
nonlocal messages
t = time.time()
while len(messages) < n:
sleep(0.1)
if time.time() - t > 10:
raise RuntimeError("did not get the number of expected messages under 10 seconds")

self.core.bus.on("message", new_msg)

sess = Session("test-session",
pipeline=[
"converse",
"ocp_high"
])
self.core.intent_service.ocp.ocp_sessions[sess.session_id] = OCPPlayerProxy(
session_id=sess.session_id, available_extractors=[], ocp_available=True,
player_state=PlayerState.STOPPED, media_state=MediaState.NO_MEDIA)
utt = Message("recognizer_loop:utterance",
{"utterances": ["play some radio station"]},
{"session": sess.serialize(), # explicit
"source": "A", "destination": "B"})
self.core.bus.emit(utt)

# confirm all expected messages are sent
expected_messages = [
"recognizer_loop:utterance",
"intent.service.skills.activated",
"ovos.common_play.activate",
"ocp:play",
"enclosure.active_skill",
"speak",
"ovos.common_play.search.start",
"enclosure.mouth.think",
"ovos.common_play.search.stop", # any ongoing previous search
"ovos.common_play.query", # media type radio
# skill searching (radio)
"ovos.common_play.skill.search_start",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.query.response",
"ovos.common_play.skill.search_end",
"ovos.common_play.search.end",
# good results because of radio media type
"ovos.common_play.reset",
"add_context", # NowPlaying context
"ovos.common_play.play", # OCP api,
"ovos.utterance.handled" # handle_utterance returned (intent service)
]
wait_for_n_messages(len(expected_messages))

self.assertEqual(len(expected_messages), len(messages))

for idx, m in enumerate(messages):
self.assertEqual(m.msg_type, expected_messages[idx])

# verify that source and destination are swapped after utterance
for m in messages:
if m.msg_type in ["recognizer_loop:utterance"]:
self.assertEqual(m.context["source"], "A")
self.assertEqual(m.context["destination"], "B")
elif m.msg_type in ["ovos.common_play.play",
"ovos.common_play.reset",
"ovos.common_play.query"]:
# OCP messages that should make it to the client
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")
elif m.msg_type.startswith("ovos.common_play"):
# internal search messages, should not leak to external clients
self.assertEqual(messages[0].context["source"], "A")
self.assertEqual(messages[0].context["destination"], "B")
else:
self.assertEqual(m.context["source"], "B")
self.assertEqual(m.context["destination"], "A")
Loading