Skip to content

Commit

Permalink
Add fixes, more tests and add scheduled flush to kafka output
Browse files Browse the repository at this point in the history
  • Loading branch information
ppcad committed Jun 21, 2024
1 parent 9ef29c4 commit 3ec4e41
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 17 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
* add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3
* make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0
* add option to Opensearch Output Connector to use parallel bulk implementation (default is True)
* make confluent kafka output store offsets only for successfully delivered events
* add `input_connector_metadata` preprocessor that allows input connectors to add a `_metadata` field to events.
* make confluent kafka output store offsets only for successfully delivered events if configured for that.

### Improvements

Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition:
return TopicPartition(
self._config.topic,
partition=last_partition,
offset=last_offset,
offset=last_offset if isinstance(last_offset, int) else last_offset[0],
)

def batch_finished_callback(self, metadata: Optional[dict] = None) -> None:
Expand Down
110 changes: 101 additions & 9 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
queue.buffering.max.ms: 0.5
"""
import json
from collections import defaultdict
from datetime import datetime
from functools import cached_property, partial
from logging import Logger
from socket import getfqdn
from typing import Optional
from typing import Optional, DefaultDict

from attrs import define, field, validators
from confluent_kafka import KafkaException, Producer
Expand Down Expand Up @@ -153,10 +155,17 @@ class Config(Output.Config):
"""The topic to which processed messages will be sent."""
error_topic: str
"""The topic to which error messages will be sent."""
flush_timeout: float
producer_flush_timeout: float
"""Timeout for sending all messages from the producer queue to kafka."""
send_timeout: int = field(validator=validators.instance_of(int), default=0)
"""Timeout for sending messages to kafka. Values above 0 make it blocking."""
flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60)
"""(Optional) Timeout after :code:`_sent_offset_backlog` is flushed if
:code:`sent_offset_backlog_size` is not reached."""
fire_and_forget: bool = field(validator=validators.instance_of(bool), default=False)
"""If True, offsets will be set after sending messages instead of waiting for delivery."""
sent_offset_backlog_size: int = field(validator=validators.instance_of(int), default=1)
""" (Optional) count of delivered messages before batch_finished_callback is called."""
kafka_config: Optional[dict] = field(
validator=[
validators.instance_of(dict),
Expand All @@ -181,6 +190,23 @@ class Config(Output.Config):
"""

__slots__ = [
"_sent_offset_backlog",
"_delivered_offset_backlog",
]

_sent_offset_backlog: DefaultDict[str, list]
_delivered_offset_backlog: DefaultDict[str, list]

def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._sent_offset_backlog = defaultdict(list)
self._delivered_offset_backlog = defaultdict(list)

@property
def _sent_offset_backlog_size(self):
return sum(map(len, self._sent_offset_backlog.values()))

