Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor/bus_utils #68

Merged
merged 2 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ovos_bus_client/apis/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from os.path import join, splitext, isfile
from typing import List, Union, Optional, Callable

from ovos_utils import resolve_ovos_resource_file, resolve_resource_file
from ovos_utils.file_utils import resolve_ovos_resource_file, resolve_resource_file
from ovos_utils.log import LOG, log_deprecation
from ovos_utils.messagebus import get_mycroft_bus
from ovos_bus_client.util import get_mycroft_bus
from ovos_utils.gui import can_use_gui

from ovos_bus_client.message import Message
Expand Down
2 changes: 1 addition & 1 deletion ovos_bus_client/apis/ocp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from os.path import abspath

from ovos_utils.messagebus import get_mycroft_bus
from ovos_bus_client.util import get_mycroft_bus
from ovos_bus_client.message import Message, dig_for_message


Expand Down
3 changes: 2 additions & 1 deletion ovos_bus_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from ovos_bus_client.conf import load_message_bus_config, MessageBusClientConf, load_gui_message_bus_config
from ovos_bus_client.message import Message, CollectionMessage, GUIMessage
from ovos_bus_client.session import SessionManager, Session
from ovos_bus_client.util import create_echo_function

try:
from mycroft_bus_client import MessageBusClient as _MessageBusClientBase
Expand Down Expand Up @@ -450,6 +449,8 @@ def echo():
"""
Echo function repeating all input from a user.
"""

from ovos_bus_client.util import create_echo_function
# TODO: Deprecate in 0.1.0
message_bus_client = MessageBusClient()

Expand Down
4 changes: 3 additions & 1 deletion ovos_bus_client/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from copy import deepcopy
from typing import Optional
from binascii import hexlify, unhexlify
from ovos_utils.gui import _GUIDict
from ovos_utils.log import LOG, deprecated
from ovos_utils.security import encrypt, decrypt
from ovos_config.config import Configuration
Expand Down Expand Up @@ -131,6 +130,9 @@ def as_dict(self) -> dict:

@staticmethod
def _json_dump(value):

from ovos_bus_client.apis.gui import _GUIDict

def serialize_item(x):
try:
if hasattr(x, "serialize"):
Expand Down
162 changes: 158 additions & 4 deletions ovos_bus_client/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@
"""
Tools and constructs that are useful together with the messagebus.
"""
import json

from ovos_config.config import read_mycroft_config
from ovos_config.locale import get_default_lang
from ovos_utils.json_helper import merge_dict
from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import dig_for_message, Message
from ovos_bus_client.session import SessionManager
from ovos_bus_client.util.scheduler import EventScheduler
from ovos_bus_client.util.utils import create_echo_function
from ovos_bus_client.message import dig_for_message
from ovos_bus_client.session import SessionManager

_DEFAULT_WS_CONFIG = {"host": "0.0.0.0",
"port": 8181,
"route": "/core",
"ssl": False}


def get_message_lang(message=None):
Expand All @@ -31,7 +42,150 @@ def get_message_lang(message=None):
# new style session lang
if not lang and "session_id" in message.context or "session" in message.context:
sess = SessionManager.get(message)
lang = sess.lang
return sess.lang

return get_default_lang()


def get_websocket(host, port, route='/', ssl=False, threaded=True):
"""
Returns a connection to a websocket
"""

client = MessageBusClient(host, port, route, ssl)
if threaded:
client.run_in_thread()
return client


def get_mycroft_bus(host: str = None, port: int = None, route: str = None,
ssl: bool = None):
"""
Returns a connection to the mycroft messagebus
"""
config = read_mycroft_config().get('websocket') or dict()
host = host or config.get('host') or _DEFAULT_WS_CONFIG['host']
port = port or config.get('port') or _DEFAULT_WS_CONFIG['port']
route = route or config.get('route') or _DEFAULT_WS_CONFIG['route']
if ssl is None:
ssl = config.get('ssl') if 'ssl' in config else \
_DEFAULT_WS_CONFIG['ssl']
return get_websocket(host, port, route, ssl)


def listen_for_message(msg_type, handler, bus=None):
"""
Continuously listens and reacts to a specific messagetype on the mycroft messagebus

NOTE: when finished you should call bus.remove(msg_type, handler)
"""
bus = bus or get_mycroft_bus()
bus.on(msg_type, handler)
return bus


