Skip to content

Commit

Permalink
[COST-5570] Capture prometheus metrics for RHEL ELS processing (#5344)
Browse files Browse the repository at this point in the history
* [COST-5570] Capture prometheus metrics for RHEL ELS processing

* Add counters for interesting data points
* Increment values where data processing occurs.
  • Loading branch information
chambridge authored Oct 25, 2024
1 parent c918cd9 commit 559ec7c
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 0 deletions.
5 changes: 5 additions & 0 deletions koku/masu/prometheus_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,8 @@
)

SOURCES_HTTP_CLIENT_ERROR_COUNTER = Counter("sources_http_client_errors", "Number of sources http client errors")

RHEL_ELS_VCPU_HOURS = Counter("rhel_els_vcpu_hours", "Number of RHEL ELS VCPU Hours", ["provider_type"])
RHEL_ELS_SYSTEMS_PROCESSED = Counter(
"rhel_els_system_count", "Number of RHEL ELS systems processed", ["provider_type"]
)
28 changes: 28 additions & 0 deletions koku/subs/subs_data_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from kafka_utils.utils import get_producer
from kafka_utils.utils import SUBS_TOPIC
from masu.prometheus_stats import KAFKA_CONNECTION_ERRORS_COUNTER
from masu.prometheus_stats import RHEL_ELS_SYSTEMS_PROCESSED
from masu.prometheus_stats import RHEL_ELS_VCPU_HOURS
from masu.util.aws.common import get_s3_resource


Expand Down Expand Up @@ -111,6 +113,19 @@ def process_and_send_subs_message(self, upload_keys):
msg = bytes(json.dumps(subs_dict), "utf-8")
self.send_kafka_message(msg)
msg_count += 1
RHEL_ELS_SYSTEMS_PROCESSED.labels(provider_type=Provider.PROVIDER_AWS).inc()
try:
RHEL_ELS_VCPU_HOURS.labels(provider_type=Provider.PROVIDER_AWS).inc(
amount=(float(row["subs_vcpu"]) * int(row["subs_usage"]))
)
except ValueError:
LOG.error(
log_json(
self.tracing_id,
msg=f"vCPU amount could not be cast to a float {row['subs_vcpu']}",
context=self.context,
)
)
LOG.info(
log_json(
self.tracing_id,
Expand Down Expand Up @@ -250,6 +265,19 @@ def process_azure_row(self, row):
start = end
self.send_kafka_message(msg)
msg_count += 1
RHEL_ELS_SYSTEMS_PROCESSED.labels(provider_type=Provider.PROVIDER_AZURE).inc()
try:
RHEL_ELS_VCPU_HOURS.labels(provider_type=Provider.PROVIDER_AZURE).inc(
amount=(float(row["subs_vcpu"]) * usage)
)
except ValueError:
LOG.error(
log_json(
self.tracing_id,
msg=f"vCPU amount could not be cast to a float {row['subs_vcpu']}",
context=self.context,
)
)
return msg_count

def determine_product_ids(self, rhel_version, addon, role):
Expand Down
69 changes: 69 additions & 0 deletions koku/subs/test/test_subs_data_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ def test_process_and_send_subs_message(self, mock_msg_builder, mock_reader, mock
mock_msg_builder.assert_called_once()
mock_producer.assert_called_once()

@patch("subs.subs_data_messenger.os.remove")
@patch("subs.subs_data_messenger.get_producer")
@patch("subs.subs_data_messenger.csv.DictReader")
@patch("subs.subs_data_messenger.SUBSDataMessenger.build_aws_subs_dict")
def test_process_and_send_subs_message_exception(self, mock_msg_builder, mock_reader, mock_producer, mock_remove):
"""Tests that the proper functions are called when running process_and_send_subs_message"""
upload_keys = ["fake_key"]
mock_msg_builder.return_value = defaultdict(str)
mock_reader.return_value = [
{
"subs_start_time": "2023-07-01T01:00:00Z",
"subs_end_time": "2023-07-01T02:00:00Z",
"subs_resource_id": "i-55555556",
"subs_account": "9999999999999",
"physical_cores": "1",
"subs_vcpu": "a",
"variant": "Server",
"subs_usage": "Production",
"subs_sla": "Premium",
"subs_role": "Red Hat Enterprise Linux Server",
"subs_rhel_version": "479",
"subs_addon_id": "204",
"subs_conversion": "true",
}
]
mock_op = mock_open(read_data="x,y,z")
with patch("builtins.open", mock_op):
self.messenger.process_and_send_subs_message(upload_keys)
mock_msg_builder.assert_called_once()

def test_build_base_subs_dict(self):
"""
Test building the kafka message body
Expand Down Expand Up @@ -522,6 +552,45 @@ def test_process_and_send_subs_message_azure_with_id(self, mock_reader, mock_pro
mock_azure_id.assert_called_once()
self.assertEqual(mock_producer.call_count, 4)

@patch("subs.subs_data_messenger.SUBSDataMessenger.determine_azure_instance_and_tenant_id")
@patch("subs.subs_data_messenger.os.remove")
@patch("subs.subs_data_messenger.get_producer")
@patch("subs.subs_data_messenger.csv.DictReader")
def test_process_and_send_subs_message_azure_with_id_exception(
self, mock_reader, mock_producer, mock_remove, mock_azure_id
):
"""Tests that the proper functions are called when running process_and_send_subs_message with Azure provider."""
upload_keys = ["fake_key"]
self.azure_messenger.date_map = defaultdict(list)
mock_azure_id.return_value = ("string1", "string2")
mock_reader.return_value = [
{
"resourceid": "i-55555556",
"subs_start_time": "2023-07-01T01:00:00Z",
"subs_end_time": "2023-07-01T02:00:00Z",
"subs_resource_id": "i-55555556",
"subs_account": "9999999999999",
"physical_cores": "1",
"subs_vcpu": "a",
"variant": "Server",
"subs_usage": "Production",
"subs_sla": "Premium",
"subs_role": "Red Hat Enterprise Linux Server",
"subs_usage_quantity": "4",
"subs_rhel_version": "479",
"subs_addon_id": "204",
"subs_conversion": "true",
"subs_instance": "",
"source": self.azure_provider.uuid,
"resourcegroup": "my-fake-rg",
"subs_vmname": "my-fake-vm",
}
]
mock_op = mock_open(read_data="x,y,z")
with patch("builtins.open", mock_op):
self.azure_messenger.process_and_send_subs_message(upload_keys)
mock_azure_id.assert_called_once()

@patch("subs.subs_data_messenger.SUBSDataMessenger.determine_azure_instance_and_tenant_id")
@patch("subs.subs_data_messenger.os.remove")
@patch("subs.subs_data_messenger.get_producer")
Expand Down

0 comments on commit 559ec7c

Please sign in to comment.