Skip to content

Commit

Permalink
fix: Changed api in Client to use SignalIdentifier instead of separat…
Browse files Browse the repository at this point in the history
…e namespace and signal names
  • Loading branch information
jrask committed Apr 9, 2024
1 parent 7ef2c23 commit 882b084
Showing 1 changed file with 22 additions and 48 deletions.
70 changes: 22 additions & 48 deletions python/remotivelabs-broker/remotivelabs/broker/sync/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

import grpc

from ..generated.sync import network_api_pb2 as network_api
from ..generated.sync import network_api_pb2_grpc, system_api_pb2_grpc, traffic_api_pb2_grpc
from . import helper as br
from .signalcreator import SignalCreator
from ..generated.sync import network_api_pb2 as network_api
from ..generated.sync import network_api_pb2_grpc, system_api_pb2_grpc, traffic_api_pb2_grpc


class SignalValue:
Expand Down Expand Up @@ -124,6 +124,13 @@ def __init__(self, name: str, namespace: str):
self.name = name
self.namespace = namespace

@staticmethod
def parse(signal_id: str) -> SignalIdentifier:
s = signal_id.split(":")
if len(s) != 2:
raise BrokerException("signal names must have format namespace:signal_name")
return SignalIdentifier(s[1], s[0])


class BrokerException(Exception):
pass
Expand All @@ -133,8 +140,8 @@ class Client:
def __init__(self, client_id: str = "broker_client"):
self._signal_creator: SignalCreator
self._traffic_stub: traffic_api_pb2_grpc.TrafficServiceStub
self._system_stub: system_api_pb2_grpc.SystemServiceStub
self._network_stub: network_api_pb2_grpc.NetworkServiceStub
self._system_stub: system_api_pb2_grpc.SystemServiceStub = None
self._network_stub: network_api_pb2_grpc.NetworkServiceStub = None
self._intercept_channel: grpc.Channel
self.client_id = client_id
self.url: Optional[str] = None
Expand All @@ -159,69 +166,36 @@ def connect(self, url: str, api_key: Union[str, None] = None):
if self.on_connect is not None:
self.on_connect(self)

def _validate_and_get_subscribed_signals(
self, subscribed_namespaces: List[str], subscribed_signals: List[str]
) -> List[SignalIdentifier]:
# Since we cannot know which List[signals] belongs to which namespace we need to fetch
# all signals from the broker and find the proper signal with namespace. Finally, we
# also filter out namespaces that we do not need since we might have duplicated signal names
# over namespaces
# Begin

def verify_namespace(available_signal: SignalIdentifier):
return list(
filter(
lambda namespace: available_signal.namespace == namespace,
subscribed_namespaces,
)
)

def find_subscribed_signal(available_signal: SignalIdentifier):
return list(filter(lambda s: available_signal.name == s, subscribed_signals))

available_signals: List[SignalIdentifier] = list(filter(verify_namespace, self.list_signal_names()))
signals_to_subscribe_to: List[SignalIdentifier] = list(filter(find_subscribed_signal, available_signals))

# Check if subscription is done on signal that is not in any of these namespaces
signals_subscribed_to_but_does_not_exist = set(subscribed_signals) - set(map(lambda s: s.name, signals_to_subscribe_to))
def subscribe(self, signals_to_subscribe_to: Union[List[SignalIdentifier], List[str]],
on_signals: Optional[Callable[[SignalsInFrame], None]] = None,
changed_values_only: bool = True):

if len(signals_subscribed_to_but_does_not_exist) > 0:
raise BrokerException(f"One or more signals you subscribed to does not exist " f", {signals_subscribed_to_but_does_not_exist}")

return list(map(lambda s: SignalIdentifier(s.name, s.namespace), signals_to_subscribe_to))

def subscribe(
self,
signal_names: List[str],
namespaces: List[str],
on_signals: Optional[Callable[[SignalsInFrame], None]] = None,
changed_values_only: bool = True,
):
client_id = br.common_pb2.ClientId(id="subscribe-sample")
if on_signals is None and self.on_signals is None:
raise BrokerException(
"You have not specified global client.on_signals nor client.subscribe(on_signals=callback), "
"or you are invoking subscribe() before client.on_signals which is not allowed"
)

client_id = br.common_pb2.ClientId(id=self.client_id)
signals_to_subscribe_to: List[SignalIdentifier] = self._validate_and_get_subscribed_signals(namespaces, signal_names)

def to_protobuf_signal(s: SignalIdentifier):
def to_protobuf_signal(s: Union[SignalIdentifier, str]):
if isinstance(s, str):
s = SignalIdentifier.parse(s)
return self._signal_creator.signal(s.name, s.namespace)

signals_to_subscribe_on = list(map(to_protobuf_signal, signals_to_subscribe_to))
_signals_to_subscribe_on = list(map(to_protobuf_signal, signals_to_subscribe_to))
wait_for_subscription_queue: queue.Queue = queue.Queue()
Thread(
target=br.act_on_signal,
args=(
client_id,
self._network_stub,
signals_to_subscribe_on,
_signals_to_subscribe_on,
changed_values_only, # True: only report when signal changes
lambda frame: self._on_signals(frame, on_signals),
lambda frame: self._on_signals(frame, self.on_signals),
lambda sub: (wait_for_subscription_queue.put((self.client_id, sub))),
),
).start()
# Wait for subscription
client_id, subscription = wait_for_subscription_queue.get()
return subscription

Expand Down

0 comments on commit 882b084

Please sign in to comment.