From 3e5ccce4eb278cd38e7b3fdd81571abab23ce044 Mon Sep 17 00:00:00 2001 From: Paul Ezvan Date: Thu, 28 Jan 2021 17:06:15 +0100 Subject: [PATCH] Split log entries so each entry contains a maximum of 100 datapoints (#64) * Add test on not emitting more than 100 datapoints per log entry. * Limit the count of datapoints per log batch to 100. * Allow passing arguments to pytest from tox. * Limit datapoints to 100 per metric instead of 100 per batch. * Add test with a single metric with more datapoints than others. * Add test to serialize data with more than 100 and metrics and more 100 datapoints per metric. * Add comments and rename confusing variable name. * Add extra asserts. Compute the full expected results so we ensure the slicing logic doesn't cause any datapoint to be lost. --- aws_embedded_metrics/constants.py | 1 + .../serializers/log_serializer.py | 55 +++++++++---- tests/serializer/test_log_serializer.py | 81 +++++++++++++++++++ tox.ini | 4 +- 4 files changed, 122 insertions(+), 19 deletions(-) diff --git a/aws_embedded_metrics/constants.py b/aws_embedded_metrics/constants.py index 756e9cd..6f99bfa 100644 --- a/aws_embedded_metrics/constants.py +++ b/aws_embedded_metrics/constants.py @@ -14,3 +14,4 @@ DEFAULT_NAMESPACE = "aws-embedded-metrics" MAX_DIMENSIONS = 9 MAX_METRICS_PER_EVENT = 100 +MAX_DATAPOINTS_PER_METRIC = 100 diff --git a/aws_embedded_metrics/serializers/log_serializer.py b/aws_embedded_metrics/serializers/log_serializer.py index fcc496a..d5d7ec8 100644 --- a/aws_embedded_metrics/serializers/log_serializer.py +++ b/aws_embedded_metrics/serializers/log_serializer.py @@ -14,7 +14,9 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers import Serializer -from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT +from aws_embedded_metrics.constants import ( + MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC +) import json from typing import Any, Dict, List @@ -50,29 +52,48 @@ def create_body() -> Dict[str, Any]: } return body - current_body: Dict[str, Any] = create_body() + current_body: Dict[str, Any] = {} event_batches: List[str] = [] num_metrics_in_current_body = 0 - for metric_name, metric in context.metrics.items(): + # Track if any given metric has data remaining to be serialized + remaining_data = True - if len(metric.values) == 1: - current_body[metric_name] = metric.values[0] - else: - current_body[metric_name] = metric.values + # Track batch number to know where to slice metric data + i = 0 - if not config.disable_metric_extraction: - current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) + while remaining_data: + remaining_data = False + current_body = create_body() - num_metrics_in_current_body += 1 + for metric_name, metric in context.metrics.items(): - should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT - if should_serialize: - event_batches.append(json.dumps(current_body)) - current_body = create_body() - num_metrics_in_current_body = 0 + if len(metric.values) == 1: + current_body[metric_name] = metric.values[0] + else: + # Slice metric data as each batch cannot contain more than + # MAX_DATAPOINTS_PER_METRIC entries for a given metric + start_index = i * MAX_DATAPOINTS_PER_METRIC + end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC + current_body[metric_name] = metric.values[start_index:end_index] + + # Make sure to consume remaining values if we sliced before the end + # of the metric value list + if len(metric.values) > end_index: + remaining_data = True - if not event_batches or num_metrics_in_current_body > 0: - event_batches.append(json.dumps(current_body)) + if not config.disable_metric_extraction: + current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit}) + num_metrics_in_current_body += 1 + + if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT): + event_batches.append(json.dumps(current_body)) + current_body = create_body() + num_metrics_in_current_body = 0 + + # iter over missing datapoints + i += 1 + if not event_batches or num_metrics_in_current_body > 0: + event_batches.append(json.dumps(current_body)) return event_batches diff --git a/tests/serializer/test_log_serializer.py b/tests/serializer/test_log_serializer.py index 32795f7..314adb9 100644 --- a/tests/serializer/test_log_serializer.py +++ b/tests/serializer/test_log_serializer.py @@ -1,6 +1,7 @@ from aws_embedded_metrics.config import get_config from aws_embedded_metrics.logger.metrics_context import MetricsContext from aws_embedded_metrics.serializers.log_serializer import LogSerializer +from collections import Counter from faker import Faker import json @@ -130,6 +131,86 @@ def test_serialize_more_than_100_metrics(): metric_index += 1 +def test_serialize_more_than_100_datapoints(): + expected_batches = 3 + datapoints = 295 + metrics = 3 + + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + for i in range(datapoints): + context.put_metric(expected_key, i) + + # add one metric with more datapoints + expected_extra_batches = 2 + extra_datapoints = 433 + for i in range(extra_datapoints): + context.put_metric(f"Metric-{metrics}", i) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == expected_batches + expected_extra_batches + + for batch_index in range(expected_batches): + result_json = results[batch_index] + result_obj = json.loads(result_json) + for index in range(metrics): + metric_name = f"Metric-{index}" + expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count + + # extra metric with more datapoints + for batch_index in range(expected_batches): + result_json = results[batch_index] + result_obj = json.loads(result_json) + metric_name = f"Metric-{metrics}" + expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100 + assert len(result_obj[metric_name]) == expected_datapoint_count + + +def test_serialize_with_more_than_100_metrics_and_datapoints(): + expected_batches = 11 + datapoints = 295 + metrics = 295 + + expected_results = {} + metric_results = {} + context = get_context() + for index in range(metrics): + expected_key = f"Metric-{index}" + expected_results[expected_key] = [] + metric_results[expected_key] = [] + + for i in range(datapoints): + context.put_metric(expected_key, i) + expected_results[expected_key].append(i) + + # act + results = serializer.serialize(context) + + # assert + assert len(results) == expected_batches + + datapoints_count = Counter() + for batch in results: + result = json.loads(batch) + datapoints_count.update({ + metric: len(result[metric]) + for metric in result if metric != "_aws" + }) + for metric in result: + if metric != "_aws": + metric_results[metric] += result[metric] + + for count in datapoints_count.values(): + assert count == datapoints + assert len(datapoints_count) == metrics + assert metric_results == expected_results + + def test_serialize_with_multiple_metrics(): # arrange metrics = 2 diff --git a/tox.ini b/tox.ini index 1a33dfa..fe51c57 100644 --- a/tox.ini +++ b/tox.ini @@ -9,7 +9,7 @@ deps = Faker aiohttp aresponses -commands = pytest --ignore=tests/integ +commands = pytest --ignore=tests/integ {posargs} [testenv:integ] deps = @@ -38,4 +38,4 @@ commands = [flake8] max-line-length = 150 -exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv \ No newline at end of file +exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv