From edcc0ea0acefac75e34dfb594c4b2e1833d9fc27 Mon Sep 17 00:00:00 2001 From: miro Date: Tue, 15 Oct 2024 03:03:12 +0100 Subject: [PATCH] port extra tests from ovos-core --- test/unittests/test_common_query_skill.py | 61 ++++++ test/unittests/test_context.py | 40 ++++ .../test_intent_service_interface.py | 145 +++++++++++++ test/unittests/test_skill_api.py | 152 ++++++++++++++ test/unittests/test_skill_loader.py | 192 ++++++++++++++++++ 5 files changed, 590 insertions(+) create mode 100644 test/unittests/test_common_query_skill.py create mode 100644 test/unittests/test_context.py create mode 100644 test/unittests/test_intent_service_interface.py create mode 100644 test/unittests/test_skill_api.py create mode 100644 test/unittests/test_skill_loader.py diff --git a/test/unittests/test_common_query_skill.py b/test/unittests/test_common_query_skill.py new file mode 100644 index 00000000..f1cf9bad --- /dev/null +++ b/test/unittests/test_common_query_skill.py @@ -0,0 +1,61 @@ +from unittest import TestCase, mock + +from ovos_bus_client.message import Message + +from ovos_workshop.skills.common_query_skill import CommonQuerySkill + + +class AnyCallable: + """Class matching any callable. + + Useful for assert_called_with arguments. + """ + def __eq__(self, other): + return callable(other) + + + +class TestCommonQuerySkill(TestCase): + def setUp(self): + self.skill = CQSTest() + self.bus = mock.Mock(name='bus') + self.skill.bind(self.bus) + self.skill.config_core = {'enclosure': {'platform': 'mycroft_mark_1'}} + + def test_lifecycle(self): + """Test startup and shutdown.""" + skill = CQSTest() + bus = mock.Mock(name='bus') + skill.bind(bus) + bus.on.assert_any_call('question:query', AnyCallable()) + bus.on.assert_any_call('question:action', AnyCallable()) + skill.shutdown() + + def test_common_test_skill_action(self): + """Test that the optional action is triggered.""" + query_action = self.bus.on.call_args_list[-2][0][1] + query_action(Message('query:action', data={ + 'phrase': 'What\'s the meaning of life', + 'skill_id': 'asdf'})) + self.skill.CQS_action.assert_not_called() + query_action(Message('query:action', data={ + 'phrase': 'What\'s the meaning of life', + 'skill_id': 'CQSTest'})) + self.skill.CQS_action.assert_called_once_with( + 'What\'s the meaning of life', {}) + + +class CQSTest(CommonQuerySkill): + """Simple skill for testing the CommonQuerySkill""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.CQS_match_query_phrase = mock.Mock(name='match_phrase') + self.CQS_action = mock.Mock(name='selected_action') + self.skill_id = 'CQSTest' + + def CQS_match_query_phrase(self, phrase): + pass + + def CQS_action(self, phrase, data): + pass diff --git a/test/unittests/test_context.py b/test/unittests/test_context.py new file mode 100644 index 00000000..549fce6b --- /dev/null +++ b/test/unittests/test_context.py @@ -0,0 +1,40 @@ +from unittest import TestCase, mock + +# TODO - move to ovos-workshop +from ovos_workshop.decorators import adds_context, removes_context + + +class ContextSkillMock(mock.Mock): + """Mock class to apply decorators on.""" + @adds_context('DestroyContext') + def handler_adding_context(self): + pass + + @adds_context('DestroyContext', 'exterminate') + def handler_adding_context_with_words(self): + pass + + @removes_context('DestroyContext') + def handler_removing_context(self): + pass + + +class TestContextDecorators(TestCase): + def test_adding_context(self): + """Check that calling handler adds the correct Keyword.""" + skill = ContextSkillMock() + skill.handler_adding_context() + skill.set_context.assert_called_once_with('DestroyContext', '') + + def test_adding_context_with_words(self): + """Ensure that decorated handler adds Keyword and content.""" + skill = ContextSkillMock() + skill.handler_adding_context_with_words() + skill.set_context.assert_called_once_with('DestroyContext', + 'exterminate') + + def test_removing_context(self): + """Make sure the decorated handler removes the specified context.""" + skill = ContextSkillMock() + skill.handler_removing_context() + skill.remove_context.assert_called_once_with('DestroyContext') diff --git a/test/unittests/test_intent_service_interface.py b/test/unittests/test_intent_service_interface.py new file mode 100644 index 00000000..61fea26e --- /dev/null +++ b/test/unittests/test_intent_service_interface.py @@ -0,0 +1,145 @@ +import unittest +# TODO - move test to ovos-workshop +from ovos_workshop.intents import IntentBuilder, IntentServiceInterface + + +class MockEmitter: + def __init__(self): + self.reset() + + def emit(self, message): + self.types.append(message.msg_type) + self.results.append(message.data) + + def get_types(self): + return self.types + + def get_results(self): + return self.results + + def on(self, event, f): + pass + + def reset(self): + self.types = [] + self.results = [] + + +class KeywordRegistrationTest(unittest.TestCase): + def check_emitter(self, expected_message_data): + """Verify that the registration messages matches the expected.""" + for msg_type in self.emitter.get_types(): + self.assertEqual(msg_type, 'register_vocab') + self.assertEqual( + sorted(self.emitter.get_results(), + key=lambda d: sorted(d.items())), + sorted(expected_message_data, key=lambda d: sorted(d.items()))) + self.emitter.reset() + + def setUp(self): + self.emitter = MockEmitter() + + def test_register_keyword(self): + intent_service = IntentServiceInterface(self.emitter) + intent_service.register_adapt_keyword('test_intent', 'test', lang='en-US') + entity_data = {'entity_value': 'test', 'entity_type': 'test_intent', 'lang': 'en-US'} + compatibility_data = {'start': 'test', 'end': 'test_intent'} + expected_data = {**entity_data, **compatibility_data} + self.check_emitter([expected_data]) + + def test_register_keyword_with_aliases(self): + # TODO 22.02: Remove compatibility data + intent_service = IntentServiceInterface(self.emitter) + intent_service.register_adapt_keyword('test_intent', 'test', + ['test2', 'test3'], + lang='en-US') + + entity_data = {'entity_value': 'test', 'entity_type': 'test_intent', 'lang': 'en-US'} + compatibility_data = {'start': 'test', 'end': 'test_intent'} + expected_initial_vocab = {**entity_data, **compatibility_data} + + alias_data = { + 'entity_value': 'test2', + 'entity_type': 'test_intent', + 'alias_of': 'test', + 'lang': 'en-US' + } + alias_compatibility = {'start': 'test2', 'end': 'test_intent'} + expected_alias1 = {**alias_data, **alias_compatibility} + + alias_data2 = { + 'entity_value': 'test3', + 'entity_type': 'test_intent', + 'alias_of': 'test', + 'lang': 'en-US' + } + alias_compatibility2 = {'start': 'test3', 'end': 'test_intent'} + expected_alias2 = {**alias_data2, **alias_compatibility2} + + self.check_emitter([expected_initial_vocab, + expected_alias1, + expected_alias2]) + + def test_register_regex(self): + intent_service = IntentServiceInterface(self.emitter) + intent_service.register_adapt_regex('.*', lang="en-us") + self.check_emitter([{'regex': '.*', 'lang': 'en-us'}]) + + +class KeywordIntentRegistrationTest(unittest.TestCase): + def check_emitter(self, expected_message_data): + """Verify that the registration messages matches the expected.""" + for msg_type in self.emitter.get_types(): + self.assertEqual(msg_type, 'register_intent') + self.assertEqual( + sorted(self.emitter.get_results(), + key=lambda d: sorted(d.items())), + sorted(expected_message_data, key=lambda d: sorted(d.items()))) + self.emitter.reset() + + def setUp(self): + self.emitter = MockEmitter() + + def test_register_intent(self): + intent_service = IntentServiceInterface(self.emitter) + intent_service.register_adapt_keyword('testA', 'testA', lang='en-US') + intent_service.register_adapt_keyword('testB', 'testB', lang='en-US') + self.emitter.reset() + + intent = IntentBuilder("test").require("testA").optionally("testB") + intent_service.register_adapt_intent("test", intent) + expected_data = {'at_least_one': [], + 'name': 'test', + 'excludes': [], + 'optional': [('testB', 'testB')], + 'requires': [('testA', 'testA')]} + self.check_emitter([expected_data]) + + + +class UtteranceIntentRegistrationTest(unittest.TestCase): + def check_emitter(self, expected_message_data): + """Verify that the registration messages matches the expected.""" + for msg_type in self.emitter.get_types(): + self.assertEqual(msg_type, 'padatious:register_intent') + + self.assertEqual( + sorted(self.emitter.get_results(), + key=lambda d: sorted(d.items())), + sorted(expected_message_data, key=lambda d: sorted(d.items()))) + self.emitter.reset() + + def setUp(self): + self.emitter = MockEmitter() + + def test_register_intent(self): + intent_service = IntentServiceInterface(self.emitter) + filename = "/tmp/test.intent" + with open(filename, "w") as f: + f.write("this is a test\ntest the intent") + + intent_service.register_padatious_intent('test', filename, lang='en-US') + expected_data = {'file_name': '/tmp/test.intent', 'lang': 'en-US', 'name': 'test', + 'samples': ['this is a test', 'test the intent']} + self.check_emitter([expected_data]) + diff --git a/test/unittests/test_skill_api.py b/test/unittests/test_skill_api.py new file mode 100644 index 00000000..3f23cf04 --- /dev/null +++ b/test/unittests/test_skill_api.py @@ -0,0 +1,152 @@ +from unittest import TestCase, mock + +from ovos_workshop.skills.ovos import OVOSSkill +from ovos_bus_client.message import Message +from ovos_workshop.decorators import skill_api_method +from ovos_workshop.skills.api import SkillApi + + +class Skill(OVOSSkill): + """Test skill with registered API methods.""" + def __init__(self, *args, **kwargs): + self.registered_methods = {} + super().__init__(*args, **kwargs) + + def add_event(self, event_type, func, **kwargs): + """Mock handler of add_event, simply storing type and method. + + Used in testing to verify the wrapped methods + """ + self.registered_methods[event_type] = func + + @skill_api_method + def test_method(self): + """Documentation.""" + return True + + @skill_api_method + def test_method2(self, arg): + """Documentation.""" + return 'TestResult' + + +def load_test_skill(): + """Helper for setting up the test skill. + + Returns: + (MycroftSkill): created test skill + """ + bus = mock.Mock() + test_skill = Skill(skill_id = 'test_skill', bus=bus) + return test_skill + + +def create_skill_api_from_skill(skill): + """Helper creating an api from a skill. + + Args: + skill (MycroftSkill): Skill to create api from. + + Returns: + (SkillApi): API for the skill. + """ + SkillApi.connect_bus(skill.bus) + return SkillApi(skill.public_api) + + +class testSkillMethod(TestCase): + """Tests for the MycroftSkill class API setup.""" + def test_public_api_event(self): + """Test that public api event handler is created.""" + test_skill = load_test_skill() + self.assertTrue( + 'test_skill.public_api' in test_skill.registered_methods + ) + + def test_public_api(self): + """Test that the public_api structure matches the decorators.""" + test_skill = load_test_skill() + # Check that methods has been added + self.assertTrue('test_method' in test_skill.public_api) + self.assertTrue('test_method2' in test_skill.public_api) + # Test docstring + self.assertEqual(test_skill.public_api['test_method']['help'], + 'Documentation.') + # Test type + self.assertEqual(test_skill.public_api['test_method']['type'], + '{}.{}'.format(test_skill.skill_id, 'test_method')) + + def test_public_api_method(self): + """Verify message from wrapped api method.""" + test_skill = load_test_skill() + api_method = test_skill.registered_methods['test_skill.test_method'] + + # Call method + call_msg = Message('test_skill.test_method', + data={'args': [], 'kwargs': {}}) + api_method(call_msg) + # Check response sent on the bus is the same as the method's return + # value + response = test_skill.bus.emit.call_args[0][0] + self.assertEqual(response.data['result'], test_skill.test_method()) + + def test_public_api_request(self): + """Test public api request handling. + + Ensures that a request for the skill's available public api returns + expected content. + """ + test_skill = load_test_skill() + sent_message = None + + def capture_sent_message(message): + """Capture sent message.""" + nonlocal sent_message + sent_message = message + + test_skill.bus.emit.side_effect = capture_sent_message + get_api_method = test_skill.registered_methods['test_skill.public_api'] + request_api_msg = Message('test_skill.public_api') + + # Ensure that the sent public api contains the correct items + get_api_method(request_api_msg) + public_api = sent_message.data + self.assertTrue('test_method' in public_api) + self.assertTrue('test_method2' in public_api) + self.assertEqual(len(public_api), 2) + + +class TestApiObject(TestCase): + """Tests for the generated SkillApi objects.""" + def test_create_api_object(self): + """Check that expected methods are available.""" + test_skill = load_test_skill() + test_api = create_skill_api_from_skill(test_skill) + + hasattr(test_api, 'test_method') + hasattr(test_api, 'test_method2') + + def test_call_api_method(self): + """Ensure that calling the methods works as expected.""" + test_skill = load_test_skill() + test_api = create_skill_api_from_skill(test_skill) + + expected_response = 'all is good' + sent_message = None + + def capture_sent_message(message, timeout=3): + """Capture sent message and return expected response message.""" + nonlocal sent_message + sent_message = message + return Message('', data={'result': expected_response}) + + test_api.bus.wait_for_response.side_effect = capture_sent_message + + response = test_api.test_method('hello', person='you') + + # Verify response + self.assertEqual(response, expected_response) + # Verify sent message + self.assertEqual(sent_message.msg_type, 'test_skill.test_method') + self.assertEqual(sent_message.data['args'], ('hello',)) + self.assertEqual(sent_message.data['kwargs'], {'person': 'you'}) diff --git a/test/unittests/test_skill_loader.py b/test/unittests/test_skill_loader.py new file mode 100644 index 00000000..001299f3 --- /dev/null +++ b/test/unittests/test_skill_loader.py @@ -0,0 +1,192 @@ +# Copyright 2019 Mycroft AI Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for the SkillLoader class.""" +import json +import unittest +from pathlib import Path +from unittest.mock import Mock + +from ovos_utils import classproperty +from ovos_utils.messagebus import FakeBus +from ovos_utils.process_utils import RuntimeRequirements +from ovos_workshop.skill_launcher import SkillLoader + +from ovos_workshop.skills.ovos import OVOSSkill + +ONE_MINUTE = 60 + + +class OfflineSkill(OVOSSkill): + @classproperty + def runtime_requirements(self): + return RuntimeRequirements(internet_before_load=False, + network_before_load=False, + requires_internet=False, + requires_network=False, + no_internet_fallback=True, + no_network_fallback=True) + + +class LANSkill(OVOSSkill): + @classproperty + def runtime_requirements(self): + scans_on_init = True + return RuntimeRequirements(internet_before_load=False, + network_before_load=scans_on_init, + requires_internet=False, + requires_network=True, + no_internet_fallback=True, + no_network_fallback=False) + + +class TestSkillNetwork(unittest.TestCase): + + def test_class_property(self): + self.assertEqual(OfflineSkill.runtime_requirements, + RuntimeRequirements(internet_before_load=False, + network_before_load=False, + requires_internet=False, + requires_network=False, + no_internet_fallback=True, + no_network_fallback=True) + ) + self.assertEqual(LANSkill.runtime_requirements, + RuntimeRequirements(internet_before_load=False, + network_before_load=True, + requires_internet=False, + requires_network=True, + no_internet_fallback=True, + no_network_fallback=False) + ) + self.assertEqual(OVOSSkill.runtime_requirements, + RuntimeRequirements() + ) + + +msgs = [] +bus = FakeBus() +bus.msgs = [] + + +def _handle(msg): + global bus + bus.msgs.append(json.loads(msg)) + + +bus.on("message", _handle) + + +class TestSkillLoader(unittest.TestCase): + skill_directory = Path('/tmp/test_skill') + skill_directory.mkdir(exist_ok=True) + for file_name in ('__init__.py', 'bar.py', '.foobar', 'bar.pyc'): + skill_directory.joinpath(file_name).touch() + + def test_skill_reload(self): + """Test reloading a skill that was modified.""" + bus.msgs = [] + loader = SkillLoader(bus, str(self.skill_directory)) + loader.instance = Mock() + loader.loaded = True + loader.load_attempted = False + loader.last_loaded = 10 + loader.instance.reload_skill = True + loader.instance.name = "MySkill" + loader.skill_id = 'test_skill' + + # Mock to return a known (Mock) skill instance + real_create_skill_instance = loader._create_skill_instance + + def _update_skill_instance(*args, **kwargs): + loader.instance = Mock() + loader.loaded = True + loader.load_attempted = True + loader.last_loaded = 100 + loader.skill_id = 'test_skill' + loader.instance.name = "MySkill" + return True + + loader._create_skill_instance = _update_skill_instance + + loader.reload() + + self.assertTrue(loader.load_attempted) + self.assertTrue(loader.loaded) + + self.assertListEqual( + ['mycroft.skills.shutdown', 'mycroft.skills.loaded'], + [m["type"] for m in bus.msgs] + ) + loader._create_skill_instance = real_create_skill_instance + + def test_skill_load(self): + loader = SkillLoader(bus, str(self.skill_directory)) + bus.msgs = [] + loader.instance = None + loader.loaded = False + loader.last_loaded = 0 + + # Mock to return a known (Mock) skill instance + real_create_skill_instance = loader._create_skill_instance + + def _update_skill_instance(*args, **kwargs): + loader.instance = Mock() + loader.loaded = True + loader.last_loaded = 100 + loader.skill_id = 'test_skill' + loader.instance.name = "MySkill" + return True + + loader._create_skill_instance = _update_skill_instance + + loader.load() + + self.assertTrue(loader.load_attempted) + self.assertTrue(loader.loaded) + + self.assertListEqual( + ['mycroft.skills.loaded'], + [m["type"] for m in bus.msgs] + ) + loader._create_skill_instance = real_create_skill_instance + + def test_skill_load_blacklisted(self): + """Skill should not be loaded if it is blacklisted""" + loader = SkillLoader(bus, str(self.skill_directory)) + loader.instance = Mock() + loader.loaded = False + loader.last_loaded = 0 + loader.skill_id = 'test_skill' + loader.name = "MySkill" + bus.msgs = [] + + config = dict(loader.config) + config['skills']['blacklisted_skills'] = ['test_skill'] + loader.config = config + self.assertEqual(loader.config['skills']['blacklisted_skills'], + ['test_skill']) + loader.skill_id = 'test_skill' + + loader.load() + + self.assertTrue(loader.load_attempted) + self.assertFalse(loader.loaded) + + self.assertListEqual( + ['mycroft.skills.loading_failure'], + [m["type"] for m in bus.msgs] + ) + + loader.config['skills']['blacklisted_skills'].remove('test_skill')