diff --git a/plugins/aws/fix_plugin_aws/resource/base.py b/plugins/aws/fix_plugin_aws/resource/base.py index 11f899d3f2..7a487b24c7 100644 --- a/plugins/aws/fix_plugin_aws/resource/base.py +++ b/plugins/aws/fix_plugin_aws/resource/base.py @@ -66,17 +66,6 @@ def get_client(config: Config, resource: BaseResource) -> AwsClient: } -@define(slots=True, frozen=True) -class AssessmentKey: - provider: str - region: str - resource_type: str - - -# Type alias for the inner dictionary that maps resource ID to a list of findings -ResourceFindings = Dict[str, List[Finding]] - - def parse_json( json: Json, clazz: Type[T], builder: GraphBuilder, mapping: Optional[Dict[str, Bender]] = None ) -> Optional[T]: diff --git a/plugins/aws/fix_plugin_aws/resource/ec2.py b/plugins/aws/fix_plugin_aws/resource/ec2.py index 4309253a39..881b234308 100644 --- a/plugins/aws/fix_plugin_aws/resource/ec2.py +++ b/plugins/aws/fix_plugin_aws/resource/ec2.py @@ -18,7 +18,6 @@ operations_to_iops, normalizer_factory, ) -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.resource.iam import AwsIamInstanceProfile from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.resource.s3 import AwsS3Bucket @@ -706,7 +705,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: clazz=AwsKmsKey, id=AwsKmsKey.normalise_id(self.volume_kms_key_id), ) - AwsGuardDutyFinding.set_findings(builder, self, "arn") def delete_resource(self, client: AwsClient, graph: Graph) -> bool: client.call( @@ -1524,8 +1522,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: if iam_profile := self.instance_iam_instance_profile: builder.add_edge(self, reverse=True, clazz=AwsIamInstanceProfile, arn=iam_profile.arn) - AwsGuardDutyFinding.set_findings(builder, self, "id") - def delete_resource(self, client: AwsClient, graph: Graph) -> bool: if self.instance_status == InstanceStatus.TERMINATED: self.log("Instance is already terminated") diff --git a/plugins/aws/fix_plugin_aws/resource/ecr.py b/plugins/aws/fix_plugin_aws/resource/ecr.py index 21c6811fd2..592f1f26a7 100644 --- a/plugins/aws/fix_plugin_aws/resource/ecr.py +++ b/plugins/aws/fix_plugin_aws/resource/ecr.py @@ -34,7 +34,6 @@ class AwsEcrRepository(AwsResource, HasResourcePolicy): _kind_service: ClassVar[Optional[str]] = service_name _metadata: ClassVar[Dict[str, Any]] = {"icon": "repository", "group": "compute"} _aws_metadata: ClassVar[Dict[str, Any]] = {"provider_link_tpl": "https://{region_id}.console.aws.amazon.com/ecr/repositories/{name}?region={region}", "arn_tpl": "arn:{partition}:ecr:{region}:{account}:repository/{name}"} # fmt: skip - _reference_kinds: ClassVar[ModelReference] = {} api_spec: ClassVar[AwsApiSpec] = AwsApiSpec("ecr", "describe-repositories", "repositories") public_spec: ClassVar[AwsApiSpec] = AwsApiSpec("ecr-public", "describe-repositories", "repositories") mapping: ClassVar[Dict[str, Bender]] = { diff --git a/plugins/aws/fix_plugin_aws/resource/ecs.py b/plugins/aws/fix_plugin_aws/resource/ecs.py index 9d52e0c5e1..7570e20cb3 100644 --- a/plugins/aws/fix_plugin_aws/resource/ecs.py +++ b/plugins/aws/fix_plugin_aws/resource/ecs.py @@ -6,7 +6,6 @@ from attrs import define, field from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.resource.autoscaling import AwsAutoScalingGroup -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.utils import TagsValue, ToDict from fix_plugin_aws.resource.base import AwsResource, GraphBuilder, AwsApiSpec @@ -2111,7 +2110,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: builder.dependant_node(self, clazz=AwsKmsKey, id=AwsKmsKey.normalise_id(exc.kms_key_id)) if exc.log_configuration and exc.log_configuration.s3_bucket_name: builder.add_edge(self, clazz=AwsS3Bucket, name=exc.log_configuration.s3_bucket_name) - AwsGuardDutyFinding.set_findings(builder, self, "arn") def delete_resource(self, client: AwsClient, graph: Graph) -> bool: client.call(aws_service=self.api_spec.service, action="delete-cluster", result_name=None, cluster=self.arn) diff --git a/plugins/aws/fix_plugin_aws/resource/eks.py b/plugins/aws/fix_plugin_aws/resource/eks.py index a5ea970e9f..9ad54520b6 100644 --- a/plugins/aws/fix_plugin_aws/resource/eks.py +++ b/plugins/aws/fix_plugin_aws/resource/eks.py @@ -6,7 +6,6 @@ from fix_plugin_aws.resource.autoscaling import AwsAutoScalingGroup from fix_plugin_aws.resource.base import AwsResource, GraphBuilder, AwsApiSpec -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.resource.iam import AwsIamRole from fix_plugin_aws.aws_client import AwsClient from fixlib.baseresources import ModelReference, BaseManagedKubernetesClusterProvider @@ -478,7 +477,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: builder.dependant_node( self, reverse=True, delete_same_as_default=True, clazz=AwsIamRole, arn=self.cluster_role_arn ) - AwsGuardDutyFinding.set_findings(builder, self, "arn") def delete_resource(self, client: AwsClient, graph: Graph) -> bool: client.call(aws_service=self.api_spec.service, action="delete-cluster", result_name=None, name=self.name) diff --git a/plugins/aws/fix_plugin_aws/resource/guardduty.py b/plugins/aws/fix_plugin_aws/resource/guardduty.py index bd4162439f..4e333f0cba 100644 --- a/plugins/aws/fix_plugin_aws/resource/guardduty.py +++ b/plugins/aws/fix_plugin_aws/resource/guardduty.py @@ -1,20 +1,28 @@ from concurrent.futures import as_completed from datetime import datetime, timezone +from functools import partial from typing import ClassVar, Dict, List, Optional, Tuple, Type, Any import logging from attrs import define, field from boto3.exceptions import Boto3Error -from fix_plugin_aws.resource.base import AssessmentKey, AwsResource, GraphBuilder +from fix_plugin_aws.resource.base import AwsResource, GraphBuilder +from fix_plugin_aws.resource.ec2 import AwsEc2Instance, AwsEc2Volume +from fix_plugin_aws.resource.ecs import AwsEcsCluster +from fix_plugin_aws.resource.eks import AwsEksCluster +from fix_plugin_aws.resource.lambda_ import AwsLambdaFunction +from fix_plugin_aws.resource.rds import AwsRdsCluster, AwsRdsInstance +from fix_plugin_aws.resource.s3 import AwsS3Bucket -from fixlib.baseresources import Assessment, Finding, PhantomBaseResource, Severity +from fixlib.baseresources import Finding, PhantomBaseResource, Severity from fixlib.json_bender import F, S, AsInt, Bend, Bender, ForallBend from fixlib.types import Json from fixlib.utils import chunks, utc_str log = logging.getLogger("fix.plugins.aws") service_name = "guardduty" +amazon_guardduty = "amazon_guard_duty" @define(eq=False, slots=False) @@ -567,664 +575,10 @@ class AwsGuardDutyResource: lambda_details: Optional[AwsGuardDutyLambdaDetails] = field(default=None, metadata={"description": "Contains information about the Lambda function that was involved in a finding."}) # fmt: skip -@define(eq=False, slots=False) -class AwsGuardDutyCountry: - kind: ClassVar[str] = "aws_guard_duty_country" - mapping: ClassVar[Dict[str, Bender]] = {"country_code": S("CountryCode"), "country_name": S("CountryName")} - country_code: Optional[str] = field(default=None, metadata={"description": "The country code of the remote IP address."}) # fmt: skip - country_name: Optional[str] = field(default=None, metadata={"description": "The country name of the remote IP address."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyGeoLocation: - kind: ClassVar[str] = "aws_guard_duty_geo_location" - mapping: ClassVar[Dict[str, Bender]] = {"lat": S("Lat"), "lon": S("Lon")} - lat: Optional[float] = field(default=None, metadata={"description": "The latitude information of the remote IP address."}) # fmt: skip - lon: Optional[float] = field(default=None, metadata={"description": "The longitude information of the remote IP address."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyOrganization: - kind: ClassVar[str] = "aws_guard_duty_organization" - mapping: ClassVar[Dict[str, Bender]] = {"asn": S("Asn"), "asn_org": S("AsnOrg"), "isp": S("Isp"), "org": S("Org")} - asn: Optional[str] = field(default=None, metadata={"description": "The Autonomous System Number (ASN) of the internet provider of the remote IP address."}) # fmt: skip - asn_org: Optional[str] = field(default=None, metadata={"description": "The organization that registered this ASN."}) # fmt: skip - isp: Optional[str] = field(default=None, metadata={"description": "The ISP information for the internet provider."}) # fmt: skip - org: Optional[str] = field(default=None, metadata={"description": "The name of the internet provider."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRemoteIpDetails: - kind: ClassVar[str] = "aws_guard_duty_remote_ip_details" - mapping: ClassVar[Dict[str, Bender]] = { - "city": S("City", "CityName"), - "country": S("Country") >> Bend(AwsGuardDutyCountry.mapping), - "geo_location": S("GeoLocation") >> Bend(AwsGuardDutyGeoLocation.mapping), - "ip_address_v4": S("IpAddressV4"), - "ip_address_v6": S("IpAddressV6"), - "organization": S("Organization") >> Bend(AwsGuardDutyOrganization.mapping), - } - city: Optional[str] = field(default=None, metadata={"description": "The city information of the remote IP address."}) # fmt: skip - country: Optional[AwsGuardDutyCountry] = field(default=None, metadata={"description": "The country code of the remote IP address."}) # fmt: skip - geo_location: Optional[AwsGuardDutyGeoLocation] = field(default=None, metadata={"description": "The location information of the remote IP address."}) # fmt: skip - ip_address_v4: Optional[str] = field(default=None, metadata={"description": "The IPv4 remote address of the connection."}) # fmt: skip - ip_address_v6: Optional[str] = field(default=None, metadata={"description": "The IPv6 remote address of the connection."}) # fmt: skip - organization: Optional[AwsGuardDutyOrganization] = field(default=None, metadata={"description": "The ISP organization information of the remote IP address."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRemoteAccountDetails: - kind: ClassVar[str] = "aws_guard_duty_remote_account_details" - mapping: ClassVar[Dict[str, Bender]] = {"account_id": S("AccountId"), "affiliated": S("Affiliated")} - account_id: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services account ID of the remote API caller."}) # fmt: skip - affiliated: Optional[bool] = field(default=None, metadata={"description": "Details on whether the Amazon Web Services account of the remote API caller is related to your GuardDuty environment. If this value is True the API caller is affiliated to your account in some way. If it is False the API caller is from outside your environment."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyAwsApiCallAction: - kind: ClassVar[str] = "aws_guard_duty_aws_api_call_action" - mapping: ClassVar[Dict[str, Bender]] = { - "api": S("Api"), - "caller_type": S("CallerType"), - "domain_details": S("DomainDetails", "Domain"), - "error_code": S("ErrorCode"), - "user_agent": S("UserAgent"), - "remote_ip_details": S("RemoteIpDetails") >> Bend(AwsGuardDutyRemoteIpDetails.mapping), - "service_name": S("ServiceName"), - "remote_account_details": S("RemoteAccountDetails") >> Bend(AwsGuardDutyRemoteAccountDetails.mapping), - "affected_resources": S("AffectedResources"), - } - api: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services API name."}) # fmt: skip - caller_type: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services API caller type."}) # fmt: skip - domain_details: Optional[str] = field(default=None, metadata={"description": "The domain information for the Amazon Web Services API call."}) # fmt: skip - error_code: Optional[str] = field(default=None, metadata={"description": "The error code of the failed Amazon Web Services API action."}) # fmt: skip - user_agent: Optional[str] = field(default=None, metadata={"description": "The agent through which the API request was made."}) # fmt: skip - remote_ip_details: Optional[AwsGuardDutyRemoteIpDetails] = field(default=None, metadata={"description": "The remote IP information of the connection that initiated the Amazon Web Services API call."}) # fmt: skip - service_name: Optional[str] = field(default=None, metadata={"description": "The Amazon Web Services service name whose API was invoked."}) # fmt: skip - remote_account_details: Optional[AwsGuardDutyRemoteAccountDetails] = field(default=None, metadata={"description": "The details of the Amazon Web Services account that made the API call. This field appears if the call was made from outside your account."}) # fmt: skip - affected_resources: Optional[Dict[str, str]] = field(default=None, metadata={"description": "The details of the Amazon Web Services account that made the API call. This field identifies the resources that were affected by this API call."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyDnsRequestAction: - kind: ClassVar[str] = "aws_guard_duty_dns_request_action" - mapping: ClassVar[Dict[str, Bender]] = { - "domain": S("Domain"), - "protocol": S("Protocol"), - "blocked": S("Blocked"), - "domain_with_suffix": S("DomainWithSuffix"), - } - domain: Optional[str] = field(default=None, metadata={"description": "The domain information for the DNS query."}) # fmt: skip - protocol: Optional[str] = field(default=None, metadata={"description": "The network connection protocol observed in the activity that prompted GuardDuty to generate the finding."}) # fmt: skip - blocked: Optional[bool] = field(default=None, metadata={"description": "Indicates whether the targeted port is blocked."}) # fmt: skip - domain_with_suffix: Optional[str] = field(default=None, metadata={"description": "The second and top level domain involved in the activity that potentially prompted GuardDuty to generate this finding. For a list of top-level and second-level domains, see public suffix list."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyLocalPortDetails: - kind: ClassVar[str] = "aws_guard_duty_local_port_details" - mapping: ClassVar[Dict[str, Bender]] = {"port": S("Port"), "port_name": S("PortName")} - port: Optional[int] = field(default=None, metadata={"description": "The port number of the local connection."}) # fmt: skip - port_name: Optional[str] = field(default=None, metadata={"description": "The port name of the local connection."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyLocalIpDetails: - kind: ClassVar[str] = "aws_guard_duty_local_ip_details" - mapping: ClassVar[Dict[str, Bender]] = {"ip_address_v4": S("IpAddressV4"), "ip_address_v6": S("IpAddressV6")} - ip_address_v4: Optional[str] = field(default=None, metadata={"description": "The IPv4 local address of the connection."}) # fmt: skip - ip_address_v6: Optional[str] = field(default=None, metadata={"description": "The IPv6 local address of the connection."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRemotePortDetails: - kind: ClassVar[str] = "aws_guard_duty_remote_port_details" - mapping: ClassVar[Dict[str, Bender]] = {"port": S("Port"), "port_name": S("PortName")} - port: Optional[int] = field(default=None, metadata={"description": "The port number of the remote connection."}) # fmt: skip - port_name: Optional[str] = field(default=None, metadata={"description": "The port name of the remote connection."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyNetworkConnectionAction: - kind: ClassVar[str] = "aws_guard_duty_network_connection_action" - mapping: ClassVar[Dict[str, Bender]] = { - "blocked": S("Blocked"), - "connection_direction": S("ConnectionDirection"), - "local_port_details": S("LocalPortDetails") >> Bend(AwsGuardDutyLocalPortDetails.mapping), - "protocol": S("Protocol"), - "local_ip_details": S("LocalIpDetails") >> Bend(AwsGuardDutyLocalIpDetails.mapping), - "remote_ip_details": S("RemoteIpDetails") >> Bend(AwsGuardDutyRemoteIpDetails.mapping), - "remote_port_details": S("RemotePortDetails") >> Bend(AwsGuardDutyRemotePortDetails.mapping), - } - blocked: Optional[bool] = field(default=None, metadata={"description": "Indicates whether EC2 blocked the network connection to your instance."}) # fmt: skip - connection_direction: Optional[str] = field(default=None, metadata={"description": "The network connection direction."}) # fmt: skip - local_port_details: Optional[AwsGuardDutyLocalPortDetails] = field(default=None, metadata={"description": "The local port information of the connection."}) # fmt: skip - protocol: Optional[str] = field(default=None, metadata={"description": "The network connection protocol."}) # fmt: skip - local_ip_details: Optional[AwsGuardDutyLocalIpDetails] = field(default=None, metadata={"description": "The local IP information of the connection."}) # fmt: skip - remote_ip_details: Optional[AwsGuardDutyRemoteIpDetails] = field(default=None, metadata={"description": "The remote IP information of the connection."}) # fmt: skip - remote_port_details: Optional[AwsGuardDutyRemotePortDetails] = field(default=None, metadata={"description": "The remote port information of the connection."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyPortProbeDetail: - kind: ClassVar[str] = "aws_guard_duty_port_probe_detail" - mapping: ClassVar[Dict[str, Bender]] = { - "local_port_details": S("LocalPortDetails") >> Bend(AwsGuardDutyLocalPortDetails.mapping), - "local_ip_details": S("LocalIpDetails") >> Bend(AwsGuardDutyLocalIpDetails.mapping), - "remote_ip_details": S("RemoteIpDetails") >> Bend(AwsGuardDutyRemoteIpDetails.mapping), - } - local_port_details: Optional[AwsGuardDutyLocalPortDetails] = field(default=None, metadata={"description": "The local port information of the connection."}) # fmt: skip - local_ip_details: Optional[AwsGuardDutyLocalIpDetails] = field(default=None, metadata={"description": "The local IP information of the connection."}) # fmt: skip - remote_ip_details: Optional[AwsGuardDutyRemoteIpDetails] = field(default=None, metadata={"description": "The remote IP information of the connection."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyPortProbeAction: - kind: ClassVar[str] = "aws_guard_duty_port_probe_action" - mapping: ClassVar[Dict[str, Bender]] = { - "blocked": S("Blocked"), - "port_probe_details": S("PortProbeDetails", default=[]) >> ForallBend(AwsGuardDutyPortProbeDetail.mapping), - } - blocked: Optional[bool] = field(default=None, metadata={"description": "Indicates whether EC2 blocked the port probe to the instance, such as with an ACL."}) # fmt: skip - port_probe_details: Optional[List[AwsGuardDutyPortProbeDetail]] = field(factory=list, metadata={"description": "A list of objects related to port probe details."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyKubernetesApiCallAction: - kind: ClassVar[str] = "aws_guard_duty_kubernetes_api_call_action" - mapping: ClassVar[Dict[str, Bender]] = { - "request_uri": S("RequestUri"), - "verb": S("Verb"), - "source_ips": S("SourceIps", default=[]), - "user_agent": S("UserAgent"), - "remote_ip_details": S("RemoteIpDetails") >> Bend(AwsGuardDutyRemoteIpDetails.mapping), - "status_code": S("StatusCode"), - "parameters": S("Parameters"), - "resource": S("Resource"), - "subresource": S("Subresource"), - "namespace": S("Namespace"), - "resource_name": S("ResourceName"), - } - request_uri: Optional[str] = field(default=None, metadata={"description": "The Kubernetes API request URI."}) # fmt: skip - verb: Optional[str] = field(default=None, metadata={"description": "The Kubernetes API request HTTP verb."}) # fmt: skip - source_ips: Optional[List[str]] = field(factory=list, metadata={"description": "The IP of the Kubernetes API caller and the IPs of any proxies or load balancers between the caller and the API endpoint."}) # fmt: skip - user_agent: Optional[str] = field(default=None, metadata={"description": "The user agent of the caller of the Kubernetes API."}) # fmt: skip - remote_ip_details: Optional[AwsGuardDutyRemoteIpDetails] = field(default=None, metadata={"description": "Contains information about the remote IP address of the connection."}) # fmt: skip - status_code: Optional[int] = field(default=None, metadata={"description": "The resulting HTTP response code of the Kubernetes API call action."}) # fmt: skip - parameters: Optional[str] = field(default=None, metadata={"description": "Parameters related to the Kubernetes API call action."}) # fmt: skip - resource: Optional[str] = field(default=None, metadata={"description": "The resource component in the Kubernetes API call action."}) # fmt: skip - subresource: Optional[str] = field(default=None, metadata={"description": "The name of the sub-resource in the Kubernetes API call action."}) # fmt: skip - namespace: Optional[str] = field(default=None, metadata={"description": "The name of the namespace where the Kubernetes API call action takes place."}) # fmt: skip - resource_name: Optional[str] = field(default=None, metadata={"description": "The name of the resource in the Kubernetes API call action."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyLoginAttribute: - kind: ClassVar[str] = "aws_guard_duty_login_attribute" - mapping: ClassVar[Dict[str, Bender]] = { - "user": S("User"), - "application": S("Application"), - "failed_login_attempts": S("FailedLoginAttempts"), - "successful_login_attempts": S("SuccessfulLoginAttempts"), - } - user: Optional[str] = field(default=None, metadata={"description": "Indicates the user name which attempted to log in."}) # fmt: skip - application: Optional[str] = field(default=None, metadata={"description": "Indicates the application name used to attempt log in."}) # fmt: skip - failed_login_attempts: Optional[int] = field(default=None, metadata={"description": "Represents the sum of failed (unsuccessful) login attempts made to establish a connection to the database instance."}) # fmt: skip - successful_login_attempts: Optional[int] = field(default=None, metadata={"description": "Represents the sum of successful connections (a correct combination of login attributes) made to the database instance by the actor."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRdsLoginAttemptAction: - kind: ClassVar[str] = "aws_guard_duty_rds_login_attempt_action" - mapping: ClassVar[Dict[str, Bender]] = { - "remote_ip_details": S("RemoteIpDetails") >> Bend(AwsGuardDutyRemoteIpDetails.mapping), - "login_attributes": S("LoginAttributes", default=[]) >> ForallBend(AwsGuardDutyLoginAttribute.mapping), - } - remote_ip_details: Optional[AwsGuardDutyRemoteIpDetails] = field(default=None, metadata={"description": "Contains information about the remote IP address of the connection."}) # fmt: skip - login_attributes: Optional[List[AwsGuardDutyLoginAttribute]] = field(factory=list, metadata={"description": "Indicates the login attributes used in the login attempt."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyKubernetesPermissionCheckedDetails: - kind: ClassVar[str] = "aws_guard_duty_kubernetes_permission_checked_details" - mapping: ClassVar[Dict[str, Bender]] = { - "verb": S("Verb"), - "resource": S("Resource"), - "namespace": S("Namespace"), - "allowed": S("Allowed"), - } - verb: Optional[str] = field(default=None, metadata={"description": "The verb component of the Kubernetes API call. For example, when you check whether or not you have the permission to call the CreatePod API, the verb component will be Create."}) # fmt: skip - resource: Optional[str] = field(default=None, metadata={"description": "The Kubernetes resource with which your Kubernetes API call will interact."}) # fmt: skip - namespace: Optional[str] = field(default=None, metadata={"description": "The namespace where the Kubernetes API action will take place."}) # fmt: skip - allowed: Optional[bool] = field(default=None, metadata={"description": "Information whether the user has the permission to call the Kubernetes API."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyKubernetesRoleBindingDetails: - kind: ClassVar[str] = "aws_guard_duty_kubernetes_role_binding_details" - mapping: ClassVar[Dict[str, Bender]] = { - "role_kind": S("Kind"), - "name": S("Name"), - "uid": S("Uid"), - "role_ref_name": S("RoleRefName"), - "role_ref_kind": S("RoleRefKind"), - } - role_kind: Optional[str] = field(default=None, metadata={"description": "The kind of the role. For role binding, this value will be RoleBinding."}) # fmt: skip - name: Optional[str] = field(default=None, metadata={"description": "The name of the RoleBinding."}) # fmt: skip - uid: Optional[str] = field(default=None, metadata={"description": "The unique identifier of the role binding."}) # fmt: skip - role_ref_name: Optional[str] = field(default=None, metadata={"description": "The name of the role being referenced. This must match the name of the Role or ClusterRole that you want to bind to."}) # fmt: skip - role_ref_kind: Optional[str] = field(default=None, metadata={"description": "The type of the role being referenced. This could be either Role or ClusterRole."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyKubernetesRoleDetails: - kind: ClassVar[str] = "aws_guard_duty_kubernetes_role_details" - mapping: ClassVar[Dict[str, Bender]] = {"role_kind": S("Kind"), "name": S("Name"), "uid": S("Uid")} - role_kind: Optional[str] = field(default=None, metadata={"description": "The kind of role. For this API, the value of kind will be Role."}) # fmt: skip - name: Optional[str] = field(default=None, metadata={"description": "The name of the Kubernetes role."}) # fmt: skip - uid: Optional[str] = field(default=None, metadata={"description": "The unique identifier of the Kubernetes role name."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyAction: - kind: ClassVar[str] = "aws_guard_duty_action" - mapping: ClassVar[Dict[str, Bender]] = { - "action_type": S("ActionType"), - "aws_api_call_action": S("AwsApiCallAction") >> Bend(AwsGuardDutyAwsApiCallAction.mapping), - "dns_request_action": S("DnsRequestAction") >> Bend(AwsGuardDutyDnsRequestAction.mapping), - "network_connection_action": S("NetworkConnectionAction") >> Bend(AwsGuardDutyNetworkConnectionAction.mapping), - "port_probe_action": S("PortProbeAction") >> Bend(AwsGuardDutyPortProbeAction.mapping), - "kubernetes_api_call_action": S("KubernetesApiCallAction") >> Bend(AwsGuardDutyKubernetesApiCallAction.mapping), - "rds_login_attempt_action": S("RdsLoginAttemptAction") >> Bend(AwsGuardDutyRdsLoginAttemptAction.mapping), - "kubernetes_permission_checked_details": S("KubernetesPermissionCheckedDetails") - >> Bend(AwsGuardDutyKubernetesPermissionCheckedDetails.mapping), - "kubernetes_role_binding_details": S("KubernetesRoleBindingDetails") - >> Bend(AwsGuardDutyKubernetesRoleBindingDetails.mapping), - "kubernetes_role_details": S("KubernetesRoleDetails") >> Bend(AwsGuardDutyKubernetesRoleDetails.mapping), - } - action_type: Optional[str] = field(default=None, metadata={"description": "The GuardDuty finding activity type."}) # fmt: skip - aws_api_call_action: Optional[AwsGuardDutyAwsApiCallAction] = field(default=None, metadata={"description": "Information about the AWS_API_CALL action described in this finding."}) # fmt: skip - dns_request_action: Optional[AwsGuardDutyDnsRequestAction] = field(default=None, metadata={"description": "Information about the DNS_REQUEST action described in this finding."}) # fmt: skip - network_connection_action: Optional[AwsGuardDutyNetworkConnectionAction] = field(default=None, metadata={"description": "Information about the NETWORK_CONNECTION action described in this finding."}) # fmt: skip - port_probe_action: Optional[AwsGuardDutyPortProbeAction] = field(default=None, metadata={"description": "Information about the PORT_PROBE action described in this finding."}) # fmt: skip - kubernetes_api_call_action: Optional[AwsGuardDutyKubernetesApiCallAction] = field(default=None, metadata={"description": "Information about the Kubernetes API call action described in this finding."}) # fmt: skip - rds_login_attempt_action: Optional[AwsGuardDutyRdsLoginAttemptAction] = field(default=None, metadata={"description": "Information about RDS_LOGIN_ATTEMPT action described in this finding."}) # fmt: skip - kubernetes_permission_checked_details: Optional[AwsGuardDutyKubernetesPermissionCheckedDetails] = field(default=None, metadata={"description": "Information whether the user has the permission to use a specific Kubernetes API."}) # fmt: skip - kubernetes_role_binding_details: Optional[AwsGuardDutyKubernetesRoleBindingDetails] = field(default=None, metadata={"description": "Information about the role binding that grants the permission defined in a Kubernetes role."}) # fmt: skip - kubernetes_role_details: Optional[AwsGuardDutyKubernetesRoleDetails] = field(default=None, metadata={"description": "Information about the Kubernetes role name and role type."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyThreatIntelligenceDetail: - kind: ClassVar[str] = "aws_guard_duty_threat_intelligence_detail" - mapping: ClassVar[Dict[str, Bender]] = { - "threat_list_name": S("ThreatListName"), - "threat_names": S("ThreatNames", default=[]), - "threat_file_sha256": S("ThreatFileSha256"), - } - threat_list_name: Optional[str] = field(default=None, metadata={"description": "The name of the threat intelligence list that triggered the finding."}) # fmt: skip - threat_names: Optional[List[str]] = field(factory=list, metadata={"description": "A list of names of the threats in the threat intelligence list that triggered the finding."}) # fmt: skip - threat_file_sha256: Optional[str] = field(default=None, metadata={"description": "SHA256 of the file that generated the finding."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyEvidence: - kind: ClassVar[str] = "aws_guard_duty_evidence" - mapping: ClassVar[Dict[str, Bender]] = { - "threat_intelligence_details": S("ThreatIntelligenceDetails", default=[]) - >> ForallBend(AwsGuardDutyThreatIntelligenceDetail.mapping) - } - threat_intelligence_details: Optional[List[AwsGuardDutyThreatIntelligenceDetail]] = field(factory=list, metadata={"description": "A list of threat intelligence details related to the evidence."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyServiceAdditionalInfo: - kind: ClassVar[str] = "aws_guard_duty_service_additional_info" - mapping: ClassVar[Dict[str, Bender]] = {"value": S("Value"), "type": S("Type")} - value: Optional[str] = field(default=None, metadata={"description": "This field specifies the value of the additional information."}) # fmt: skip - type: Optional[str] = field(default=None, metadata={"description": "Describes the type of the additional information."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyScannedItemCount: - kind: ClassVar[str] = "aws_guard_duty_scanned_item_count" - mapping: ClassVar[Dict[str, Bender]] = {"total_gb": S("TotalGb"), "files": S("Files"), "volumes": S("Volumes")} - total_gb: Optional[int] = field(default=None, metadata={"description": "Total GB of files scanned for malware."}) # fmt: skip - files: Optional[int] = field(default=None, metadata={"description": "Number of files scanned."}) # fmt: skip - volumes: Optional[int] = field(default=None, metadata={"description": "Total number of scanned volumes."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyHighestSeverityThreatDetails: - kind: ClassVar[str] = "aws_guard_duty_highest_severity_threat_details" - mapping: ClassVar[Dict[str, Bender]] = { - "severity": S("Severity"), - "threat_name": S("ThreatName"), - "count": S("Count"), - } - severity: Optional[str] = field(default=None, metadata={"description": "Severity level of the highest severity threat detected."}) # fmt: skip - threat_name: Optional[str] = field(default=None, metadata={"description": "Threat name of the highest severity threat detected as part of the malware scan."}) # fmt: skip - count: Optional[int] = field(default=None, metadata={"description": "Total number of infected files with the highest severity threat detected."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyScanFilePath: - kind: ClassVar[str] = "aws_guard_duty_scan_file_path" - mapping: ClassVar[Dict[str, Bender]] = { - "file_path": S("FilePath"), - "volume_arn": S("VolumeArn"), - "hash": S("Hash"), - "file_name": S("FileName"), - } - file_path: Optional[str] = field(default=None, metadata={"description": "The file path of the infected file."}) # fmt: skip - volume_arn: Optional[str] = field(default=None, metadata={"description": "EBS volume ARN details of the infected file."}) # fmt: skip - hash: Optional[str] = field(default=None, metadata={"description": "The hash value of the infected file."}) # fmt: skip - file_name: Optional[str] = field(default=None, metadata={"description": "File name of the infected file."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyScanThreatName: - kind: ClassVar[str] = "aws_guard_duty_scan_threat_name" - mapping: ClassVar[Dict[str, Bender]] = { - "name": S("Name"), - "severity": S("Severity"), - "item_count": S("ItemCount"), - "file_paths": S("FilePaths", default=[]) >> ForallBend(AwsGuardDutyScanFilePath.mapping), - } - name: Optional[str] = field(default=None, metadata={"description": "The name of the identified threat."}) # fmt: skip - severity: Optional[str] = field(default=None, metadata={"description": "Severity of threat identified as part of the malware scan."}) # fmt: skip - item_count: Optional[int] = field(default=None, metadata={"description": "Total number of files infected with given threat."}) # fmt: skip - file_paths: Optional[List[AwsGuardDutyScanFilePath]] = field(factory=list, metadata={"description": "List of infected files in EBS volume with details."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyThreatDetectedByName: - kind: ClassVar[str] = "aws_guard_duty_threat_detected_by_name" - mapping: ClassVar[Dict[str, Bender]] = { - "item_count": S("ItemCount"), - "unique_threat_name_count": S("UniqueThreatNameCount"), - "shortened": S("Shortened"), - "threat_names": S("ThreatNames", default=[]) >> ForallBend(AwsGuardDutyScanThreatName.mapping), - } - item_count: Optional[int] = field(default=None, metadata={"description": "Total number of infected files identified."}) # fmt: skip - unique_threat_name_count: Optional[int] = field(default=None, metadata={"description": "Total number of unique threats by name identified, as part of the malware scan."}) # fmt: skip - shortened: Optional[bool] = field(default=None, metadata={"description": "Flag to determine if the finding contains every single infected file-path and/or every threat."}) # fmt: skip - threat_names: Optional[List[AwsGuardDutyScanThreatName]] = field(factory=list, metadata={"description": "List of identified threats with details, organized by threat name."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyScanDetections: - kind: ClassVar[str] = "aws_guard_duty_scan_detections" - mapping: ClassVar[Dict[str, Bender]] = { - "scanned_item_count": S("ScannedItemCount") >> Bend(AwsGuardDutyScannedItemCount.mapping), - "threats_detected_item_count": S("ThreatsDetectedItemCount", "Files"), - "highest_severity_threat_details": S("HighestSeverityThreatDetails") - >> Bend(AwsGuardDutyHighestSeverityThreatDetails.mapping), - "threat_detected_by_name": S("ThreatDetectedByName") >> Bend(AwsGuardDutyThreatDetectedByName.mapping), - } - scanned_item_count: Optional[AwsGuardDutyScannedItemCount] = field(default=None, metadata={"description": "Total number of scanned files."}) # fmt: skip - threats_detected_item_count: Optional[int] = field(default=None, metadata={"description": "Total number of infected files."}) # fmt: skip - highest_severity_threat_details: Optional[AwsGuardDutyHighestSeverityThreatDetails] = field(default=None, metadata={"description": "Details of the highest severity threat detected during malware scan and number of infected files."}) # fmt: skip - threat_detected_by_name: Optional[AwsGuardDutyThreatDetectedByName] = field(default=None, metadata={"description": "Contains details about identified threats organized by threat name."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyEbsVolumeScanDetails: - kind: ClassVar[str] = "aws_guard_duty_ebs_volume_scan_details" - mapping: ClassVar[Dict[str, Bender]] = { - "scan_id": S("ScanId"), - "scan_started_at": S("ScanStartedAt"), - "scan_completed_at": S("ScanCompletedAt"), - "trigger_finding_id": S("TriggerFindingId"), - "sources": S("Sources", default=[]), - "scan_detections": S("ScanDetections") >> Bend(AwsGuardDutyScanDetections.mapping), - "scan_type": S("ScanType"), - } - scan_id: Optional[str] = field(default=None, metadata={"description": "Unique Id of the malware scan that generated the finding."}) # fmt: skip - scan_started_at: Optional[datetime] = field(default=None, metadata={"description": "Returns the start date and time of the malware scan."}) # fmt: skip - scan_completed_at: Optional[datetime] = field(default=None, metadata={"description": "Returns the completion date and time of the malware scan."}) # fmt: skip - trigger_finding_id: Optional[str] = field(default=None, metadata={"description": "GuardDuty finding ID that triggered a malware scan."}) # fmt: skip - sources: Optional[List[str]] = field(factory=list, metadata={"description": "Contains list of threat intelligence sources used to detect threats."}) # fmt: skip - scan_detections: Optional[AwsGuardDutyScanDetections] = field(default=None, metadata={"description": "Contains a complete view providing malware scan result details."}) # fmt: skip - scan_type: Optional[str] = field(default=None, metadata={"description": "Specifies the scan type that invoked the malware scan."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyLineageObject: - kind: ClassVar[str] = "aws_guard_duty_lineage_object" - mapping: ClassVar[Dict[str, Bender]] = { - "start_time": S("StartTime"), - "namespace_pid": S("NamespacePid"), - "user_id": S("UserId"), - "name": S("Name"), - "pid": S("Pid"), - "uuid": S("Uuid"), - "executable_path": S("ExecutablePath"), - "euid": S("Euid"), - "parent_uuid": S("ParentUuid"), - } - start_time: Optional[datetime] = field(default=None, metadata={"description": "The time when the process started. This is in UTC format."}) # fmt: skip - namespace_pid: Optional[int] = field(default=None, metadata={"description": "The process ID of the child process."}) # fmt: skip - user_id: Optional[int] = field(default=None, metadata={"description": "The user ID of the user that executed the process."}) # fmt: skip - name: Optional[str] = field(default=None, metadata={"description": "The name of the process."}) # fmt: skip - pid: Optional[int] = field(default=None, metadata={"description": "The ID of the process."}) # fmt: skip - uuid: Optional[str] = field(default=None, metadata={"description": "The unique ID assigned to the process by GuardDuty."}) # fmt: skip - executable_path: Optional[str] = field(default=None, metadata={"description": "The absolute path of the process executable file."}) # fmt: skip - euid: Optional[int] = field(default=None, metadata={"description": "The effective user ID that was used to execute the process."}) # fmt: skip - parent_uuid: Optional[str] = field(default=None, metadata={"description": "The unique ID of the parent process. This ID is assigned to the parent process by GuardDuty."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyProcessDetails: - kind: ClassVar[str] = "aws_guard_duty_process_details" - mapping: ClassVar[Dict[str, Bender]] = { - "name": S("Name"), - "executable_path": S("ExecutablePath"), - "executable_sha256": S("ExecutableSha256"), - "namespace_pid": S("NamespacePid"), - "pwd": S("Pwd"), - "pid": S("Pid"), - "start_time": S("StartTime"), - "uuid": S("Uuid"), - "parent_uuid": S("ParentUuid"), - "user": S("User"), - "user_id": S("UserId"), - "euid": S("Euid"), - "lineage": S("Lineage", default=[]) >> ForallBend(AwsGuardDutyLineageObject.mapping), - } - name: Optional[str] = field(default=None, metadata={"description": "The name of the process."}) # fmt: skip - executable_path: Optional[str] = field(default=None, metadata={"description": "The absolute path of the process executable file."}) # fmt: skip - executable_sha256: Optional[str] = field(default=None, metadata={"description": "The SHA256 hash of the process executable."}) # fmt: skip - namespace_pid: Optional[int] = field(default=None, metadata={"description": "The ID of the child process."}) # fmt: skip - pwd: Optional[str] = field(default=None, metadata={"description": "The present working directory of the process."}) # fmt: skip - pid: Optional[int] = field(default=None, metadata={"description": "The ID of the process."}) # fmt: skip - start_time: Optional[datetime] = field(default=None, metadata={"description": "The time when the process started. This is in UTC format."}) # fmt: skip - uuid: Optional[str] = field(default=None, metadata={"description": "The unique ID assigned to the process by GuardDuty."}) # fmt: skip - parent_uuid: Optional[str] = field(default=None, metadata={"description": "The unique ID of the parent process. This ID is assigned to the parent process by GuardDuty."}) # fmt: skip - user: Optional[str] = field(default=None, metadata={"description": "The user that executed the process."}) # fmt: skip - user_id: Optional[int] = field(default=None, metadata={"description": "The unique ID of the user that executed the process."}) # fmt: skip - euid: Optional[int] = field(default=None, metadata={"description": "The effective user ID of the user that executed the process."}) # fmt: skip - lineage: Optional[List[AwsGuardDutyLineageObject]] = field(factory=list, metadata={"description": "Information about the process's lineage."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRuntimeContext: - kind: ClassVar[str] = "aws_guard_duty_runtime_context" - mapping: ClassVar[Dict[str, Bender]] = { - "modifying_process": S("ModifyingProcess") >> Bend(AwsGuardDutyProcessDetails.mapping), - "modified_at": S("ModifiedAt"), - "script_path": S("ScriptPath"), - "library_path": S("LibraryPath"), - "ld_preload_value": S("LdPreloadValue"), - "socket_path": S("SocketPath"), - "runc_binary_path": S("RuncBinaryPath"), - "release_agent_path": S("ReleaseAgentPath"), - "mount_source": S("MountSource"), - "mount_target": S("MountTarget"), - "file_system_type": S("FileSystemType"), - "flags": S("Flags", default=[]), - "module_name": S("ModuleName"), - "module_file_path": S("ModuleFilePath"), - "module_sha256": S("ModuleSha256"), - "shell_history_file_path": S("ShellHistoryFilePath"), - "target_process": S("TargetProcess") >> Bend(AwsGuardDutyProcessDetails.mapping), - "address_family": S("AddressFamily"), - "iana_protocol_number": S("IanaProtocolNumber"), - "memory_regions": S("MemoryRegions", default=[]), - "tool_name": S("ToolName"), - "tool_category": S("ToolCategory"), - "service_name": S("ServiceName"), - "command_line_example": S("CommandLineExample"), - "threat_file_path": S("ThreatFilePath"), - } - modifying_process: Optional[AwsGuardDutyProcessDetails] = field(default=None, metadata={"description": "Information about the process that modified the current process. This is available for multiple finding types."}) # fmt: skip - modified_at: Optional[datetime] = field(default=None, metadata={"description": "The timestamp at which the process modified the current process. The timestamp is in UTC date string format."}) # fmt: skip - script_path: Optional[str] = field(default=None, metadata={"description": "The path to the script that was executed."}) # fmt: skip - library_path: Optional[str] = field(default=None, metadata={"description": "The path to the new library that was loaded."}) # fmt: skip - ld_preload_value: Optional[str] = field(default=None, metadata={"description": "The value of the LD_PRELOAD environment variable."}) # fmt: skip - socket_path: Optional[str] = field(default=None, metadata={"description": "The path to the docket socket that was accessed."}) # fmt: skip - runc_binary_path: Optional[str] = field(default=None, metadata={"description": "The path to the leveraged runc implementation."}) # fmt: skip - release_agent_path: Optional[str] = field(default=None, metadata={"description": "The path in the container that modified the release agent file."}) # fmt: skip - mount_source: Optional[str] = field(default=None, metadata={"description": "The path on the host that is mounted by the container."}) # fmt: skip - mount_target: Optional[str] = field(default=None, metadata={"description": "The path in the container that is mapped to the host directory."}) # fmt: skip - file_system_type: Optional[str] = field(default=None, metadata={"description": "Represents the type of mounted fileSystem."}) # fmt: skip - flags: Optional[List[str]] = field(factory=list, metadata={"description": "Represents options that control the behavior of a runtime operation or action. For example, a filesystem mount operation may contain a read-only flag."}) # fmt: skip - module_name: Optional[str] = field(default=None, metadata={"description": "The name of the module loaded into the kernel."}) # fmt: skip - module_file_path: Optional[str] = field(default=None, metadata={"description": "The path to the module loaded into the kernel."}) # fmt: skip - module_sha256: Optional[str] = field(default=None, metadata={"description": "The SHA256 hash of the module."}) # fmt: skip - shell_history_file_path: Optional[str] = field(default=None, metadata={"description": "The path to the modified shell history file."}) # fmt: skip - target_process: Optional[AwsGuardDutyProcessDetails] = field(default=None, metadata={"description": "Information about the process that had its memory overwritten by the current process."}) # fmt: skip - address_family: Optional[str] = field(default=None, metadata={"description": "Represents the communication protocol associated with the address. For example, the address family AF_INET is used for IP version of 4 protocol."}) # fmt: skip - iana_protocol_number: Optional[int] = field(default=None, metadata={"description": "Specifies a particular protocol within the address family. Usually there is a single protocol in address families. For example, the address family AF_INET only has the IP protocol."}) # fmt: skip - memory_regions: Optional[List[str]] = field(factory=list, metadata={"description": "Specifies the Region of a process's address space such as stack and heap."}) # fmt: skip - tool_name: Optional[str] = field(default=None, metadata={"description": "Name of the potentially suspicious tool."}) # fmt: skip - tool_category: Optional[str] = field(default=None, metadata={"description": "Category that the tool belongs to. Some of the examples are Backdoor Tool, Pentest Tool, Network Scanner, and Network Sniffer."}) # fmt: skip - service_name: Optional[str] = field(default=None, metadata={"description": "Name of the security service that has been potentially disabled."}) # fmt: skip - command_line_example: Optional[str] = field(default=None, metadata={"description": "Example of the command line involved in the suspicious activity."}) # fmt: skip - threat_file_path: Optional[str] = field(default=None, metadata={"description": "The suspicious file path for which the threat intelligence details were found."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyRuntimeDetails: - kind: ClassVar[str] = "aws_guard_duty_runtime_details" - mapping: ClassVar[Dict[str, Bender]] = { - "process": S("Process") >> Bend(AwsGuardDutyProcessDetails.mapping), - "context": S("Context") >> Bend(AwsGuardDutyRuntimeContext.mapping), - } - process: Optional[AwsGuardDutyProcessDetails] = field(default=None, metadata={"description": "Information about the observed process."}) # fmt: skip - context: Optional[AwsGuardDutyRuntimeContext] = field(default=None, metadata={"description": "Additional information about the suspicious activity."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyAnomalyUnusual: - kind: ClassVar[str] = "aws_guard_duty_anomaly_unusual" - mapping: ClassVar[Dict[str, Bender]] = {"behavior": S("Behavior")} - behavior: Optional[Dict[str, Any]] = field(default=None, metadata={"description": "The behavior of the anomalous activity that caused GuardDuty to generate the finding."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyAnomaly: - kind: ClassVar[str] = "aws_guard_duty_anomaly" - mapping: ClassVar[Dict[str, Bender]] = { - "profiles": S("Profiles"), - "unusual": S("Unusual") >> Bend(AwsGuardDutyAnomalyUnusual.mapping), - } - profiles: Optional[Dict[str, Any]] = field(default=None, metadata={"description": "Information about the types of profiles."}) # fmt: skip - unusual: Optional[AwsGuardDutyAnomalyUnusual] = field(default=None, metadata={"description": "Information about the behavior of the anomalies."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyDetection: - kind: ClassVar[str] = "aws_guard_duty_detection" - mapping: ClassVar[Dict[str, Bender]] = {"anomaly": S("Anomaly") >> Bend(AwsGuardDutyAnomaly.mapping)} - anomaly: Optional[AwsGuardDutyAnomaly] = field(default=None, metadata={"description": "The details about the anomalous activity that caused GuardDuty to generate the finding."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyItemPath: - kind: ClassVar[str] = "aws_guard_duty_item_path" - mapping: ClassVar[Dict[str, Bender]] = {"nested_item_path": S("NestedItemPath"), "hash": S("Hash")} - nested_item_path: Optional[str] = field(default=None, metadata={"description": "The nested item path where the infected file was found."}) # fmt: skip - hash: Optional[str] = field(default=None, metadata={"description": "The hash value of the infected resource."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyThreat: - kind: ClassVar[str] = "aws_guard_duty_threat" - mapping: ClassVar[Dict[str, Bender]] = { - "name": S("Name"), - "source": S("Source"), - "item_paths": S("ItemPaths", default=[]) >> ForallBend(AwsGuardDutyItemPath.mapping), - } - name: Optional[str] = field(default=None, metadata={"description": "Name of the detected threat that caused GuardDuty to generate this finding."}) # fmt: skip - source: Optional[str] = field(default=None, metadata={"description": "Source of the threat that generated this finding."}) # fmt: skip - item_paths: Optional[List[AwsGuardDutyItemPath]] = field(factory=list, metadata={"description": "Information about the nested item path and hash of the protected resource."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyMalwareScanDetails: - kind: ClassVar[str] = "aws_guard_duty_malware_scan_details" - mapping: ClassVar[Dict[str, Bender]] = { - "threats": S("Threats", default=[]) >> ForallBend(AwsGuardDutyThreat.mapping) - } - threats: Optional[List[AwsGuardDutyThreat]] = field(factory=list, metadata={"description": "Information about the detected threats associated with the generated GuardDuty finding."}) # fmt: skip - - -@define(eq=False, slots=False) -class AwsGuardDutyService: - kind: ClassVar[str] = "aws_guard_duty_service" - mapping: ClassVar[Dict[str, Bender]] = { - "action": S("Action") >> Bend(AwsGuardDutyAction.mapping), - "evidence": S("Evidence") >> Bend(AwsGuardDutyEvidence.mapping), - "archived": S("Archived"), - "count": S("Count"), - "detector_id": S("DetectorId"), - "event_first_seen": S("EventFirstSeen"), - "event_last_seen": S("EventLastSeen"), - "resource_role": S("ResourceRole"), - "service_name": S("ServiceName"), - "user_feedback": S("UserFeedback"), - "additional_info": S("AdditionalInfo") >> Bend(AwsGuardDutyServiceAdditionalInfo.mapping), - "feature_name": S("FeatureName"), - "ebs_volume_scan_details": S("EbsVolumeScanDetails") >> Bend(AwsGuardDutyEbsVolumeScanDetails.mapping), - "runtime_details": S("RuntimeDetails") >> Bend(AwsGuardDutyRuntimeDetails.mapping), - "detection": S("Detection") >> Bend(AwsGuardDutyDetection.mapping), - "malware_scan_details": S("MalwareScanDetails") >> Bend(AwsGuardDutyMalwareScanDetails.mapping), - } - action: Optional[AwsGuardDutyAction] = field(default=None, metadata={"description": "Information about the activity that is described in a finding."}) # fmt: skip - evidence: Optional[AwsGuardDutyEvidence] = field(default=None, metadata={"description": "An evidence object associated with the service."}) # fmt: skip - archived: Optional[bool] = field(default=None, metadata={"description": "Indicates whether this finding is archived."}) # fmt: skip - count: Optional[int] = field(default=None, metadata={"description": "The total count of the occurrences of this finding type."}) # fmt: skip - detector_id: Optional[str] = field(default=None, metadata={"description": "The detector ID for the GuardDuty service."}) # fmt: skip - event_first_seen: Optional[str] = field(default=None, metadata={"description": "The first-seen timestamp of the activity that prompted GuardDuty to generate this finding."}) # fmt: skip - event_last_seen: Optional[str] = field(default=None, metadata={"description": "The last-seen timestamp of the activity that prompted GuardDuty to generate this finding."}) # fmt: skip - resource_role: Optional[str] = field(default=None, metadata={"description": "The resource role information for this finding."}) # fmt: skip - service_name: Optional[str] = field(default=None, metadata={"description": "The name of the Amazon Web Services service (GuardDuty) that generated a finding."}) # fmt: skip - user_feedback: Optional[str] = field(default=None, metadata={"description": "Feedback that was submitted about the finding."}) # fmt: skip - additional_info: Optional[AwsGuardDutyServiceAdditionalInfo] = field(default=None, metadata={"description": "Contains additional information about the generated finding."}) # fmt: skip - feature_name: Optional[str] = field(default=None, metadata={"description": "The name of the feature that generated a finding."}) # fmt: skip - ebs_volume_scan_details: Optional[AwsGuardDutyEbsVolumeScanDetails] = field(default=None, metadata={"description": "Returns details from the malware scan that created a finding."}) # fmt: skip - runtime_details: Optional[AwsGuardDutyRuntimeDetails] = field(default=None, metadata={"description": "Information about the process and any required context values for a specific finding"}) # fmt: skip - detection: Optional[AwsGuardDutyDetection] = field(default=None, metadata={"description": "Contains information about the detected unusual behavior."}) # fmt: skip - malware_scan_details: Optional[AwsGuardDutyMalwareScanDetails] = field(default=None, metadata={"description": "Returns details from the malware scan that generated a GuardDuty finding."}) # fmt: skip - - @define(eq=False, slots=False) class AwsGuardDutyFinding(AwsResource, PhantomBaseResource): kind: ClassVar[str] = "aws_guard_duty_finding" - _kind_display: ClassVar[str] = "AWS GuardDuty Finding" - _kind_description: ClassVar[str] = ( - "AWS GuardDuty Finding represents a potential security issue identified by Amazon GuardDuty. " - "GuardDuty uses machine learning, anomaly detection, and integrated threat intelligence to detect and " - "alert on suspicious activity in your AWS environment. Findings highlight possible attacks or vulnerabilities " - "that may require further investigation." - ) - _kind_service: ClassVar[Optional[str]] = service_name - _metadata: ClassVar[Dict[str, Any]] = {"icon": "log", "group": "management"} - _docs_url: ClassVar[str] = "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_findings.html" - _aws_metadata: ClassVar[Dict[str, Any]] = { - "provider_link_tpl": "https://{region_id}.console.aws.amazon.com/guardduty/home?region={region_id}#/findings?fId={id}¯os=current", - } + _model_export: ClassVar[bool] = False # do not export this class, since there will be no instances of it # api spec defined in `collect_resources` mapping: ClassVar[Dict[str, Bender]] = { "id": S("Id"), @@ -1239,10 +593,11 @@ class AwsGuardDutyFinding(AwsResource, PhantomBaseResource): "finding_region": S("Region"), "finding_resource": S("Resource") >> Bend(AwsGuardDutyResource.mapping), "schema_version": S("SchemaVersion"), - "finding_service": S("Service") >> Bend(AwsGuardDutyService.mapping), "finding_severity": S("Severity"), "title": S("Title"), "type": S("Type"), + # available but not used property: + # "finding_service": S("Service"), } account_id: Optional[str] = field(default=None, metadata={"description": "The ID of the account in which the finding was generated."}) # fmt: skip confidence: Optional[float] = field(default=None, metadata={"description": "The confidence score for the finding."}) # fmt: skip @@ -1251,7 +606,6 @@ class AwsGuardDutyFinding(AwsResource, PhantomBaseResource): finding_region: Optional[str] = field(default=None, metadata={"description": "The Region where the finding was generated."}) # fmt: skip finding_resource: Optional[AwsGuardDutyResource] = field(default=None, metadata={"description": "Contains information about the Amazon Web Services resource associated with the activity that prompted GuardDuty to generate a finding."}) # fmt: skip schema_version: Optional[str] = field(default=None, metadata={"description": "The version of the schema used for the finding."}) # fmt: skip - finding_service: Optional[AwsGuardDutyService] = field(default=None, metadata={"description": "Contains additional information about the generated finding."}) # fmt: skip finding_severity: Optional[float] = field(default=None, metadata={"description": "The severity of the finding."}) # fmt: skip title: Optional[str] = field(default=None, metadata={"description": "The title of the finding."}) # fmt: skip type: Optional[str] = field(default=None, metadata={"description": "The type of finding."}) # fmt: skip @@ -1260,30 +614,6 @@ class AwsGuardDutyFinding(AwsResource, PhantomBaseResource): def service_name(cls) -> str: return service_name - @staticmethod - def set_findings(builder: GraphBuilder, resource_to_set: AwsResource, to_check: str = "id") -> None: - """ - Set the assessment findings for the resource based on its ID or ARN. - """ - id_or_arn_or_name = "" - - if to_check == "arn": - if not resource_to_set.arn: - return - id_or_arn_or_name = resource_to_set.arn - elif to_check == "id": - id_or_arn_or_name = resource_to_set.id - elif to_check == "name": - id_or_arn_or_name = resource_to_set.safe_name - else: - return - provider_findings = builder._assessment_findings.get( - AssessmentKey("guard_duty", resource_to_set.region().id, resource_to_set.__class__.__name__), {} - ).get(id_or_arn_or_name, []) - if provider_findings: - # Set the findings in the resource's _assessments dictionary - resource_to_set._assessments.append(Assessment("guard_duty", provider_findings)) - def parse_finding(self, source: Json) -> Finding: def get_severity() -> Severity: if not self.finding_severity: @@ -1300,10 +630,7 @@ def get_severity() -> Severity: return Severity.critical finding_title = self.safe_name - if not self.finding_severity: - finding_severity = Severity.medium - else: - finding_severity = get_severity() + finding_severity = get_severity() description = self.description updated_at = self.mtime details = source.get("Service", {}) @@ -1314,54 +641,53 @@ def collect_resources(cls: Type[AwsResource], builder: GraphBuilder) -> None: def check_type_and_adjust_id( finding_resource: AwsGuardDutyResource, - ) -> List[Tuple[str, str]]: - # To avoid circular imports, defined here - from fix_plugin_aws.resource.ec2 import AwsEc2Instance, AwsEc2Volume - from fix_plugin_aws.resource.ecs import AwsEcsCluster - from fix_plugin_aws.resource.eks import AwsEksCluster - from fix_plugin_aws.resource.lambda_ import AwsLambdaFunction - from fix_plugin_aws.resource.rds import AwsRdsCluster, AwsRdsInstance - from fix_plugin_aws.resource.s3 import AwsS3Bucket - - finding_resources = [] + ) -> List[Tuple[Type[Any], Dict[str, Any]]]: + + finding_resources: List[Tuple[Type[AwsResource], Dict[str, Any]]] = [] if finding_resource.s3_bucket_details: for s3_bucket_detail in finding_resource.s3_bucket_details: if s3_bucket_detail.name: - finding_resources.append((AwsS3Bucket.__name__, s3_bucket_detail.name)) + finding_resources.append((AwsS3Bucket, {"name": s3_bucket_detail.name})) if finding_resource.instance_details and finding_resource.instance_details.instance_id: - finding_resources.append((AwsEc2Instance.__name__, finding_resource.instance_details.instance_id)) + finding_resources.append((AwsEc2Instance, {"id": finding_resource.instance_details.instance_id})) if finding_resource.eks_cluster_details and finding_resource.eks_cluster_details.arn: - finding_resources.append((AwsEksCluster.__name__, finding_resource.eks_cluster_details.arn)) + finding_resources.append((AwsEksCluster, {"arn": finding_resource.eks_cluster_details.arn})) if finding_resource.ebs_volume_details: for vol_detail in finding_resource.ebs_volume_details.scanned_volume_details or []: if vol_detail.volume_arn: - finding_resources.append((AwsEc2Volume.__name__, vol_detail.volume_arn)) + finding_resources.append((AwsEc2Volume, {"arn": vol_detail.volume_arn})) for vol_detail in finding_resource.ebs_volume_details.skipped_volume_details or []: if vol_detail.volume_arn: - finding_resources.append((AwsEc2Volume.__name__, vol_detail.volume_arn)) + finding_resources.append((AwsEc2Volume, {"arn": vol_detail.volume_arn})) if finding_resource.ecs_cluster_details and finding_resource.ecs_cluster_details.arn: - finding_resources.append((AwsEcsCluster.__name__, finding_resource.ecs_cluster_details.arn)) + finding_resources.append((AwsEcsCluster, {"arn": finding_resource.ecs_cluster_details.arn})) if finding_resource.rds_db_instance_details: if finding_resource.rds_db_instance_details.db_instance_identifier: finding_resources.append( - (AwsRdsInstance.__name__, finding_resource.rds_db_instance_details.db_instance_identifier) + (AwsRdsInstance, {"id": finding_resource.rds_db_instance_details.db_instance_identifier}) ) if finding_resource.rds_db_instance_details.db_cluster_identifier: finding_resources.append( - (AwsRdsCluster.__name__, finding_resource.rds_db_instance_details.db_cluster_identifier) + (AwsRdsCluster, {"id": finding_resource.rds_db_instance_details.db_cluster_identifier}) ) if finding_resource.lambda_details and finding_resource.lambda_details.function_name: - finding_resources.append((AwsLambdaFunction.__name__, finding_resource.lambda_details.function_name)) + finding_resources.append((AwsLambdaFunction, {"name": finding_resource.lambda_details.function_name})) return finding_resources + def add_finding( + provider: str, finding: Finding, clazz: Optional[Type[AwsResource]] = None, **node: Any + ) -> None: + if resource := builder.node(clazz=clazz, **node): + resource.add_finding(provider, finding) + try: detector_ids = builder.client.list(service_name, "list-detectors", "DetectorIds") finding_id_futures = { @@ -1401,14 +727,15 @@ def check_type_and_adjust_id( if instance := AwsGuardDutyFinding.from_api(finding, builder): if fr := instance.finding_resource: found_info = check_type_and_adjust_id(fr) - for class_name, id_or_arn_or_name in found_info: - adjusted_finding = instance.parse_finding(finding) - builder.add_finding( - "guard_duty", - class_name, - instance.finding_region or "global", - id_or_arn_or_name, - adjusted_finding, + for clazz, res_filter in found_info: + builder.after_collect_actions.append( + partial( + add_finding, + amazon_guardduty, + instance.parse_finding(finding), + clazz, + **res_filter, + ) ) except Boto3Error as e: msg = f"Error while collecting {cls.__name__} in region {builder.region.name}: {e}" diff --git a/plugins/aws/fix_plugin_aws/resource/lambda_.py b/plugins/aws/fix_plugin_aws/resource/lambda_.py index c435615942..599bef09ce 100644 --- a/plugins/aws/fix_plugin_aws/resource/lambda_.py +++ b/plugins/aws/fix_plugin_aws/resource/lambda_.py @@ -10,7 +10,6 @@ from fix_plugin_aws.resource.base import AwsResource, GraphBuilder, AwsApiSpec, parse_json from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, normalizer_factory from fix_plugin_aws.resource.ec2 import AwsEc2Subnet, AwsEc2SecurityGroup, AwsEc2Vpc -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.resource.kms import AwsKmsKey from fixlib.baseresources import ( BaseServerlessFunction, @@ -406,7 +405,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: clazz=AwsKmsKey, arn=self.function_kms_key_arn, ) - AwsGuardDutyFinding.set_findings(builder, self, "name") def update_resource_tag(self, client: AwsClient, key: str, value: str) -> bool: client.call( diff --git a/plugins/aws/fix_plugin_aws/resource/rds.py b/plugins/aws/fix_plugin_aws/resource/rds.py index f6d644a02b..c1427d463a 100644 --- a/plugins/aws/fix_plugin_aws/resource/rds.py +++ b/plugins/aws/fix_plugin_aws/resource/rds.py @@ -7,7 +7,6 @@ from fix_plugin_aws.resource.base import AwsApiSpec, AwsResource, GraphBuilder from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, AwsCloudwatchMetricData, normalizer_factory from fix_plugin_aws.resource.ec2 import AwsEc2SecurityGroup, AwsEc2Subnet, AwsEc2Vpc -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.resource.kinesis import AwsKinesisStream from fix_plugin_aws.resource.kms import AwsKmsKey from fix_plugin_aws.utils import ToDict, TagsValue @@ -599,8 +598,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: for key_reference in keys: builder.dependant_node(from_node=self, clazz=AwsKmsKey, id=AwsKmsKey.normalise_id(key_reference)) - AwsGuardDutyFinding.set_findings(builder, self, "id") - def delete_resource(self, client: AwsClient, graph: Graph) -> bool: client.call( aws_service=self.api_spec.service, @@ -1042,7 +1039,6 @@ def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: ) if kinesis := self.rds_activity_stream_kinesis_stream_name: builder.add_edge(self, clazz=AwsKinesisStream, name=kinesis) - AwsGuardDutyFinding.set_findings(builder, self, "id") def delete_resource(self, client: AwsClient, graph: Graph) -> bool: client.call( diff --git a/plugins/aws/fix_plugin_aws/resource/s3.py b/plugins/aws/fix_plugin_aws/resource/s3.py index db54ccec16..bb52c3b8f9 100644 --- a/plugins/aws/fix_plugin_aws/resource/s3.py +++ b/plugins/aws/fix_plugin_aws/resource/s3.py @@ -11,7 +11,6 @@ from fix_plugin_aws.aws_client import AwsClient from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder, parse_json from fix_plugin_aws.resource.cloudwatch import AwsCloudwatchQuery, normalizer_factory -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.utils import tags_as_dict from fixlib.baseresources import ( BaseBucket, @@ -180,7 +179,6 @@ class AwsS3Bucket(AwsResource, BaseBucket, HasResourcePolicy): _kind_description: ClassVar[str] = "AWS S3 Bucket is a cloud storage service provided by Amazon Web Services. It stores and retrieves data objects, such as files, documents, and images. S3 Buckets organize data into containers, offering features like access control, versioning, and lifecycle management. Users can interact with S3 Buckets through APIs, SDKs, or the AWS Management Console." # fmt: skip _docs_url: ClassVar[str] = "https://docs.aws.amazon.com/AmazonS3/latest/userguide/creating-bucket.html" _kind_service: ClassVar[Optional[str]] = service_name - _reference_kinds: ClassVar[ModelReference] = {} api_spec: ClassVar[AwsApiSpec] = AwsApiSpec( service_name, "list-buckets", "Buckets", override_iam_permission="s3:ListAllMyBuckets" ) @@ -216,9 +214,6 @@ def called_collect_apis(cls) -> List[AwsApiSpec]: AwsApiSpec(service_name, "get-bucket-lifecycle-configuration"), ] - def connect_in_graph(self, builder: GraphBuilder, source: Json) -> None: - AwsGuardDutyFinding.set_findings(builder, self, "name") - @classmethod def collect(cls: Type[AwsResource], json: List[Json], builder: GraphBuilder) -> None: def add_tags(bucket: AwsS3Bucket) -> None: diff --git a/plugins/aws/test/collector_test.py b/plugins/aws/test/collector_test.py index 6254df5249..9de124602e 100644 --- a/plugins/aws/test/collector_test.py +++ b/plugins/aws/test/collector_test.py @@ -11,7 +11,6 @@ called_collect_apis, called_mutator_apis, ) -from fix_plugin_aws.resource.guardduty import AwsGuardDutyFinding from fix_plugin_aws.resource.base import AwsResource, AwsApiSpec, GraphBuilder, AwsRegion from fix_plugin_aws.resource.ec2 import AwsEc2Instance from fixlib.baseresources import BaseResource