Skip to content

Commit

Permalink
[inventory][fix] Better detect and remove unused regions
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Oct 4, 2024
1 parent 6f8a26b commit a32c131
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 91 deletions.
11 changes: 6 additions & 5 deletions fixlib/fixlib/baseresources.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from fixlib.json import from_json as _from_json, to_json as _to_json
from fixlib.logger import log
from fixlib.types import Json
from fixlib.utils import make_valid_timestamp, utc_str
from fixlib.utils import make_valid_timestamp, utc_str, utc
from fixlib.basecategories import Category


Expand Down Expand Up @@ -369,7 +369,7 @@ def to_json(self) -> Json:
return _to_json(self, strip_nulls=True)

def log(self, msg: str, data: Optional[Any] = None, exception: Optional[Exception] = None) -> None:
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now = utc()
log_entry = {
"timestamp": now,
"msg": str(msg),
Expand Down Expand Up @@ -418,23 +418,23 @@ def chksum(self) -> str:

@property
def age(self) -> Optional[timedelta]:
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now = utc()
if self.ctime is not None:
return now - self.ctime
else:
return None

@property
def last_access(self) -> Optional[timedelta]:
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now = utc()
if self.atime is not None:
return now - self.atime
else:
return None

@property
def last_update(self) -> Optional[timedelta]:
now = datetime.utcnow().replace(tzinfo=timezone.utc)
now = utc()
if self.mtime is not None:
return now - self.mtime
else:
Expand Down Expand Up @@ -894,6 +894,7 @@ class BaseRegion(PhantomBaseResource):
long_name: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
region_in_use: Optional[bool] = field(default=None, metadata={"description": "Indicates if the region is in use."})

def _keys(self) -> Tuple[Any, ...]:
if self._graph is None:
Expand Down
9 changes: 8 additions & 1 deletion fixlib/fixlib/graph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from enum import Enum
from functools import lru_cache
from time import time
from typing import Dict, Iterator, List, Tuple, Optional, Union, Any, Type, TypeVar, Set
from typing import Dict, Iterator, List, Tuple, Optional, Union, Any, Type, TypeVar, Set, Iterable

import networkx
from attr import resolve_types
Expand Down Expand Up @@ -167,6 +167,13 @@ def add_node(self, node_for_adding: BaseResource, **attr: Any) -> None:
# which stores it as a weakref.
node_for_adding._graph = self

def remove_recursively(self, nodes: Iterable[Any]) -> None:
remove_nodes = set()
for node in nodes:
remove_nodes.add(node)
remove_nodes.update(self.successors(node))
self.remove_nodes_from(remove_nodes)

def has_edge(
self, src: BaseResource, dst: BaseResource, key: Optional[EdgeKey] = None, edge_type: Optional[EdgeType] = None
) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions fixlib/fixlib/json_bender.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from abc import ABC
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import Dict, Any, Type, Union, Optional, Callable, List

Expand Down Expand Up @@ -418,7 +418,7 @@ def execute(self, source: Any) -> Any:
if isinstance(source, str):
return datetime.strptime(source, self._format)
elif isinstance(source, (int, float)):
return datetime.utcfromtimestamp(source)
return datetime.fromtimestamp(source, timezone.utc)
else:
return source

Expand All @@ -435,7 +435,7 @@ def __init__(self, out_format: str = "%Y-%m-%dT%H:%M:%SZ", **kwargs: Any):

def execute(self, source: Any) -> Any:
if isinstance(source, (int, float)):
return datetime.utcfromtimestamp(source).strftime(self._out_format)
return datetime.fromtimestamp(source, timezone.utc).strftime(self._out_format)
else:
return source

Expand Down Expand Up @@ -582,7 +582,7 @@ def execute(self, source: Any) -> Any:

class SecondsFromEpochToDatetime(Bender):
def execute(self, source: int) -> str:
return utc_str(datetime.utcfromtimestamp(source))
return utc_str(datetime.fromtimestamp(source, timezone.utc))


EmptyToNone = EmptyToNoneBender()
Expand Down
16 changes: 8 additions & 8 deletions plugins/aws/fix_plugin_aws/collector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
import logging
from collections import defaultdict
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
from typing import List, Type, Optional, ClassVar, Union, cast, Dict, Any
Expand Down Expand Up @@ -262,7 +262,7 @@ def get_last_run() -> Optional[datetime]:
# wait for all futures to finish
shared_queue.wait_for_submitted_work()
# remove unused nodes
self.remove_unused()
self.remove_unused(global_builder)
self.core_feedback.progress_done(self.account.dname, 1, 1, context=[self.cloud.id])
self.error_accumulator.report_all(global_builder.core_feedback)

Expand Down Expand Up @@ -334,10 +334,9 @@ def collect_and_set_metrics(

builder.submit_work("cloudwatch", collect_and_set_metrics, start, region, queries)

def remove_unused(self) -> None:
remove_nodes = []

def rm_nodes(cls, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = True) -> None: # type: ignore
def remove_unused(self, builder: GraphBuilder) -> None:
def rm_leaf_nodes(cls: Any, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = True) -> None:
remove_nodes = []
for node in self.graph.nodes:
if not isinstance(node, cls):
continue
Expand All @@ -356,9 +355,10 @@ def rm_nodes(cls, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = T
continue
removed.add(node)
self.graph.remove_node(node)
remove_nodes.clear()

rm_nodes(bedrock.AwsBedrockFoundationModel, check_pred=False)
rm_leaf_nodes(bedrock.AwsBedrockFoundationModel, check_pred=False)
# remove regions that are not in use
self.graph.remove_recursively(builder.nodes(AwsRegion, lambda r: r.region_in_use is False))

# TODO: move into separate AwsAccountSettings
def update_account(self) -> None:
Expand Down
48 changes: 35 additions & 13 deletions plugins/aws/fix_plugin_aws/resource/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@

import logging
import re
from urllib.parse import quote_plus as urlquote
from abc import ABC
from concurrent.futures import Future
from datetime import datetime, timezone, timedelta
from functools import lru_cache
from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Type, TypeVar, Tuple
from urllib.parse import quote_plus as urlquote

from math import ceil

from attr import evolve, field
from attr import evolve
from attrs import define
from boto3.exceptions import Boto3Error
from fixinventorydata.cloud import instances as cloud_instance_data, regions as cloud_region_data
from math import ceil

from fix_plugin_aws.aws_client import AwsClient
from fix_plugin_aws.configuration import AwsConfig
from fix_plugin_aws.resource.pricing import AwsPricingPrice
from fix_plugin_aws.utils import arn_partition
from fixlib.utils import utc
from fixlib.baseresources import (
BaseAccount,
BaseIamPrincipal,
Expand All @@ -40,7 +39,7 @@
from fixlib.proc import set_thread_name
from fixlib.threading import ExecutorQueue
from fixlib.types import Json
from fixinventorydata.cloud import instances as cloud_instance_data, regions as cloud_region_data
from fixlib.utils import utc

log = logging.getLogger("fix.plugins.aws")

Expand Down Expand Up @@ -362,7 +361,6 @@ class AwsRegion(BaseRegion, AwsResource):
]
}
}
region_in_use: Optional[bool] = field(default=None, metadata={"description": "Indicates if the region is in use."})

def __attrs_post_init__(self) -> None:
super().__attrs_post_init__()
Expand All @@ -374,12 +372,30 @@ def __attrs_post_init__(self) -> None:

def complete_graph(self, builder: GraphBuilder, source: Json) -> None:
count = 0
# A region with less than 10 real resources is considered not in use.
ignore_kinds = {
"aws_athena_work_group",
"aws_cloud_trail",
"aws_ec2_internet_gateway",
"aws_ec2_network_acl",
"aws_ec2_security_group",
"aws_ec2_subnet",
"aws_ec2_route_table",
}

def ignore_for_count(resource: BaseResource) -> bool:
if isinstance(resource, PhantomBaseResource):
return True
if resource.kind == "aws_vpc" and getattr(resource, "vpc_is_default", False):
return True
if resource.kind in ignore_kinds:
return True
return False

# A region with less than 3 real resources is considered not in use.
# AWS is creating a couple of resources in every region automatically.
# The number 10 is chosen by looking into different empty regions.
empty_region = 10
empty_region = 3
for succ in builder.graph.descendants(self):
if not isinstance(succ, PhantomBaseResource):
if not ignore_for_count(succ):
count += 1
if count > empty_region:
break
Expand Down Expand Up @@ -487,11 +503,17 @@ def node(self, clazz: Optional[Type[AwsResourceType]] = None, **node: Any) -> Op
return n # type: ignore
return None

def nodes(self, clazz: Optional[Type[AwsResourceType]] = None, **node: Any) -> Iterator[AwsResourceType]:
def nodes(
self, clazz: Optional[Type[AwsResourceType]] = None, filter: Optional[Callable[[Any], bool]] = None, **node: Any
) -> Iterator[AwsResourceType]:
with self.graph_nodes_access.read_access:
for n in self.graph:
is_clazz = isinstance(n, clazz) if clazz else True
if is_clazz and all(getattr(n, k, None) == v for k, v in node.items()):
if (
is_clazz
and (filter(n) if filter else True)
and all(getattr(n, k, None) == v for k, v in node.items())
):
yield n

def add_node(
Expand Down
6 changes: 3 additions & 3 deletions plugins/aws/fix_plugin_aws/resource/sqs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import ClassVar, Dict, List, Optional, Type, Any


Expand Down Expand Up @@ -51,8 +51,8 @@ class AwsSqsQueue(AwsResource, BaseQueue):
mapping: ClassVar[Dict[str, Bender]] = {
"id": S("QueueName"),
"name": S("QueueName"),
"ctime": S("CreatedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"mtime": S("LastModifiedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.utcfromtimestamp(x))),
"ctime": S("CreatedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.fromtimestamp(x, timezone.utc))),
"mtime": S("LastModifiedTimestamp") >> AsInt() >> F(lambda x: utc_str(datetime.fromtimestamp(x, timezone.utc))),
"arn": S("QueueArn"),
"sqs_queue_url": S("QueueUrl"),
"sqs_approximate_number_of_messages": S("ApproximateNumberOfMessages") >> AsInt(),
Expand Down
56 changes: 30 additions & 26 deletions plugins/azure/fix_plugin_azure/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@
resources as compute_resources,
)
from fix_plugin_azure.resource.containerservice import resources as aks_resources
from fix_plugin_azure.resource.cosmosdb import (
AzureCosmosDBLocation,
resources as cosmosdb_resources,
)
from fix_plugin_azure.resource.keyvault import resources as keyvault_resources
from fix_plugin_azure.resource.machinelearning import (
AzureMachineLearningUsage,
AzureMachineLearningVirtualMachineSize,
resources as ml_resources,
)
from fix_plugin_azure.resource.microsoft_graph import (
MicrosoftGraphOrganization,
resources as graph_resources,
Expand All @@ -43,17 +52,8 @@
)
from fix_plugin_azure.resource.security import resources as security_resources
from fix_plugin_azure.resource.sql_server import resources as sql_resources
from fix_plugin_azure.resource.cosmosdb import (
AzureCosmosDBLocation,
resources as cosmosdb_resources,
)
from fix_plugin_azure.resource.storage import AzureStorageAccountUsage, AzureStorageSku, resources as storage_resources
from fix_plugin_azure.resource.web import resources as web_resources
from fix_plugin_azure.resource.machinelearning import (
AzureMachineLearningUsage,
AzureMachineLearningVirtualMachineSize,
resources as ml_resources,
)
from fixlib.baseresources import Cloud, GraphRoot, BaseAccount, BaseRegion
from fixlib.core.actions import CoreFeedback, ErrorAccumulator
from fixlib.graph import Graph
Expand Down Expand Up @@ -100,6 +100,7 @@ def __init__(
core_feedback: CoreFeedback,
task_data: Optional[Json] = None,
max_resources_per_account: Optional[int] = None,
filter_unused_resources: bool = True,
):
self.config = config
self.cloud = cloud
Expand All @@ -108,6 +109,7 @@ def __init__(
self.core_feedback = core_feedback
self.graph = Graph(root=account, max_nodes=max_resources_per_account)
self.task_data = task_data
self.filter_unused_resources = filter_unused_resources

def collect(self) -> None:
with ThreadPoolExecutor(
Expand Down Expand Up @@ -165,13 +167,14 @@ def get_last_run() -> Optional[datetime]:
queue.wait_for_submitted_work()

# post-process nodes
self.remove_unused()
if self.filter_unused_resources:
self.remove_unused(builder)
for node, data in list(self.graph.nodes(data=True)):
if isinstance(node, MicrosoftResource):
node.after_collect(builder, data.get("source", {}))

# delete unnecessary nodes after all work is completed
self.after_collect()
self.after_collect(builder)
# report all accumulated errors
error_accumulator.report_all(self.core_feedback)
self.core_feedback.progress_done(self.account.id, 1, 1, context=[self.cloud.id])
Expand Down Expand Up @@ -205,10 +208,10 @@ def collect_with(self, builder: GraphBuilder, locations: Dict[str, BaseRegion])
def locations(self, builder: GraphBuilder) -> Dict[str, BaseRegion]:
pass

def remove_unused(self) -> None:
def remove_unused(self, builder: GraphBuilder) -> None:
pass

def after_collect(self) -> None:
def after_collect(self, builder: GraphBuilder) -> None:
pass


Expand All @@ -234,10 +237,10 @@ def collect_with(self, builder: GraphBuilder, locations: Dict[str, BaseRegion])
self.collect_resource_list(location.safe_name, builder.with_location(location), regional_resources)
processed_locations.add(location.safe_name)

def remove_unused(self) -> None:
def remove_unused(self, builder: GraphBuilder) -> None:
remove_nodes = []

def rm_nodes(cls, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = True) -> None: # type: ignore
def rm_leaf_nodes(cls: Any, ignore_kinds: Optional[Type[Any]] = None, check_pred: bool = True) -> None:
for node in self.graph.nodes:
if not isinstance(node, cls):
continue
Expand All @@ -263,19 +266,20 @@ def remove_usage_zero_value() -> None:
remove_nodes.append(node)
self._delete_nodes(remove_nodes)

rm_nodes(AzureComputeVirtualMachineSize, AzureLocation)
rm_nodes(AzureNetworkExpressRoutePortsLocation, AzureSubscription)
rm_nodes(AzureNetworkVirtualApplianceSku, AzureSubscription)
rm_nodes(AzureComputeDiskType, AzureSubscription)
rm_nodes(AzureMachineLearningVirtualMachineSize, AzureLocation)
rm_nodes(AzureStorageSku, AzureLocation)
rm_nodes(AzureMysqlServerType, AzureSubscription)
rm_nodes(AzurePostgresqlServerType, AzureSubscription)
rm_nodes(AzureCosmosDBLocation, AzureLocation, check_pred=False)
rm_nodes(AzureLocation, check_pred=False)
rm_leaf_nodes(AzureComputeVirtualMachineSize, AzureLocation)
rm_leaf_nodes(AzureNetworkExpressRoutePortsLocation, AzureSubscription)
rm_leaf_nodes(AzureNetworkVirtualApplianceSku, AzureSubscription)
rm_leaf_nodes(AzureComputeDiskType, AzureSubscription)
rm_leaf_nodes(AzureMachineLearningVirtualMachineSize, AzureLocation)
rm_leaf_nodes(AzureStorageSku, AzureLocation)
rm_leaf_nodes(AzureMysqlServerType, AzureSubscription)
rm_leaf_nodes(AzurePostgresqlServerType, AzureSubscription)
rm_leaf_nodes(AzureCosmosDBLocation, AzureLocation, check_pred=False)
rm_leaf_nodes(AzureLocation, check_pred=False)
remove_usage_zero_value()
self.graph.remove_recursively(builder.nodes(AzureLocation, lambda r: r.region_in_use is False))

def after_collect(self) -> None:
def after_collect(self, builder: GraphBuilder) -> None:
# Filter unnecessary nodes such as AzureComputeDiskTypePricing
nodes_to_remove = []
node_types = (AzureComputeDiskTypePricing,)
Expand Down
Loading

0 comments on commit a32c131

Please sign in to comment.