def listen_once_for_message(msg_type, handler, bus=None):
"""
listens and reacts once to a specific messagetype on the mycroft messagebus
"""
auto_close = bus is None
bus = bus or get_mycroft_bus()

def _handler(message):
handler(message)
if auto_close:
bus.close()

bus.once(msg_type, _handler)
return bus


def wait_for_reply(message, reply_type=None, timeout=3.0, bus=None):
"""Send a message and wait for a response.

Args:
message (FakeMessage or str or dict): message object or type to send
reply_type (str): the message type of the expected reply.
Defaults to "<message.type>.response".
timeout: seconds to wait before timeout, defaults to 3
Returns:
The received message or None if the response timed out
"""
auto_close = bus is None
bus = bus or get_mycroft_bus()
if isinstance(message, str):
try:
message = json.loads(message)
except:
pass
if isinstance(message, str):
message = Message(message)
elif isinstance(message, dict):
message = Message(message["type"],
message.get("data"),
message.get("context"))
elif not isinstance(message, Message):
raise ValueError
response = bus.wait_for_response(message, reply_type, timeout)
if auto_close:
bus.close()
return response


def send_message(message, data=None, context=None, bus=None):
auto_close = bus is None
bus = bus or get_mycroft_bus()
if isinstance(message, str):
if isinstance(data, dict) or isinstance(context, dict):
message = Message(message, data, context)
else:
try:
message = json.loads(message)
except:
message = Message(message)
if isinstance(message, dict):
message = Message(message["type"],
message.get("data"),
message.get("context"))
if not isinstance(message, Message):
raise ValueError
bus.emit(message)
if auto_close:
bus.close()


def send_binary_data_message(binary_data, msg_type="mycroft.binary.data",
msg_data=None, msg_context=None, bus=None):
msg_data = msg_data or {}
msg = {
"type": msg_type,
"data": merge_dict(msg_data, {"binary": binary_data.hex()}),
"context": msg_context or None
}
send_message(msg, bus=bus)


def send_binary_file_message(filepath, msg_type="mycroft.binary.file",
msg_context=None, bus=None):
with open(filepath, 'rb') as f:
binary_data = f.read()
msg_data = {"path": filepath}
send_binary_data_message(binary_data, msg_type=msg_type, msg_data=msg_data,
msg_context=msg_context, bus=bus)

return lang

def decode_binary_message(message):
if isinstance(message, str):
try: # json string
message = json.loads(message)
binary_data = message.get("binary") or message["data"]["binary"]
except: # hex string
binary_data = message
elif isinstance(message, dict):
# data field or serialized message
binary_data = message.get("binary") or message["data"]["binary"]
else:
# message object
binary_data = message.data["binary"]
# decode hex string
return bytearray.fromhex(binary_data)
2 changes: 0 additions & 2 deletions ovos_bus_client/util/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
from ovos_config.config import Configuration
from ovos_config.locations import get_xdg_data_save_path, get_xdg_config_save_path
from ovos_utils.log import LOG, log_deprecation
from ovos_utils.messagebus import FakeBus
from ovos_utils.events import create_basic_wrapper
from ovos_utils.events import EventContainer as _EventContainer
from ovos_utils.events import EventSchedulerInterface as _SchedulerInterface
from ovos_bus_client.message import Message
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ovos-config >= 0.0.8, < 0.1.0
ovos-utils >= 0.0.36, < 0.1.0
ovos-config >= 0.0.12a6, < 0.1.0
ovos-utils >= 0.0.37a3, < 0.1.0
websocket-client>=0.54.0
pyee>=8.1.0, < 9.0.0
2 changes: 1 addition & 1 deletion test/unittests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_build_url(self):
self.assertEqual(ssl_url, "wss://sslhost:443/core")

def test_create_client(self):
self.assertEqual(self.client.client.url, "ws://0.0.0.0:8181/core")
self.assertEqual(self.client.client.url, "ws://127.0.0.1:8181/core")
self.assertIsInstance(self.client.emitter, ExecutorEventEmitter)

mock_emitter = Mock()
Expand Down
Loading