Skip to content

Commit

Permalink
refactor/pipeline_shutdown (#484)
Browse files Browse the repository at this point in the history
* refactor/pipeline_shutdown

adds shutdown method to pipeline components, allowing to deregister bus events etc

removes fallback V1 pipeline, that was only to help in transition, ovos-workshop only uses fallbackV1 if it detects ovos-core <= 0.0.7 it is impossible to use with ovos-core 0.0.8 in a natural way, standalone skills were introduced in core 0.0.8 and also use V2

see https://github.com/OpenVoiceOS/OVOS-workshop/blob/dev/ovos_workshop/skills/fallback.py#L74

when used, it only introduces 10 seconds latency for no good reason

streamlines main.py

* drop old test

* update test

* update test

* add a shutdown test

* add a shutdown test

* add a shutdown test

* tests not depending on padatious installed
  • Loading branch information
JarbasAl authored May 28, 2024
1 parent 0cead56 commit 87b756b
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 347 deletions.
1 change: 0 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
pip install ./test/unittests/common_query/ovos_tskill_fakewiki
pip install ./test/end2end/skill-ovos-hello-world
pip install ./test/end2end/skill-ovos-fallback-unknown
pip install ./test/end2end/skill-ovos-fallback-unknownv1
pip install ./test/end2end/skill-converse_test
pip install ./test/end2end/skill-ovos-schedule
pip install ./test/end2end/skill-new-stop
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ jobs:
pip install ./test/unittests/common_query/ovos_tskill_fakewiki
pip install ./test/end2end/skill-ovos-hello-world
pip install ./test/end2end/skill-ovos-fallback-unknown
pip install ./test/end2end/skill-ovos-fallback-unknownv1
pip install ./test/end2end/skill-converse_test
pip install ./test/end2end/skill-ovos-schedule
pip install ./test/end2end/skill-new-stop
Expand Down
46 changes: 11 additions & 35 deletions ovos_core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,20 @@
#
"""Daemon launched at startup to handle skill activities.
In this repo, you will not find an entry called mycroft-skills in the bin
directory. The executable gets added to the bin directory when installed
The executable gets added to the bin directory when installed
(see setup.py)
"""

from ovos_config.locale import setup_locale

from ovos_bus_client import MessageBusClient
from ovos_bus_client.util.scheduler import EventScheduler
from ovos_workshop.skills.api import SkillApi
from ovos_config.locale import setup_locale
from ovos_core.intent_services import IntentService
from ovos_core.skill_installer import SkillsStore
from ovos_core.skill_manager import SkillManager, on_error, on_stopping, on_ready, on_alive, on_started
from ovos_utils import wait_for_exit_signal
from ovos_utils.log import LOG, init_service_logger
from ovos_utils.process_utils import reset_sigint_handler
from ovos_core.skill_installer import SkillsStore
from ovos_workshop.skills.fallback import FallbackSkill
from ovos_workshop.skills.api import SkillApi


def main(alive_hook=on_alive, started_hook=on_started, ready_hook=on_ready,
Expand All @@ -49,7 +46,9 @@ def main(alive_hook=on_alive, started_hook=on_started, ready_hook=on_ready,
bus = MessageBusClient()
bus.run_in_thread()
bus.connected_event.wait()
_register_intent_services(bus)

intents = IntentService(bus)

event_scheduler = EventScheduler(bus, autostart=False)
event_scheduler.daemon = True
event_scheduler.start()
Expand All @@ -68,34 +67,11 @@ def main(alive_hook=on_alive, started_hook=on_started, ready_hook=on_ready,

wait_for_exit_signal()

shutdown(skill_manager, event_scheduler, osm)
intents.shutdown()
osm.shutdown()
skill_manager.stop()
event_scheduler.shutdown()


def _register_intent_services(bus):
"""Start up the all intent services and connect them as needed.
Args:
bus: messagebus client to register the services on
"""
service = IntentService(bus)
# Register handler to trigger fallback system
bus.on(
'mycroft.skills.fallback',
FallbackSkill.make_intent_failure_handler(bus)
)
return service


def shutdown(skill_manager, event_scheduler, osm):
LOG.info('Shutting down Skills service')
if event_scheduler is not None:
event_scheduler.shutdown()
# Terminate all running threads that update skills
if skill_manager is not None:
skill_manager.stop()
skill_manager.join()
if osm is not None:
osm.shutdown()
LOG.info('Skills service shutdown complete!')


Expand Down
57 changes: 43 additions & 14 deletions ovos_core/intent_services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from ovos_utils.metrics import Stopwatch
from ovos_workshop.intents import open_intent_envelope


# Intent match response tuple containing
# intent_service: Name of the service that matched the intent
# intent_type: intent name (used to call intent handler over the message bus)
Expand Down Expand Up @@ -60,12 +59,16 @@ def __init__(self, bus, config=None):

# TODO - replace with plugins
self.adapt_service = AdaptService()
self.padatious_service = None
try:
from ovos_core.intent_services.padatious_service import PadatiousService
self.padatious_service = PadatiousService(bus, self.config["padatious"])
if self.config["padatious"].get("disabled"):
LOG.info("padatious forcefully disabled in config")
else:
from ovos_core.intent_services.padatious_service import PadatiousService
self.padatious_service = PadatiousService(bus, self.config["padatious"])
except ImportError:
LOG.error(f'Failed to create padatious intent handlers, padatious not installed')
self.padatious_service = None

self.padacioso_service = PadaciosoService(bus, self.config["padatious"])
self.fallback = FallbackService(bus)
self.converse = ConverseService(bus)
Expand Down Expand Up @@ -99,16 +102,11 @@ def __init__(self, bus, config=None):
self.bus.on('intent.service.intent.get', self.handle_get_intent)
self.bus.on('intent.service.skills.get', self.handle_get_skills)
self.bus.on('intent.service.adapt.get', self.handle_get_adapt)
self.bus.on('intent.service.adapt.manifest.get',
self.handle_adapt_manifest)
self.bus.on('intent.service.adapt.vocab.manifest.get',
self.handle_vocab_manifest)
self.bus.on('intent.service.padatious.get',
self.handle_get_padatious)
self.bus.on('intent.service.padatious.manifest.get',
self.handle_padatious_manifest)
self.bus.on('intent.service.padatious.entities.manifest.get',
self.handle_entity_manifest)
self.bus.on('intent.service.adapt.manifest.get', self.handle_adapt_manifest)
self.bus.on('intent.service.adapt.vocab.manifest.get', self.handle_vocab_manifest)
self.bus.on('intent.service.padatious.get', self.handle_get_padatious)
self.bus.on('intent.service.padatious.manifest.get', self.handle_padatious_manifest)
self.bus.on('intent.service.padatious.entities.manifest.get', self.handle_entity_manifest)

def _load_ocp_pipeline(self):
"""EXPERIMENTAL: this feature is not yet ready for end users"""
Expand Down Expand Up @@ -603,6 +601,37 @@ def handle_entity_manifest(self, message):
"intent.service.padatious.entities.manifest",
{"entities": self.padacioso_service.registered_entities}))

def shutdown(self):
self.utterance_plugins.shutdown()
self.metadata_plugins.shutdown()
self.adapt_service.shutdown()
self.padacioso_service.shutdown()
if self.padatious_service:
self.padatious_service.shutdown()
self.common_qa.shutdown()
self.converse.shutdown()
self.fallback.shutdown()
if self.ocp:
self.ocp.shutdown()

self.bus.remove('register_vocab', self.handle_register_vocab)
self.bus.remove('register_intent', self.handle_register_intent)
self.bus.remove('recognizer_loop:utterance', self.handle_utterance)
self.bus.remove('detach_intent', self.handle_detach_intent)
self.bus.remove('detach_skill', self.handle_detach_skill)
self.bus.remove('add_context', self.handle_add_context)
self.bus.remove('remove_context', self.handle_remove_context)
self.bus.remove('clear_context', self.handle_clear_context)
self.bus.remove('mycroft.skills.loaded', self.update_skill_name_dict)
self.bus.remove('intent.service.intent.get', self.handle_get_intent)
self.bus.remove('intent.service.skills.get', self.handle_get_skills)
self.bus.remove('intent.service.adapt.get', self.handle_get_adapt)
self.bus.remove('intent.service.adapt.manifest.get', self.handle_adapt_manifest)
self.bus.remove('intent.service.adapt.vocab.manifest.get', self.handle_vocab_manifest)
self.bus.remove('intent.service.padatious.get', self.handle_get_padatious)
self.bus.remove('intent.service.padatious.manifest.get', self.handle_padatious_manifest)
self.bus.remove('intent.service.padatious.entities.manifest.get', self.handle_entity_manifest)


def _is_old_style_keyword_message(message):
"""Simple check that the message is not using the updated format.
Expand Down
5 changes: 5 additions & 0 deletions ovos_core/intent_services/adapt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,8 @@ def detach_intent(self, intent_name):
p for p in self.engines[lang].intent_parsers if p.name != intent_name
]
self.engines[lang].intent_parsers = new_parsers

def shutdown(self):
for lang in self.engines:
parsers = self.engines[lang].intent_parsers
self.engines[lang].drop_intent_parser(parsers)
5 changes: 5 additions & 0 deletions ovos_core/intent_services/commonqa_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,8 @@ def _query_timeout(self, message: Message):
else:
query.answered = False
query.completed.set()

def shutdown(self):
self.bus.remove('question:query.response', self.handle_query_response)
self.bus.remove('common_query.question', self.handle_question)
self.bus.remove('ovos.common_query.pong', self.handle_skill_pong)
9 changes: 9 additions & 0 deletions ovos_core/intent_services/converse_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,3 +378,12 @@ def handle_get_active_skills(self, message):
"""
self.bus.emit(message.reply("intent.service.active_skills.reply",
{"skills": self.get_active_skills(message)}))

def shutdown(self):
self.bus.remove('mycroft.speech.recognition.unknown', self.reset_converse)
self.bus.remove('intent.service.skills.deactivate', self.handle_deactivate_skill_request)
self.bus.remove('intent.service.skills.activate', self.handle_activate_skill_request)
self.bus.remove('active_skill_request', self.handle_activate_skill_request) # TODO backwards compat, deprecate
self.bus.remove('intent.service.active_skills.get', self.handle_get_active_skills)
self.bus.remove("skill.converse.get_response.enable", self.handle_get_response_enable)
self.bus.remove("skill.converse.get_response.disable", self.handle_get_response_disable)
17 changes: 4 additions & 13 deletions ovos_core/intent_services/fallback_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,6 @@ def _fallback_range(self, utterances, lang, message, fb_range):
result = self.attempt_fallback(utterances, skill_id, lang, message)
if result:
return ovos_core.intent_services.IntentMatch('Fallback', None, {}, skill_id, utterances[0])

# old style deprecated fallback skill singleton class
LOG.debug("checking for FallbackSkillsV1")
msg = message.reply(
'mycroft.skills.fallback',
data={'utterance': utterances[0],
'lang': lang,
'fallback_range': (fb_range.start, fb_range.stop)}
)
response = self.bus.wait_for_response(msg, timeout=10)

if response and response.data['handled']:
return ovos_core.intent_services.IntentMatch('Fallback', None, {}, None, utterances[0])
return None

def high_prio(self, utterances, lang, message):
Expand All @@ -195,3 +182,7 @@ def low_prio(self, utterances, lang, message):
"""Low prio fallbacks with general matching such as chat-bot."""
return self._fallback_range(utterances, lang, message,
FallbackRange(90, 101))

def shutdown(self):
self.bus.remove("ovos.skills.fallback.register", self.handle_register_fallback)
self.bus.remove("ovos.skills.fallback.deregister", self.handle_deregister_fallback)
27 changes: 27 additions & 0 deletions ovos_core/intent_services/ocp_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,33 @@ def handle_legacy_cps(self, message: Message):
self.bus.emit(message.forward("mycroft.audio.play_sound",
{"uri": "snd/error.mp3"}))

def shutdown(self):
self.mycroft_cps.shutdown()
self.bus.remove("ovos.common_play.search", self.handle_search_query)
self.bus.remove("ovos.common_play.play_search", self.handle_play_search)
self.bus.remove('ovos.common_play.status.response', self.handle_player_state_update)
self.bus.remove('ovos.common_play.track.state', self.handle_track_state_update)
self.bus.remove('ovos.common_play.SEI.get.response', self.handle_get_SEIs)
self.bus.remove('ovos.common_play.register_keyword', self.handle_skill_keyword_register)
self.bus.remove('ovos.common_play.deregister_keyword', self.handle_skill_keyword_deregister)
self.bus.remove('ovos.common_play.announce', self.handle_skill_register)
self.bus.remove("mycroft.audio.playing_track", self._handle_legacy_audio_start)
self.bus.remove("mycroft.audio.queue_end", self._handle_legacy_audio_end)
self.bus.remove("mycroft.audio.service.pause", self._handle_legacy_audio_pause)
self.bus.remove("mycroft.audio.service.resume", self._handle_legacy_audio_resume)
self.bus.remove("mycroft.audio.service.stop", self._handle_legacy_audio_stop)
self.bus.remove("ocp:play", self.handle_play_intent)
self.bus.remove("ocp:play_favorites", self.handle_play_favorites_intent)
self.bus.remove("ocp:open", self.handle_open_intent)
self.bus.remove("ocp:next", self.handle_next_intent)
self.bus.remove("ocp:prev", self.handle_prev_intent)
self.bus.remove("ocp:pause", self.handle_pause_intent)
self.bus.remove("ocp:resume", self.handle_resume_intent)
self.bus.remove("ocp:media_stop", self.handle_stop_intent)
self.bus.remove("ocp:search_error", self.handle_search_error_intent)
self.bus.remove("ocp:like_song", self.handle_like_intent)
self.bus.remove("ocp:legacy_cps", self.handle_legacy_cps)


class LegacyCommonPlay:
""" interface for mycroft common play
Expand Down
6 changes: 6 additions & 0 deletions ovos_core/intent_services/padacioso_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def calc_intent(self, utterances: List[str], lang: str = None) -> Optional[Padac
if intents:
return max(intents, key=lambda k: k.conf)

def shutdown(self):
self.bus.remove('padatious:register_intent', self.register_intent)
self.bus.remove('padatious:register_entity', self.register_entity)
self.bus.remove('detach_intent', self.handle_detach_intent)
self.bus.remove('detach_skill', self.handle_detach_skill)


@lru_cache(maxsize=3) # repeat calls under different conf levels wont re-run code
def _calc_padacioso_intent(utt, intent_container) -> \
Expand Down
7 changes: 7 additions & 0 deletions ovos_core/intent_services/padatious_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ def calc_intent(self, utterances: List[str], lang: str = None) -> Optional[Padat
if intents:
return max(intents, key=lambda k: k.conf)

def shutdown(self):
self.bus.remove('padatious:register_intent', self.register_intent)
self.bus.remove('padatious:register_entity', self.register_entity)
self.bus.remove('detach_intent', self.handle_detach_intent)
self.bus.remove('detach_skill', self.handle_detach_skill)
self.bus.remove('mycroft.skills.initialized', self.train)


@lru_cache(maxsize=3) # repeat calls under different conf levels wont re-run code
def _calc_padatious_intent(utt, intent_container) -> Optional[PadatiousIntent]:
Expand Down
54 changes: 5 additions & 49 deletions test/end2end/session/test_complete_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,7 @@ def wait_for_n_messages(n):
# Converse
f"{self.skill_id}.converse.ping",
"skill.converse.pong",
# FallbackV1
"mycroft.skills.fallback",
"mycroft.skill.handler.start",
"mycroft.skill.handler.complete",
"mycroft.skills.fallback.response",

"mycroft.skills.fallback",
"mycroft.skill.handler.start",
"mycroft.skill.handler.complete",
"mycroft.skills.fallback.response",

"mycroft.skills.fallback",
"mycroft.skill.handler.start",
"mycroft.skill.handler.complete",
"mycroft.skills.fallback.response",
# complete intent failure
"mycroft.audio.play_sound",
"complete_intent_failure",
Expand All @@ -104,44 +90,14 @@ def wait_for_n_messages(n):
self.assertEqual(messages[2].context["skill_id"], self.skill_id)
self.assertFalse(messages[2].data["can_handle"])

# high prio fallback
self.assertEqual(messages[3].msg_type, "mycroft.skills.fallback")
self.assertEqual(messages[3].data["fallback_range"], [0, 5])
self.assertEqual(messages[4].msg_type, "mycroft.skill.handler.start")
self.assertEqual(messages[4].data["handler"], "fallback")
self.assertEqual(messages[5].msg_type, "mycroft.skill.handler.complete")
self.assertEqual(messages[5].data["handler"], "fallback")
self.assertEqual(messages[6].msg_type, "mycroft.skills.fallback.response")
self.assertFalse(messages[6].data["handled"])

# medium prio fallback
self.assertEqual(messages[7].msg_type, "mycroft.skills.fallback")
self.assertEqual(messages[7].data["fallback_range"], [5, 90])
self.assertEqual(messages[8].msg_type, "mycroft.skill.handler.start")
self.assertEqual(messages[8].data["handler"], "fallback")
self.assertEqual(messages[9].msg_type, "mycroft.skill.handler.complete")
self.assertEqual(messages[9].data["handler"], "fallback")
self.assertEqual(messages[10].msg_type, "mycroft.skills.fallback.response")
self.assertFalse(messages[10].data["handled"])

# low prio fallback
self.assertEqual(messages[11].msg_type, "mycroft.skills.fallback")
self.assertEqual(messages[11].data["fallback_range"], [90, 101])
self.assertEqual(messages[12].msg_type, "mycroft.skill.handler.start")
self.assertEqual(messages[12].data["handler"], "fallback")
self.assertEqual(messages[13].msg_type, "mycroft.skill.handler.complete")
self.assertEqual(messages[13].data["handler"], "fallback")
self.assertEqual(messages[14].msg_type, "mycroft.skills.fallback.response")
self.assertFalse(messages[14].data["handled"])

# complete intent failure
self.assertEqual(messages[15].msg_type, "mycroft.audio.play_sound")
self.assertEqual(messages[15].data["uri"], "snd/error.mp3")
self.assertEqual(messages[16].msg_type, "complete_intent_failure")
self.assertEqual(messages[3].msg_type, "mycroft.audio.play_sound")
self.assertEqual(messages[3].data["uri"], "snd/error.mp3")
self.assertEqual(messages[4].msg_type, "complete_intent_failure")

# verify default session is now updated
self.assertEqual(messages[17].msg_type, "ovos.session.update_default")
self.assertEqual(messages[17].data["session_data"]["session_id"], "default")
self.assertEqual(messages[5].msg_type, "ovos.session.update_default")
self.assertEqual(messages[5].data["session_data"]["session_id"], "default")

@skip("TODO works if run standalone, otherwise has side effects in other tests")
def test_complete_failure_lang_detect(self):
Expand Down
Loading

0 comments on commit 87b756b

Please sign in to comment.