diff --git a/cmd/agent/dist/conf.d/orchestrator_ecs.d/conf.yaml.default b/cmd/agent/dist/conf.d/orchestrator_ecs.d/conf.yaml.default new file mode 100644 index 0000000000000..d1c78d4d5df81 --- /dev/null +++ b/cmd/agent/dist/conf.d/orchestrator_ecs.d/conf.yaml.default @@ -0,0 +1,4 @@ +ad_identifiers: + - _ecs_orchestrator +instances: + - {} diff --git a/comp/core/autodiscovery/listeners/environment.go b/comp/core/autodiscovery/listeners/environment.go index a5b99da07f0ae..faf400c7ddd43 100644 --- a/comp/core/autodiscovery/listeners/environment.go +++ b/comp/core/autodiscovery/listeners/environment.go @@ -56,6 +56,7 @@ func (l *EnvironmentListener) createServices() { "cri": config.Cri, "containerd": config.Containerd, "kube_orchestrator": config.KubeOrchestratorExplorer, + "ecs_orchestrator": config.ECSOrchestratorExplorer, } for name, feature := range features { diff --git a/go.mod b/go.mod index ed544ed130789..7abfebe7bc327 100644 --- a/go.mod +++ b/go.mod @@ -604,7 +604,7 @@ require github.com/lorenzosaino/go-sysctl v0.3.1 require ( github.com/DATA-DOG/go-sqlmock v1.5.0 - github.com/DataDog/agent-payload/v5 v5.0.109 + github.com/DataDog/agent-payload/v5 v5.0.110 github.com/DataDog/datadog-agent/cmd/agent/common/path v0.52.0-rc.3 github.com/DataDog/datadog-agent/comp/core/config v0.52.0-rc.3 github.com/DataDog/datadog-agent/comp/core/flare/types v0.52.0-rc.3 diff --git a/go.sum b/go.sum index d3c62047ac40f..7ace3677c397e 100644 --- a/go.sum +++ b/go.sum @@ -264,8 +264,8 @@ github.com/CycloneDX/cyclonedx-go v0.8.0 h1:FyWVj6x6hoJrui5uRQdYZcSievw3Z32Z88uY github.com/CycloneDX/cyclonedx-go v0.8.0/go.mod h1:K2bA+324+Og0X84fA8HhN2X066K7Bxz4rpMQ4ZhjtSk= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DataDog/agent-payload/v5 v5.0.109 h1:K9Js8JJDMmK9TrPYC58zTSRv62GHHxwqKjUtxBUszJk= -github.com/DataDog/agent-payload/v5 v5.0.109/go.mod h1:COngtbYYCncpIPiE5D93QlXDH/3VAKk10jDNwGHcMRE= +github.com/DataDog/agent-payload/v5 v5.0.110 h1:LbUAqqQl7ogQ2Z+wBJbd9Oqfc3qMcUBfVOzYIGV1Ork= +github.com/DataDog/agent-payload/v5 v5.0.110/go.mod h1:COngtbYYCncpIPiE5D93QlXDH/3VAKk10jDNwGHcMRE= github.com/DataDog/appsec-internal-go v1.4.2 h1:rLOp0mSzJ7L7Nn3jAdWbgvs+1HK25H0DN4XYVDJu72s= github.com/DataDog/appsec-internal-go v1.4.2/go.mod h1:pEp8gjfNLtEOmz+iZqC8bXhu0h4k7NUsW/qiQb34k1U= github.com/DataDog/aptly v1.5.3 h1:oLsRvjuXSVM4ia0N83dU3KiQeiJ6BaszYbTZOkSfDlw= diff --git a/pkg/collector/corechecks/cluster/orchestrator/collectors/collector.go b/pkg/collector/corechecks/cluster/orchestrator/collectors/collector.go index c16a63d7d64d1..0fb3d235c24c9 100644 --- a/pkg/collector/corechecks/cluster/orchestrator/collectors/collector.go +++ b/pkg/collector/corechecks/cluster/orchestrator/collectors/collector.go @@ -66,6 +66,9 @@ type K8sCollectorRunConfig struct { // ECSCollectorRunConfig is the configuration used to initialize or run the ECS collector. type ECSCollectorRunConfig struct { WorkloadmetaStore workloadmeta.Component + AWSAccountID int + Region string + ClusterName string } // CollectorRunConfig is the configuration used to initialize or run the diff --git a/pkg/collector/corechecks/cluster/orchestrator/collectors/ecs/task.go b/pkg/collector/corechecks/cluster/orchestrator/collectors/ecs/task.go new file mode 100644 index 0000000000000..ffedc0f244a49 --- /dev/null +++ b/pkg/collector/corechecks/cluster/orchestrator/collectors/ecs/task.go @@ -0,0 +1,107 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +// Package ecs defines a collector to collect ECS task +package ecs + +import ( + "fmt" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/ecs" + transformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs" + "github.com/DataDog/datadog-agent/pkg/orchestrator" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +// TaskCollector is a collector for ECS tasks. +type TaskCollector struct { + metadata *collectors.CollectorMetadata + processor *processors.Processor +} + +// NewTaskCollector creates a new collector for the ECS Task resource. +func NewTaskCollector() *TaskCollector { + return &TaskCollector{ + metadata: &collectors.CollectorMetadata{ + IsStable: false, + IsMetadataProducer: true, + IsManifestProducer: false, + Name: "ecstasks", + NodeType: orchestrator.ECSTask, + }, + processor: processors.NewProcessor(new(ecs.TaskHandlers)), + } +} + +// Metadata is used to access information about the collector. +func (t *TaskCollector) Metadata() *collectors.CollectorMetadata { + return t.metadata +} + +// Init is used to initialize the collector. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (t *TaskCollector) Init(rcfg *collectors.CollectorRunConfig) {} + +// Run triggers the collection process. +func (t *TaskCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.CollectorRunResult, error) { + list := rcfg.WorkloadmetaStore.ListECSTasks() + tasks := make([]transformers.TaskWithContainers, 0, len(list)) + for _, task := range list { + newTask := task + tasks = append(tasks, t.fetchContainers(rcfg, newTask)) + } + + ctx := &processors.ECSProcessorContext{ + BaseProcessorContext: processors.BaseProcessorContext{ + Cfg: rcfg.Config, + MsgGroupID: rcfg.MsgGroupRef.Inc(), + NodeType: t.metadata.NodeType, + ManifestProducer: t.metadata.IsManifestProducer, + ClusterID: rcfg.ClusterID, + }, + AWSAccountID: rcfg.AWSAccountID, + ClusterName: rcfg.ClusterName, + Region: rcfg.Region, + } + + processResult, processed := t.processor.Process(ctx, tasks) + + if processed == -1 { + return nil, fmt.Errorf("unable to process resources: a panic occurred") + } + + result := &collectors.CollectorRunResult{ + Result: processResult, + ResourcesListed: len(list), + ResourcesProcessed: processed, + } + + return result, nil +} + +// fetchContainers fetches the containers from workloadmeta store for a given task. +func (t *TaskCollector) fetchContainers(rcfg *collectors.CollectorRunConfig, task *workloadmeta.ECSTask) transformers.TaskWithContainers { + ecsTask := transformers.TaskWithContainers{ + Task: task, + Containers: make([]*workloadmeta.Container, 0, len(task.Containers)), + } + + for _, container := range task.Containers { + c, err := rcfg.WorkloadmetaStore.GetContainer(container.ID) + if err != nil { + log.Errorc(err.Error(), orchestrator.ExtraLogContext...) + continue + } + ecsTask.Containers = append(ecsTask.Containers, c) + } + + return ecsTask +} diff --git a/pkg/collector/corechecks/cluster/orchestrator/processors/ecs/task.go b/pkg/collector/corechecks/cluster/orchestrator/processors/ecs/task.go new file mode 100644 index 0000000000000..a28640d2bca62 --- /dev/null +++ b/pkg/collector/corechecks/cluster/orchestrator/processors/ecs/task.go @@ -0,0 +1,95 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +// Package ecs defines handlers for processing ECS tasks +package ecs + +import ( + "k8s.io/apimachinery/pkg/types" + + model "github.com/DataDog/agent-payload/v5/process" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/common" + transformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs" +) + +// TaskHandlers implements the Handlers interface for ECS Tasks. +type TaskHandlers struct { + common.BaseHandlers +} + +// BuildMessageBody is a handler called to build a message body out of a list of extracted resources. +func (t *TaskHandlers) BuildMessageBody(ctx processors.ProcessorContext, resourceModels []interface{}, groupSize int) model.MessageBody { + pctx := ctx.(*processors.ECSProcessorContext) + models := make([]*model.ECSTask, 0, len(resourceModels)) + + for _, m := range resourceModels { + models = append(models, m.(*model.ECSTask)) + } + + return &model.CollectorECSTask{ + AwsAccountID: int64(pctx.AWSAccountID), + ClusterName: pctx.ClusterName, + ClusterId: pctx.ClusterID, + Region: pctx.Region, + GroupId: pctx.MsgGroupID, + GroupSize: int32(groupSize), + Tasks: models, + } +} + +// ExtractResource is a handler called to extract the resource model out of a raw resource. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (t *TaskHandlers) ExtractResource(ctx processors.ProcessorContext, resource interface{}) (resourceModel interface{}) { + r := resource.(transformers.TaskWithContainers) + return transformers.ExtractECSTask(r) +} + +// ResourceList is a handler called to convert a list passed as a generic +// interface to a list of generic interfaces. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (t *TaskHandlers) ResourceList(ctx processors.ProcessorContext, list interface{}) (resources []interface{}) { + resourceList := list.([]transformers.TaskWithContainers) + + resources = make([]interface{}, 0, len(resourceList)) + + for _, resource := range resourceList { + resources = append(resources, resource) + } + + return resources +} + +// ResourceUID is a handler called to retrieve the resource UID. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (t *TaskHandlers) ResourceUID(ctx processors.ProcessorContext, resource interface{}) types.UID { + return types.UID(resource.(transformers.TaskWithContainers).Task.EntityID.ID) +} + +// ResourceVersion sets and returns custom resource version for an ECS task. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (t *TaskHandlers) ResourceVersion(ctx processors.ProcessorContext, resource, resourceModel interface{}) string { + return resourceModel.(*model.ECSTask).ResourceVersion +} + +// AfterMarshalling is a handler called after resource marshalling. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (h *TaskHandlers) AfterMarshalling(ctx processors.ProcessorContext, resource, resourceModel interface{}, yaml []byte) (skip bool) { + return +} + +// ScrubBeforeExtraction is a handler called to redact the raw resource before +// it is extracted as an internal resource model. +// +//nolint:revive // TODO(CAPP) Fix revive linter +func (h *TaskHandlers) ScrubBeforeExtraction(ctx processors.ProcessorContext, resource interface{}) { +} diff --git a/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task.go b/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task.go new file mode 100644 index 0000000000000..2f622d71a3e6d --- /dev/null +++ b/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task.go @@ -0,0 +1,239 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +// Package ecs provides methods for converting ECS resources to protobuf model. +package ecs + +import ( + "fmt" + "hash/fnv" + "sort" + "strconv" + "time" + + jsoniter "github.com/json-iterator/go" + + model "github.com/DataDog/agent-payload/v5/process" + "github.com/DataDog/datadog-agent/comp/core/tagger" + "github.com/DataDog/datadog-agent/comp/core/tagger/collectors" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/orchestrator" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +// TaskWithContainers represents an ECS task with its containers fetched from the workloadmeta store +type TaskWithContainers struct { + Task *workloadmeta.ECSTask + Containers []*workloadmeta.Container +} + +// ExtractECSTask returns the protobuf model corresponding to an ECS Task resource. +func ExtractECSTask(task TaskWithContainers) *model.ECSTask { + if task.Task == nil { + return nil + } + taskModel := &model.ECSTask{ + Arn: task.Task.EntityID.ID, + LaunchType: string(task.Task.LaunchType), + DesiredStatus: task.Task.DesiredStatus, + KnownStatus: task.Task.KnownStatus, + Family: task.Task.Family, + Version: task.Task.Version, + AvailabilityZone: task.Task.AvailabilityZone, + Limits: task.Task.Limits, + EphemeralStorageMetrics: task.Task.EphemeralStorageMetrics, + ServiceName: task.Task.ServiceName, + VpcId: task.Task.VPCID, + PullStartedAt: extractTimestampPtr(task.Task.PullStartedAt), + PullStoppedAt: extractTimestampPtr(task.Task.PullStoppedAt), + ExecutionStoppedAt: extractTimestampPtr(task.Task.ExecutionStoppedAt), + Containers: extractECSContainer(task.Containers), + } + + tags, err := tagger.Tag(fmt.Sprintf("ecs_task://%s", task.Task.EntityID.ID), collectors.HighCardinality) + if err != nil { + log.Debugf("Could not retrieve tags for task: %s", err.Error()) + } + + taskModel.Tags = tags + taskModel.EcsTags = toTags(task.Task.Tags) + taskModel.ContainerInstanceTags = toTags(task.Task.ContainerInstanceTags) + + // Enforce order consistency on slices + sort.Strings(taskModel.Tags) + sort.Strings(taskModel.EcsTags) + sort.Strings(taskModel.ContainerInstanceTags) + + taskModel.ResourceVersion = BuildTaskResourceVersion(taskModel) + + return taskModel +} + +func extractECSContainer(containers []*workloadmeta.Container) []*model.ECSContainer { + ecsContainers := make([]*model.ECSContainer, 0, len(containers)) + for _, container := range containers { + if container == nil { + continue + } + ecsContainer := &model.ECSContainer{ + DockerID: container.EntityID.ID, + DockerName: container.EntityMeta.Name, + Image: container.Image.RawName, + ImageID: container.Image.ID, + CreatedAt: extractTimestamp(container.State.CreatedAt), + StartedAt: extractTimestamp(container.State.StartedAt), + FinishedAt: extractTimestamp(container.State.FinishedAt), + Labels: toTags(container.EntityMeta.Labels), + Ports: extractECSContainerPort(container), + ExitCode: extractExitCode(container.State.ExitCode), + } + + if container.ECSContainer != nil { + ecsContainer.ContainerArn = container.ContainerARN + ecsContainer.Name = container.ECSContainer.DisplayName + ecsContainer.Networks = extractECSContainerNetworks(container.ECSContainer) + ecsContainer.Volumes = extractECSContainerVolume(container.ECSContainer) + ecsContainer.Health = extractECSContainerHealth(container.ECSContainer) + ecsContainer.DesiredStatus = container.ECSContainer.DesiredStatus + ecsContainer.KnownStatus = container.ECSContainer.KnownStatus + ecsContainer.Type = container.ECSContainer.Type + ecsContainer.LogDriver = container.ECSContainer.LogDriver + ecsContainer.LogOptions = container.ECSContainer.LogOptions + ecsContainer.ContainerArn = container.ECSContainer.ContainerARN + ecsContainer.Snapshotter = container.ECSContainer.Snapshotter + } + + if container.Resources.CPULimit != nil || container.Resources.MemoryLimit != nil { + ecsContainer.Limits = make(map[string]float64) + if container.Resources.CPULimit != nil { + ecsContainer.Limits["CPU"] = *container.Resources.CPULimit + } + if container.Resources.MemoryLimit != nil { + ecsContainer.Limits["Memory"] = float64(*container.Resources.MemoryLimit) + } + } + + sort.Strings(ecsContainer.Labels) + + ecsContainers = append(ecsContainers, ecsContainer) + } + return ecsContainers +} + +func extractTimestampPtr(t *time.Time) int64 { + if t == nil || t.IsZero() { + return 0 + } + return t.Unix() +} + +func extractTimestamp(t time.Time) int64 { + if t.IsZero() { + return 0 + } + return t.Unix() +} + +func extractExitCode(exitCode *uint32) *model.ECSContainerExitCode { + if exitCode == nil { + return nil + } + return &model.ECSContainerExitCode{ + ExitCode: int32(*exitCode), + } +} + +func extractECSContainerPort(container *workloadmeta.Container) []*model.ECSContainerPort { + if len(container.Ports) == 0 { + return nil + } + + ports := make([]*model.ECSContainerPort, 0, len(container.Ports)) + for _, port := range container.Ports { + ports = append(ports, &model.ECSContainerPort{ + ContainerPort: int32(port.Port), + HostPort: int32(port.HostPort), + Protocol: port.Protocol, + }) + } + return ports +} + +func extractECSContainerNetworks(container *workloadmeta.ECSContainer) []*model.ECSContainerNetwork { + if len(container.Networks) == 0 { + return nil + } + + networks := make([]*model.ECSContainerNetwork, 0, len(container.Networks)) + for _, network := range container.Networks { + networks = append(networks, &model.ECSContainerNetwork{ + NetworkMode: network.NetworkMode, + Ipv4Addresses: network.IPv4Addresses, + Ipv6Addresses: network.IPv6Addresses, + }) + } + return networks +} + +func extractECSContainerVolume(container *workloadmeta.ECSContainer) []*model.ECSContainerVolume { + if len(container.Volumes) == 0 { + return nil + } + + volumes := make([]*model.ECSContainerVolume, 0, len(container.Volumes)) + for _, volume := range container.Volumes { + volumes = append(volumes, &model.ECSContainerVolume{ + DockerName: volume.Name, + Source: volume.Source, + Destination: volume.Destination, + }) + } + return volumes +} + +func extractECSContainerHealth(container *workloadmeta.ECSContainer) *model.ECSContainerHealth { + if container.Health == nil { + return nil + } + + health := &model.ECSContainerHealth{ + Status: container.Health.Status, + Output: container.Health.Output, + ExitCode: extractExitCode(container.Health.ExitCode), + Since: extractTimestampPtr(container.Health.Since), + } + + return health +} + +func toTags(tags map[string]string) []string { + var result []string + for k, v := range tags { + result = append(result, fmt.Sprintf("%s:%s", k, v)) + } + return result +} + +// BuildTaskResourceVersion computes a resource version for an ECS task. +func BuildTaskResourceVersion(model interface{}) string { + modelJSON, err := json.Marshal(model) + if err != nil { + log.Warnc(fmt.Sprintf("Fail to compute ECS task resource version: %s", err.Error()), orchestrator.ExtraLogContext...) + return "" + } + + h := fnv.New64a() + _, err = h.Write(modelJSON) + if err != nil { + log.Warnc(fmt.Sprintf("Fail to compute ECS task resource version: %s", err.Error()), orchestrator.ExtraLogContext...) + return "" + } + + return strconv.FormatUint(h.Sum64(), 10) +} diff --git a/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task_test.go b/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task_test.go new file mode 100644 index 0000000000000..6c03f06b216e7 --- /dev/null +++ b/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs/task_test.go @@ -0,0 +1,200 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +package ecs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + model "github.com/DataDog/agent-payload/v5/process" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" +) + +func TestExtractECSTask(t *testing.T) { + now := time.Date(2024, 1, 1, 11, 1, 1, 1, time.UTC) + + actual := ExtractECSTask(TaskWithContainers{ + Task: &workloadmeta.ECSTask{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindECSTask, + ID: "arn:aws:ecs:us-east-1:123456789012:task/12345678-1234-1234-1234-123456789012", + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: "12345678-1234-1234-1234-123456789012", + }, + ClusterName: "ecs-cluster", + AWSAccountID: 123456789012, + Region: "us-east-1", + LaunchType: workloadmeta.ECSLaunchTypeEC2, + Family: "redis", + Version: "1", + DesiredStatus: "RUNNING", + KnownStatus: "RUNNING", + VPCID: "vpc-12345678", + ServiceName: "redis", + PullStartedAt: &now, + Limits: map[string]float64{"CPU": 1, "Memory": 2048}, + Containers: []workloadmeta.OrchestratorContainer{ + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + }, + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + }, + }, + Tags: workloadmeta.MapTags{ + "ecs.cluster": "ecs-cluster", + "region": "us-east-1", + }, + ContainerInstanceTags: workloadmeta.MapTags{ + "instance": "instance-1", + "region": "us-east-1", + }, + }, + Containers: []*workloadmeta.Container{ + { + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: "log_router", + Labels: map[string]string{ + "com.amazonaws.ecs.cluster": "ecs-cluster", + "com.amazonaws.ecs.container-name": "log_router", + }, + }, + ECSContainer: &workloadmeta.ECSContainer{ + DisplayName: "log_router_container", + Health: &workloadmeta.ContainerHealthStatus{ + Status: "HEALTHY", + Since: &now, + ExitCode: func(i uint32) *uint32 { + return &i + }(2), + }, + Type: "NORMAL", + }, + Image: workloadmeta.ContainerImage{ + RawName: "amazon/aws-for-fluent-bit:latest", + Name: "amazon/aws-for-fluent-bit", + }, + Ports: []workloadmeta.ContainerPort{ + { + Port: 80, + HostPort: 80, + }, + }, + Resources: workloadmeta.ContainerResources{ + CPULimit: func(f float64) *float64 { return &f }(1), + MemoryLimit: func(f uint64) *uint64 { return &f }(2048), + }, + }, + { + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: "redis", + }, + Image: workloadmeta.ContainerImage{ + RawName: "redis/redis:latest", + Name: "redis/redis", + }, + ECSContainer: &workloadmeta.ECSContainer{ + DisplayName: "redis", + Type: "NORMAL", + }, + Ports: []workloadmeta.ContainerPort{ + { + Port: 90, + HostPort: 90, + }, + { + Port: 81, + HostPort: 8080, + }, + }, + }, + }, + }) + + expected := &model.ECSTask{ + Arn: "arn:aws:ecs:us-east-1:123456789012:task/12345678-1234-1234-1234-123456789012", + ResourceVersion: "14848715317751284039", + LaunchType: "ec2", + DesiredStatus: "RUNNING", + KnownStatus: "RUNNING", + Family: "redis", + Version: "1", + VpcId: "vpc-12345678", + ServiceName: "redis", + PullStartedAt: now.Unix(), + Limits: map[string]float64{"CPU": 1, "Memory": 2048}, + EcsTags: []string{ + "ecs.cluster:ecs-cluster", + "region:us-east-1", + }, + ContainerInstanceTags: []string{ + "instance:instance-1", + "region:us-east-1", + }, + Containers: []*model.ECSContainer{ + { + DockerID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + DockerName: "log_router", + Name: "log_router_container", + Image: "amazon/aws-for-fluent-bit:latest", + Type: "NORMAL", + Ports: []*model.ECSContainerPort{ + { + ContainerPort: 80, + HostPort: 80, + }, + }, + Health: &model.ECSContainerHealth{ + Status: "HEALTHY", + Since: now.Unix(), + ExitCode: &model.ECSContainerExitCode{ + ExitCode: 2, + }, + }, + Labels: []string{ + "com.amazonaws.ecs.cluster:ecs-cluster", + "com.amazonaws.ecs.container-name:log_router", + }, + Limits: map[string]float64{ + "CPU": 1, + "Memory": 2048, + }, + }, + { + DockerID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + DockerName: "redis", + Name: "redis", + Image: "redis/redis:latest", + Type: "NORMAL", + Ports: []*model.ECSContainerPort{ + { + ContainerPort: 90, + HostPort: 90, + }, + { + ContainerPort: 81, + HostPort: 8080, + }, + }, + }, + }, + } + + require.Equal(t, expected, actual) +} diff --git a/pkg/collector/corechecks/orchestrator/ecs/ecs.go b/pkg/collector/corechecks/orchestrator/ecs/ecs.go new file mode 100644 index 0000000000000..475a861b92776 --- /dev/null +++ b/pkg/collector/corechecks/orchestrator/ecs/ecs.go @@ -0,0 +1,230 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +// Package ecs is used for the orchestrator ECS check +package ecs + +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "go.uber.org/atomic" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/aggregator/sender" + "github.com/DataDog/datadog-agent/pkg/collector/check" + core "github.com/DataDog/datadog-agent/pkg/collector/corechecks" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors/ecs" + "github.com/DataDog/datadog-agent/pkg/orchestrator" + oconfig "github.com/DataDog/datadog-agent/pkg/orchestrator/config" + "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/optional" +) + +// CheckName is the name of the check +const CheckName = "orchestrator_ecs" + +// Check doesn't need additional fields +type Check struct { + core.CheckBase + sender sender.Sender + config *oconfig.OrchestratorConfig + collectors []collectors.Collector + groupID *atomic.Int32 + workloadmetaStore workloadmeta.Component + isECSCollectionEnabledFunc func() bool + awsAccountID int + clusterName string + region string + clusterID string +} + +// Factory creates a new check factory +func Factory() optional.Option[func() check.Check] { + return optional.NewOption(newCheck) +} + +func newCheck() check.Check { + return &Check{ + CheckBase: core.NewCheckBase(CheckName), + workloadmetaStore: workloadmeta.GetGlobalStore(), + config: oconfig.NewDefaultOrchestratorConfig(), + groupID: atomic.NewInt32(rand.Int31()), + isECSCollectionEnabledFunc: oconfig.IsOrchestratorECSExplorerEnabled, + } +} + +// Configure the Orchestrator ECS check +// nil check to allow for overrides +func (c *Check) Configure( + senderManager sender.SenderManager, + integrationConfigDigest uint64, + data integration.Data, + initConfig integration.Data, + source string, +) error { + c.BuildID(integrationConfigDigest, data, initConfig) + + err := c.CommonConfigure(senderManager, integrationConfigDigest, initConfig, data, source) + if err != nil { + return err + } + + err = c.config.Load() + if err != nil { + return err + } + + if c.isECSCollectionEnabledFunc == nil { + c.isECSCollectionEnabledFunc = oconfig.IsOrchestratorECSExplorerEnabled + } + + if !c.isECSCollectionEnabledFunc() { + log.Debug("Orchestrator ECS Collection is disabled") + return nil + } + + if c.sender == nil { + sender, err := c.GetSender() + if err != nil { + return err + } + c.sender = sender + } + return nil +} + +// Run executes the check +func (c *Check) Run() error { + if !c.shouldRun() { + return nil + } + + c.initCollectors() + + for _, collector := range c.collectors { + if collector.Metadata().IsSkipped { + c.Warnf("collector %s is skipped: %s", collector.Metadata().Name, collector.Metadata().SkippedReason) + continue + } + + runStartTime := time.Now() + runConfig := &collectors.CollectorRunConfig{ + ECSCollectorRunConfig: collectors.ECSCollectorRunConfig{ + WorkloadmetaStore: c.workloadmetaStore, + AWSAccountID: c.awsAccountID, + Region: c.region, + ClusterName: c.clusterName, + }, + Config: c.config, + MsgGroupRef: c.groupID, + ClusterID: c.clusterID, + } + result, err := collector.Run(runConfig) + if err != nil { + _ = c.Warnf("K8sCollector %s failed to run: %s", collector.Metadata().FullName(), err.Error()) + continue + } + runDuration := time.Since(runStartTime) + log.Debugf("ECSCollector %s run stats: listed=%d processed=%d messages=%d duration=%s", collector.Metadata().FullName(), result.ResourcesListed, result.ResourcesProcessed, len(result.Result.MetadataMessages), runDuration) + + c.sender.OrchestratorMetadata(result.Result.MetadataMessages, runConfig.ClusterID, int(collector.Metadata().NodeType)) + } + return nil +} + +func (c *Check) shouldRun() bool { + if c.isECSCollectionEnabledFunc == nil || !c.isECSCollectionEnabledFunc() { + log.Debug("Orchestrator ECS Collection is disabled") + return false + } + + c.initConfig() + + if c.region == "" || c.awsAccountID == 0 || c.clusterName == "" || c.clusterID == "" { + log.Warnf("Orchestrator ECS check is missing required information, region: %s, awsAccountID: %d, clusterName: %s, clusterID: %s", c.region, c.awsAccountID, c.clusterName, c.clusterID) + return false + } + return true +} + +func (c *Check) initConfig() { + if c.awsAccountID != 0 && c.region != "" && c.clusterName != "" && c.clusterID != "" { + return + } + + tasks := c.workloadmetaStore.ListECSTasks() + if len(tasks) == 0 { + return + } + + c.awsAccountID = tasks[0].AWSAccountID + c.clusterName = tasks[0].ClusterName + c.region = tasks[0].Region + + if tasks[0].Region == "" || tasks[0].AWSAccountID == 0 { + c.region, c.awsAccountID = getRegionAndAWSAccountID(tasks[0].EntityID.ID) + } + + c.clusterID = initClusterID(c.awsAccountID, c.region, tasks[0].ClusterName) +} + +func (c *Check) initCollectors() { + c.collectors = []collectors.Collector{ecs.NewTaskCollector()} +} + +// initClusterID generates a cluster ID from the AWS account ID, region and cluster name. +func initClusterID(awsAccountID int, region, clusterName string) string { + cluster := fmt.Sprintf("%d/%s/%s", awsAccountID, region, clusterName) + + hash := md5.New() + hash.Write([]byte(cluster)) + hashString := hex.EncodeToString(hash.Sum(nil)) + uuid, err := uuid.FromBytes([]byte(hashString[0:16])) + if err != nil { + log.Errorc(err.Error(), orchestrator.ExtraLogContext...) + return "" + } + return uuid.String() +} + +// ParseRegionAndAWSAccountID parses the region and AWS account ID from an ARN. +// https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html#arns-syntax +func getRegionAndAWSAccountID(arn string) (string, int) { + arnParts := strings.Split(arn, ":") + if len(arnParts) < 5 { + return "", 0 + } + if arnParts[0] != "arn" || strings.Index(arnParts[1], "aws") != 0 { + return "", 0 + } + region := arnParts[3] + if strings.Count(region, "-") < 2 { + region = "" + } + + id := arnParts[4] + // aws account id is 12 digits + // https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-identifiers.html + if len(id) != 12 { + return region, 0 + } + awsAccountID, err := strconv.Atoi(id) + if err != nil { + return region, 0 + } + + return region, awsAccountID +} diff --git a/pkg/collector/corechecks/orchestrator/ecs/ecs_test.go b/pkg/collector/corechecks/orchestrator/ecs/ecs_test.go new file mode 100644 index 0000000000000..2e8ddf447d4f7 --- /dev/null +++ b/pkg/collector/corechecks/orchestrator/ecs/ecs_test.go @@ -0,0 +1,351 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build orchestrator + +package ecs + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/DataDog/agent-payload/v5/process" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/aggregator/mocksender" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs" + "github.com/DataDog/datadog-agent/pkg/orchestrator" + oconfig "github.com/DataDog/datadog-agent/pkg/orchestrator/config" + "github.com/DataDog/datadog-agent/pkg/serializer/types" +) + +func TestGetRegionAndAWSAccountID(t *testing.T) { + region, id := getRegionAndAWSAccountID("arn:aws:ecs:us-east-1:123427279990:container-instance/ecs-my-cluster/123412345abcdefgh34999999") + require.Equal(t, "us-east-1", region) + require.Equal(t, 123427279990, id) +} +func TestInitClusterID(t *testing.T) { + id1 := initClusterID(123456789012, "us-east-1", "ecs-cluster-1") + require.Equal(t, "34616234-6562-3536-3733-656534636532", id1) + + // same account, same region, different cluster name + id2 := initClusterID(123456789012, "us-east-1", "ecs-cluster-2") + require.Equal(t, "31643131-3131-3263-3331-383136383336", id2) + + // same account, different region, same cluster name + id3 := initClusterID(123456789012, "us-east-2", "ecs-cluster-1") + require.Equal(t, "64663464-6662-3232-3635-646166613230", id3) + + // different account, same region, same cluster name + id4 := initClusterID(123456789013, "us-east-1", "ecs-cluster-1") + require.Equal(t, "61623431-6137-6231-3136-366464643761", id4) +} + +type fakeWorkloadmetaStore struct { + workloadmeta.Component + EnableV4 bool + notifiedEvents []*workloadmeta.ECSTask +} + +func (store *fakeWorkloadmetaStore) AddECSTasks(task ...*workloadmeta.ECSTask) { + store.notifiedEvents = append(store.notifiedEvents, task...) +} + +func (store *fakeWorkloadmetaStore) ListECSTasks() (events []*workloadmeta.ECSTask) { + return store.notifiedEvents +} + +func (store *fakeWorkloadmetaStore) GetContainer(id string) (*workloadmeta.Container, error) { + if id == "938f6d263c464aa5985dc67ab7f38a7e-1714341083" { + return container1(store.EnableV4), nil + } + if id == "938f6d263c464aa5985dc67ab7f38a7e-1714341084" { + return container2(store.EnableV4), nil + } + return nil, fmt.Errorf("container not found") +} + +type fakeSender struct { + mocksender.MockSender + messages []process.MessageBody + clusterIDs []string + nodeTypes []int +} + +func (s *fakeSender) OrchestratorMetadata(msgs []types.ProcessMessageBody, clusterID string, nodeType int) { + s.messages = append(s.messages, msgs...) + s.clusterIDs = append(s.clusterIDs, clusterID) + s.nodeTypes = append(s.nodeTypes, nodeType) +} + +func (s *fakeSender) Flush() []process.MessageBody { + messages := s.messages + s.messages = s.messages[:0] + return messages +} + +func TestNotECS(t *testing.T) { + check, _, sender := prepareTest(false, "notECS") + err := check.Run() + require.NoError(t, err) + require.Len(t, sender.messages, 0) +} + +func TestECSV4Enabled(t *testing.T) { + testECS(true, t) +} + +// TestECSV4Disabled tests the ECS collector when the feature of using v4 endpoint is disabled in Workloadmeta +func TestECSV4Disabled(t *testing.T) { + testECS(false, t) +} + +func testECS(v4 bool, t *testing.T) { + check, store, sender := prepareTest(v4, "ecs") + + // add 2 tasks to fake workloadmetaStore + task1Id := "123" + task2Id := "124" + store.AddECSTasks(task(v4, task1Id)) + store.AddECSTasks(task(v4, task2Id)) + + err := check.Run() + require.NoError(t, err) + + // should receive one message + messages := sender.Flush() + require.Len(t, messages, 1) + + groupID := int32(1) + expectedTasks := expected(v4, groupID, task1Id, task2Id) + require.Equal(t, expectedTasks, messages[0]) + require.Equal(t, expectedTasks.ClusterId, sender.clusterIDs[0]) + require.Equal(t, orchestrator.ECSTask, sender.nodeTypes[0]) + + // add another task with different id to fake workloadmetaStore + task3Id := "125" + store.AddECSTasks(task(v4, task3Id)) + + err = check.Run() + require.NoError(t, err) + + messages = sender.Flush() + require.Len(t, messages, 1) + + groupID++ + require.Equal(t, expected(v4, groupID, task3Id), messages[0]) + require.Equal(t, sender.clusterIDs[0], sender.clusterIDs[1]) + require.Equal(t, sender.nodeTypes[0], sender.nodeTypes[1]) + + // 0 message should be received as tasks are skipped by cache + err = check.Run() + require.NoError(t, err) + messages = sender.Flush() + require.Len(t, messages, 0) +} + +// prepareTest returns a check, a fake workloadmeta store and a fake sender +func prepareTest(v4 bool, env string) (*Check, *fakeWorkloadmetaStore, *fakeSender) { + orchConfig := oconfig.NewDefaultOrchestratorConfig() + orchConfig.OrchestrationCollectionEnabled = true + orchConfig.OrchestrationECSCollectionEnabled = true + + store := &fakeWorkloadmetaStore{ + EnableV4: v4, + } + sender := &fakeSender{} + + c := &Check{ + sender: sender, + workloadmetaStore: store, + config: orchConfig, + groupID: atomic.NewInt32(0), + } + + c.isECSCollectionEnabledFunc = func() bool { return false } + if env == "ecs" { + c.isECSCollectionEnabledFunc = func() bool { return true } + } + + return c, store, sender +} + +func task(v4 bool, id string) *workloadmeta.ECSTask { + ecsTask := &workloadmeta.ECSTask{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindECSTask, + ID: fmt.Sprintf("arn:aws:ecs:us-east-1:123456789012:task/%s", id), + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: fmt.Sprintf("12345678-1234-1234-1234-123456789%s", id), + }, + ClusterName: "ecs-cluster", + LaunchType: workloadmeta.ECSLaunchTypeEC2, + Family: "redis", + Version: "1", + Containers: []workloadmeta.OrchestratorContainer{ + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + }, + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + }, + }, + } + + if v4 { + ecsTask.AWSAccountID = 123456789012 + ecsTask.Region = "us-east-1" + ecsTask.DesiredStatus = "RUNNING" + ecsTask.KnownStatus = "RUNNING" + ecsTask.VPCID = "vpc-12345678" + ecsTask.ServiceName = "redis" + ecsTask.Limits = map[string]float64{"CPU": 1, "Memory": 2048} + ecsTask.Tags = workloadmeta.MapTags{ + "ecs.cluster": "ecs-cluster", + "region": "us-east-1", + } + ecsTask.ContainerInstanceTags = workloadmeta.MapTags{ + "instance": "instance-1", + "region": "us-east-1", + } + } + return ecsTask +} + +func container1(v4 bool) *workloadmeta.Container { + container := &workloadmeta.Container{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: "log_router", + Labels: map[string]string{ + "com.amazonaws.ecs.cluster": "ecs-cluster", + "com.amazonaws.ecs.container-name": "log_router", + }, + }, + Image: workloadmeta.ContainerImage{ + RawName: "amazon/aws-for-fluent-bit:latest", + Name: "amazon/aws-for-fluent-bit", + }, + Ports: []workloadmeta.ContainerPort{ + { + Port: 80, + HostPort: 80, + }, + }, + } + if v4 { + container.ECSContainer = &workloadmeta.ECSContainer{ + DisplayName: "log_router_container", + Health: &workloadmeta.ContainerHealthStatus{ + Status: "HEALTHY", + ExitCode: func(i uint32) *uint32 { + return &i + }(2), + }, + Type: "NORMAL", + } + } + return container +} + +func container2(v4 bool) *workloadmeta.Container { + container := &workloadmeta.Container{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + }, + EntityMeta: workloadmeta.EntityMeta{ + Name: "redis", + }, + } + + if v4 { + container.ECSContainer = &workloadmeta.ECSContainer{ + DisplayName: "redis", + Type: "NORMAL", + } + } + return container +} + +func expected(v4 bool, groupID int32, ids ...string) *process.CollectorECSTask { + tasks := make([]*process.ECSTask, 0, len(ids)) + for _, id := range ids { + container1 := &process.ECSContainer{ + DockerID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + DockerName: "log_router", + Image: "amazon/aws-for-fluent-bit:latest", + Ports: []*process.ECSContainerPort{ + { + ContainerPort: 80, + HostPort: 80, + }, + }, + Labels: []string{ + "com.amazonaws.ecs.cluster:ecs-cluster", + "com.amazonaws.ecs.container-name:log_router", + }, + } + container2 := &process.ECSContainer{ + DockerID: "938f6d263c464aa5985dc67ab7f38a7e-1714341084", + DockerName: "redis", + } + + newTask := &process.ECSTask{ + Arn: fmt.Sprintf("arn:aws:ecs:us-east-1:123456789012:task/%s", id), + LaunchType: "ec2", + Family: "redis", + Version: "1", + Containers: []*process.ECSContainer{container1, container2}, + } + + if v4 { + newTask.DesiredStatus = "RUNNING" + newTask.KnownStatus = "RUNNING" + newTask.VpcId = "vpc-12345678" + newTask.ServiceName = "redis" + newTask.Limits = map[string]float64{"CPU": 1, "Memory": 2048} + newTask.EcsTags = []string{ + "ecs.cluster:ecs-cluster", + "region:us-east-1", + } + newTask.ContainerInstanceTags = []string{ + "instance:instance-1", + "region:us-east-1", + } + + container1.Name = "log_router_container" + container1.Type = "NORMAL" + container1.Health = &process.ECSContainerHealth{ + Status: "HEALTHY", + ExitCode: &process.ECSContainerExitCode{ + ExitCode: 2, + }, + } + + container2.Name = "redis" + container2.Type = "NORMAL" + } + + newTask.ResourceVersion = ecs.BuildTaskResourceVersion(newTask) + tasks = append(tasks, newTask) + } + + return &process.CollectorECSTask{ + AwsAccountID: 123456789012, + ClusterName: "ecs-cluster", + ClusterId: "63306530-3932-3664-3664-376566306132", + Region: "us-east-1", + GroupId: groupID, + GroupSize: 1, + Tasks: tasks, + } +} diff --git a/pkg/collector/corechecks/orchestrator/ecs/stub.go b/pkg/collector/corechecks/orchestrator/ecs/stub.go new file mode 100644 index 0000000000000..0cffb4bf01e72 --- /dev/null +++ b/pkg/collector/corechecks/orchestrator/ecs/stub.go @@ -0,0 +1,24 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build !orchestrator + +// Package ecs is used for the orchestrator ECS check +package ecs + +import ( + "github.com/DataDog/datadog-agent/pkg/collector/check" + "github.com/DataDog/datadog-agent/pkg/util/optional" +) + +const ( + // CheckName is the name of the check + CheckName = "orchestrator_ecs" +) + +// Factory creates a new check factory +func Factory() optional.Option[func() check.Check] { + return optional.NewNoneOption[func() check.Check]() +} diff --git a/pkg/commonchecks/corechecks.go b/pkg/commonchecks/corechecks.go index 1f86c586a5554..252aca8095a64 100644 --- a/pkg/commonchecks/corechecks.go +++ b/pkg/commonchecks/corechecks.go @@ -30,6 +30,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/collector/corechecks/networkpath" nvidia "github.com/DataDog/datadog-agent/pkg/collector/corechecks/nvidia/jetson" oracle "github.com/DataDog/datadog-agent/pkg/collector/corechecks/oracle-dbm" + "github.com/DataDog/datadog-agent/pkg/collector/corechecks/orchestrator/ecs" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/orchestrator/pod" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/sbom" "github.com/DataDog/datadog-agent/pkg/collector/corechecks/snmp" @@ -71,6 +72,7 @@ func RegisterChecks(store workloadmeta.Component) { corecheckLoader.RegisterCheck(helm.CheckName, helm.Factory()) corecheckLoader.RegisterCheck(pod.CheckName, pod.Factory()) corecheckLoader.RegisterCheck(ebpf.CheckName, ebpf.Factory()) + corecheckLoader.RegisterCheck(ecs.CheckName, ecs.Factory()) corecheckLoader.RegisterCheck(oomkill.CheckName, oomkill.Factory()) corecheckLoader.RegisterCheck(tcpqueuelength.CheckName, tcpqueuelength.Factory()) corecheckLoader.RegisterCheck(apm.CheckName, apm.Factory()) diff --git a/pkg/config/aliases.go b/pkg/config/aliases.go index 91c4e4d69bc56..6b0bb1950cb5a 100644 --- a/pkg/config/aliases.go +++ b/pkg/config/aliases.go @@ -76,6 +76,7 @@ const ( Cri = env.Cri Containerd = env.Containerd KubeOrchestratorExplorer = env.KubeOrchestratorExplorer + ECSOrchestratorExplorer = env.ECSOrchestratorExplorer ) var ( diff --git a/pkg/config/env/environment_container_features.go b/pkg/config/env/environment_container_features.go index 929e78b9864f3..cec6b8dd33062 100644 --- a/pkg/config/env/environment_container_features.go +++ b/pkg/config/env/environment_container_features.go @@ -22,7 +22,9 @@ const ( // EKSFargate environment EKSFargate Feature = "eksfargate" // KubeOrchestratorExplorer can be enabled - KubeOrchestratorExplorer Feature = "orchestratorexplorer" + KubeOrchestratorExplorer Feature = "kube_orchestratorexplorer" + // ECSOrchestratorExplorer can be enabled + ECSOrchestratorExplorer Feature = "ecs_orchestratorexplorer" // CloudFoundry socket present CloudFoundry Feature = "cloudfoundry" // Podman containers storage path accessible diff --git a/pkg/config/env/environment_containers.go b/pkg/config/env/environment_containers.go index 2886af2295b2f..f5f186ff152dd 100644 --- a/pkg/config/env/environment_containers.go +++ b/pkg/config/env/environment_containers.go @@ -42,6 +42,7 @@ func init() { registerFeature(ECSFargate) registerFeature(EKSFargate) registerFeature(KubeOrchestratorExplorer) + registerFeature(ECSOrchestratorExplorer) registerFeature(CloudFoundry) registerFeature(Podman) } @@ -166,6 +167,10 @@ func isCriSupported() bool { func detectAWSEnvironments(features FeatureMap, cfg model.Reader) { if IsECSFargate() { features[ECSFargate] = struct{}{} + if cfg.GetBool("orchestrator_explorer.enabled") && + cfg.GetBool("orchestrator_explorer.ecs_collection.enabled") { + features[ECSOrchestratorExplorer] = struct{}{} + } return } @@ -177,6 +182,10 @@ func detectAWSEnvironments(features FeatureMap, cfg model.Reader) { if IsECS() { features[ECSEC2] = struct{}{} + if cfg.GetBool("orchestrator_explorer.enabled") && + cfg.GetBool("orchestrator_explorer.ecs_collection.enabled") { + features[ECSOrchestratorExplorer] = struct{}{} + } } } diff --git a/pkg/config/env/environment_detection_test.go b/pkg/config/env/environment_detection_test.go index b208b857bfa1b..22f074b6576f3 100644 --- a/pkg/config/env/environment_detection_test.go +++ b/pkg/config/env/environment_detection_test.go @@ -20,6 +20,7 @@ func TestExcludeFeatures(t *testing.T) { Cri: struct{}{}, Docker: struct{}{}, KubeOrchestratorExplorer: struct{}{}, + ECSOrchestratorExplorer: struct{}{}, Kubernetes: struct{}{}, ECSFargate: struct{}{}, EKSFargate: struct{}{}, @@ -37,6 +38,7 @@ func TestExcludeFeatures(t *testing.T) { CloudFoundry: struct{}{}, Containerd: struct{}{}, KubeOrchestratorExplorer: struct{}{}, + ECSOrchestratorExplorer: struct{}{}, Kubernetes: struct{}{}, }) } diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 1b4596605a4dc..08b178b877b50 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -1150,6 +1150,7 @@ func InitConfig(config pkgconfigmodel.Config) { config.BindEnvAndSetDefault("orchestrator_explorer.manifest_collection.enabled", true) config.BindEnvAndSetDefault("orchestrator_explorer.manifest_collection.buffer_manifest", true) config.BindEnvAndSetDefault("orchestrator_explorer.manifest_collection.buffer_flush_interval", 20*time.Second) + config.BindEnvAndSetDefault("orchestrator_explorer.ecs_collection.enabled", false) // Container lifecycle configuration config.BindEnvAndSetDefault("container_lifecycle.enabled", true) diff --git a/pkg/orchestrator/aliases.go b/pkg/orchestrator/aliases.go index 8d608bc05ab8d..2b0f7115df7ff 100644 --- a/pkg/orchestrator/aliases.go +++ b/pkg/orchestrator/aliases.go @@ -79,6 +79,8 @@ const ( K8sHorizontalPodAutoscaler = pkgorchestratormodel.K8sHorizontalPodAutoscaler // K8sNetworkPolicy alias for pkgorchestratormodel.K8sNetworkPolicy K8sNetworkPolicy = pkgorchestratormodel.K8sNetworkPolicy + // ECSTask alias for pkgorchestratormodel.ECSTask + ECSTask = pkgorchestratormodel.ECSTask ) // SetCacheStats alias for pkgorchestratormodel.SetCacheStats diff --git a/pkg/orchestrator/config/config.go b/pkg/orchestrator/config/config.go index 89bf03c868466..a42193cd17a93 100644 --- a/pkg/orchestrator/config/config.go +++ b/pkg/orchestrator/config/config.go @@ -33,19 +33,20 @@ const ( // OrchestratorConfig is the global config for the Orchestrator related packages. This information // is sourced from config files and the environment variables. type OrchestratorConfig struct { - CollectorDiscoveryEnabled bool - OrchestrationCollectionEnabled bool - KubeClusterName string - IsScrubbingEnabled bool - Scrubber *redact.DataScrubber - OrchestratorEndpoints []apicfg.Endpoint - MaxPerMessage int - MaxWeightPerMessageBytes int - PodQueueBytes int // The total number of bytes that can be enqueued for delivery to the orchestrator endpoint - ExtraTags []string - IsManifestCollectionEnabled bool - BufferedManifestEnabled bool - ManifestBufferFlushInterval time.Duration + CollectorDiscoveryEnabled bool + OrchestrationCollectionEnabled bool + KubeClusterName string + IsScrubbingEnabled bool + Scrubber *redact.DataScrubber + OrchestratorEndpoints []apicfg.Endpoint + MaxPerMessage int + MaxWeightPerMessageBytes int + PodQueueBytes int // The total number of bytes that can be enqueued for delivery to the orchestrator endpoint + ExtraTags []string + IsManifestCollectionEnabled bool + BufferedManifestEnabled bool + ManifestBufferFlushInterval time.Duration + OrchestrationECSCollectionEnabled bool } // NewDefaultOrchestratorConfig returns an NewDefaultOrchestratorConfig using a configuration file. It can be nil @@ -118,6 +119,7 @@ func (oc *OrchestratorConfig) Load() error { oc.IsManifestCollectionEnabled = config.Datadog.GetBool(OrchestratorNSKey("manifest_collection.enabled")) oc.BufferedManifestEnabled = config.Datadog.GetBool(OrchestratorNSKey("manifest_collection.buffer_manifest")) oc.ManifestBufferFlushInterval = config.Datadog.GetDuration(OrchestratorNSKey("manifest_collection.buffer_flush_interval")) + oc.OrchestrationECSCollectionEnabled = config.Datadog.GetBool(OrchestratorNSKey("ecs_collection.enabled")) return nil } @@ -192,3 +194,20 @@ func IsOrchestratorEnabled() (bool, string) { } return enabled, clusterName } + +// IsOrchestratorECSExplorerEnabled checks if orchestrator ecs explorer features are enabled +func IsOrchestratorECSExplorerEnabled() bool { + if !config.Datadog.GetBool(OrchestratorNSKey("enabled")) { + return false + } + + if !config.Datadog.GetBool(OrchestratorNSKey("ecs_collection.enabled")) { + return false + } + + if config.IsECS() || config.IsECSFargate() { + return true + } + + return false +} diff --git a/pkg/orchestrator/model/types.go b/pkg/orchestrator/model/types.go index 997e01b4e82ac..97968166df9ec 100644 --- a/pkg/orchestrator/model/types.go +++ b/pkg/orchestrator/model/types.go @@ -79,6 +79,8 @@ const ( K8sHorizontalPodAutoscaler = 23 // K8sNetworkPolicy represents a Kubernetes NetworkPolicy K8sNetworkPolicy = 24 + // ECSTask represents an ECS Task + ECSTask = 150 ) // NodeTypes returns the current existing NodesTypes as a slice to iterate over. @@ -108,6 +110,7 @@ func NodeTypes() []NodeType { K8sVerticalPodAutoscaler, K8sHorizontalPodAutoscaler, K8sNetworkPolicy, + ECSTask, } } @@ -163,6 +166,8 @@ func (n NodeType) String() string { return "NetworkPolicy" case K8sUnsetType: return "UnsetType" + case ECSTask: + return "ECSTask" default: _ = log.Errorf("Trying to convert unknown NodeType iota: %d", n) return "Unknown" @@ -198,6 +203,8 @@ func (n NodeType) Orchestrator() string { K8sNetworkPolicy, K8sUnsetType: return "k8s" + case ECSTask: + return "ecs" default: log.Errorf("Unknown NodeType %v", n) return "" diff --git a/releasenotes/notes/add-new-check-orchestrator-ecs-fa5d1511b1a550c3.yaml b/releasenotes/notes/add-new-check-orchestrator-ecs-fa5d1511b1a550c3.yaml new file mode 100644 index 0000000000000..f1eef3ef048d6 --- /dev/null +++ b/releasenotes/notes/add-new-check-orchestrator-ecs-fa5d1511b1a550c3.yaml @@ -0,0 +1,12 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- + +features: + - | + Add new core check orchestrator_ecs to collect running ECS tasks diff --git a/tasks/agent.py b/tasks/agent.py index 93a35e192c4a9..ce897bc3ff3d4 100644 --- a/tasks/agent.py +++ b/tasks/agent.py @@ -76,6 +76,7 @@ "jetson", "telemetry", "orchestrator_pod", + "orchestrator_ecs", ] WINDOWS_CORECHECKS = [