@cached_property
def _producer(self):
injected_config = {
Expand Down Expand Up @@ -253,6 +279,7 @@ def store(self, document: dict) -> Optional[bool]:
"""
self.metrics.number_of_processed_events += 1
self._send_to_kafka(document, self._config.topic)
self._add_offset_to_sent_backlog(document)

@Metric.measure_time()
def store_custom(self, document: dict, target: str) -> None:
Expand Down Expand Up @@ -290,6 +317,7 @@ def _send_to_kafka(self, document, target, set_offsets=True):
"""
try:
set_offsets &= not self._config.fire_and_forget
callback = self.delivered_callback(document) if set_offsets else None
self._producer.produce(
target,
Expand All @@ -299,12 +327,18 @@ def _send_to_kafka(self, document, target, set_offsets=True):
self._producer.poll(self._config.send_timeout)
except BufferError:
# block program until buffer is empty
self._producer.flush(timeout=self._config.flush_timeout)
self._producer.flush(timeout=self._config.producer_flush_timeout)
except BaseException as error:
raise CriticalOutputError(
self, f"Error storing output document -> {error}", document
) from error

if self._config.fire_and_forget:
return

if self._sent_offset_backlog_size >= self._config.sent_offset_backlog_size:
self._write_backlog()

@Metric.measure_time()
def store_failed(
self, error_message: str, document_received: dict, document_processed: dict
Expand Down Expand Up @@ -335,15 +369,58 @@ def store_failed(
"timestamp": str(datetime.now()),
}
try:
callback = (
self.delivered_callback(document_processed)
if not self._config.fire_and_forget
else None
)
self._producer.produce(
self._config.error_topic,
value=json.dumps(value, separators=(",", ":")).encode("utf-8"),
on_delivery=self.delivered_callback(document_processed),
on_delivery=callback
)
self._producer.poll(self._config.send_timeout)
except BufferError:
# block program until buffer is empty
self._producer.flush(timeout=self._config.flush_timeout)
self._producer.flush(timeout=self._config.producer_flush_timeout)
self._add_offset_to_sent_backlog(document_received)

def _add_offset_to_sent_backlog(self, document):
if not self._config.fire_and_forget:
metadata = document.get("_metadata", {})
partition = metadata.get("last_partition", None)
offset = metadata.get("last_offset", None)
if not (partition is None and offset is None):
self._sent_offset_backlog[partition].append(offset)

@staticmethod
def _get_last_committable_offsets(
sent_offset_backlog: DefaultDict[str, list],
delivered_offset_backlog: DefaultDict[str, list],
) -> dict:
last_committable = {}
for partition, offsets in delivered_offset_backlog.items():
if not offsets:
continue

if len(offsets) == 1:
last_committable[partition] = offsets[0]
continue

offsets.sort()
prev_offset = offsets[0]
for offset in offsets[1:]:
if offset > prev_offset + 1:
unexpected_gap = False
for missing_offset in range(prev_offset + 1, offset):
if missing_offset in sent_offset_backlog.get(partition, []):
last_committable[partition] = prev_offset
unexpected_gap = True
if unexpected_gap:
break
last_committable[partition] = offset
prev_offset = offset
return last_committable

def delivered_callback(self, document):
"""Callback that can called when a single message has been successfully delivered.
Expand All @@ -369,24 +446,39 @@ def delivered_callback(self, document):
def store_offsets_on_success(error, _):
"""Set offset via message stored in closure if no error occurred.
"delivered_callback_frequency" can be configured to prevent setting the callback for
"sent_offset_backlog_size" can be configured to prevent setting the callback for
every single message that was successfully delivered.
Setting this higher than 1 might be useful if auto-commit is disabled.
"""
if error:
raise FatalOutputError(output=self, message=error)
self.metrics.number_of_successfully_delivered_events += 1
self.input_connector.batch_finished_callback(metadata=partition_offset)
last_partition = partition_offset["last_partition"]
last_offset = partition_offset["last_offset"]
self._delivered_offset_backlog[last_partition].append(last_offset)

return store_offsets_on_success

@Metric.measure_time()
def _write_backlog(self):
self._producer.flush(self._config.flush_timeout)
self._producer.flush(self._config.producer_flush_timeout)
if self._config.fire_and_forget:
return

last_commitable_offsets = self._get_last_committable_offsets(
self._sent_offset_backlog, self._delivered_offset_backlog
)
for partition, offset in last_commitable_offsets.items():
committable_offset = {"last_partition": partition, "last_offset": offset}
self.input_connector.batch_finished_callback(metadata=committable_offset)
self._delivered_offset_backlog.clear()
self._sent_offset_backlog.clear()

def setup(self):
super().setup()
flush_timeout = self._config.flush_timeout
self._schedule_task(task=self._write_backlog, seconds=flush_timeout)
try:
_ = self._producer
except (KafkaException, ValueError) as error:
Expand All @@ -395,4 +487,4 @@ def setup(self):
def shut_down(self) -> None:
"""ensures that all messages are flushed"""
if self._producer is not None:
self._producer.flush(self._config.flush_timeout)
self._write_backlog()
58 changes: 57 additions & 1 deletion tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ def test_shut_down_calls_consumer_close(self, _):
],
)
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers):
def test_batch_finished_callback_calls_offsets_handler_for_setting_without_metadata(
self, _, settings, handlers
):
input_config = deepcopy(self.CONFIG)
input_config["use_metadata_for_offsets"] = False
kafka_input = Factory.create({"test": input_config}, logger=self.logger)
kafka_input._config.kafka_config.update(settings)
kafka_consumer = kafka_input._consumer
Expand Down Expand Up @@ -409,10 +412,63 @@ def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback
kafka_input.output_connector = mock.MagicMock()
kafka_input.batch_finished_callback = mock.MagicMock()
mock_partitions = [mock.MagicMock()]
kafka_input.output_connector._sent_offset_backlog = {}
kafka_input.output_connector._delivered_offset_backlog = {}
kafka_input._revoke_callback(mock_consumer, mock_partitions)
kafka_input.output_connector._write_backlog.assert_called()
kafka_input.batch_finished_callback.assert_called()

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_revoke_callback_writes_output_backlog_and_does_not_call_batch_finished_callback_metadata(
self, mock_consumer
):
input_config = deepcopy(self.CONFIG)
input_config["use_metadata_for_offsets"] = True
kafka_input = Factory.create({"test": input_config}, logger=self.logger)

kafka_input.output_connector = mock.MagicMock()
kafka_input.batch_finished_callback = mock.MagicMock()
mock_partitions = [mock.MagicMock()]
kafka_input.output_connector._sent_offset_backlog = {0: [0]}
kafka_input.output_connector._delivered_offset_backlog = {0: [0]}
kafka_input._revoke_callback(mock_consumer, mock_partitions)
kafka_input.output_connector._write_backlog.assert_called()
kafka_input.batch_finished_callback.assert_not_called()

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_handle_offsets_uses_delivered_offsets_if_use_metadata(self, _):
input_config = deepcopy(self.CONFIG)
input_config["use_metadata_for_offsets"] = True
kafka_input = Factory.create({"test": input_config}, logger=self.logger)

kafka_input.output_connector = mock.MagicMock()
metadata = {"last_partition": 0, "last_offset": 0}
kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata)
offsets = [TopicPartition(kafka_input._config.topic, partition=0, offset=0)]
kafka_input._consumer.store_offsets.assert_called_with(offsets=offsets)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_handle_offsets_raises_Exception_if_use_metadata_but_no_metadata(self, _):
input_config = deepcopy(self.CONFIG)
input_config["use_metadata_for_offsets"] = True
kafka_input = Factory.create({"test": input_config}, logger=self.logger)

kafka_input.output_connector = mock.MagicMock()
metadata = {}
with pytest.raises(FatalInputError, match="'last_partition' and 'last_offset' required"):
kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_handle_offsets_uses__last_valid_records_if_not_use_metadata(self, _):
input_config = deepcopy(self.CONFIG)
input_config["use_metadata_for_offsets"] = False
kafka_input = Factory.create({"test": input_config}, logger=self.logger)

kafka_input.output_connector = mock.MagicMock()
kafka_input._last_valid_records = {0: "MESSAGE_OBJECT"}
kafka_input._handle_offsets(kafka_input._consumer.store_offsets, {})
kafka_input._consumer.store_offsets.assert_called_with(message="MESSAGE_OBJECT")

@pytest.mark.parametrize(
"metadata",
[{}, {"last_offset": 0}, {"last_partition": 0}],
Expand Down
Loading

0 comments on commit 3ec4e41

Please sign in to comment.