Skip to content

Commit

Permalink
Refactor: move logic out of client for easier unit testing (DataDog#1…
Browse files Browse the repository at this point in the history
…9183)

* Move get_consumer_group_state out of client

* Set up mock client in helper function

* Hide consumer behind client methods

* Turn confluent kafka objects into python ones (for easier mocking)

* Same for consumer_offsets_for_times

* Move get_high_water_offsets out of client

* separate out client-related logic to list consumer groups

* Separate client-related logic for listing consumer group offsets

* Focus get_partitions_for_topic on interactions with kafka

* Move get_consumer_offsets out of kafka client
  • Loading branch information
iliakur authored Dec 5, 2024
1 parent 1af4fa9 commit e410940
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 315 deletions.
314 changes: 85 additions & 229 deletions kafka_consumer/datadog_checks/kafka_consumer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
from confluent_kafka import Consumer, ConsumerGroupTopicPartitions, KafkaException, TopicPartition
from confluent_kafka.admin import AdminClient

from datadog_checks.kafka_consumer.constants import KAFKA_INTERNAL_TOPICS, OFFSET_INVALID


class KafkaClient:
def __init__(self, config, log) -> None:
self.config = config
self.log = log
self._kafka_client = None
self.topic_partition_cache = {}
self._consumer = None

@property
def kafka_client(self):
Expand All @@ -31,7 +29,7 @@ def kafka_client(self):

return self._kafka_client

def __create_consumer(self, consumer_group):
def open_consumer(self, consumer_group):
config = {
"bootstrap.servers": self.config._kafka_connect_str,
"group.id": consumer_group,
Expand All @@ -41,7 +39,12 @@ def __create_consumer(self, consumer_group):
}
config.update(self.__get_authentication_config())

return Consumer(config, logger=self.log)
self._consumer = Consumer(config, logger=self.log)
self.log.debug("Consumer instance %s created for group %s", self._consumer, consumer_group)

def close_consumer(self):
self.log.debug("Closing consumer instance %s", self._consumer)
self._consumer.close()

def __get_authentication_config(self):
config = {
Expand Down Expand Up @@ -79,252 +82,105 @@ def __get_authentication_config(self):

return config

def get_highwater_offsets(self, consumer_offsets):
self.log.debug('Getting highwater offsets')

cluster_id = ""
highwater_offsets = {}
topics_with_consumer_offset = set()
topic_partition_with_consumer_offset = set()

if not self.config._monitor_all_broker_highwatermarks:
for _, topic, partition in consumer_offsets:
topics_with_consumer_offset.add(topic)
topic_partition_with_consumer_offset.add((topic, partition))

topic_partition_checked = set()

for consumer_group, _topic, _partition in consumer_offsets:
self.log.debug('CONSUMER GROUP: %s', consumer_group)
if (_topic, _partition) in topic_partition_checked:
self.log.debug('Highwater offset already collected for topic %s with partition %s', _topic, _partition)
continue

topic_partitions_for_highwater_offsets = set()

consumer = self.__create_consumer(consumer_group)
self.log.debug("Consumer instance %s created for group %s", consumer, consumer_group)
cluster_metadata = consumer.list_topics(timeout=self.config._request_timeout)
try:
cluster_id = cluster_metadata.cluster_id
except AttributeError:
self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group)
topics = cluster_metadata.topics

for topic in topics:
if topic in KAFKA_INTERNAL_TOPICS:
self.log.debug("Skipping internal topic %s", topic)
continue
if not self.config._monitor_all_broker_highwatermarks and topic not in topics_with_consumer_offset:
self.log.debug("Skipping non-relevant topic %s", topic)
continue

for partition in topics[topic].partitions:
if (
self.config._monitor_all_broker_highwatermarks
or (topic, partition) in topic_partition_with_consumer_offset
):
# Setting offset to -1 will return the latest highwater offset while calling offsets_for_times
# Reference: https://github.com/fede1024/rust-rdkafka/issues/460
topic_partitions_for_highwater_offsets.add(
TopicPartition(topic=topic, partition=partition, offset=-1)
)
self.log.debug('TOPIC: %s', topic)
self.log.debug('PARTITION: %s', partition)
else:
self.log.debug("Skipping non-relevant partition %s of topic %s", partition, topic)

