Skip to content

Commit

Permalink
#138 - kafka.read watermark commit
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 13, 2024
1 parent 69f1235 commit 0e2ee03
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,36 @@ class Block(DyProducer, metaclass=ABCMeta):
def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)

self.port = int(connection_details.get("port", 9092))
logger.debug(f"Connection details: {json.dumps(connection_details)}")
self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)
self.consumer = Consumer({


async def produce(self) -> AsyncGenerator[List[Message], None]:
consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': 'false'
})

async def produce(self) -> AsyncGenerator[List[Message], None]:
logger.debug(f"Producing {self.get_block_name()}")

if self.seek_to_beginning:
def on_assign(c, ps):
for p in ps:
p.offset = -2
c.assign(ps)
self.consumer.subscribe([self.topic], on_assign)
consumer.subscribe([self.topic], on_assign)
else:
self.consumer.subscribe([self.topic])
consumer.subscribe([self.topic])

try:
while True:
# Poll for messages
msg = self.consumer.poll(3.0 if self.snapshot else None)
msg = consumer.poll(3.0 if self.snapshot else None)
counter = next(count())
if msg is None:
assert self.snapshot
logger.warning(f"Snapshot defined quitting on topic {self.topic}"'')
Expand All @@ -73,14 +73,12 @@ def on_assign(c, ps):
# Process the message
message = orjson.loads(msg.value())
yield [{self.MSG_ID_FIELD: msg.offset(), **message}]
if counter % self.MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)

finally:
self.consumer.close()
consumer.close()


def ack(self, msg_ids: List[str]):
try:
self.consumer.commit(asynchronous=False)
except Exception as e:
logger.error(f"Cannot commit: {e}")


0 comments on commit 0e2ee03

Please sign in to comment.