diff --git a/mofa/proxyqueue.py b/mofa/proxyqueue.py index aac1f9c6..a9b64fe1 100644 --- a/mofa/proxyqueue.py +++ b/mofa/proxyqueue.py @@ -131,14 +131,23 @@ def connect_request_producer(self): def connect_request_consumer(self): if not isinstance(self.request_consumer, StreamConsumer): - consumer = Consumer( - confluent_consumer_conf(self.group_id, self.auto_offset_reset) - ) - request_topic = f"{self.prefix}_requests" - # consumer.subscribe([request_topic]) - topic_partition = TopicPartition(request_topic, partition=0) - consumer.assign([topic_partition]) - subscriber = KafkaSubscriber(client=consumer) + + if ENGINE == "octopus": + consumer = Consumer( + confluent_consumer_conf(self.group_id, self.auto_offset_reset) + ) + request_topic = f"{self.prefix}_requests" + # consumer.subscribe([request_topic]) + topic_partition = TopicPartition(request_topic, partition=0) + consumer.assign([topic_partition]) + subscriber = KafkaSubscriber(client=consumer) + else: + subscriber = MofkaSubscriber( + protocol=MOFKA_PROTOCOL, + group_file=MOFKA_GROUPFILE, + topic_name=request_topic, + subscriber_name=str(f"MOFA-request-{uuid4()}"), + ) self.request_consumer = StreamConsumer( subscriber=subscriber, ) @@ -160,7 +169,7 @@ def connect_result_consumer(self, topic): protocol=MOFKA_PROTOCOL, group_file=MOFKA_GROUPFILE, topic_name=topic, - subscriber_name=str(f"MOFA-{uuid4()}"), + subscriber_name=str(f"MOFA-result-{uuid4()}"), ) oconsumer = StreamConsumer(