Skip to content

Commit

Permalink
#138 - kafka.read - autopep8
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 18, 2024
1 parent 9a74193 commit 26d7397
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
11 changes: 7 additions & 4 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ class Block(DyProducer, metaclass=ABCMeta):

def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)
connection_details = utils.get_connection_details(
self.properties["bootstrap_servers"], context)
logger.debug(f"Connection details: {json.dumps(connection_details)}")
self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.seek_to_beginning = self.properties.get(
"seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)

async def produce(self) -> AsyncGenerator[List[Message], None]:
Expand Down Expand Up @@ -62,7 +64,9 @@ def on_assign(c, ps):
counter = next(count())
if msg is None:
assert self.snapshot
logger.warning(f"Snapshot defined quitting on topic {self.topic}"'')
logger.warning(
f"Snapshot defined quitting on topic {self.topic}"
'')
break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
Expand All @@ -82,7 +86,6 @@ def on_assign(c, ps):
if counter % self.MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)


finally:
try:
consumer.commit(asynchronous=False)
Expand Down
7 changes: 5 additions & 2 deletions integration-tests/common/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@


def get_kafka_container() -> KafkaContainer:
return (KafkaContainer(image="confluentinc/cp-kafka:latest")
.with_bind_ports(KafkaContainer.KAFKA_PORT, KafkaContainer.KAFKA_PORT))
return (
KafkaContainer(
image="confluentinc/cp-kafka:latest") .with_bind_ports(
KafkaContainer.KAFKA_PORT,
KafkaContainer.KAFKA_PORT))


def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer:
Expand Down
6 changes: 4 additions & 2 deletions integration-tests/test_kafka_to_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def test_kafka_to_redis():
kafka_container = kafka_utils.get_kafka_container()
try:
with kafka_container as kafka:
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container = redis_utils.get_redis_oss_container(
redis_utils.REDIS_PORT)
redis_container.start()

bootstrap_servers = kafka.get_bootstrap_server()
Expand All @@ -23,7 +24,8 @@ def test_kafka_to_redis():
producer.flush()
run_job("tests.kafka_to_redis")

redis_client = redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT)
redis_client = redis_utils.get_redis_client(
"localhost", redis_utils.REDIS_PORT)

assert len(redis_client.keys()) == 2

Expand Down

0 comments on commit 26d7397

Please sign in to comment.