Skip to content

Commit

Permalink
#138 - kafka.read - merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
wheelly committed Feb 18, 2024
1 parent ddb1ecb commit e5425ed
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 16 deletions.
5 changes: 2 additions & 3 deletions core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,9 @@ kafka = ["kafka-python", "confluent-kafka"]
test = [
"aiohttp",
"cassandra-driver",
"confluent-kafka",
"fastparquet",
"ibm_db_sa",
"kafka-python",
"kafka",
"mock",
"oracledb",
"psycopg2-binary",
Expand All @@ -87,7 +86,7 @@ test = [
"redis",
"requests-mock",
"SQLAlchemy",
"testcontainers",
"testcontainers"
]

[tool.poetry.urls]
Expand Down
8 changes: 3 additions & 5 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import AsyncGenerator, List, Optional

import orjson
from confluent_kafka import Consumer, KafkaError, TopicPartition
from confluent_kafka import Consumer, KafkaError
from datayoga_core import utils
from datayoga_core.context import Context
from datayoga_core.producer import Message
Expand All @@ -26,14 +26,12 @@ class Block(DyProducer, metaclass=ABCMeta):

def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
connection_details = utils.get_connection_details(
self.properties["bootstrap_servers"], context)
connection_details = utils.get_connection_details(self.properties["bootstrap_servers"], context)
logger.debug(f"Connection details: {json.dumps(connection_details)}")
self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get(
"seek_to_beginning", False)
self.seek_to_beginning = self.properties.get("seek_to_beginning", False)
self.snapshot = self.properties.get("snapshot", False)

async def produce(self) -> AsyncGenerator[List[Message], None]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"const": "kafka"
},
"bootstrap_servers": {
"description": "Kafka Hosts",
"description": "Kafka Hosts List comma separated",
"type": "string"
}
},
Expand All @@ -19,7 +19,7 @@
{
"kafka": {
"type": "kafka",
"bootstrap_servers": ["localhost:9092"]
"bootstrap_servers": ["localhost:9092,localhost:9093"]
}
}
]
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/resources/jobs/tests/kafka_to_redis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
topic: integration-tests
group: integration-tests
snapshot: true
steps:
- uses: redis.write
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/resources/jobs/tests/kafka_to_stdout.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
topic: integration-tests
group: integration-tests
snapshot: true
steps:
- uses: std.write
3 changes: 1 addition & 2 deletions integration-tests/test_kafka_to_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ def test_kafka_to_redis():
producer.flush()
run_job("tests.kafka_to_redis")

redis_client = redis_utils.get_redis_client(
"localhost", redis_utils.REDIS_PORT)
redis_client = redis_utils.get_redis_client("localhost", redis_utils.REDIS_PORT)

assert len(redis_client.keys()) == 2

Expand Down

0 comments on commit e5425ed

Please sign in to comment.