diff --git a/core/src/datayoga_core/blocks/kafka/read/block.py b/core/src/datayoga_core/blocks/kafka/read/block.py index 8711d0b5..01504367 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.py +++ b/core/src/datayoga_core/blocks/kafka/read/block.py @@ -31,8 +31,8 @@ def init(self, context: Optional[Context] = None): self.port = int(connection_details.get("port", 9092)) logger.debug(f"Connection details: {json.dumps(connection_details)}") self.bootstrap_servers = connection_details.get("bootstrap_servers") - self.group = connection_details.get("group") - self.topic = connection_details.get("topic", "integration-tests") + self.group = self.properties.get("group") + self.topic = self.properties["topic"] self.seek_to_beginning = self.properties.get("seek_to_beginning", False) self.snapshot = self.properties.get("snapshot", False) self.consumer = Consumer({ diff --git a/core/src/datayoga_core/blocks/kafka/read/block.schema.json b/core/src/datayoga_core/blocks/kafka/read/block.schema.json index d8364259..b9a2a59c 100644 --- a/core/src/datayoga_core/blocks/kafka/read/block.schema.json +++ b/core/src/datayoga_core/blocks/kafka/read/block.schema.json @@ -7,6 +7,14 @@ "description": "host name", "type": "string" }, + "topic": { + "description": "Kafka topic", + "type": "string" + }, + "group": { + "description": "Kafka group", + "type": "string" + }, "seek_to_beginning": { "description": "Consumer seek to beginning", "type": "boolean" @@ -19,6 +27,7 @@ } }, "required": [ - "bootstrap_servers" + "bootstrap_servers", + "topic" ] } diff --git a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json index 7bb5f08e..f56c5122 100644 --- a/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json +++ b/core/src/datayoga_core/resources/schemas/connections/kafka.schema.json @@ -11,24 +11,15 @@ "bootstrap_servers": { "description": "Kafka Hosts", "type": "string" - }, - "topic": { - "description": "Kafka Topics", - "type": "string" - }, - "group": { - "description": "Kafka Group", - "type": "string" } }, "additionalProperties": false, - "required": ["type", "bootstrap_servers", "topic"], + "required": ["type", "bootstrap_servers"], "examples": [ { "kafka": { "type": "kafka", - "bootstrap_servers": ["localhost"], - "topic": "test" + "bootstrap_servers": ["localhost:9092"] } } ] diff --git a/integration-tests/resources/connections.yaml b/integration-tests/resources/connections.yaml index 1b469a27..e35e48e0 100644 --- a/integration-tests/resources/connections.yaml +++ b/integration-tests/resources/connections.yaml @@ -47,5 +47,3 @@ cassandra: kafka: type: kafka bootstrap_servers: "localhost:9093" - group: "integration-tests" - topic: "integration-tests" diff --git a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml index 36f6feb5..b49a688d 100644 --- a/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml +++ b/integration-tests/resources/jobs/tests/kafka_to_stdout.yaml @@ -2,6 +2,8 @@ input: uses: kafka.read with: bootstrap_servers: kafka + topic: "integration-tests" + group: "integration-tests" snapshot: true seek_to_beginning: true steps: