Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventClientReceiver & MockEvent Service / Client #146

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 91 additions & 2 deletions py/farm_ng/core/event_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@
import argparse
import asyncio
import logging
from typing import TYPE_CHECKING, AsyncIterator, Protocol
from typing import TYPE_CHECKING, Any, AsyncGenerator, AsyncIterator, Protocol

import grpc
from farm_ng.core import event_service_pb2_grpc
from farm_ng.core.event_pb2 import Event
from farm_ng.core.event_service import add_service_parser, load_service_config
from farm_ng.core.event_service import (
add_service_parser,
load_service_config,
)
from farm_ng.core.event_service_pb2 import (
EventServiceConfig,
EventServiceConfigList,
ListUrisReply,
ListUrisRequest,
RequestReplyReply,
Expand All @@ -30,6 +34,7 @@
get_stamp_by_semantics_and_clock_type,
get_system_clock_now,
)
from farm_ng.core.uri import uri_query_to_dict

if TYPE_CHECKING:
from farm_ng.core.timestamp_pb2 import Timestamp
Expand All @@ -44,6 +49,26 @@
__all__ = ["EventClient"]


def make_event_and_payload(
path: str,
message: Message,
service_name: str,
sequence: int,
timestamps: list[Timestamp] | None = None,
):
uri = make_proto_uri(path=path, message=message, service_name=service_name)
payload = message.SerializeToString()
return (
Event(
uri=uri,
timestamps=timestamps or [],
payload_length=len(payload),
sequence=sequence,
),
payload,
)


class EventClientProtocol(Protocol):
"""Protocol for the gRPC streaming object.

Expand All @@ -63,6 +88,70 @@ def _check_valid_response(response: SubscribeReply | None) -> None:
raise grpc.RpcError(msg)


class EventClientReceiver:
def __init__(
self,
config_list: EventServiceConfigList,
service_config: EventServiceConfig,
):
self.config_list = config_list
self.service_config = service_config
# prepares the message subscribers
self.client_configs: dict[str, EventServiceConfig] = {}
for config in config_list.configs:
if config.port != 0:
self.client_configs[config.name] = config

async def receive(
self,
decode: bool = True,
) -> AsyncGenerator[tuple[Event, Any], None]:
clients: dict[str, EventClient] = {}
message_queue: asyncio.Queue = asyncio.Queue()
tasks = []

# subscribe to the topics
subscription: SubscribeRequest
for subscription in self.service_config.subscriptions:
query: dict[str, str] = uri_query_to_dict(uri=subscription.uri)
query_service_name: str = query["service_name"]
if query_service_name not in self.client_configs:
msg = f"Unknown service_name in subscription: {subscription}\n{self.client_configs}"
raise ValueError(
msg,
)

if query_service_name not in clients:
clients[query_service_name] = EventClient(
self.client_configs[query_service_name],
)

client: EventClient = clients[query_service_name]

async def subscribe(client=client, subscription=subscription):
async for event, message in client.subscribe(
subscription,
decode=decode,
):
await message_queue.put((event, message))
await message_queue.put((None, None)) # sentinel value

tasks.append(asyncio.create_task(subscribe()))

done_count = 0
n_subscriptions = len(tasks)
event: Event | None
message: Any | None
while done_count < n_subscriptions:
event, message = await message_queue.get()
if event is not None and message is not None:
yield event, message
else:
done_count += 1

await asyncio.gather(*tasks)


class EventClient:
"""Generic client class to connect with the Amiga brain services.

Expand Down
137 changes: 137 additions & 0 deletions py/farm_ng/core/event_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from __future__ import annotations

import asyncio
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Callable

from farm_ng.core import stamp as stamp_module
from farm_ng.core.event_client import make_event_and_payload
from farm_ng.core.event_service_metrics import Singleton
from farm_ng.core.events_file_reader import payload_to_protobuf
from farm_ng.core.stamp import StampSemantics
from farm_ng.core.timestamp_pb2 import Timestamp

if TYPE_CHECKING:
from farm_ng.core.event_pb2 import Event
from farm_ng.core.event_service_pb2 import RequestReplyReply
from google.protobuf.message import Message


class MockEventClient:
def __init__(self, req_rep_handler):
self._req_rep_handler = req_rep_handler

async def request_reply(
self,
path: str,
message: Message,
timestamps: list[Timestamp] | None = None,
) -> RequestReplyReply:
return await self._req_rep_handler(path, message, timestamps)


class MockEventClientReceiver:
def __init__(self):
self._message_queue: asyncio.Queue | None = None

async def send(self, event: Event, payload: bytes):
if self._message_queue is not None:
await self._message_queue.put((event, payload))

async def receive(self, decode=True):
self._message_queue = asyncio.Queue()
while True:
event, payload = await self._message_queue.get()
if decode:
yield event, payload_to_protobuf(event, payload=payload)
else:
yield event, payload


class MockEventService:
def __init__(self, service_name: str, receivers: list[MockEventClientReceiver]):
self._sequence_tracker: dict[str, int] = defaultdict(lambda: 0)
self._service_name: str = service_name
self._receivers = receivers
self._request_reply_handler: Callable | None = None

@property
def request_reply_handler(self) -> Callable | None:
"""Returns the request/reply handler."""
return self._request_reply_handler

@request_reply_handler.setter
def request_reply_handler(self, handler: Callable) -> None:
"""Sets the request/reply handler."""
self._request_reply_handler = handler

async def publish(self, path, message: Message):
timestamps = []
timestamps.append(
SimTime().get_monotonic_now(semantics=StampSemantics.SERVICE_SEND),
)
timestamps.append(
SimTime().get_system_clock_now(semantics=StampSemantics.SERVICE_SEND),
)
seq = self._sequence_tracker[path]
self._sequence_tracker[path] = seq + 1
event, payload = make_event_and_payload(
path,
message,
self._service_name,
seq,
timestamps,
)
for receiver in self._receivers:
await receiver.send(event, payload)


class SimTime(metaclass=Singleton):
def __init__(self):
self._monotonic = 0
self._system = time.time()
self._increment_event = asyncio.Event()
self._timeslice_lock = asyncio.Lock()

@property
def lock(self):
return self._timeslice_lock

async def tick(self, dt):
async with self._timeslice_lock:
self._monotonic += dt
self._system += dt
self._increment_event.set()
self._increment_event.clear()
await asyncio.sleep(0)

def monotonic(self):
return self._monotonic

def time(self):
return self._system

def localtime(self):
return time.localtime(self._system)

async def sleep_until(self, deadline) -> asyncio.Lock:
while self._monotonic < deadline:
await self._increment_event.wait()

return self._timeslice_lock

def get_monotonic_now(self, semantics: str) -> Timestamp:
return Timestamp(
stamp=self.monotonic(),
clock_name=stamp_module.get_host_name() + "/monotonic",
semantics=semantics,
)

def get_system_clock_now(self, semantics: str) -> Timestamp:
return Timestamp(
stamp=self.time(),
clock_name=stamp_module.get_host_name()
+ f"/system_clock/{self.localtime().tm_zone}",
semantics=semantics,
)