Skip to content

Commit

Permalink
Fix typo when saving broker timestamp (DataDog#18307)
Browse files Browse the repository at this point in the history
* test

* test

* fix typo in the key

* add logging

* add persistent_cache_key

* changelog
  • Loading branch information
HadhemiDD authored Aug 12, 2024
1 parent 276d38c commit 5e0787d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
1 change: 1 addition & 0 deletions kafka_consumer/changelog.d/18307.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a typo when writing to persistent cache to calculate the estimated consumer lag.
13 changes: 7 additions & 6 deletions kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ def check(self, _):
highwater_offsets = {}
broker_timestamps = defaultdict(dict)
cluster_id = ""
persistent_cache_key = "broker_timestamps_"
try:
if len(consumer_offsets) < self._context_limit:
# Fetch highwater offsets
# Expected format: ({(topic, partition): offset}, cluster_id)
highwater_offsets, cluster_id = self.client.get_highwater_offsets(consumer_offsets)
if self._data_streams_enabled:
broker_timestamps = self._load_broker_timestamps()
broker_timestamps = self._load_broker_timestamps(persistent_cache_key)
self._add_broker_timestamps(broker_timestamps, highwater_offsets)
self._save_broker_timestamps(broker_timestamps)
self._save_broker_timestamps(broker_timestamps, persistent_cache_key)
else:
self.warning("Context limit reached. Skipping highwater offset collection.")
except Exception:
Expand Down Expand Up @@ -94,11 +95,11 @@ def check(self, _):
if self.config._close_admin_client:
self.client.close_admin_client()

def _load_broker_timestamps(self):
def _load_broker_timestamps(self, persistent_cache_key):
"""Loads broker timestamps from persistent cache."""
broker_timestamps = defaultdict(dict)
try:
for topic_partition, content in json.loads(self.read_persistent_cache("broker_timestamps_")).items():
for topic_partition, content in json.loads(self.read_persistent_cache(persistent_cache_key)).items():
for offset, timestamp in content.items():
broker_timestamps[topic_partition][int(offset)] = timestamp
except Exception as e:
Expand All @@ -113,9 +114,9 @@ def _add_broker_timestamps(self, broker_timestamps, highwater_offsets):
if len(timestamps) > self._max_timestamps:
del timestamps[min(timestamps)]

def _save_broker_timestamps(self, broker_timestamps):
def _save_broker_timestamps(self, broker_timestamps, persistent_cache_key):
"""Saves broker timestamps to persistent cache."""
self.write_persistent_cache("broker_timestamps", json.dumps(broker_timestamps))
self.write_persistent_cache(persistent_cache_key, json.dumps(broker_timestamps))

def report_highwater_offsets(self, highwater_offsets, contexts_limit, cluster_id):
"""Report the broker highwater offsets."""
Expand Down

0 comments on commit 5e0787d

Please sign in to comment.