Skip to content

Commit

Permalink
#138 - kafka.read test finally works
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 8, 2024
1 parent da42af4 commit 831a7e6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
42 changes: 22 additions & 20 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
from datayoga_core.context import Context
from typing import AsyncGenerator, List, Optional
from datayoga_core.producer import Producer as DyProducer, Message
from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition, OFFSET_BEGINNING
from confluent_kafka import Consumer, KafkaError

from itertools import count
from datayoga_core import utils
import orjson

logger = logging.getLogger("dy")

Expand All @@ -34,35 +35,32 @@ def init(self, context: Optional[Context] = None):
self.topic = connection_details.get("topic", "integration-tests")
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)

async def produce(self) -> AsyncGenerator[List[Message], None]:
logger.debug(f"Producing {self.get_block_name()}")

consumer = Consumer({
self.consumer = Consumer({
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': 'false'
})

async def produce(self) -> AsyncGenerator[List[Message], None]:
logger.debug(f"Producing {self.get_block_name()}")

if self.seek_to_beginning:
def on_assign(c, ps):
for p in ps:
p.offset = -2
c.assign(ps)
consumer.subscribe([self.topic], on_assign)
self.consumer.subscribe([self.topic], on_assign)
else:
consumer.subscribe([self.topic])
self.consumer.subscribe([self.topic])

try:
while True:
# Poll for messages
msg = consumer.poll(1.0)
counter = next(count())

msg = self.consumer.poll(3.0 if self.snapshot else None)
if msg is None:
if self.snapshot:
break
continue
assert self.snapshot
logger.warning(f"Snapshot defined quitting on topic {self.topic}"'')
break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
Expand All @@ -73,12 +71,16 @@ def on_assign(c, ps):

else:
# Process the message
yield [{self.MSG_ID_FIELD: msg.value()}]
# Commit the message offset
consumer.commit(msg)
if counter % self.MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)
message = orjson.loads(msg.value())
yield [{self.MSG_ID_FIELD: msg.offset(), **message}]

finally:
consumer.close()
self.consumer.close()

def ack(self, msg_ids: List[str]):
try:
self.consumer.commit(asynchronous=False)
except Exception as e:
logger.error(f"Cannot commit: {e}")


30 changes: 17 additions & 13 deletions integration-tests/test_kafka_to_stdout.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import logging
import os

from common import kafka_utils
from common.utils import run_job

logger = logging.getLogger("dy")
message_one = b'{"id":1,"name":"Boris"}'
message_two = b'{"id":2,"name":"Ivan"}'

def test_kafka_to_stdout(tmpdir):
kafka_container = kafka_utils.get_kafka_container()
with kafka_container as kafka:
bootstrap_servers = kafka.get_bootstrap_server()
logger.info(f"Connecting to Kafka: {bootstrap_servers}")
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.produce("integration-tests", b'{"hello": "Earth"}')
producer.produce("integration-tests", b'{"bye": "Mars"}')
producer.flush()
output_file = tmpdir.join("tests_kafka_to_stdout.txt")
run_job("tests.kafka_to_stdout", None, output_file)
logger.error(output_file.read())
# result = json.loads(output_file.read())
# assert result[0]["hello"] == "Earth"
# assert result[1]["bye"] == "Mars"
try:
with kafka_container as kafka:
bootstrap_servers = kafka.get_bootstrap_server()
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.produce("integration-tests", message_one)
producer.produce("integration-tests", message_two)
producer.flush()
output_file = tmpdir.join("tests_kafka_to_stdout.txt")
run_job("tests.kafka_to_stdout", None, output_file)
result = output_file.readlines()
assert result[0].strip().encode() == message_one
assert result[1].strip().encode() == message_two
finally:
os.remove(output_file)



0 comments on commit 831a7e6

Please sign in to comment.