Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/killable_converse #221

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions ovos_workshop/decorators/killable.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ def killable_event(msg: str = "mycroft.skills.abort_execution",
exc: Type[Exception] = AbortEvent,
callback: Optional[callable] = None,
react_to_stop: bool = False, call_stop: bool = False,
stop_tts: bool = False):
stop_tts: bool = False,
check_skill_id: bool = False):
"""
Decorator to mark a method that can be terminated during execution.
@param msg: Message name to terminate on
Expand All @@ -50,6 +51,7 @@ def killable_event(msg: str = "mycroft.skills.abort_execution",
@param react_to_stop: If true, also terminate on `stop` Messages
@param call_stop: If true, also call `Class.stop` method
@param stop_tts: If true, emit message to stop TTS audio playback
@param check_skill_id: If true, require skill_id in message.data to match this skill
"""
# Begin wrapper
def create_killable(func):
Expand All @@ -68,17 +70,27 @@ def abort(m: Message):
if sess.session_id != sess2.session_id:
LOG.debug(f"ignoring '{msg}' kill event, event listener not created by this session")
return
if check_skill_id:
skill_id = m.data.get("skill_id", "")
if skill_id and skill_id != skill.skill_id:
LOG.debug(f"ignoring '{msg}' kill event, event targeted to {skill_id}")
return

if stop_tts:
skill.bus.emit(Message("mycroft.audio.speech.stop"))
if call_stop:
# call stop on parent skill
skill.stop()

# ensure no orphan get_response daemons
# this is the only killable daemon that core itself will
# create, users should also account for this condition with
# callbacks if using the decorator for other purposes
skill._handle_killed_wait_response()
LOG.debug(f"killing {func} - callback {callback}")

def cb():
if callback is not None:
if len(signature(callback).parameters) == 1:
# class method, needs self
callback(skill)
else:
callback()

try:
while t.is_alive():
Expand All @@ -88,12 +100,14 @@ def abort(m: Message):
pass # already killed
except AssertionError:
pass # could not determine thread id ?
if callback is not None:
if len(signature(callback).parameters) == 1:
# class method, needs self
callback(skill)
else:
callback()
except exc:
# this is the exception we raised ourselves to kill the thread
# usually it doesnt propagate this far, if it does we need to re-raise it
# (reproducible with killable get_response)
LOG.debug(f"Killed thread {t}")
cb()
raise
cb()

# save reference to threads so they can be killed later
if not hasattr(skill, "_threads"):
Expand Down
4 changes: 3 additions & 1 deletion ovos_workshop/skills/fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,14 @@ def _handle_fallback_ack(self, message: Message):
context={"skill_id": self.skill_id}))

def _on_timeout(self):
"""_handle_fallback_request timed out and was forcefully killed by ovos-core"""
message = dig_for_message()
self.bus.emit(message.forward(
f"ovos.skills.fallback.{self.skill_id}.killed",
data={"error": "timed out"}))

@killable_event("ovos.skills.fallback.force_timeout", callback=_on_timeout)
@killable_event("ovos.skills.fallback.force_timeout",
callback=_on_timeout, check_skill_id=True)
def _handle_fallback_request(self, message: Message):
"""
Handle a fallback request, calling any registered handlers in priority
Expand Down
23 changes: 21 additions & 2 deletions ovos_workshop/skills/ovos.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,15 @@ def on_deac(message):
self.bus.remove("intent.service.skills.deactivate", on_deac)
return result

def _on_timeout(self):
"""_handle_converse_request timed out and was forcefully killed by ovos-core"""
message = dig_for_message()
self.bus.emit(message.forward(
f"{self.skill_id}.converse.killed",
data={"error": "timed out"}))

@killable_event("ovos.skills.converse.force_timeout",
callback=_on_timeout, check_skill_id=True)
def _handle_converse_request(self, message: Message):
"""
If this skill is requested and supports converse, handle the user input
Expand Down Expand Up @@ -1256,10 +1265,16 @@ def _handle_converse_request(self, message: Message):
self.bus.emit(message.reply('skill.converse.response',
{"skill_id": self.skill_id,
"result": result}))
except (AbortQuestion, AbortEvent):
self.bus.emit(message.reply('skill.converse.response',
{"skill_id": self.skill_id,
"error": "killed",
"result": False}))
except Exception as e:
LOG.error(e)
self.bus.emit(message.reply('skill.converse.response',
{"skill_id": self.skill_id,
"error": repr(e),
"result": False}))

def _handle_stop_ack(self, message: Message):
Expand Down Expand Up @@ -2050,9 +2065,12 @@ def _handle_killed_wait_response(self):
self.__responses = {k: None for k in self.__responses}
self.__validated_responses = {k: None for k in self.__validated_responses}
self.converse = self._original_converse
message = dig_for_message()
self.bus.emit(message.forward(f"{self.skill_id}.get_response.killed"))

@killable_event("mycroft.skills.abort_question", exc=AbortQuestion,
callback=_handle_killed_wait_response, react_to_stop=True)
callback=_handle_killed_wait_response, react_to_stop=True,
check_skill_id=True)
def _real_wait_response(self, is_cancel, validator, on_fail, num_retries,
message: Message):
"""
Expand All @@ -2064,10 +2082,11 @@ def _real_wait_response(self, is_cancel, validator, on_fail, num_retries,

Arguments:
is_cancel (callable): function checking cancel criteria
validator (callbale): function checking for a valid response
validator (callable): function checking for a valid response
on_fail (callable): function handling retries

"""
self.bus.emit(message.forward(f"{self.skill_id}.get_response.waiting"))
sess = SessionManager.get(message)

num_fails = 0
Expand Down
Loading