From 942ba375b87643ac8eb5e9d75825a5a69d37f48a Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 6 Sep 2023 09:45:00 -0400 Subject: [PATCH 1/3] Move @ethanrublee EventClientReceiver to farm-ng-core --- py/farm_ng/core/event_client.py | 73 ++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/py/farm_ng/core/event_client.py b/py/farm_ng/core/event_client.py index f674c592..e539c49b 100644 --- a/py/farm_ng/core/event_client.py +++ b/py/farm_ng/core/event_client.py @@ -7,14 +7,18 @@ import argparse import asyncio import logging -from typing import TYPE_CHECKING, AsyncIterator, Protocol +from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterator, Protocol import grpc from farm_ng.core import event_service_pb2_grpc from farm_ng.core.event_pb2 import Event -from farm_ng.core.event_service import add_service_parser, load_service_config +from farm_ng.core.event_service import ( + add_service_parser, + load_service_config, +) from farm_ng.core.event_service_pb2 import ( EventServiceConfig, + EventServiceConfigList, ListUrisReply, ListUrisRequest, RequestReplyReply, @@ -30,6 +34,7 @@ get_stamp_by_semantics_and_clock_type, get_system_clock_now, ) +from farm_ng.core.uri import uri_query_to_dict if TYPE_CHECKING: from farm_ng.core.timestamp_pb2 import Timestamp @@ -63,6 +68,70 @@ def _check_valid_response(response: SubscribeReply | None) -> None: raise grpc.RpcError(msg) +class EventClientReceiver: + def __init__( + self, + config_list: EventServiceConfigList, + service_config: EventServiceConfig, + ): + self.config_list = config_list + self.service_config = service_config + # prepares the message subscribers + self.client_configs: dict[str, EventServiceConfig] = {} + for config in config_list.configs: + if config.port != 0: + self.client_configs[config.name] = config + + async def receive( + self, + decode: bool = True, + ) -> AsyncGenerator[tuple[Event, Any], None]: + clients: dict[str, EventClient] = {} + message_queue: asyncio.Queue = asyncio.Queue() + tasks = [] + + # subscribe to the topics + subscription: SubscribeRequest + for subscription in self.service_config.subscriptions: + query: dict[str, str] = uri_query_to_dict(uri=subscription.uri) + query_service_name: str = query["service_name"] + if query_service_name not in self.client_configs: + msg = f"Unknown service_name in subscription: {subscription}\n{self.client_configs}" + raise ValueError( + msg, + ) + + if query_service_name not in clients: + clients[query_service_name] = EventClient( + self.client_configs[query_service_name], + ) + + client: EventClient = clients[query_service_name] + + async def subscribe(client=client, subscription=subscription): + async for event, message in client.subscribe( + subscription, + decode=decode, + ): + await message_queue.put((event, message)) + await message_queue.put((None, None)) # sentinel value + + tasks.append(asyncio.create_task(subscribe())) + + done_count = 0 + n_subscriptions = len(tasks) + event: Event | None + message: Any | None + while done_count < n_subscriptions: + event, message = await message_queue.get() + if event is not None and message is not None: + yield event, message + else: + done_count += 1 + + await asyncio.gather(*tasks) + + class EventClient: """Generic client class to connect with the Amiga brain services. From e048610b6374b962de3b0336fa2ae2c3f0cab056 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 6 Sep 2023 09:54:40 -0400 Subject: [PATCH 2/3] Move @ethanrublee make_event_and_payload to farm-ng-core --- py/farm_ng/core/event_client.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/py/farm_ng/core/event_client.py b/py/farm_ng/core/event_client.py index e539c49b..0df06bec 100644 --- a/py/farm_ng/core/event_client.py +++ b/py/farm_ng/core/event_client.py @@ -49,6 +49,26 @@ __all__ = ["EventClient"] +def make_event_and_payload( + path: str, + message: Message, + service_name: str, + sequence: int, + timestamps: list[Timestamp] | None = None, +): + uri = make_proto_uri(path=path, message=message, service_name=service_name) + payload = message.SerializeToString() + return ( + Event( + uri=uri, + timestamps=timestamps or [], + payload_length=len(payload), + sequence=sequence, + ), + payload, + ) + + class EventClientProtocol(Protocol): """Protocol for the gRPC streaming object. From 20a4a2c2979d4b1a3700747e2fd74e488a86e732 Mon Sep 17 00:00:00 2001 From: Kyle Coble Date: Wed, 6 Sep 2023 09:58:22 -0400 Subject: [PATCH 3/3] Move @ethanrublee MockEvent to farm-ng-core --- py/farm_ng/core/event_mock.py | 137 ++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 py/farm_ng/core/event_mock.py diff --git a/py/farm_ng/core/event_mock.py b/py/farm_ng/core/event_mock.py new file mode 100644 index 00000000..938da4ee --- /dev/null +++ b/py/farm_ng/core/event_mock.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import asyncio +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Callable + +from farm_ng.core import stamp as stamp_module +from farm_ng.core.event_client import make_event_and_payload +from farm_ng.core.event_service_metrics import Singleton +from farm_ng.core.events_file_reader import payload_to_protobuf +from farm_ng.core.stamp import StampSemantics +from farm_ng.core.timestamp_pb2 import Timestamp + +if TYPE_CHECKING: + from farm_ng.core.event_pb2 import Event + from farm_ng.core.event_service_pb2 import RequestReplyReply + from google.protobuf.message import Message + + +class MockEventClient: + def __init__(self, req_rep_handler): + self._req_rep_handler = req_rep_handler + + async def request_reply( + self, + path: str, + message: Message, + timestamps: list[Timestamp] | None = None, + ) -> RequestReplyReply: + return await self._req_rep_handler(path, message, timestamps) + + +class MockEventClientReceiver: + def __init__(self): + self._message_queue: asyncio.Queue | None = None + + async def send(self, event: Event, payload: bytes): + if self._message_queue is not None: + await self._message_queue.put((event, payload)) + + async def receive(self, decode=True): + self._message_queue = asyncio.Queue() + while True: + event, payload = await self._message_queue.get() + if decode: + yield event, payload_to_protobuf(event, payload=payload) + else: + yield event, payload + + +class MockEventService: + def __init__(self, service_name: str, receivers: list[MockEventClientReceiver]): + self._sequence_tracker: dict[str, int] = defaultdict(lambda: 0) + self._service_name: str = service_name + self._receivers = receivers + self._request_reply_handler: Callable | None = None + + @property + def request_reply_handler(self) -> Callable | None: + """Returns the request/reply handler.""" + return self._request_reply_handler + + @request_reply_handler.setter + def request_reply_handler(self, handler: Callable) -> None: + """Sets the request/reply handler.""" + self._request_reply_handler = handler + + async def publish(self, path, message: Message): + timestamps = [] + timestamps.append( + SimTime().get_monotonic_now(semantics=StampSemantics.SERVICE_SEND), + ) + timestamps.append( + SimTime().get_system_clock_now(semantics=StampSemantics.SERVICE_SEND), + ) + seq = self._sequence_tracker[path] + self._sequence_tracker[path] = seq + 1 + event, payload = make_event_and_payload( + path, + message, + self._service_name, + seq, + timestamps, + ) + for receiver in self._receivers: + await receiver.send(event, payload) + + +class SimTime(metaclass=Singleton): + def __init__(self): + self._monotonic = 0 + self._system = time.time() + self._increment_event = asyncio.Event() + self._timeslice_lock = asyncio.Lock() + + @property + def lock(self): + return self._timeslice_lock + + async def tick(self, dt): + async with self._timeslice_lock: + self._monotonic += dt + self._system += dt + self._increment_event.set() + self._increment_event.clear() + await asyncio.sleep(0) + + def monotonic(self): + return self._monotonic + + def time(self): + return self._system + + def localtime(self): + return time.localtime(self._system) + + async def sleep_until(self, deadline) -> asyncio.Lock: + while self._monotonic < deadline: + await self._increment_event.wait() + + return self._timeslice_lock + + def get_monotonic_now(self, semantics: str) -> Timestamp: + return Timestamp( + stamp=self.monotonic(), + clock_name=stamp_module.get_host_name() + "/monotonic", + semantics=semantics, + ) + + def get_system_clock_now(self, semantics: str) -> Timestamp: + return Timestamp( + stamp=self.time(), + clock_name=stamp_module.get_host_name() + + f"/system_clock/{self.localtime().tm_zone}", + semantics=semantics, + )