From 0e2ee03e0f64d6f55824e7f05c70cada55f5b283 Mon Sep 17 00:00:00 2001 From: wheelly Date: Tue, 13 Feb 2024 14:29:17 +0200 Subject: [PATCH] #138 - kafka.read watermark commit --- .../datayoga_core/blocks/kafka/read/block.py | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 01504367..b4e4b1b3 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -27,21 +27,20 @@ class Block(DyProducer, metaclass=ABCMeta): def init(self, context: Optional[Context] = None): logger.debug(f"Initializing {self.get_block_name()}") connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context) - - self.port = int(connection_details.get("port", 9092)) logger.debug(f"Connection details: {json.dumps(connection_details)}") self.bootstrap_servers = connection_details.get("bootstrap_servers") self.group = self.properties.get("group") self.topic = self.properties["topic"] self.seek_to_beginning = self.properties.get("seek_to_beginning", False) self.snapshot = self.properties.get("snapshot", False) - self.consumer = Consumer({ + + + async def produce(self) -> AsyncGenerator[List[Message], None]: + consumer = Consumer({ 'bootstrap.servers': self.bootstrap_servers, 'group.id': self.group, 'enable.auto.commit': 'false' }) - - async def produce(self) -> AsyncGenerator[List[Message], None]: logger.debug(f"Producing {self.get_block_name()}") if self.seek_to_beginning: @@ -49,14 +48,15 @@ def on_assign(c, ps): for p in ps: p.offset = -2 c.assign(ps) - self.consumer.subscribe([self.topic], on_assign) + consumer.subscribe([self.topic], on_assign) else: - self.consumer.subscribe([self.topic]) + consumer.subscribe([self.topic]) try: while True: # Poll for messages - msg = self.consumer.poll(3.0 if self.snapshot else None) + msg = consumer.poll(3.0 if self.snapshot else None) + counter = next(count()) if msg is None: assert self.snapshot logger.warning(f"Snapshot defined quitting on topic {self.topic}"'') @@ -73,14 +73,12 @@ def on_assign(c, ps): # Process the message message = orjson.loads(msg.value()) yield [{self.MSG_ID_FIELD: msg.offset(), **message}] + if counter % self.MIN_COMMIT_COUNT == 0: + consumer.commit(asynchronous=False) finally: - self.consumer.close() + consumer.close() + - def ack(self, msg_ids: List[str]): - try: - self.consumer.commit(asynchronous=False) - except Exception as e: - logger.error(f"Cannot commit: {e}")