Skip to content

Commit

Permalink
Split log entries so each entry contains a maximum of 100 datapoints (#…
Browse files Browse the repository at this point in the history
…64)

* Add test on not emitting more than 100 datapoints per log entry.

* Limit the count of datapoints per log batch to 100.

* Allow passing arguments to pytest from tox.

* Limit datapoints to 100 per metric instead of 100 per batch.

* Add test with a single metric with more datapoints than others.

* Add test to serialize data with more than 100 and metrics and more 100
datapoints per metric.

* Add comments and rename confusing variable name.

* Add extra asserts.

Compute the full expected results so we ensure the slicing logic doesn't
cause any datapoint to be lost.
  • Loading branch information
paulez authored Jan 28, 2021
1 parent f645c6b commit 3e5ccce
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 19 deletions.
1 change: 1 addition & 0 deletions aws_embedded_metrics/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
DEFAULT_NAMESPACE = "aws-embedded-metrics"
MAX_DIMENSIONS = 9
MAX_METRICS_PER_EVENT = 100
MAX_DATAPOINTS_PER_METRIC = 100
55 changes: 38 additions & 17 deletions aws_embedded_metrics/serializers/log_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers import Serializer
from aws_embedded_metrics.constants import MAX_DIMENSIONS, MAX_METRICS_PER_EVENT
from aws_embedded_metrics.constants import (
MAX_DIMENSIONS, MAX_METRICS_PER_EVENT, MAX_DATAPOINTS_PER_METRIC
)
import json
from typing import Any, Dict, List

Expand Down Expand Up @@ -50,29 +52,48 @@ def create_body() -> Dict[str, Any]:
}
return body

current_body: Dict[str, Any] = create_body()
current_body: Dict[str, Any] = {}
event_batches: List[str] = []
num_metrics_in_current_body = 0

for metric_name, metric in context.metrics.items():
# Track if any given metric has data remaining to be serialized
remaining_data = True

if len(metric.values) == 1:
current_body[metric_name] = metric.values[0]
else:
current_body[metric_name] = metric.values
# Track batch number to know where to slice metric data
i = 0

if not config.disable_metric_extraction:
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
while remaining_data:
remaining_data = False
current_body = create_body()

num_metrics_in_current_body += 1
for metric_name, metric in context.metrics.items():

should_serialize: bool = num_metrics_in_current_body == MAX_METRICS_PER_EVENT
if should_serialize:
event_batches.append(json.dumps(current_body))
current_body = create_body()
num_metrics_in_current_body = 0
if len(metric.values) == 1:
current_body[metric_name] = metric.values[0]
else:
# Slice metric data as each batch cannot contain more than
# MAX_DATAPOINTS_PER_METRIC entries for a given metric
start_index = i * MAX_DATAPOINTS_PER_METRIC
end_index = (i + 1) * MAX_DATAPOINTS_PER_METRIC
current_body[metric_name] = metric.values[start_index:end_index]

# Make sure to consume remaining values if we sliced before the end
# of the metric value list
if len(metric.values) > end_index:
remaining_data = True

if not event_batches or num_metrics_in_current_body > 0:
event_batches.append(json.dumps(current_body))
if not config.disable_metric_extraction:
current_body["_aws"]["CloudWatchMetrics"][0]["Metrics"].append({"Name": metric_name, "Unit": metric.unit})
num_metrics_in_current_body += 1

if (num_metrics_in_current_body == MAX_METRICS_PER_EVENT):
event_batches.append(json.dumps(current_body))
current_body = create_body()
num_metrics_in_current_body = 0

# iter over missing datapoints
i += 1
if not event_batches or num_metrics_in_current_body > 0:
event_batches.append(json.dumps(current_body))

return event_batches
81 changes: 81 additions & 0 deletions tests/serializer/test_log_serializer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from aws_embedded_metrics.config import get_config
from aws_embedded_metrics.logger.metrics_context import MetricsContext
from aws_embedded_metrics.serializers.log_serializer import LogSerializer
from collections import Counter
from faker import Faker
import json

Expand Down Expand Up @@ -130,6 +131,86 @@ def test_serialize_more_than_100_metrics():
metric_index += 1


def test_serialize_more_than_100_datapoints():
expected_batches = 3
datapoints = 295
metrics = 3

context = get_context()
for index in range(metrics):
expected_key = f"Metric-{index}"
for i in range(datapoints):
context.put_metric(expected_key, i)

# add one metric with more datapoints
expected_extra_batches = 2
extra_datapoints = 433
for i in range(extra_datapoints):
context.put_metric(f"Metric-{metrics}", i)

# act
results = serializer.serialize(context)

# assert
assert len(results) == expected_batches + expected_extra_batches

for batch_index in range(expected_batches):
result_json = results[batch_index]
result_obj = json.loads(result_json)
for index in range(metrics):
metric_name = f"Metric-{index}"
expected_datapoint_count = datapoints % 100 if (batch_index == expected_batches - 1) else 100
assert len(result_obj[metric_name]) == expected_datapoint_count

# extra metric with more datapoints
for batch_index in range(expected_batches):
result_json = results[batch_index]
result_obj = json.loads(result_json)
metric_name = f"Metric-{metrics}"
expected_datapoint_count = extra_datapoints % 100 if (batch_index == expected_batches + expected_extra_batches - 1) else 100
assert len(result_obj[metric_name]) == expected_datapoint_count


def test_serialize_with_more_than_100_metrics_and_datapoints():
expected_batches = 11
datapoints = 295
metrics = 295

expected_results = {}
metric_results = {}
context = get_context()
for index in range(metrics):
expected_key = f"Metric-{index}"
expected_results[expected_key] = []
metric_results[expected_key] = []

for i in range(datapoints):
context.put_metric(expected_key, i)
expected_results[expected_key].append(i)

# act
results = serializer.serialize(context)

# assert
assert len(results) == expected_batches

datapoints_count = Counter()
for batch in results:
result = json.loads(batch)
datapoints_count.update({
metric: len(result[metric])
for metric in result if metric != "_aws"
})
for metric in result:
if metric != "_aws":
metric_results[metric] += result[metric]

for count in datapoints_count.values():
assert count == datapoints
assert len(datapoints_count) == metrics
assert metric_results == expected_results


def test_serialize_with_multiple_metrics():
# arrange
metrics = 2
Expand Down
4 changes: 2 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ deps =
Faker
aiohttp
aresponses
commands = pytest --ignore=tests/integ
commands = pytest --ignore=tests/integ {posargs}

[testenv:integ]
deps =
Expand Down Expand Up @@ -38,4 +38,4 @@ commands =

[flake8]
max-line-length = 150
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv
exclude = .git,__pycache__,docs/source/conf.py,old,build,dist,**/venv

0 comments on commit 3e5ccce

Please sign in to comment.