Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

138 kafka.read block #355

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,074 changes: 1,761 additions & 1,313 deletions core/poetry.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pymssql = { version = "^2.2.7", optional = true }
PyMySQL = { version = "^1.0.2", optional = true }
redis = { version = "^4.3.5", optional = true }
SQLAlchemy = { version = "^2.0.4", optional = true }
kafka-python = { version = "^2.0.2", optional = true }
confluent-kafka = { version= "^2.3.0", optional = true }
wheelly marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.extras]
azure = ["azure-eventhub", "azure-eventhub-checkpointstoreblob-aio"]
Expand All @@ -62,12 +64,15 @@ parquet = ["fastparquet"]
pg = ["psycopg2-binary", "SQLAlchemy"]
redis = ["redis"]
sqlserver = ["pymssql", "SQLAlchemy"]
kafka = ["kafka-python", "confluent-kafka"]

test = [
"aiohttp",
"cassandra-driver",
"confluent-kafka",
"fastparquet",
"ibm_db_sa",
"kafka-python",
wheelly marked this conversation as resolved.
Show resolved Hide resolved
"mock",
"oracledb",
"psycopg2-binary",
Expand All @@ -82,7 +87,7 @@ test = [
"redis",
"requests-mock",
"SQLAlchemy",
"testcontainers"
"testcontainers",
wheelly marked this conversation as resolved.
Show resolved Hide resolved
]

[tool.poetry.urls]
Expand Down
Empty file.
92 changes: 92 additions & 0 deletions core/src/datayoga_core/blocks/kafka/read/block.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import logging
from abc import ABCMeta
from itertools import count
from typing import AsyncGenerator, List, Optional

import orjson
from confluent_kafka import Consumer, KafkaError, TopicPartition
wheelly marked this conversation as resolved.
Show resolved Hide resolved
from datayoga_core import utils
from datayoga_core.context import Context
from datayoga_core.producer import Message
from datayoga_core.producer import Producer as DyProducer

logger = logging.getLogger("dy")


class Block(DyProducer, metaclass=ABCMeta):
bootstrap_servers: str
group: str
topic: str
seek_to_beginning: bool
snapshot: bool
INTERNAL_FIELD_PREFIX = "__$$"
MSG_ID_FIELD = f"{INTERNAL_FIELD_PREFIX}msg_id"
MIN_COMMIT_COUNT = 10

def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")
connection_details = utils.get_connection_details(
self.properties["bootstrap_servers"], context)
wheelly marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Connection details: {json.dumps(connection_details)}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to move this debug log to the get_connection_details itself, so we will have it printed there whenever it's called.

self.bootstrap_servers = connection_details.get("bootstrap_servers")
self.group = self.properties.get("group")
self.topic = self.properties["topic"]
self.seek_to_beginning = self.properties.get(
"seek_to_beginning", False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a oneliner (line length < 120).

self.snapshot = self.properties.get("snapshot", False)

async def produce(self) -> AsyncGenerator[List[Message], None]:
consumer = Consumer(**{
'bootstrap.servers': self.bootstrap_servers,
'group.id': self.group,
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant comma,

Comment on lines +39 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace ' to ", normally this is what we use. For consistency sake.

})
logger.debug(f"Producing {self.get_block_name()}")
# consumer.assign([TopicPartition(self.topic, 0)])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused code.


if self.seek_to_beginning:
def on_assign(c, ps):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add types and rename to something more meaningful than c and ps.

for p in ps:
p.offset = -2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is -2? Consider using a const for it.

c.assign(ps)

consumer.subscribe([self.topic], on_assign)
logger.debug(f"Seeking to beginning on topic {self.topic}"'')
else:
consumer.subscribe([self.topic])

try:
while True:
# Poll for messages
msg = consumer.poll(3.0 if self.snapshot else None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is 3.0? consider using a const instead.

counter = next(count())
if msg is None:
assert self.snapshot
logger.warning(f"Snapshot defined quitting on topic {self.topic}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a warning actually as it behaves as expected in snapshot mode, change to info, please.

break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
logger.info("Reached end of partition")
else:
# Handle other errors
logger.error("Error: {}".format(msg.error()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use f-string to be consistent.


else:
# Process the message
message = orjson.loads(msg.value())
yield [{self.MSG_ID_FIELD: msg.offset(), **message}]
# res = consumer.get_watermark_offsets(TopicPartition(self.topic, 0))
# logger.error(res)
# consumer.commit(offsets=res, asynchronous=False)
Comment on lines +79 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove unused code.

if counter % self.MIN_COMMIT_COUNT == 0:
consumer.commit(asynchronous=False)

finally:
try:
consumer.commit(asynchronous=False)
except Exception as e:
logger.error(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a message so it would be clear during what it fails, for example:

logger.error(f"Error while committing: {e}")

consumer.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be in another finally for better readability, IMO.

30 changes: 30 additions & 0 deletions core/src/datayoga_core/blocks/kafka/read/block.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"title": "kafka.read",
"description": "Read from the kafka consumer",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to:
Read from Kafka topic

"type": "object",
"properties": {
"bootstrap_servers": {
"description": "host name",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Host name

"type": "string"
},
"topic": {
"description": "Kafka topic",
"type": "string"
},
"group": {
"description": "Kafka group",
"type": "string"
},
"seek_to_beginning": {
"description": "Consumer seek to beginning",
"type": "boolean"
},
"snapshot": {
"type": "boolean",
"title": "Snapshot current entries and quit",
"description": "Snapshot current entries and quit",
"default": false
}
},
"required": ["bootstrap_servers", "topic"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"mysql",
"postgresql",
"redis",
"sqlserver"
"sqlserver",
"kafka"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep the list alphabetically sorted.

]
},
"if": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"title": "Kafka",
"description": "Schema for configuring Kafka connection parameters",
"type": "object",
"properties": {
"type": {
"description": "Connection type",
"type": "string",
"const": "kafka"
},
"bootstrap_servers": {
"description": "Kafka Hosts",
wheelly marked this conversation as resolved.
Show resolved Hide resolved
"type": "string"
}
},
"additionalProperties": false,
"required": ["type", "bootstrap_servers"],
"examples": [
{
"kafka": {
"type": "kafka",
"bootstrap_servers": ["localhost:9092"]
}
}
]
}
28 changes: 28 additions & 0 deletions docs/reference/blocks/kafka_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
parent: Blocks
grand_parent: Reference
---

# kafka\.read

Read from the kafka consumer


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**bootstrap\_servers**|`string`|host name<br/>|yes|
|**topic**|`string`|Kafka topic<br/>|yes|
|**group**|`string`|Kafka group<br/>|no|
|**seek\_to\_beginning**|`boolean`|Consumer seek to beginning<br/>|no|
|**snapshot**<br/>(Snapshot current entries and quit)|`boolean`|Snapshot current entries and quit<br/>Default: `false`<br/>|no|

**Example**

```yaml
snapshot: false

```


29 changes: 29 additions & 0 deletions docs/reference/connection_types/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
parent: Connection Types
grand_parent: Reference
---

# Kafka

Schema for configuring Kafka connection parameters


**Properties**

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**type**|`string`|Connection type<br/>Constant Value: `"kafka"`<br/>|yes|
|**bootstrap\_servers**|`string`|Kafka Hosts<br/>|yes|

**Additional Properties:** not allowed
**Example**

```yaml
kafka:
type: kafka
bootstrap_servers:
- localhost:9092

```


2 changes: 1 addition & 1 deletion docs/reference/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Connection catalog

|Name|Type|Description|Required|
|----|----|-----------|--------|
|**type**|`string`|Connection type<br/>Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`<br/>|yes|
|**type**|`string`|Connection type<br/>Enum: `"cassandra"`, `"db2"`, `"http"`, `"mysql"`, `"postgresql"`, `"redis"`, `"sqlserver"`, `"kafka"`<br/>|yes|
|**if**|||no|
|**then**|||no|

Expand Down
14 changes: 14 additions & 0 deletions integration-tests/common/kafka_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from kafka import KafkaProducer
from testcontainers.kafka import KafkaContainer


def get_kafka_container() -> KafkaContainer:
return (
KafkaContainer(
image="confluentinc/cp-kafka:latest") .with_bind_ports(
KafkaContainer.KAFKA_PORT,
KafkaContainer.KAFKA_PORT))


def get_kafka_producer(bootstrap_servers: str) -> KafkaProducer:
return KafkaProducer(bootstrap_servers=bootstrap_servers)
3 changes: 3 additions & 0 deletions integration-tests/resources/connections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ cassandra:
type: cassandra
hosts: ["localhost"]
port: 9042
kafka:
type: kafka
bootstrap_servers: "localhost:9093"
15 changes: 15 additions & 0 deletions integration-tests/resources/jobs/tests/kafka_to_redis.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
wheelly marked this conversation as resolved.
Show resolved Hide resolved
snapshot: true
steps:
- uses: redis.write
with:
connection: cache
command: HSET
key:
expression: id
language: jmespath
9 changes: 9 additions & 0 deletions integration-tests/resources/jobs/tests/kafka_to_stdout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
input:
uses: kafka.read
with:
bootstrap_servers: kafka
topic: "integration-tests"
group: "integration-tests"
wheelly marked this conversation as resolved.
Show resolved Hide resolved
snapshot: true
steps:
- uses: std.write
37 changes: 37 additions & 0 deletions integration-tests/test_kafka_to_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
import logging

from common import kafka_utils, redis_utils
from common.utils import run_job

logger = logging.getLogger("dy")
message_one = b'{"id":"1","name":"Boris"}'
message_two = b'{"id":"2","name":"Ivan"}'


def test_kafka_to_redis():
kafka_container = kafka_utils.get_kafka_container()
try:
with kafka_container as kafka:
wheelly marked this conversation as resolved.
Show resolved Hide resolved
redis_container = redis_utils.get_redis_oss_container(redis_utils.REDIS_PORT)
redis_container.start()

bootstrap_servers = kafka.get_bootstrap_server()
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.send("integration-tests", message_one)
producer.send("integration-tests", message_two)
producer.flush()
run_job("tests.kafka_to_redis")

redis_client = redis_utils.get_redis_client(
"localhost", redis_utils.REDIS_PORT)
wheelly marked this conversation as resolved.
Show resolved Hide resolved

assert len(redis_client.keys()) == 2

boris = redis_client.hgetall("1")
ivan = redis_client.hgetall("2")

assert boris == json.loads(message_one.decode())
assert ivan == json.loads(message_two.decode())
finally:
redis_container.stop()
43 changes: 43 additions & 0 deletions integration-tests/test_kafka_to_stdout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
import os

from common import kafka_utils
from common.utils import run_job

logger = logging.getLogger("dy")
message_one = b'{"id":1,"name":"Boris"}'
message_two = b'{"id":2,"name":"Ivan"}'
message_three = b'{"id":3,"name":"Yossi"}'
message_four = b'{"id":4,"name":"Adi"}'


def test_kafka_to_stdout(tmpdir):
kafka_container = kafka_utils.get_kafka_container()
output_file = tmpdir.join("tests_kafka_to_stdout.txt")
try:

with kafka_container as kafka:
bootstrap_servers = kafka.get_bootstrap_server()
# bootstrap_servers = "host.docker.internal:9093"
producer = kafka_utils.get_kafka_producer(bootstrap_servers)
producer.send("integration-tests", message_one)
producer.send("integration-tests", message_two)
producer.flush()

run_job("tests.kafka_to_stdout", None, output_file)
result = output_file.readlines()
assert len(result) == 2
assert result[0].strip().encode() == message_one
assert result[1].strip().encode() == message_two

producer.send("integration-tests", message_three)
producer.send("integration-tests", message_four)
producer.flush()

run_job("tests.kafka_to_stdout", None, output_file)
result = output_file.readlines()
assert len(result) == 2
assert result[0].strip().encode() == message_three
assert result[1].strip().encode() == message_four
finally:
os.remove(output_file)
Loading