Skip to content

Commit

Permalink
Add missing mofa switch statement
Browse files Browse the repository at this point in the history
  • Loading branch information
ValHayot committed Dec 11, 2024
1 parent b3e9be0 commit eeac24e
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions mofa/proxyqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,23 @@ def connect_request_producer(self):

def connect_request_consumer(self):
if not isinstance(self.request_consumer, StreamConsumer):
consumer = Consumer(
confluent_consumer_conf(self.group_id, self.auto_offset_reset)
)
request_topic = f"{self.prefix}_requests"
# consumer.subscribe([request_topic])
topic_partition = TopicPartition(request_topic, partition=0)
consumer.assign([topic_partition])
subscriber = KafkaSubscriber(client=consumer)

if ENGINE == "octopus":
consumer = Consumer(
confluent_consumer_conf(self.group_id, self.auto_offset_reset)
)
request_topic = f"{self.prefix}_requests"
# consumer.subscribe([request_topic])
topic_partition = TopicPartition(request_topic, partition=0)
consumer.assign([topic_partition])
subscriber = KafkaSubscriber(client=consumer)
else:
subscriber = MofkaSubscriber(
protocol=MOFKA_PROTOCOL,
group_file=MOFKA_GROUPFILE,
topic_name=request_topic,
subscriber_name=str(f"MOFA-request-{uuid4()}"),
)
self.request_consumer = StreamConsumer(
subscriber=subscriber,
)
Expand All @@ -160,7 +169,7 @@ def connect_result_consumer(self, topic):
protocol=MOFKA_PROTOCOL,
group_file=MOFKA_GROUPFILE,
topic_name=topic,
subscriber_name=str(f"MOFA-{uuid4()}"),
subscriber_name=str(f"MOFA-result-{uuid4()}"),
)

oconsumer = StreamConsumer(
Expand Down

0 comments on commit eeac24e

Please sign in to comment.