diff --git a/CHANGELOG.md b/CHANGELOG.md index 0836533a7..62077fe6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,8 @@ * add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3 * make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0 * add option to Opensearch Output Connector to use parallel bulk implementation (default is True) - +* add `input_connector_metadata` preprocessor that allows input connectors to add a `_metadata` field to events. +* make confluent kafka output store offsets only for successfully delivered events if configured for that. ### Improvements diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 706c6ec5d..15b068416 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -112,6 +112,7 @@ class Config(Connector.Config): "log_arrival_time_target_field": Optional[str], "log_arrival_timedelta": Optional[TimeDeltaConfig], "enrich_by_env_variables": Optional[dict], + "input_connector_metadata": Optional[bool], }, ), ], @@ -172,6 +173,8 @@ class Config(Connector.Config): - `enrich_by_env_variables` - If required it is possible to automatically enrich incoming events by environment variables. To activate this preprocessor the fields value has to be a mapping from the target field name (key) to the environment variable name (value). + - `input_connector_metadata` - If set to True, metadata will be added by the input connector + if the connector implements `_add_input_connector_metadata_to_event`. """ _version_information: dict = field( @@ -186,6 +189,11 @@ class Config(Connector.Config): output_connector: Optional["Output"] __slots__ = ["pipeline_index", "output_connector"] + @property + def _add_input_connector_metadata(self): + """Check and return if input connector metadata should be added or not.""" + return bool(self._config.preprocessing.get("input_connector_metadata")) + @property def _add_hmac(self): """Check and return if a hmac should be added or not.""" @@ -283,6 +291,8 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self.metrics.number_of_processed_events += 1 if not isinstance(event, dict): raise CriticalInputError(self, "not a dict", event) + if self._add_input_connector_metadata: + event, non_critical_error_msg = self._add_input_connector_metadata_to_event(event) if self._add_hmac: event, non_critical_error_msg = self._add_hmac_to(event, raw_event) if self._add_version_info: @@ -295,8 +305,30 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self._add_env_enrichment_to_event(event) return event, non_critical_error_msg - def batch_finished_callback(self): - """Can be called by output connectors after processing a batch of one or more records.""" + def batch_finished_callback(self, metadata: Optional[dict] = None): + """Can be called by output connectors after processing a batch of one or more records. + + Parameters + ---------- + metadata: dict + Metadata that can be passed by outputs. + """ + + def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]: + """Add input connector metadata to the event. + + Does nothing unless implemented by an input connector. + + Parameters + ---------- + event_dict: dict + The event to which the metadata should be added to + + Returns + ------- + event_dict: dict + The original event extended with metadata from the input connector. + """ def _add_env_enrichment_to_event(self, event: dict): """Add the env enrichment information to the event""" diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 9565c6a8f..442bfab44 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -43,6 +43,7 @@ Consumer, KafkaException, TopicPartition, + Message, ) from logprep.abc.connector import Connector @@ -209,6 +210,15 @@ class Config(Input.Config): topic: str = field(validator=validators.instance_of(str)) """The topic from which new log messages will be fetched.""" + use_metadata_for_offsets: bool = field( + validator=validators.instance_of(bool), default=False + ) + """Use metadata to set offsets if this is set to True (default is False). + + This must be set appropriately depending on the output connector for the offsets to be set + correctly. + """ + kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -234,12 +244,14 @@ class Config(Input.Config): """ _last_valid_records: dict + _last_valid_record: Optional[Message] - __slots__ = ["_last_valid_records"] + __slots__ = ["_last_valid_records", "_last_valid_record"] def __init__(self, name: str, configuration: "Connector.Config", logger: Logger) -> None: super().__init__(name, configuration, logger) self._last_valid_records = {} + self._last_valid_record = None @cached_property def _consumer(self) -> Consumer: @@ -356,7 +368,7 @@ def describe(self) -> str: base_description = super().describe() return f"{base_description} - Kafka Input: {self._config.kafka_config['bootstrap.servers']}" - def _get_raw_event(self, timeout: float) -> bytearray: + def _get_raw_event(self, timeout: float) -> Optional[bytearray]: """Get next raw Message from Kafka. Parameters @@ -386,6 +398,7 @@ def _get_raw_event(self, timeout: float) -> bytearray: self, "A confluent-kafka record contains an error code", kafka_error ) self._last_valid_records[message.partition()] = message + self._last_valid_record = message labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels) return message.value() @@ -421,6 +434,31 @@ def _get_event(self, timeout: float) -> Union[Tuple[None, None], Tuple[dict, dic ) from error return event_dict, raw_event + def _add_input_connector_metadata_to_event(self, event: dict) -> Tuple[dict, Optional[str]]: + """Add last_partition and last_offset to _metadata. + + Pop previous last_partition and last_offset to ensure no incorrect values are set. + Try for AttributeError, since _metadata could already exist, but not be a dict. + """ + metadata = event.get("_metadata", {}) + try: + metadata.pop("last_partition", None) + metadata.pop("last_offset", None) + except AttributeError: + pass + + if metadata: + non_critical_error_msg = ( + "Couldn't add metadata to the input event as the field '_metadata' already exist." + ) + return event, non_critical_error_msg + + event["_metadata"] = { + "last_partition": self._last_valid_record.partition(), + "last_offset": self._last_valid_record.offset(), + } + return event, None + @property def _enable_auto_offset_store(self) -> bool: return self._config.kafka_config.get("enable.auto.offset.store") == "true" @@ -429,24 +467,49 @@ def _enable_auto_offset_store(self) -> bool: def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" - def batch_finished_callback(self) -> None: + def _get_delivered_partition_offset(self, metadata: dict) -> TopicPartition: + try: + last_partition = metadata["last_partition"] + last_offset = metadata["last_offset"] + except KeyError as error: + raise FatalInputError( + self, + "Missing fields in metadata for setting offsets: " + "'last_partition' and 'last_offset' required", + ) from error + return TopicPartition( + self._config.topic, + partition=last_partition, + offset=last_offset if isinstance(last_offset, int) else last_offset[0], + ) + + def batch_finished_callback(self, metadata: Optional[dict] = None) -> None: """Store offsets for each kafka partition in `self._last_valid_records` - and if configured commit them. Should be called by output connectors if - they are finished processing a batch of records. + or instead use `metadata` to obtain offsets. If configured commit them. + Should be called by output connectors if they are finished processing a batch of records. """ + metadata = {} if metadata is None else metadata if self._enable_auto_offset_store: return - self._handle_offsets(self._consumer.store_offsets) + self._handle_offsets(self._consumer.store_offsets, metadata) if not self._enable_auto_commit: - self._handle_offsets(self._consumer.commit) + self._handle_offsets(self._consumer.commit, metadata) self._last_valid_records.clear() - def _handle_offsets(self, offset_handler: Callable) -> None: - for message in self._last_valid_records.values(): + def _handle_offsets(self, offset_handler: Callable, metadata: Optional[dict]) -> None: + if self._config.use_metadata_for_offsets: + delivered_offset = self._get_delivered_partition_offset(metadata) try: - offset_handler(message=message) + offset_handler(offsets=[delivered_offset]) except KafkaException as error: - raise InputWarning(self, f"{error}, {message}") from error + raise InputWarning(self, f"{error}, {delivered_offset}") from error + else: + records = self._last_valid_records.values() + for record in records: + try: + offset_handler(message=record) + except KafkaException as error: + raise InputWarning(self, f"{error}, {record}") from error def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: @@ -472,7 +535,8 @@ def _revoke_callback(self, consumer, topic_partitions): f"partition {topic_partition.partition}" ) self.output_connector._write_backlog() - self.batch_finished_callback() + if not self._config.use_metadata_for_offsets: + self.batch_finished_callback() def _lost_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 1b950ab01..39fee502d 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -24,18 +24,19 @@ request.required.acks: -1 queue.buffering.max.ms: 0.5 """ - import json +from collections import defaultdict from datetime import datetime from functools import cached_property, partial +from logging import Logger from socket import getfqdn -from typing import Optional +from typing import Optional, DefaultDict from attrs import define, field, validators from confluent_kafka import KafkaException, Producer from logprep.abc.output import CriticalOutputError, FatalOutputError, Output -from logprep.metrics.metrics import GaugeMetric, Metric +from logprep.metrics.metrics import GaugeMetric, Metric, CounterMetric from logprep.util.validators import keys_in_validator DEFAULTS = { @@ -138,15 +139,33 @@ class Metrics(Output.Metrics): ) """Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers""" + number_of_successfully_delivered_events: CounterMetric = field( + factory=lambda: CounterMetric( + description="Number of events that were successfully delivered to Kafka", + name="number_of_successfully_delivered_events", + ) + ) + """Number of events that were successfully delivered to Kafka""" @define(kw_only=True, slots=False) class Config(Output.Config): """Confluent Kafka Output Config""" topic: str = field(validator=validators.instance_of(str)) + """The topic to which processed messages will be sent.""" error_topic: str - flush_timeout: float + """The topic to which error messages will be sent.""" + producer_flush_timeout: float + """Timeout for sending all messages from the producer queue to kafka.""" send_timeout: int = field(validator=validators.instance_of(int), default=0) + """Timeout for sending messages to kafka. Values above 0 make it blocking.""" + flush_timeout: Optional[int] = field(validator=validators.instance_of(int), default=60) + """(Optional) Timeout after :code:`_sent_offset_backlog` is flushed if + :code:`sent_offset_backlog_size` is not reached.""" + fire_and_forget: bool = field(validator=validators.instance_of(bool), default=False) + """If True, offsets will be set after sending messages instead of waiting for delivery.""" + sent_offset_backlog_size: int = field(validator=validators.instance_of(int), default=1) + """ (Optional) count of delivered messages before batch_finished_callback is called.""" kafka_config: Optional[dict] = field( validator=[ validators.instance_of(dict), @@ -171,6 +190,23 @@ class Config(Output.Config): """ + __slots__ = [ + "_sent_offset_backlog", + "_delivered_offset_backlog", + ] + + _sent_offset_backlog: DefaultDict[str, list] + _delivered_offset_backlog: DefaultDict[str, list] + + def __init__(self, name: str, configuration: "ConfluentKafkaOutput.Config", logger: Logger): + super().__init__(name, configuration, logger) + self._sent_offset_backlog = defaultdict(list) + self._delivered_offset_backlog = defaultdict(list) + + @property + def _sent_offset_backlog_size(self): + return sum(map(len, self._sent_offset_backlog.values())) + @cached_property def _producer(self): injected_config = { @@ -233,25 +269,21 @@ def describe(self) -> str: return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}" def store(self, document: dict) -> Optional[bool]: - """Store a document in the producer topic. + """Store a document in the configured producer topic. Parameters ---------- document : dict Document to store. - Returns - ------- - Returns True to inform the pipeline to call the batch_finished_callback method in the - configured input """ - self.store_custom(document, self._config.topic) - if self.input_connector: - self.input_connector.batch_finished_callback() + self.metrics.number_of_processed_events += 1 + self._send_to_kafka(document, self._config.topic) + self._add_offset_to_sent_backlog(document) @Metric.measure_time() def store_custom(self, document: dict, target: str) -> None: - """Write document to Kafka into target topic. + """Store a document in the target topic. Parameters ---------- @@ -259,6 +291,25 @@ def store_custom(self, document: dict, target: str) -> None: Document to be stored in target topic. target : str Topic to store document in. + + """ + self._send_to_kafka(document, target, set_offsets=False) + + def _send_to_kafka(self, document, target, set_offsets=True): + """Send document to target Kafka topic. + + Documents are sent asynchronously via "produce". + A callback method is used to set the offset for each message once it has been + successfully delivered. + The callback specified for "produce" is called for each document that has been delivered to + Kafka whenever the "poll" method is called (directly or by "flush"). + + Parameters + ---------- + document : dict + Document to be sent to target topic. + target : str + Topic to send document to. Raises ------ CriticalOutputError @@ -266,23 +317,40 @@ def store_custom(self, document: dict, target: str) -> None: """ try: - self._producer.produce(target, value=self._encoder.encode(document)) + set_offsets &= not self._config.fire_and_forget + callback = self.delivered_callback(document) if set_offsets else None + self._producer.produce( + target, + value=self._encoder.encode(document), + on_delivery=callback, + ) self._producer.poll(self._config.send_timeout) - self.metrics.number_of_processed_events += 1 except BufferError: # block program until buffer is empty - self._producer.flush(timeout=self._config.flush_timeout) + self._producer.flush(timeout=self._config.producer_flush_timeout) except BaseException as error: raise CriticalOutputError( self, f"Error storing output document -> {error}", document ) from error + if self._config.fire_and_forget: + return + + if self._sent_offset_backlog_size >= self._config.sent_offset_backlog_size: + self._write_backlog() + @Metric.measure_time() def store_failed( self, error_message: str, document_received: dict, document_processed: dict ) -> None: """Write errors into error topic for documents that failed processing. + Documents are sent asynchronously via "produce". + A callback method is used to set the offset for each message once it has been + successfully delivered. + The callback specified for "produce" is called for each document that has been delivered to + Kafka whenever the "poll" method is called (directly or by "flush"). + Parameters ---------- error_message : str @@ -301,17 +369,116 @@ def store_failed( "timestamp": str(datetime.now()), } try: + callback = ( + self.delivered_callback(document_processed) + if not self._config.fire_and_forget + else None + ) self._producer.produce( self._config.error_topic, value=json.dumps(value, separators=(",", ":")).encode("utf-8"), + on_delivery=callback ) self._producer.poll(self._config.send_timeout) except BufferError: # block program until buffer is empty - self._producer.flush(timeout=self._config.flush_timeout) + self._producer.flush(timeout=self._config.producer_flush_timeout) + self._add_offset_to_sent_backlog(document_received) + + def _add_offset_to_sent_backlog(self, document): + if not self._config.fire_and_forget: + metadata = document.get("_metadata", {}) + partition = metadata.get("last_partition", None) + offset = metadata.get("last_offset", None) + if not (partition is None and offset is None): + self._sent_offset_backlog[partition].append(offset) + + @staticmethod + def _get_last_committable_offsets( + sent_offset_backlog: DefaultDict[str, list], + delivered_offset_backlog: DefaultDict[str, list], + ) -> dict: + last_committable = {} + for partition, offsets in delivered_offset_backlog.items(): + if not offsets: + continue + + if len(offsets) == 1: + last_committable[partition] = offsets[0] + continue + + offsets.sort() + prev_offset = offsets[0] + for offset in offsets[1:]: + if offset > prev_offset + 1: + unexpected_gap = False + for missing_offset in range(prev_offset + 1, offset): + if missing_offset in sent_offset_backlog.get(partition, []): + last_committable[partition] = prev_offset + unexpected_gap = True + if unexpected_gap: + break + last_committable[partition] = offset + prev_offset = offset + return last_committable + + def delivered_callback(self, document): + """Callback that can called when a single message has been successfully delivered. + + The callback is called asynchronously, therefore the current message is stored within a + closure. + This message is required to later set the offset. + + Returns + ------- + store_offsets_on_success + This is the callback method that the Kafka producer calls. It has access to "message". + + """ + try: + partition_offset = { + "last_partition": document["_metadata"]["last_partition"], + "last_offset": document["_metadata"]["last_offset"], + } + except (TypeError, KeyError): + return lambda *args: None + + def store_offsets_on_success(error, _): + """Set offset via message stored in closure if no error occurred. + + "sent_offset_backlog_size" can be configured to prevent setting the callback for + every single message that was successfully delivered. + Setting this higher than 1 might be useful if auto-commit is disabled. + + """ + if error: + raise FatalOutputError(output=self, message=error) + self.metrics.number_of_successfully_delivered_events += 1 + last_partition = partition_offset["last_partition"] + last_offset = partition_offset["last_offset"] + self._delivered_offset_backlog[last_partition].append(last_offset) + + return store_offsets_on_success + + @Metric.measure_time() + def _write_backlog(self): + self._producer.flush(self._config.producer_flush_timeout) + if self._config.fire_and_forget: + return + + last_commitable_offsets = self._get_last_committable_offsets( + self._sent_offset_backlog, self._delivered_offset_backlog + ) + for partition, offset in last_commitable_offsets.items(): + committable_offset = {"last_partition": partition, "last_offset": offset} + self.input_connector.batch_finished_callback(metadata=committable_offset) + self._delivered_offset_backlog.clear() + self._sent_offset_backlog.clear() def setup(self): super().setup() + flush_timeout = self._config.flush_timeout + self._schedule_task(task=self._write_backlog, seconds=flush_timeout) try: _ = self._producer except (KafkaException, ValueError) as error: @@ -320,4 +487,4 @@ def setup(self): def shut_down(self) -> None: """ensures that all messages are flushed""" if self._producer is not None: - self._producer.flush(self._config.flush_timeout) + self._write_backlog() diff --git a/tests/unit/connector/base.py b/tests/unit/connector/base.py index 9e5c59d6e..4a39ddb7d 100644 --- a/tests/unit/connector/base.py +++ b/tests/unit/connector/base.py @@ -537,6 +537,12 @@ def test_get_next_has_time_measurement(self): # asserts entering context manager in metrics.metrics.Metric.measure_time mock_metric.assert_has_calls([mock.call.tracker.labels().time().__enter__()]) + def test_add_input_connector_metadata_returns_true_if_input_connector_metadata_set(self): + connector_config = deepcopy(self.CONFIG) + connector_config.update({"preprocessing": {"input_connector_metadata": True}}) + connector = Factory.create({"test connector": connector_config}, logger=self.logger) + assert connector._add_input_connector_metadata is True + class BaseOutputTestCase(BaseConnectorTestCase): def test_is_output_instance(self): diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index f777c7bbf..10b9c869e 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -8,7 +8,7 @@ from unittest import mock import pytest -from confluent_kafka import OFFSET_BEGINNING, KafkaException +from confluent_kafka import OFFSET_BEGINNING, KafkaException, TopicPartition from logprep.abc.input import ( CriticalInputError, @@ -103,13 +103,17 @@ def test_shut_down_calls_consumer_close(self, _): ], ) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers): + def test_batch_finished_callback_calls_offsets_handler_for_setting_without_metadata( + self, _, settings, handlers + ): input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False kafka_input = Factory.create({"test": input_config}, logger=self.logger) kafka_input._config.kafka_config.update(settings) kafka_consumer = kafka_input._consumer message = "test message" kafka_input._last_valid_records = {0: message} + kafka_input.output_connector = mock.MagicMock() kafka_input.batch_finished_callback() if handlers is None: assert kafka_consumer.commit.call_count == 0 @@ -141,6 +145,7 @@ def raise_generator(return_sequence): getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence) kafka_input._last_valid_records = {0: "message"} + kafka_input.output_connector = mock.MagicMock() with pytest.raises(InputWarning): kafka_input.batch_finished_callback() @@ -152,6 +157,7 @@ def test_get_next_raises_critical_input_error_if_not_a_dict(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '[{"element":"in list"}]'.encode("utf8") + self.object.output_connector = mock.MagicMock() with pytest.raises(CriticalInputError, match=r"not a dict"): self.object.get_next(1) @@ -163,9 +169,48 @@ def test_get_next_raises_critical_input_error_if_unvalid_json(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = "I'm not valid json".encode("utf8") + self.object.output_connector = mock.MagicMock() with pytest.raises(CriticalInputError, match=r"not a valid json"): self.object.get_next(1) + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_next_adds_metadata_if_configured(self, _): + input_config = deepcopy(self.CONFIG) + input_config["preprocessing"] = {"input_connector_metadata": True} + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + kafka_input.setup() + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + kafka_input._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"foo":"bar"}'.encode("utf8") + event, warning = kafka_input.get_next(1) + assert warning is None + assert event.get("_metadata", {}).get("last_partition") + assert event.get("_metadata", {}).get("last_offset") + del event["_metadata"] + assert event == {"foo": "bar"} + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_get_next_returns_warning_if_metadata_configured_but_field_exists(self, _): + input_config = deepcopy(self.CONFIG) + input_config["preprocessing"] = {"input_connector_metadata": True} + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + kafka_input.setup() + mock_record = mock.MagicMock() + mock_record.error = mock.MagicMock() + mock_record.error.return_value = None + kafka_input._consumer.poll = mock.MagicMock(return_value=mock_record) + mock_record.value = mock.MagicMock() + mock_record.value.return_value = '{"_metadata":"foo"}'.encode("utf8") + event, warning = kafka_input.get_next(1) + assert ( + warning + == "Couldn't add metadata to the input event as the field '_metadata' already exist." + ) + assert event == {"_metadata": "foo"} + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_get_event_returns_event_and_raw_event(self, _): mock_record = mock.MagicMock() @@ -174,6 +219,7 @@ def test_get_event_returns_event_and_raw_event(self, _): self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '{"element":"in list"}'.encode("utf8") + self.object.output_connector = mock.MagicMock() event, raw_event = self.object._get_event(0.001) assert event == {"element": "in list"} assert raw_event == '{"element":"in list"}'.encode("utf8") @@ -187,6 +233,7 @@ def test_get_raw_event_is_callable(self, _): # pylint: disable=arguments-differ self.object._consumer.poll = mock.MagicMock(return_value=mock_record) mock_record.value = mock.MagicMock() mock_record.value.return_value = '{"element":"in list"}'.encode("utf8") + self.object.output_connector = mock.MagicMock() result = self.object._get_raw_event(0.001) assert result @@ -340,12 +387,106 @@ def test_revoke_callback_logs_warning_and_counts(self, mock_consumer): assert self.object.metrics.number_of_warnings == 1 @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback( + def test_revoke_callback_writes_output_backlog_and_does_not_call_batch_finished_callback_if_metadata( self, mock_consumer ): - self.object.output_connector = mock.MagicMock() - self.object.batch_finished_callback = mock.MagicMock() + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() + mock_partitions = [mock.MagicMock()] + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + assert not kafka_input.batch_finished_callback.called + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback_if_not_metadata( + self, mock_consumer + ): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() + mock_partitions = [mock.MagicMock()] + kafka_input.output_connector._sent_offset_backlog = {} + kafka_input.output_connector._delivered_offset_backlog = {} + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + kafka_input.batch_finished_callback.assert_called() + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_revoke_callback_writes_output_backlog_and_does_not_call_batch_finished_callback_metadata( + self, mock_consumer + ): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input.batch_finished_callback = mock.MagicMock() mock_partitions = [mock.MagicMock()] - self.object._revoke_callback(mock_consumer, mock_partitions) - self.object.output_connector._write_backlog.assert_called() - self.object.batch_finished_callback.assert_called() + kafka_input.output_connector._sent_offset_backlog = {0: [0]} + kafka_input.output_connector._delivered_offset_backlog = {0: [0]} + kafka_input._revoke_callback(mock_consumer, mock_partitions) + kafka_input.output_connector._write_backlog.assert_called() + kafka_input.batch_finished_callback.assert_not_called() + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_uses_delivered_offsets_if_use_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + metadata = {"last_partition": 0, "last_offset": 0} + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata) + offsets = [TopicPartition(kafka_input._config.topic, partition=0, offset=0)] + kafka_input._consumer.store_offsets.assert_called_with(offsets=offsets) + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_raises_Exception_if_use_metadata_but_no_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = True + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + metadata = {} + with pytest.raises(FatalInputError, match="'last_partition' and 'last_offset' required"): + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, metadata) + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_handle_offsets_uses__last_valid_records_if_not_use_metadata(self, _): + input_config = deepcopy(self.CONFIG) + input_config["use_metadata_for_offsets"] = False + kafka_input = Factory.create({"test": input_config}, logger=self.logger) + + kafka_input.output_connector = mock.MagicMock() + kafka_input._last_valid_records = {0: "MESSAGE_OBJECT"} + kafka_input._handle_offsets(kafka_input._consumer.store_offsets, {}) + kafka_input._consumer.store_offsets.assert_called_with(message="MESSAGE_OBJECT") + + @pytest.mark.parametrize( + "metadata", + [{}, {"last_offset": 0}, {"last_partition": 0}], + ) + def test_get_delivered_partition_offset_with_missing_metadata_field_raises_exception( + self, metadata + ): + with pytest.raises( + FatalInputError, + match="Missing fields in metadata for setting offsets: " + "'last_partition' and 'last_offset' required", + ): + self.object._get_delivered_partition_offset(metadata) + + def test_get_delivered_partition_offset_with_metadata_returns_topic_partition(self): + topic_partition = self.object._get_delivered_partition_offset( + {"last_partition": 0, "last_offset": 1} + ) + assert isinstance(topic_partition, TopicPartition) + assert topic_partition.partition == 0 + assert topic_partition.offset == 1 diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index 5af0f0c3f..52afe6d84 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -27,7 +27,7 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase) "type": "confluentkafka_output", "topic": "test_input_raw", "error_topic": "test_error_topic", - "flush_timeout": 0.1, + "producer_flush_timeout": 30, "kafka_config": { "bootstrap.servers": "testserver:9092", }, @@ -50,6 +50,7 @@ class TestConfluentKafkaOutput(BaseOutputTestCase, CommonConfluentKafkaTestCase) "logprep_number_of_failed_events", "logprep_number_of_warnings", "logprep_number_of_errors", + "logprep_number_of_successfully_delivered_events", ] @mock.patch("logprep.connector.confluent_kafka.output.Producer", return_value="The Producer") @@ -59,10 +60,13 @@ def test_producer_property_instanciates_kafka_producer(self, _): @mock.patch("logprep.connector.confluent_kafka.output.Producer") def test_store_sends_event_to_expected_topic(self, _): + self.object.delivered_callback = mock.MagicMock() kafka_producer = self.object._producer event = {"field": "content"} event_raw = json.dumps(event, separators=(",", ":")).encode("utf-8") - expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw) + expected_call = mock.call( + self.CONFIG.get("topic"), value=event_raw, on_delivery=self.object.delivered_callback() + ) self.object.store(event) kafka_producer.produce.assert_called() assert expected_call in kafka_producer.produce.mock_calls @@ -72,7 +76,7 @@ def test_store_custom_sends_event_to_expected_topic(self, _): kafka_producer = self.object._producer event = {"field": "content"} event_raw = json.dumps(event, separators=(",", ":")).encode("utf-8") - expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw) + expected_call = mock.call(self.CONFIG.get("topic"), value=event_raw, on_delivery=None) self.object.store_custom(event, self.CONFIG.get("topic")) kafka_producer.produce.assert_called() assert expected_call in kafka_producer.produce.mock_calls @@ -142,10 +146,142 @@ def test_store_counts_processed_events(self, _): # pylint: disable=arguments-di assert self.object.metrics.number_of_processed_events == 1 @mock.patch("logprep.connector.confluent_kafka.output.Producer") - def test_store_calls_batch_finished_callback(self, _): # pylint: disable=arguments-differ + def test_delivered_callback_adds_offset(self, _): # pylint: disable=arguments-differ self.object.input_connector = mock.MagicMock() - self.object.store({"message": "my event message"}) - self.object.input_connector.batch_finished_callback.assert_called() + self.object.input_connector._last_delivered_record = mock.MagicMock() + metadata = {"_metadata": {"last_partition": 0, "last_offset": 0}} + callback = self.object.delivered_callback(metadata) + callback(None, "msg") + assert self.object._delivered_offset_backlog == {0: [0]} + + @pytest.mark.parametrize( + "sent, delivered, callback", + [ + ({"_metadata": {"last_partition": 0}}, {"_metadata": {"last_partition": 0}}, True), + ({"_metadata": {"last_partition": 0}}, None, False), + (None, {"_metadata": {"last_partition": 0}}, False), + (None, None, False), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_send_to_kafka_calls_callback_if_no_fire_and_forget( + self, _, sent, delivered, callback + ): # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = False + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output.input_connector = mock.MagicMock() + doc = {"_metadata": {"last_partition": 0, "last_offset": 0}} + kafka_output._sent_offset_backlog = {0: [sent]} if sent else {} + kafka_output._delivered_offset_backlog = {0: [delivered]} if delivered else {} + kafka_output._send_to_kafka(doc, "foo") + if callback: + kafka_output.input_connector.batch_finished_callback.assert_called() + else: + kafka_output.input_connector.batch_finished_callback.assert_not_called() + + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_send_to_kafka_does_not_call_callback_if_fire_and_forget(self, _): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = True + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output.input_connector = mock.MagicMock() + kafka_output._send_to_kafka({"foo": "bar"}, "baz_topic") + kafka_output.input_connector.batch_finished_callback.assert_not_called() + + @pytest.mark.parametrize("fire_and_forget", [True, False]) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_write_backlog_does_not_call_callback_if_fire_and_forget(self, _, fire_and_forget): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = fire_and_forget + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + kafka_output._sent_offset_backlog = {0: [0]} + kafka_output._delivered_offset_backlog = kafka_output._sent_offset_backlog + kafka_output.input_connector = mock.MagicMock() + kafka_output._write_backlog() + if fire_and_forget: + kafka_output.input_connector.batch_finished_callback.assert_not_called() + else: + kafka_output.input_connector.batch_finished_callback.assert_called() + + @pytest.mark.parametrize( + "sent_backlog, delivered_backlog, expected_offsets", + [ + ({}, {}, {}), + ({0: [0]}, {}, {}), + ({}, {0: [0]}, {0: 0}), + ({}, {0: []}, {}), + ({0: [0]}, {0: [0]}, {0: 0}), + ({0: [0, 1, 2]}, {0: [0, 1, 2]}, {0: 2}), + ({0: [0, 2]}, {0: [0, 1, 2]}, {0: 2}), + ({0: [0, 1, 2]}, {0: [0, 2]}, {0: 0}), + ({0: [0, 2]}, {0: [0, 2]}, {0: 2}), + ({0: [0], 1: [0]}, {0: [0], 1: [0]}, {0: 0, 1: 0}), + ({0: [0, 1, 2, 3], 1: [0, 1, 2, 3]}, {0: [0, 2, 3], 1: [0, 1, 3]}, {0: 0, 1: 1}), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_get_last_committable_offsets( + self, _, sent_backlog, delivered_backlog, expected_offsets + ): + last_committable_offsets = self.object._get_last_committable_offsets( + sent_backlog, delivered_backlog + ) + assert last_committable_offsets == expected_offsets + + @pytest.mark.parametrize( + "doc, sent_backlog, fire_and_forget", + [ + ({"_metadata": {"last_partition": 0, "last_offset": 0}}, {0: [0]}, False), + ({"foo": "bar"}, {}, False), + ({"_metadata": {"last_partition": 0, "last_offset": 0}}, {}, True), + ({"foo": "bar"}, {}, True), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_do_not_add_offset_to_sent_backlog_if_not_fire_and_forget( + self, _, doc, sent_backlog, fire_and_forget + ): + # pylint: disable=arguments-differ + output_config = deepcopy(self.CONFIG) + output_config["fire_and_forget"] = fire_and_forget + kafka_output = Factory.create({"test": output_config}, logger=self.logger) + + kafka_output._add_offset_to_sent_backlog(doc) + assert kafka_output._sent_offset_backlog == sent_backlog + + @pytest.mark.parametrize( + "metadata", + [ + {"_metadata": {"last_partition": 0}}, + {"_metadata": {"last_offset": 0}}, + {"_metadata": {}}, + {}, + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_delivered_callback_without_metadata_doesnt_call_batch_finished_callback( + self, _, metadata + ): # pylint: disable=arguments-differ + self.object.input_connector = mock.MagicMock() + callback = self.object.delivered_callback(metadata) + callback(None, "msg") + assert not self.object.input_connector.batch_finished_callback.called + + @mock.patch("logprep.connector.confluent_kafka.output.Producer") + def test_delivered_callback_calls_with_error_doesnt_call_batch_finished_callback( + self, _ + ): # pylint: disable=arguments-differ + self.object.input_connector = mock.MagicMock() + metadata = {"_metadata": {"last_partition": 0, "last_offset": 0}} + callback = self.object.delivered_callback(metadata) + with pytest.raises(FatalOutputError, match=r"some_error"): + callback(BaseException("some_error"), "msg") + self.object.input_connector.batch_finished_callback.assert_not_called() def test_setup_raises_fatal_output_error_on_invalid_config(self): config = {"myconfig": "the config", "bootstrap.servers": "testserver:9092"} @@ -159,3 +295,6 @@ def test_raises_value_error_if_mandatory_parameters_not_set(self): expected_error_message = r"keys are missing: {'bootstrap.servers'}" with pytest.raises(InvalidConfigurationError, match=expected_error_message): Factory.create({"test": config}, logger=self.logger) + + def test_store_calls_batch_finished_callback(self): + """Skipped from superclass"""