Skip to content

Commit

Permalink
Removed MOFKA_PROTOCOL from mofka pub/sub initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
ValHayot committed Dec 11, 2024
1 parent 32ee2aa commit e9180a4
Showing 1 changed file with 2 additions and 6 deletions.
8 changes: 2 additions & 6 deletions mofa/proxyqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from proxystore.ex.stream.shims.mofka import MofkaPublisher

assert (MOFKA_GROUPFILE := os.environ["MOFKA_GROUPFILE"])
assert (MOFKA_PROTOCOL := os.environ["MOFKA_PROTOCOL"])
assert os.environ["MOFKA_PROTOCOL"]


def oauth_cb(oauth_config):
Expand Down Expand Up @@ -124,9 +124,7 @@ def connect_request_producer(self):
stores={k: self.store for k in self.proxy_topics},
)
else:
publisher = MofkaPublisher(
protocol=MOFKA_PROTOCOL, group_file=MOFKA_GROUPFILE
)
publisher = MofkaPublisher(group_file=MOFKA_GROUPFILE)
self.request_producer = StreamProducer(publisher=publisher)

def connect_request_consumer(self):
Expand All @@ -143,7 +141,6 @@ def connect_request_consumer(self):
subscriber = KafkaSubscriber(client=consumer)
else:
subscriber = MofkaSubscriber(
protocol=MOFKA_PROTOCOL,
group_file=MOFKA_GROUPFILE,
topic_name=request_topic,
subscriber_name=str(f"MOFA-request-{uuid4()}"),
Expand All @@ -166,7 +163,6 @@ def connect_result_consumer(self, topic):
subscriber = KafkaSubscriber(client=consumer)
else:
subscriber = MofkaSubscriber(
protocol=MOFKA_PROTOCOL,
group_file=MOFKA_GROUPFILE,
topic_name=topic,
subscriber_name=str(f"MOFA-result-{uuid4()}"),
Expand Down

0 comments on commit e9180a4

Please sign in to comment.