if len(topic_partitions_for_highwater_offsets) > 0:
self.log.debug(
'Querying %s highwater offsets for consumer group %s',
len(topic_partitions_for_highwater_offsets),
consumer_group,
)
for topic_partition_with_highwater_offset in consumer.offsets_for_times(
partitions=list(topic_partitions_for_highwater_offsets),
timeout=self.config._request_timeout,
):
self.log.debug('Topic partition with highwater offset: %s', topic_partition_with_highwater_offset)
topic = topic_partition_with_highwater_offset.topic
partition = topic_partition_with_highwater_offset.partition
offset = topic_partition_with_highwater_offset.offset
highwater_offsets[(topic, partition)] = offset
self.log.debug("Adding %s %s to checked set to facilitate early exit", topic, partition)
topic_partition_checked.add((topic, partition))
else:
self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group)

self.log.debug("Closing consumer instance %s", consumer)
consumer.close()

self.log.debug('Got %s highwater offsets', len(highwater_offsets))
return highwater_offsets, cluster_id
def consumer_get_cluster_id_and_list_topics(self, consumer_group):
cluster_metadata = self._consumer.list_topics(timeout=self.config._request_timeout)
try:
# TODO: remove this try-except, the attribute is always present.
cluster_id = cluster_metadata.cluster_id
except AttributeError:
self.log.error("Failed to get cluster metadata for consumer group %s", consumer_group)
return "", []
return (cluster_id, [(name, list(metadata.partitions)) for name, metadata in cluster_metadata.topics.items()])

def consumer_offsets_for_times(self, partitions):
topicpartitions_for_querying = [
# Setting offset to -1 will return the latest highwater offset while calling offsets_for_times
# Reference: https://github.com/fede1024/rust-rdkafka/issues/460
TopicPartition(topic=topic, partition=partition, offset=-1)
for topic, partition in partitions
]
return [
(tp.topic, tp.partition, tp.offset)
for tp in self._consumer.offsets_for_times(
partitions=topicpartitions_for_querying, timeout=self.config._request_timeout
)
]

def get_partitions_for_topic(self, topic):
if partitions := self.topic_partition_cache.get(topic):
return partitions

try:
cluster_metadata = self.kafka_client.list_topics(topic, timeout=self.config._request_timeout)
except KafkaException as e:
self.log.error("Received exception when getting partitions for topic %s: %s", topic, e)
return None
else:
topic_metadata = cluster_metadata.topics[topic]
partitions = list(topic_metadata.partitions.keys())
self.topic_partition_cache[topic] = partitions
return partitions
return []
topic_metadata = cluster_metadata.topics[topic]
return list(topic_metadata.partitions)

def request_metadata_update(self):
# https://github.com/confluentinc/confluent-kafka-python/issues/594
self.kafka_client.list_topics(None, timeout=self.config._request_timeout)

def get_consumer_offsets(self):
# {(consumer_group, topic, partition): offset}
self.log.debug('Getting consumer offsets')
consumer_offsets = {}

consumer_groups = self._get_consumer_groups()
self.log.debug('Identified %s consumer groups', len(consumer_groups))
def list_consumer_groups(self):
groups = []
try:
groups_res = self.kafka_client.list_consumer_groups().result()
for valid_group in groups_res.valid:
self.log.debug("Discovered consumer group: %s", valid_group.group_id)
groups.append(valid_group.group_id)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return groups

def list_consumer_group_offsets(self, groups):
"""
For every group and (optionally) its topics and partitions retrieve consumer offsets.
futures = self._get_consumer_offset_futures(consumer_groups)
self.log.debug('%s futures to be waited on', len(futures))
As input expects a list of tuples: (consumer_group_id, topic_partitions).
topic_partitions are either None to indicate we want all topics and partitions OR a list of (topic, partition).
for future in as_completed(futures):
Returns a list of tuples with members:
1. group id
2. list of tuples: (topic, partition, offset)
"""
futures = []
for consumer_group, topic_partitions in groups:
topic_partitions = (
topic_partitions if topic_partitions is None else [TopicPartition(t, p) for t, p in topic_partitions]
)
futures.append(
self.kafka_client.list_consumer_group_offsets(
[ConsumerGroupTopicPartitions(group_id=consumer_group, topic_partitions=topic_partitions)]
)[consumer_group]
)
offsets = []
for completed in as_completed(futures):
try:
response_offset_info = future.result()
response_offset_info = completed.result()
except KafkaException as e:
self.log.debug("Failed to read consumer offsets for future %s: %s", future, e)
else:
consumer_group = response_offset_info.group_id
topic_partitions = response_offset_info.topic_partitions

self.log.debug('RESULT CONSUMER GROUP: %s', consumer_group)
self.log.debug('RESULT TOPIC PARTITIONS: %s', topic_partitions)

