Skip to content

Commit

Permalink
[mongo] deprecate collStats command and use $collStats aggregation st…
Browse files Browse the repository at this point in the history
…age to collect collection metrics (DataDog#17961)

* use collStats aggregation pipeline to collect collection metrics

* add changelog

* sort metadata

* remove commented code

* update test results with new metrics
  • Loading branch information
lu-zhengda authored and ravindrasojitra-crest committed Aug 5, 2024
1 parent f41a3c0 commit 1e8e209
Show file tree
Hide file tree
Showing 17 changed files with 2,760 additions and 2,856 deletions.
12 changes: 12 additions & 0 deletions mongo/changelog.d/17961.added
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Added new collection latency and query execution stats metrics.
- mongodb.collection.totalindexsize
- mongodb.collection.collectionscans.nontailable
- mongodb.collection.collectionscans.total
- mongodb.collection.commands.latency
- mongodb.collection.commands.opsps
- mongodb.collection.reads.latency
- mongodb.collection.reads.opsps
- mongodb.collection.transactions.latency
- mongodb.collection.transactions.opsps
- mongodb.collection.writes.latency
- mongodb.collection.writes.opsps
1 change: 1 addition & 0 deletions mongo/changelog.d/17961.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace deprecated collStats command with $collStats aggregation stage to collect collection metrics
17 changes: 17 additions & 0 deletions mongo/datadog_checks/mongo/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ def current_op(self, session=None):
# The $currentOp stage returns a cursor over a stream of documents, each of which reports a single operation.
return self["admin"].aggregate([{'$currentOp': {'allUsers': True}}], session=session)

def coll_stats(self, db_name, coll_name, session=None):
return self[db_name][coll_name].aggregate(
[
{
"$collStats": {
"latencyStats": {},
"storageStats": {},
"queryExecStats": {},
}
},
],
session=session,
)

def index_stats(self, db_name, coll_name, session=None):
return self[db_name][coll_name].aggregate([{"$indexStats": {}}], session=session)

def _is_arbiter(self, options):
cli = MongoClient(**options)
is_master_payload = cli['admin'].command('isMaster')
Expand Down
33 changes: 19 additions & 14 deletions mongo/datadog_checks/mongo/collectors/coll_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,25 @@ def _get_collections(self, api):
return api.list_authorized_collections(self.db_name)

def collect(self, api):
# Ensure that you're on the right db
db = api[self.db_name]
coll_names = self._get_collections(api)

for coll_name in coll_names:
# Grab the stats from the collection
payload = {'collection': db.command("collstats", coll_name)}
additional_tags = ["db:%s" % self.db_name, "collection:%s" % coll_name]
self._submit_payload(payload, additional_tags, COLLECTION_METRICS)

# Submit the indexSizes metrics manually
index_sizes = payload['collection'].get('indexSizes', {})
metric_name_alias = self._normalize("collection.indexSizes", AgentCheck.gauge)
for idx, val in iteritems(index_sizes):
# we tag the index
idx_tags = self.base_tags + additional_tags + ["index:%s" % idx]
self.gauge(metric_name_alias, val, tags=idx_tags)
for coll_stats in api.coll_stats(self.db_name, coll_name):
# Submit the metrics
storage_stats = coll_stats.get('storageStats', {})
latency_stats = coll_stats.get('latencyStats', {})
query_stats = coll_stats.get('queryExecStats', {})
payload = {'collection': {**storage_stats, **latency_stats, **query_stats}}
additional_tags = ["db:%s" % self.db_name, "collection:%s" % coll_name]
if coll_stats.get('shard'):
# If the collection is sharded, add the shard tag
additional_tags.append("shard:%s" % coll_stats['shard'])
self._submit_payload(payload, additional_tags, COLLECTION_METRICS)

# Submit the indexSizes metrics manually
index_sizes = storage_stats.get('indexSizes', {})
metric_name_alias = self._normalize("collection.indexSizes", AgentCheck.gauge)
for idx, val in iteritems(index_sizes):
# we tag the index
idx_tags = self.base_tags + additional_tags + ["index:%s" % idx]
self.gauge(metric_name_alias, val, tags=idx_tags)
4 changes: 1 addition & 3 deletions mongo/datadog_checks/mongo/collectors/index_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@ def _get_collections(self, api):
return api.list_authorized_collections(self.db_name)

def collect(self, api):
db = api[self.db_name]
coll_names = self._get_collections(api)

for coll_name in coll_names:
try:
for stats in db[coll_name].aggregate([{"$indexStats": {}}], cursor={}):
for stats in api.index_stats(self.db_name, coll_name):
idx_tags = self.base_tags + [
"name:{0}".format(stats.get('name', 'unknown')),
"collection:{0}".format(coll_name),
Expand Down
14 changes: 14 additions & 0 deletions mongo/datadog_checks/mongo/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@
}

COLLECTION_METRICS = {
# collection storage stats
'collection.size': GAUGE,
'collection.avgObjSize': GAUGE,
'collection.count': GAUGE,
Expand All @@ -321,6 +322,19 @@
'collection.maxSize': GAUGE,
'collection.storageSize': GAUGE,
'collection.nindexes': GAUGE,
'collection.totalIndexSize': GAUGE,
# collection latency stats
'collection.reads.latency': GAUGE,
'collection.reads.ops': RATE,
'collection.writes.ops': RATE,
'collection.writes.latency': GAUGE,
'collection.commands.latency': GAUGE,
'collection.commands.ops': RATE,
'collection.transactions.latency': GAUGE,
'collection.transactions.ops': RATE,
# collection query exec stats
'collection.collectionScans.total': GAUGE,
'collection.collectionScans.nonTailable': GAUGE,
}

"""
Expand Down
Loading

0 comments on commit 1e8e209

Please sign in to comment.