Skip to content

Commit

Permalink
[gcp][feat] Add metrics collection (#2283)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Veit <[email protected]>
  • Loading branch information
1101-1 and aquamatthias authored Dec 4, 2024
1 parent 786318a commit c626700
Show file tree
Hide file tree
Showing 20 changed files with 857 additions and 36 deletions.
7 changes: 5 additions & 2 deletions fixlib/fixlib/baseresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def get(self) -> Dict[str, Any]:
return changes


# todo: replace to StrEnum once resoto is on 3.11
class MetricName(str, Enum):
class MetricName(StrEnum):
def __str__(self) -> str:
return self.value

Expand Down Expand Up @@ -150,6 +149,8 @@ def __str__(self) -> str:

# load balancers
RequestCount = "request" # _count will be added to the end because of the unit
RequestBytesCount = "request_bytes" # _count will be added to the end because of the unit
ResponseBytesCount = "response_bytes" # _count will be added to the end because of the unit
ActiveConnectionCount = "active_connection" # _count will be added to the end because of the unit
ALBActiveConnectionCount = "alb_active_connection" # _count will be added to the end because of the unit
ConnectionAttemptCount = "connection_attempt" # _count will be added to the end because of the unit
Expand Down Expand Up @@ -195,6 +196,8 @@ def __str__(self) -> str:
DiskQueueDepth = "disk_queue_depth"
NetworkReceiveThroughput = "network_receive_throughput"
NetworkTransmitThroughput = "network_transmit_throughput"
NetworkBytesSent = "network_bytes_sent"
NetworkBytesReceived = "network_bytes_received"

# serverless
Invocations = "invocations"
Expand Down
5 changes: 3 additions & 2 deletions plugins/aws/fix_plugin_aws/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ def get_last_run() -> Optional[datetime]:
try:
log.info(f"[Aws:{self.account.id}] Collect usage metrics.")
self.collect_usage_metrics(global_builder)
shared_queue.wait_for_submitted_work()
except Exception as e:
log.warning(
f"Failed to collect usage metrics on account {self.account.id} in region {global_builder.region.id}: {e}"
)
shared_queue.wait_for_submitted_work()

# call all registered after collect hooks
for after_collect in global_builder.after_collect_actions:
Expand Down Expand Up @@ -334,8 +334,9 @@ def collect_usage_metrics(self, builder: GraphBuilder) -> None:
continue
# region can be overridden in the query: s3 is global, but need to be queried per region
if region := cast(AwsRegion, resource.region()):
lookup_map[resource.id] = resource
resource_queries: List[cloudwatch.AwsCloudwatchQuery] = resource.collect_usage_metrics(builder)
if resource_queries:
lookup_map[resource.id] = resource
for query in resource_queries:
query_region = query.region or region
start = query.start_delta or builder.metrics_delta
Expand Down
6 changes: 4 additions & 2 deletions plugins/azure/fix_plugin_azure/resource/metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
from datetime import datetime, timedelta
from concurrent.futures import as_completed
import logging
Expand Down Expand Up @@ -271,12 +272,13 @@ def _query_for_single(
interval: str,
) -> "Tuple[Optional[AzureMetricData], Optional[str]]":
try:
local_api_spec = deepcopy(api_spec)
# Set the path for the API call based on the instance ID of the query
api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
local_api_spec.path = f"{query.instance_id}/providers/Microsoft.Insights/metrics"
# Retrieve metric data from the API
aggregation = ",".join(query.aggregation)
part = builder.client.list(
api_spec,
local_api_spec,
metricnames=query.metric_name,
metricNamespace=query.metric_namespace,
timespan=timespan,
Expand Down
7 changes: 5 additions & 2 deletions plugins/gcp/fix_plugin_gcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fixlib.core.actions import CoreFeedback
from fixlib.graph import Graph, MaxNodesExceeded
from fixlib.logger import log, setup_logger
from fixlib.types import Json
from .collector import GcpProjectCollector
from .config import GcpConfig
from .resources.base import GcpProject
Expand Down Expand Up @@ -77,10 +78,11 @@ def collect(self) -> None:
project_id,
feedback,
cloud,
self.task_data or {},
max_resources_per_account=self.max_resources_per_account,
**collect_args,
)
for project_id in credentials.keys()
for project_id in credentials
]
for future in futures.as_completed(wait_for):
project_graph = future.result()
Expand All @@ -98,6 +100,7 @@ def collect_project(
project_id: str,
core_feedback: CoreFeedback,
cloud: Cloud,
task_data: Json,
args: Optional[Namespace] = None,
running_config: Optional[RunningConfig] = None,
credentials: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -130,7 +133,7 @@ def collect_project(

try:
core_feedback.progress_done(project_id, 0, 1)
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, max_resources_per_account)
gpc = GcpProjectCollector(Config.gcp, cloud, project, core_feedback, task_data, max_resources_per_account)
try:
gpc.collect()
except MaxNodesExceeded as ex:
Expand Down
40 changes: 39 additions & 1 deletion plugins/gcp/fix_plugin_gcp/collector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Type, List, Any, Optional
from datetime import datetime, timezone
from typing import Type, List, Any, Optional, cast

from fix_plugin_gcp.config import GcpConfig
from fix_plugin_gcp.gcp_client import GcpApiSpec
Expand All @@ -14,12 +15,15 @@
firestore,
filestore,
cloudfunctions,
monitoring,
)
from fix_plugin_gcp.resources.base import GcpResource, GcpProject, ExecutorQueue, GraphBuilder, GcpRegion, GcpZone
from fix_plugin_gcp.utils import Credentials
from fixlib.baseresources import Cloud
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph
from fixlib.json import value_in_path
from fixlib.types import Json

