Skip to content

Commit

Permalink
Support headers (#208)
Browse files Browse the repository at this point in the history
* Support headers

Signed-off-by: clyang82 <[email protected]>

Test

Signed-off-by: clyang82 <[email protected]>

* Add tests

Signed-off-by: clyang82 <[email protected]>

* Address comments

Signed-off-by: clyang82 <[email protected]>

* fix test errors

Signed-off-by: clyang82 <[email protected]>

* Change exception to info

Signed-off-by: clyang82 <[email protected]>

* place headers under meta

Signed-off-by: clyang82 <[email protected]>

---------

Signed-off-by: clyang82 <[email protected]>
Co-authored-by: Alex <[email protected]>
  • Loading branch information
clyang82 and Alex-Izquierdo authored Jun 4, 2024
1 parent 191d025 commit 1084732
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 14 deletions.
52 changes: 39 additions & 13 deletions extensions/eda/plugins/event_source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,50 @@ async def main( # pylint: disable=R0914
await kafka_consumer.start()

try:
async for msg in kafka_consumer:
data = None
try:
value = msg.value.decode(encoding)
data = json.loads(value)
except json.decoder.JSONDecodeError:
data = value
except UnicodeError:
logger.exception("Unicode Error")

if data:
await queue.put({"body": data})
await asyncio.sleep(0)
await receive_msg(queue, kafka_consumer, encoding)
finally:
logger.info("Stopping kafka consumer")
await kafka_consumer.stop()


async def receive_msg(
queue: asyncio.Queue,
kafka_consumer: AIOKafkaConsumer,
encoding: str,
) -> None:
"""Receive messages from the Kafka topic and put them into the queue."""
logger = logging.getLogger()

async for msg in kafka_consumer:
event = {}

# Process headers
try:
headers = {header[0]: header[1].decode(encoding) for header in msg.headers}
event["meta"] = {}
event["meta"]["headers"] = headers
except UnicodeError:
logger.exception("Unicode error while decoding headers")

# Process message body
try:
value = msg.value.decode(encoding)
data = json.loads(value)
except json.decoder.JSONDecodeError:
logger.info("JSON decode error, storing raw value")
data = value
except UnicodeError:
logger.exception("Unicode error while decoding message body")
data = None

# Add data to the event and put it into the queue
if data:
event["body"] = data
await queue.put(event)

await asyncio.sleep(0)


if __name__ == "__main__":
"""MockQueue if running directly."""

Expand Down
20 changes: 20 additions & 0 deletions tests/integration/event_source_kafka/test_kafka_rules_headers.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
- name: test kafka source plugin with the specified header
hosts: localhost
sources:
- ansible.eda.kafka:
topic: kafka-events-plaintext
host: localhost
port: 9092
offset: earliest
encoding: ascii
rules:
- name: match kafka event
condition: event.meta.headers.foo == "bar"
action:
debug:
msg: "Rule fired successfully with headers"

- name: stop
condition: event.body == "stop"
action:
shutdown:
23 changes: 23 additions & 0 deletions tests/integration/event_source_kafka/test_kafka_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ def test_kafka_source_plaintext(kafka_certs, kafka_broker, kafka_producer):
assert "Rule fired successfully for PLAINTEXT consumers" in result.stdout.decode()


def test_kafka_source_with_headers(kafka_certs, kafka_broker, kafka_producer):
ruleset = os.path.join(
TESTS_PATH, "event_source_kafka", "test_kafka_rules_headers.yml"
)

msgs = [
json.dumps({"name": "Produced for PLAINTEXT consumers"}).encode("ascii"),
"stop".encode("ascii"),
]

headers = [
(key, value.encode("ascii"))
for key, value in json.loads('{"foo": "bar"}').items()
]

for msg in msgs:
kafka_producer.send(topic="kafka-events-plaintext", value=msg, headers=headers)

result = CLIRunner(rules=ruleset).run()

assert "Rule fired successfully with headers" in result.stdout.decode()


def test_kafka_source_ssl(kafka_certs, kafka_broker, kafka_producer):
ruleset = os.path.join(TESTS_PATH, "event_source_kafka", "test_kafka_rules_ssl.yml")

Expand Down
10 changes: 9 additions & 1 deletion tests/unit/event_source/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import json
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -28,6 +29,10 @@ async def __anext__(self):
if self.count < 2:
mock = MagicMock()
mock.value = f'{{"i": {self.count}}}'.encode("utf-8")
mock.headers = [
(key, value.encode("utf-8"))
for key, value in json.loads('{"foo": "bar"}').items()
]
self.count += 1
return mock
else:
Expand All @@ -54,5 +59,8 @@ def test_receive_from_kafka_place_in_queue(myqueue):
},
)
)
assert myqueue.queue[0] == {"body": {"i": 0}}
assert myqueue.queue[0] == {
"body": {"i": 0},
"meta": {"headers": {"foo": "bar"}},
}
assert len(myqueue.queue) == 2

0 comments on commit 1084732

Please sign in to comment.