diff --git a/plugins/gcp/fix_plugin_gcp/collector.py b/plugins/gcp/fix_plugin_gcp/collector.py index fbf407dea..128052e45 100644 --- a/plugins/gcp/fix_plugin_gcp/collector.py +++ b/plugins/gcp/fix_plugin_gcp/collector.py @@ -1,7 +1,7 @@ import logging -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone -from typing import Type, List, Any, Optional, cast +from typing import Tuple, Type, List, Any, Optional, cast, Dict from fix_plugin_gcp.config import GcpConfig from fix_plugin_gcp.gcp_client import GcpApiSpec @@ -160,16 +160,31 @@ def get_last_run() -> Optional[datetime]: log.info(f"[GCP:{self.project.id}] Collecting resources done.") def collect_usage_metrics(self, builder: GraphBuilder) -> None: + metric_data_futures = [] + mq_lookup = {} + resources_map = {} + result: Dict[monitoring.GcpMonitoringQuery, monitoring.GcpMonitoringMetricData] = {} for resource in builder.graph.nodes: if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)): + mq_lookup.update({q.metric_id: q for q in mq}) start_at = builder.created_at - builder.metrics_delta region = cast(GcpRegion, resource.region()) - try: - rb = builder.for_region(region) - result = monitoring.GcpMonitoringMetricData.query_for(rb, mq, start_at, builder.created_at) - monitoring.update_resource_metrics(resource, result) - except Exception as e: - log.warning(f"Error occurred in region {region}: {e}") + rb = builder.for_region(region) + resources_map[f"{resource.kind}/{resource.id}/{resource.region().id}"] = resource + metric_data_futures.extend( + monitoring.GcpMonitoringMetricData.query_for(rb, mq, start_at, builder.created_at) + ) + + for metric_data in as_completed(metric_data_futures): + try: + metric_query_result: List[Tuple[str, monitoring.GcpMonitoringMetricData]] = metric_data.result() + for metric_id, metric in metric_query_result: + if metric is not None and metric_id is not None: + result[mq_lookup[metric_id]] = metric + except Exception as e: + log.warning(f"An error occurred while processing a metric query: {e}") + + monitoring.update_resource_metrics(resources_map, result) def remove_unconnected_nodes(self, builder: GraphBuilder) -> None: def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None: diff --git a/plugins/gcp/fix_plugin_gcp/resources/base.py b/plugins/gcp/fix_plugin_gcp/resources/base.py index 8dc3067a8..c1f3b53cf 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/base.py +++ b/plugins/gcp/fix_plugin_gcp/resources/base.py @@ -349,6 +349,7 @@ class GcpMonitoringQuery: metric_name: MetricName # final name of the metric query_name: str # name of the metric (e.g., GCP metric type) period: timedelta # period of the metric + ref_id: str # unique id of the resource metric_id: str # unique metric identifier stat: str # aggregation type, supports ALIGN_MEAN, ALIGN_MAX, ALIGN_MIN project_id: str # GCP project name @@ -375,6 +376,7 @@ def create( metric_name=metric_name, query_name=query_name, period=period, + ref_id=ref_id, metric_id=metric_id, stat=stat, normalization=normalization, diff --git a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py index c3d1b32cb..28eb55fd0 100644 --- a/plugins/gcp/fix_plugin_gcp/resources/monitoring.py +++ b/plugins/gcp/fix_plugin_gcp/resources/monitoring.py @@ -1,5 +1,5 @@ import logging -from concurrent.futures import as_completed +from concurrent.futures import Future from copy import deepcopy from datetime import datetime from functools import cached_property @@ -8,8 +8,8 @@ from attr import define, field from fix_plugin_gcp.gcp_client import GcpApiSpec -from fix_plugin_gcp.resources.base import GraphBuilder, GcpResource, GcpMonitoringQuery, MetricNormalization -from fixlib.baseresources import MetricUnit, StatName +from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery, MetricNormalization +from fixlib.baseresources import MetricUnit, StatName, BaseResource from fixlib.durations import duration_str from fixlib.json import from_json from fixlib.json_bender import S, Bender, ForallBend, bend, K @@ -18,6 +18,7 @@ service_name = "monitoring" log = logging.getLogger("fix.plugins.gcp") T = TypeVar("T") +V = TypeVar("V", bound=BaseResource) STAT_MAP: Dict[str, StatName] = {"ALIGN_MIN": StatName.min, "ALIGN_MEAN": StatName.avg, "ALIGN_MAX": StatName.max} @@ -58,15 +59,15 @@ def query_for( queries: List[GcpMonitoringQuery], start_time: datetime, end_time: datetime, - ) -> "Dict[GcpMonitoringQuery, GcpMonitoringMetricData]": + ) -> List[Future[List[Tuple[str, "GcpMonitoringMetricData"]]]]: if builder.region: log.info( f"[{builder.region.safe_name}|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics." ) else: log.info(f"[global|{start_time}|{duration_str(end_time - start_time)}] Query for {len(queries)} metrics.") - lookup = {q.metric_id: q for q in queries} - result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {} + # lookup = {q.metric_id: q for q in queries} + # result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {} futures = [] api_spec = GcpApiSpec( @@ -76,7 +77,6 @@ def query_for( action="list", request_parameter={ "name": "projects/{project}", - "aggregation_groupByFields": "", "interval_endTime": utc_str(end_time), "interval_startTime": utc_str(start_time), "view": "FULL", @@ -99,16 +99,16 @@ def query_for( ) futures.append(future) # Retrieve results from submitted queries and populate the result dictionary - for future in as_completed(futures): - try: - metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = future.result() - for metric_id, metric in metric_query_result: - if metric is not None and metric_id is not None: - result[lookup[metric_id]] = metric - except Exception as e: - log.warning(f"An error occurred while processing a metric query: {e}") - raise e - return result + # for future in as_completed(futures): + # try: + # metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = future.result() + # for metric_id, metric in metric_query_result: + # if metric is not None and metric_id is not None: + # result[lookup[metric_id]] = metric + # except Exception as e: + # log.warning(f"An error occurred while processing a metric query: {e}") + # raise e + return futures @staticmethod def _query_for_chunk( @@ -146,10 +146,13 @@ def _query_for_chunk( def update_resource_metrics( - resource: GcpResource, + resources_map: Dict[str, V], monitoring_metric_result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData], ) -> None: for query, metric in monitoring_metric_result.items(): + resource = resources_map.get(query.ref_id) + if resource is None: + continue if len(metric.metric_values) == 0: continue normalizer = query.normalization diff --git a/plugins/gcp/test/test_compute.py b/plugins/gcp/test/test_compute.py index 02129e5f0..7a8fbb027 100644 --- a/plugins/gcp/test/test_compute.py +++ b/plugins/gcp/test/test_compute.py @@ -1,4 +1,4 @@ -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta, datetime import json import os @@ -183,34 +183,46 @@ def test_gcp_instance_usage_metrics(random_builder: GraphBuilder) -> None: gcp_instance.instance_status = InstanceStatus.RUNNING random_builder.region = GcpRegion(id="us-east1", name="us-east1") - queries = gcp_instance.collect_usage_metrics(random_builder) + random_builder.created_at = datetime(2020, 5, 30, 15, 45, 30) + random_builder.metrics_delta = timedelta(hours=1) - # simulates the `collect_usage_metrics` method found in `GcpAccountCollector`. - def collect_and_set_metrics(start_at: datetime, _: GcpRegion, queries: List[GcpMonitoringQuery]) -> None: - with ThreadPoolExecutor(max_workers=1) as executor: - queue = ExecutorQueue(executor, tasks_per_key=lambda _: 1, name="test") - g_builder = GraphBuilder( - random_builder.graph, - random_builder.cloud, - random_builder.project, - AnonymousCredentials(), # type: ignore - queue, - random_builder.core_feedback, - random_builder.error_accumulator, - GcpRegion(id="global", name="global"), - random_builder.config, - last_run_started_at=random_builder.last_run_started_at, - ) - result = GcpMonitoringMetricData.query_for(g_builder, queries, start_at, start_at + timedelta(hours=2)) - update_resource_metrics(gcp_instance, result) - - start = datetime(2020, 5, 30, 15, 45, 30) - - collect_and_set_metrics(start, GcpRegion(id="us-east-1", name="us-east-1"), queries) - - assert gcp_instance._resource_usage["cpu_utilization_percent"]["avg"] > 0.0 - assert gcp_instance._resource_usage["network_in_count"]["avg"] > 0.0 - assert gcp_instance._resource_usage["disk_read_count"]["avg"] > 0.0 + queries = gcp_instance.collect_usage_metrics(random_builder) + mq_lookup = {q.metric_id: q for q in queries} + resources_map = {f"{gcp_instance.kind}/{gcp_instance.id}/{gcp_instance.region().id}": gcp_instance} + with ThreadPoolExecutor(max_workers=1) as executor: + queue = ExecutorQueue(executor, tasks_per_key=lambda _: 10, name="test") + g_builder = GraphBuilder( + random_builder.graph, + random_builder.cloud, + random_builder.project, + AnonymousCredentials(), # type: ignore + queue, + random_builder.core_feedback, + random_builder.error_accumulator, + GcpRegion(id="global", name="global"), + random_builder.config, + last_run_started_at=random_builder.last_run_started_at, + ) + metric_data_futures = GcpMonitoringMetricData.query_for( + g_builder, queries, random_builder.created_at, random_builder.created_at + random_builder.metrics_delta + ) + + result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {} + + for metric_data in as_completed(metric_data_futures): + try: + metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = metric_data.result() + for metric_id, metric in metric_query_result: + if metric is not None and metric_id is not None: + result[mq_lookup[metric_id]] = metric + except Exception as e: + log.warning(f"An error occurred while processing a metric query: {e}") + + update_resource_metrics(resources_map, result) + + assert gcp_instance._resource_usage["cpu_utilization_percent"]["avg"] > 0.0 + assert gcp_instance._resource_usage["network_in_count"]["avg"] > 0.0 + assert gcp_instance._resource_usage["disk_read_count"]["avg"] > 0.0 def test_machine_type_ondemand_cost(random_builder: GraphBuilder) -> None: diff --git a/plugins/gcp/test/test_monitoring.py b/plugins/gcp/test/test_monitoring.py index 3b7e3eb3a..2e5858396 100644 --- a/plugins/gcp/test/test_monitoring.py +++ b/plugins/gcp/test/test_monitoring.py @@ -1,4 +1,6 @@ +from concurrent.futures import as_completed from datetime import timedelta, datetime, timezone +from typing import List, Tuple, Dict from fix_plugin_gcp.resources.base import GraphBuilder, GcpMonitoringQuery from fix_plugin_gcp.resources.monitoring import GcpMonitoringMetricData, normalizer_factory @@ -28,6 +30,13 @@ def test_metric(random_builder: GraphBuilder) -> None: project_id=random_builder.project.id, metric_filters={"metric.labels.instance_name": "random_instance", "resource.labels.zone": "global"}, ) - result = GcpMonitoringMetricData.query_for(random_builder, [read, write], earlier, now) + queries = GcpMonitoringMetricData.query_for(random_builder, [read, write], earlier, now) + mq_lookup = {q.metric_id: q for q in [read, write]} + result: Dict[GcpMonitoringQuery, GcpMonitoringMetricData] = {} + for metric_data in as_completed(queries): + metric_query_result: List[Tuple[str, GcpMonitoringMetricData]] = metric_data.result() + for metric_id, metric in metric_query_result: + if metric is not None and metric_id is not None: + result[mq_lookup[metric_id]] = metric assert all(value > 0 for value in result[read].metric_values or []) assert all(value > 0 for value in result[write].metric_values or [])