log = logging.getLogger("fix.plugins.gcp")
all_resources: List[Type[GcpResource]] = (
Expand Down Expand Up @@ -58,6 +62,7 @@ def __init__(
cloud: Cloud,
project: GcpProject,
core_feedback: CoreFeedback,
task_data: Json,
max_resources_per_account: Optional[int] = None,
) -> None:
self.config = config
Expand All @@ -67,6 +72,7 @@ def __init__(
self.error_accumulator = ErrorAccumulator()
self.graph = Graph(root=self.project, max_nodes=max_resources_per_account)
self.credentials = Credentials.get(self.project.id)
self.task_data = task_data

def collect(self) -> None:
with ThreadPoolExecutor(
Expand All @@ -77,7 +83,20 @@ def collect(self) -> None:
# It should only be used in scenarios, where it is safe to do so.
# This executor is shared between all regions.
shared_queue = ExecutorQueue(executor, self.project.safe_name)

def get_last_run() -> Optional[datetime]:
td = self.task_data
if not td:
return None
timestamp = value_in_path(td, ["timing", td.get("step", ""), "started_at"])

if timestamp is None:
return None

return datetime.fromtimestamp(timestamp, timezone.utc)

project_global_region = GcpRegion.fallback_global_region(self.project)
last_run = get_last_run()
global_builder = GraphBuilder(
self.graph,
self.cloud,
Expand All @@ -87,6 +106,8 @@ def collect(self) -> None:
self.core_feedback,
self.error_accumulator,
project_global_region,
config=self.config,
last_run_started_at=last_run,
)
global_builder.add_node(project_global_region, {})

Expand All @@ -113,6 +134,13 @@ def collect(self) -> None:

self.error_accumulator.report_all(global_builder.core_feedback)

if global_builder.config.collect_usage_metrics:
try:
log.info(f"[GCP:{self.project.id}] Collect usage metrics.")
self.collect_usage_metrics(global_builder)
global_builder.executor.wait_for_submitted_work()
except Exception as e:
log.warning(f"Failed to collect usage metrics in project {self.project.id}: {e}")
log.info(f"[GCP:{self.project.id}] Connect resources and create edges.")
# connect nodes
for node, data in list(self.graph.nodes(data=True)):
Expand All @@ -128,9 +156,19 @@ def collect(self) -> None:
if isinstance(node, GcpResource):
node.post_process_instance(global_builder, data.get("source", {}))

global_builder.executor.wait_for_submitted_work()

self.core_feedback.progress_done(self.project.id, 1, 1, context=[self.cloud.id])
log.info(f"[GCP:{self.project.id}] Collecting resources done.")

def collect_usage_metrics(self, builder: GraphBuilder) -> None:
for resource in builder.graph.nodes:
if isinstance(resource, GcpResource) and (mq := resource.collect_usage_metrics(builder)):
start_at = builder.created_at - builder.metrics_delta
region = cast(GcpRegion, resource.region())
rb = builder.for_region(region)
monitoring.GcpMonitoringMetricData.query_for(rb, resource, mq, start_at, builder.created_at)

def remove_unconnected_nodes(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(clazz: Any, ignore_kinds: Optional[Type[Any]] = None) -> None:
remove_nodes = set()
Expand Down
9 changes: 6 additions & 3 deletions plugins/gcp/fix_plugin_gcp/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from fixlib.proc import num_default_threads
from attrs import define, field
from typing import List, ClassVar
from typing import List, ClassVar, Optional


@define
Expand All @@ -17,7 +16,7 @@ class GcpConfig:
metadata={"description": "GCP services to exclude (default: none)"},
)
project_pool_size: int = field(
factory=num_default_threads,
default=64,
metadata={"description": "GCP project thread/process pool size"},
)
fork_process: bool = field(
Expand All @@ -31,6 +30,10 @@ class GcpConfig:
"If false, the error is logged and the resource is skipped."
},
)
collect_usage_metrics: Optional[bool] = field(
default=True,
metadata={"description": "Collect resource usage metrics via GCP Monitoring, enabled by default"},
)

def should_collect(self, name: str) -> bool:
if self.collect:
Expand Down
6 changes: 3 additions & 3 deletions plugins/gcp/fix_plugin_gcp/resources/aiplatform.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class AIPlatformRegionFilter:
def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpResource]:
# Default behavior: in case the class has an ApiSpec, call the api and call collect.
if issubclass(cls, GcpResource):
region_name = "global" if not builder.region else builder.region.safe_name
log.info(f"[GCP:{builder.project.id}:{region_name}] Collecting {cls.kind}")
if spec := cls.api_spec:
expected_errors = GcpExpectedErrorCodes | (spec.expected_errors or set()) | {"HttpError:none:none"}
with GcpErrorHandler(
Expand All @@ -66,7 +64,9 @@ def collect_resources(cls, builder: GraphBuilder, **kwargs: Any) -> List[GcpReso
if builder.region:
items = builder.client.list(spec, **kwargs)
collected_resources = cls.collect(items, builder)
log.info(f"[GCP:{builder.project.id}] finished collecting: {cls.kind}")
log.info(
f"[GCP:{builder.project.id}:{builder.region.safe_name}] finished collecting: {cls.kind}"
)
return collected_resources
return []

Expand Down
Loading

0 comments on commit c626700

Please sign in to comment.