for topic_partition in topic_partitions:
topic = topic_partition.topic
partition = topic_partition.partition
offset = topic_partition.offset

self.log.debug('RESULTS TOPIC: %s', topic)
self.log.debug('RESULTS PARTITION: %s', partition)
self.log.debug('RESULTS OFFSET: %s', offset)

if topic_partition.error:
self.log.debug(
"Encountered error: %s. Occurred with topic: %s; partition: [%s]",
topic_partition.error.str(),
topic_partition.topic,
str(topic_partition.partition),
)
continue

if offset == OFFSET_INVALID:
continue

if self.config._monitor_unlisted_consumer_groups or not self.config._consumer_groups_compiled_regex:
consumer_offsets[(consumer_group, topic, partition)] = offset
else:
to_match = f"{consumer_group},{topic},{partition}"
if self.config._consumer_groups_compiled_regex.match(to_match):
consumer_offsets[(consumer_group, topic, partition)] = offset

self.log.debug('Got %s consumer offsets', len(consumer_offsets))
return consumer_offsets

def _get_consumer_groups(self):
# Get all consumer groups to monitor
consumer_groups = []
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
consumer_groups_future = self.kafka_client.list_consumer_groups()
try:
list_consumer_groups_result = consumer_groups_future.result()
for valid_consumer_group in list_consumer_groups_result.valid:
self.log.debug("Discovered consumer group: %s", valid_consumer_group.group_id)

consumer_groups.extend(
valid_consumer_group.group_id
for valid_consumer_group in list_consumer_groups_result.valid
if valid_consumer_group.group_id != ""
)
except Exception as e:
self.log.error("Failed to collect consumer groups: %s", e)
return consumer_groups
else:
return self.config._consumer_groups

def get_consumer_group_state(self, consumer_group):
consumer_group_state = ""
# Get the consumer group state if present
consumer_groups_future = self._describe_consumer_groups(consumer_group)
consumer_groups_result = consumer_groups_future[consumer_group].result()
self.log.debug(
"Consumer group: %s in state %s",
consumer_groups_result.group_id,
consumer_groups_result.state,
)
consumer_group_result_state = str(consumer_groups_result.state)
consumer_group_state = consumer_group_result_state.split('.')[1]

return consumer_group_state

def _list_consumer_group_offsets(self, cg_tp):
"""
:returns: A dict of futures for each group, keyed by the group id.
The future result() method returns :class:`ConsumerGroupTopicPartitions`.
:rtype: dict[str, future]
"""
return self.kafka_client.list_consumer_group_offsets([cg_tp])
self.log.debug("Failed to read consumer offsets for future %s: %s", completed, e)
continue
tpo = []
for tp in response_offset_info.topic_partitions:
if tp.error:
self.log.debug(
"Encountered error: %s. Occurred with topic: %s; partition: [%s]",
tp.error.str(),
tp.topic,
str(tp.partition),
)
continue
tpo.append((tp.topic, tp.partition, tp.offset))
offsets.append((response_offset_info.group_id, tpo))
return offsets

def _describe_consumer_groups(self, consumer_group):
def describe_consumer_groups(self, consumer_group):
"""
:returns: A dict of futures for each group, keyed by the group_id.
The future result() method returns :class:`ConsumerGroupDescription`.
:rtype: dict[str, future]
"""
return self.kafka_client.describe_consumer_groups([consumer_group])
desc = self.kafka_client.describe_consumer_groups([consumer_group])[consumer_group].result()
return (desc.group_id, desc.state.value)

def close_admin_client(self):
self._kafka_client = None

def _get_consumer_offset_futures(self, consumer_groups):
futures = []

# If either monitoring all consumer groups or regex, return all consumer group offsets (can filter later)
if self.config._monitor_unlisted_consumer_groups or self.config._consumer_groups_compiled_regex:
for consumer_group in consumer_groups:
futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group]
)
return futures

for consumer_group in consumer_groups:
# If topics are specified
topics = consumer_groups.get(consumer_group)
if not topics:
futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group))[consumer_group]
)
continue

for topic in topics:
# If partitions are defined
if partitions := topics[topic]:
topic_partitions = [TopicPartition(topic, partition) for partition in partitions]
# If partitions are not defined
else:
# get all the partitions for this topic
partitions = self.get_partitions_for_topic(topic)

topic_partitions = [TopicPartition(topic, partition) for partition in partitions]

futures.append(
self._list_consumer_group_offsets(ConsumerGroupTopicPartitions(consumer_group, topic_partitions))[
consumer_group
]
)

return futures
Loading

0 comments on commit e410940

Please sign in to comment.