Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Orchestrator] add new check to collect ecs tasks #22060

Merged
merged 5 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/agent/dist/conf.d/orchestrator_ecs.d/conf.yaml.default
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ad_identifiers:
- _ecs_orchestrator
instances:
- {}
1 change: 1 addition & 0 deletions comp/core/autodiscovery/listeners/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (l *EnvironmentListener) createServices() {
"cri": config.Cri,
"containerd": config.Containerd,
"kube_orchestrator": config.KubeOrchestratorExplorer,
"ecs_orchestrator": config.ECSOrchestratorExplorer,
}

for name, feature := range features {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ require github.com/lorenzosaino/go-sysctl v0.3.1

require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/DataDog/agent-payload/v5 v5.0.106
github.com/DataDog/agent-payload/v5 v5.0.108
github.com/DataDog/datadog-agent/cmd/agent/common/path v0.52.0-rc.3
github.com/DataDog/datadog-agent/comp/core/config v0.52.0-rc.3
github.com/DataDog/datadog-agent/comp/core/flare/types v0.52.0-rc.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

165 changes: 165 additions & 0 deletions pkg/collector/corechecks/cluster/orchestrator/collectors/ecs/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build orchestrator

// Package ecs defines a collector to collect ECS task
package ecs

import (
"crypto/md5"
"encoding/hex"
"fmt"
"strconv"
"strings"

"github.com/google/uuid"

"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/ecs"
transformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs"
"github.com/DataDog/datadog-agent/pkg/orchestrator"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// TaskCollector is a collector for ECS tasks.
type TaskCollector struct {
metadata *collectors.CollectorMetadata
processor *processors.Processor
}

// NewTaskCollector creates a new collector for the ECS Task resource.
func NewTaskCollector() *TaskCollector {
return &TaskCollector{
metadata: &collectors.CollectorMetadata{
IsStable: false,
IsMetadataProducer: true,
IsManifestProducer: false,
Name: "ecstasks",
NodeType: orchestrator.ECSTask,
},
processor: processors.NewProcessor(new(ecs.TaskHandlers)),
}
}

// Metadata is used to access information about the collector.
func (t *TaskCollector) Metadata() *collectors.CollectorMetadata {
return t.metadata
}

// Init is used to initialize the collector.
func (t *TaskCollector) Init(_ *collectors.CollectorRunConfig) {}
kangyili marked this conversation as resolved.
Show resolved Hide resolved

// Run triggers the collection process.
func (t *TaskCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.CollectorRunResult, error) {
list := rcfg.WorkloadmetaStore.ListECSTasks()
tasks := make([]transformers.TaskWithContainers, 0, len(list))
for _, task := range list {
newTask := task
tasks = append(tasks, t.fetchContainers(rcfg, newTask))
}

ctx := &processors.ECSProcessorContext{
BaseProcessorContext: processors.BaseProcessorContext{
Cfg: rcfg.Config,
MsgGroupID: rcfg.MsgGroupRef.Inc(),
NodeType: t.metadata.NodeType,
ManifestProducer: t.metadata.IsManifestProducer,
},
}
if len(list) > 0 {
kangyili marked this conversation as resolved.
Show resolved Hide resolved
ctx.AWSAccountID = list[0].AWSAccountID
ctx.ClusterName = list[0].ClusterName
ctx.Region = list[0].Region

if list[0].Region == "" || list[0].AWSAccountID == 0 {
ctx.Region, ctx.AWSAccountID = getRegionAndAWSAccountID(list[0].EntityID.ID)
}

// If the cluster ID is not set, we generate it from the first task
if rcfg.ClusterID == "" {
rcfg.ClusterID = initClusterID(ctx.AWSAccountID, ctx.Region, ctx.ClusterName)
}
ctx.ClusterID = rcfg.ClusterID
}

processResult, processed := t.processor.Process(ctx, tasks)

if processed == -1 {
return nil, fmt.Errorf("unable to process resources: a panic occurred")
}

result := &collectors.CollectorRunResult{
Result: processResult,
ResourcesListed: len(list),
ResourcesProcessed: processed,
}

return result, nil
}

// fetchContainers fetches the containers from workloadmeta store for a given task.
func (t *TaskCollector) fetchContainers(rcfg *collectors.CollectorRunConfig, task *workloadmeta.ECSTask) transformers.TaskWithContainers {
ecsTask := transformers.TaskWithContainers{
Task: task,
kangyili marked this conversation as resolved.
Show resolved Hide resolved
Containers: make([]*workloadmeta.Container, 0, len(task.Containers)),
}

for _, container := range task.Containers {
c, err := rcfg.WorkloadmetaStore.GetContainer(container.ID)
if err != nil {
log.Errorc(err.Error(), orchestrator.ExtraLogContext...)
continue
}
ecsTask.Containers = append(ecsTask.Containers, c)
}

return ecsTask
}

// initClusterID generates a cluster ID from the AWS account ID, region and cluster name.
func initClusterID(awsAccountID int, region, clusterName string) string {
cluster := fmt.Sprintf("%d/%s/%s", awsAccountID, region, clusterName)

hash := md5.New()
hash.Write([]byte(cluster))
hashString := hex.EncodeToString(hash.Sum(nil))
uuid, err := uuid.FromBytes([]byte(hashString[0:16]))
if err != nil {
log.Errorc(err.Error(), orchestrator.ExtraLogContext...)
return ""
kangyili marked this conversation as resolved.
Show resolved Hide resolved
}
return uuid.String()
}

// ParseRegionAndAWSAccountID parses the region and AWS account ID from a task ARN.
func getRegionAndAWSAccountID(taskARN string) (string, int) {
arnParts := strings.Split(taskARN, ":")
if len(arnParts) < 5 {
kangyili marked this conversation as resolved.
Show resolved Hide resolved
return "", 0
}
if arnParts[0] != "arn" || arnParts[1] != "aws" {
return "", 0
kangyili marked this conversation as resolved.
Show resolved Hide resolved
}
region := arnParts[3]
if strings.Count(region, "-") < 2 {
region = ""
}

id := arnParts[4]
// aws account id is 12 digits
// https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-identifiers.html
if len(id) != 12 {
return region, 0
}
awsAccountID, err := strconv.Atoi(id)
if err != nil {
return region, 0
}

return region, awsAccountID
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build orchestrator

package ecs

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestInitClusterID(t *testing.T) {
id1 := initClusterID(123456789012, "us-east-1", "ecs-cluster-1")
require.Equal(t, "34616234-6562-3536-3733-656534636532", id1)

// same account, same region, different cluster name
id2 := initClusterID(123456789012, "us-east-1", "ecs-cluster-2")
require.Equal(t, "31643131-3131-3263-3331-383136383336", id2)

// same account, different region, same cluster name
id3 := initClusterID(123456789012, "us-east-2", "ecs-cluster-1")
require.Equal(t, "64663464-6662-3232-3635-646166613230", id3)

// different account, same region, same cluster name
id4 := initClusterID(123456789013, "us-east-1", "ecs-cluster-1")
require.Equal(t, "61623431-6137-6231-3136-366464643761", id4)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build orchestrator

// Package ecs defines handlers for processing ECS tasks
package ecs

import (
"k8s.io/apimachinery/pkg/types"

model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/common"
transformers "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/transformers/ecs"
)

// TaskHandlers implements the Handlers interface for ECS Tasks.
type TaskHandlers struct {
common.BaseHandlers
}

// BuildMessageBody is a handler called to build a message body out of a list of extracted resources.
func (t *TaskHandlers) BuildMessageBody(ctx processors.ProcessorContext, resourceModels []interface{}, groupSize int) model.MessageBody {
pctx := ctx.(*processors.ECSProcessorContext)
models := make([]*model.ECSTask, 0, len(resourceModels))

for _, m := range resourceModels {
models = append(models, m.(*model.ECSTask))
}

return &model.CollectorECSTask{
AwsAccountID: int64(pctx.AWSAccountID),
ClusterName: pctx.ClusterName,
ClusterId: pctx.ClusterID,
Region: pctx.Region,
GroupId: pctx.MsgGroupID,
GroupSize: int32(groupSize),
Tasks: models,
}
}

// ExtractResource is a handler called to extract the resource model out of a raw resource.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (t *TaskHandlers) ExtractResource(ctx processors.ProcessorContext, resource interface{}) (resourceModel interface{}) {
r := resource.(transformers.TaskWithContainers)
return transformers.ExtractECSTask(r)
}

// ResourceList is a handler called to convert a list passed as a generic
// interface to a list of generic interfaces.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (t *TaskHandlers) ResourceList(ctx processors.ProcessorContext, list interface{}) (resources []interface{}) {
resourceList := list.([]transformers.TaskWithContainers)

resources = make([]interface{}, 0, len(resourceList))

for _, resource := range resourceList {
resources = append(resources, resource)
}

return resources
}

// ResourceUID is a handler called to retrieve the resource UID.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (t *TaskHandlers) ResourceUID(ctx processors.ProcessorContext, resource interface{}) types.UID {
return types.UID(resource.(transformers.TaskWithContainers).Task.EntityID.ID)
}

// ResourceVersion sets and returns custom resource version for an ECS task.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (t *TaskHandlers) ResourceVersion(ctx processors.ProcessorContext, resource, resourceModel interface{}) string {
return resourceModel.(*model.ECSTask).ResourceVersion
}

// AfterMarshalling is a handler called after resource marshalling.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (h *TaskHandlers) AfterMarshalling(ctx processors.ProcessorContext, resource, resourceModel interface{}, yaml []byte) (skip bool) {
return
}

// ScrubBeforeExtraction is a handler called to redact the raw resource before
// it is extracted as an internal resource model.
//
//nolint:revive // TODO(CAPP) Fix revive linter
func (h *TaskHandlers) ScrubBeforeExtraction(ctx processors.ProcessorContext, resource interface{}) {
}
Loading
Loading