diff --git a/comp/core/workloadmeta/collectors/internal/ecs/ecs.go b/comp/core/workloadmeta/collectors/internal/ecs/ecs.go index b67f22ecb938d0..7e704ea336a812 100644 --- a/comp/core/workloadmeta/collectors/internal/ecs/ecs.go +++ b/comp/core/workloadmeta/collectors/internal/ecs/ecs.go @@ -10,9 +10,17 @@ package ecs import ( "context" + "fmt" + "hash/fnv" + "reflect" "strings" + "time" + + "github.com/patrickmn/go-cache" + "golang.org/x/time/rate" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util" "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/errors" ecsutil "github.com/DataDog/datadog-agent/pkg/util/ecs" @@ -20,6 +28,7 @@ import ( v1 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v1" v3or4 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/retry" "go.uber.org/fx" ) @@ -30,16 +39,22 @@ const ( ) type collector struct { - id string - store workloadmeta.Component - catalog workloadmeta.AgentType - metaV1 v1.Client - metaV3or4 func(metaURI, metaVersion string) v3or4.Client - clusterName string - hasResourceTags bool - collectResourceTags bool - resourceTags map[string]resourceTags - seen map[workloadmeta.EntityID]struct{} + id string + store workloadmeta.Component + catalog workloadmeta.AgentType + metaV1 v1.Client + metaV3or4 func(metaURI, metaVersion string) v3or4.Client + clusterName string + hasResourceTags bool + collectResourceTags bool + resourceTags map[string]resourceTags + seen map[workloadmeta.EntityID]struct{} + v4TaskEnabled bool + v4TaskCache *cache.Cache + v4TaskRefreshInterval time.Duration + v4TaskQueue []string + v4TaskNumberLimitPerRun int + v4TaskRateLimiter *rate.Limiter } type resourceTags struct { @@ -49,12 +64,24 @@ type resourceTags struct { // NewCollector returns a new ecs collector provider and an error func NewCollector() (workloadmeta.CollectorProvider, error) { + v4TaskEnabled := util.Isv4TaskEnabled() + v4TaskRefreshInterval := config.Datadog.GetDuration("ecs_ec2_task_cache_ttl") + v4TaskNumberLimitPerRun := config.Datadog.GetInt("ecs_ec2_task_limit_per_run") + v4TaskRateRPS := config.Datadog.GetInt("ecs_ec2_task_rate") + v4TaskRateBurst := config.Datadog.GetInt("ecs_ec2_task_burst") + return workloadmeta.CollectorProvider{ Collector: &collector{ - id: collectorID, - resourceTags: make(map[string]resourceTags), - seen: make(map[workloadmeta.EntityID]struct{}), - catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent, + id: collectorID, + resourceTags: make(map[string]resourceTags), + seen: make(map[workloadmeta.EntityID]struct{}), + catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent, + v4TaskEnabled: v4TaskEnabled, + v4TaskCache: cache.New(v4TaskRefreshInterval, 30*time.Second), + v4TaskQueue: make([]string, 0, 2*v4TaskNumberLimitPerRun), + v4TaskRefreshInterval: v4TaskRefreshInterval, + v4TaskNumberLimitPerRun: v4TaskNumberLimitPerRun, + v4TaskRateLimiter: rate.NewLimiter(rate.Every(time.Duration(1/v4TaskRateRPS)*time.Second), v4TaskRateBurst), }, }, nil } @@ -105,8 +132,11 @@ func (c *collector) Pull(ctx context.Context) error { // immutable: the list of containers in the task changes as containers // don't get added until they actually start running, and killed // containers will get re-created. - c.store.Notify(c.parseTasks(ctx, tasks)) - + if c.v4TaskEnabled { + c.store.Notify(c.parseV4Tasks(ctx, tasks)) + } else { + c.store.Notify(c.parseTasks(ctx, tasks)) + } return nil } @@ -124,7 +154,7 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm for _, task := range tasks { // We only want to collect tasks without a STOPPED status. - if task.KnownStatus == "STOPPED" { + if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped { continue } @@ -166,6 +196,43 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm }) } + return c.setLastSeenEntitiesAndUnsetEvents(events, seen) +} + +// parseV4Tasks queries the v4 task endpoint for each task, parses them and stores them in the store. +func (c *collector) parseV4Tasks(ctx context.Context, tasks []v1.Task) []workloadmeta.CollectorEvent { + events := []workloadmeta.CollectorEvent{} + seen := make(map[workloadmeta.EntityID]struct{}) + // get task ARNs to fetch from the metadata v4 API + taskArns := c.getTaskArnsToFetch(tasks) + + for _, task := range tasks { + // We only want to collect tasks without a STOPPED status. + if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped { + continue + } + + var v4Task v3or4.Task + if _, ok := taskArns[task.Arn]; ok { + v4Task = c.getV4TaskWithTags(ctx, task) + } else { + // if the task is not returned by getTaskArnsToFetch, it means it has been fetched during previous runs + // retrieve it from the cache + taskCached, found := c.v4TaskCache.Get(task.Arn) + if found { + v4Task = *taskCached.(*v3or4.Task) + } else { + v4Task = v1TaskToV4Task(task) + } + } + + events = append(events, util.ParseV4Task(v4Task, seen)...) + } + + return c.setLastSeenEntitiesAndUnsetEvents(events, seen) +} + +func (c *collector) setLastSeenEntitiesAndUnsetEvents(events []workloadmeta.CollectorEvent, seen map[workloadmeta.EntityID]struct{}) []workloadmeta.CollectorEvent { for seenID := range c.seen { if _, ok := seen[seenID]; ok { continue @@ -194,7 +261,6 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm } c.seen = seen - return events } @@ -233,6 +299,121 @@ func (c *collector) parseTaskContainers( return taskContainers, events } +// getV4TaskWithTags fetches task and tasks from the metadata v4 API +func (c *collector) getV4TaskWithTags(ctx context.Context, task v1.Task) v3or4.Task { + var metaURI string + for _, taskContainer := range task.Containers { + containerID := taskContainer.DockerID + container, err := c.store.GetContainer(containerID) + if err != nil { + log.Tracef("cannot find container %q found in task %s: %s", taskContainer, task.Arn, err) + continue + } + + uri, ok := container.EnvVars[v3or4.DefaultMetadataURIv4EnvVariable] + if ok && uri != "" { + metaURI = uri + break + } + } + + if metaURI == "" { + log.Errorf("failed to get client for metadata v4 API from task %s and the following containers: %v", task.Arn, task.Containers) + return v1TaskToV4Task(task) + } + + err := c.v4TaskRateLimiter.Wait(ctx) + if err != nil { + log.Warnf("failed to get task with tags from metadata v4 API: %s", err) + return v1TaskToV4Task(task) + } + + taskWithTags, err := getV4TaskWithRetry(ctx, c.metaV3or4(metaURI, "v4")) + if err != nil { + log.Warnf("failed to get task with tags from metadata v4 API: %s", err) + return v1TaskToV4Task(task) + } + + c.v4TaskCache.Set(task.Arn, taskWithTags, c.v4TaskRefreshInterval+jitter(task.Arn)) + + return *taskWithTags +} + +func getV4TaskWithRetry(ctx context.Context, metaV3orV4 v3or4.Client) (*v3or4.Task, error) { + var taskWithTagsRetry retry.Retrier + var taskWithTags *v3or4.Task + var err error + maxRetryCount := 3 + retryCount := 0 + + _ = taskWithTagsRetry.SetupRetrier(&retry.Config{ + Name: "get-v4-task-with-tags", + AttemptMethod: func() error { + retryCount++ + taskWithTags, err = metaV3orV4.GetTaskWithTags(ctx) + return err + }, + Strategy: retry.Backoff, + InitialRetryDelay: 250 * time.Millisecond, + MaxRetryDelay: 1 * time.Second, + }) + + // retry 3 times with exponential backoff strategy: 250ms, 500ms, 1s + for { + err = taskWithTagsRetry.TriggerRetry() + if err == nil || (reflect.ValueOf(err).Kind() == reflect.Ptr && reflect.ValueOf(err).IsNil()) { + break + } + + if retry.IsErrPermaFail(err) { + return nil, err + } + + if retryCount >= maxRetryCount { + return nil, fmt.Errorf("failed to get task with tags from metadata v4 API after %d retries", maxRetryCount) + } + } + return taskWithTags, nil +} + +// getTaskArnsToFetch returns a list of task ARNs to fetch from the metadata v4 API +// It uses v4TaskCache to know whether a task has been fetched during previous runs +// The length of taskArns is limited by v4TaskNumberLimitPerRun to avoid long time running for a single pull +func (c *collector) getTaskArnsToFetch(tasks []v1.Task) map[string]struct{} { + taskArns := make(map[string]struct{}, c.v4TaskNumberLimitPerRun) + + for _, task := range tasks { + if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped { + continue + } + c.v4TaskQueue = append(c.v4TaskQueue, task.Arn) + } + + index := 0 + for _, taskArn := range c.v4TaskQueue { + if len(taskArns) >= c.v4TaskNumberLimitPerRun { + break + } + + // Task is in the queue but not in current running task list + // It means the task has been stopped, skip it + if !hasTask(taskArn, tasks) { + index++ + continue + } + + // if task is not in the cache or expired, add it + if _, ok := c.v4TaskCache.Get(taskArn); !ok { + taskArns[taskArn] = struct{}{} + } + index++ + } + + c.v4TaskQueue = c.v4TaskQueue[index:] + + return taskArns +} + // getResourceTags fetches task and container instance tags from the ECS API, // and caches them for the lifetime of the task, to avoid hitting throttling // limits from tasks being updated on every pull. Tags won't change in the @@ -294,3 +475,44 @@ func (c *collector) getResourceTags(ctx context.Context, entity *workloadmeta.EC return rt } + +func v1TaskToV4Task(task v1.Task) v3or4.Task { + result := v3or4.Task{ + TaskARN: task.Arn, + DesiredStatus: task.DesiredStatus, + KnownStatus: task.KnownStatus, + Family: task.Family, + Version: task.Version, + Containers: make([]v3or4.Container, 0, len(task.Containers)), + } + + for _, container := range task.Containers { + result.Containers = append(result.Containers, v3or4.Container{ + Name: container.Name, + DockerName: container.DockerName, + DockerID: container.DockerID, + }) + } + return result +} + +var hash32 = fnv.New32a() + +func jitter(s string) time.Duration { + defer hash32.Reset() + _, err := hash32.Write([]byte(s)) + if err != nil { + return 0 + } + second := time.Duration(hash32.Sum32()%61) * time.Second + return second +} + +func hasTask(taskARN string, tasks []v1.Task) bool { + for _, task := range tasks { + if task.Arn == taskARN { + return true + } + } + return false +} diff --git a/comp/core/workloadmeta/collectors/internal/ecs/ecs_test.go b/comp/core/workloadmeta/collectors/internal/ecs/ecs_test.go index f3e85ba6a83baf..687b37541b19c4 100644 --- a/comp/core/workloadmeta/collectors/internal/ecs/ecs_test.go +++ b/comp/core/workloadmeta/collectors/internal/ecs/ecs_test.go @@ -12,25 +12,36 @@ package ecs import ( "context" "errors" + "fmt" "testing" + "time" + "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/testutil" v1 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v1" "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" ) type fakeWorkloadmetaStore struct { workloadmeta.Component - notifiedEvents []workloadmeta.CollectorEvent + notifiedEvents []workloadmeta.CollectorEvent + getGetContainerHandler func(id string) (*workloadmeta.Container, error) } func (store *fakeWorkloadmetaStore) Notify(events []workloadmeta.CollectorEvent) { store.notifiedEvents = append(store.notifiedEvents, events...) } -func (store *fakeWorkloadmetaStore) GetContainer(_ string) (*workloadmeta.Container, error) { +func (store *fakeWorkloadmetaStore) GetContainer(id string) (*workloadmeta.Container, error) { + if store.getGetContainerHandler != nil { + return store.getGetContainerHandler(id) + } + return &workloadmeta.Container{ EnvVars: map[string]string{ v3or4.DefaultMetadataURIv4EnvVariable: "fake_uri", @@ -130,3 +141,168 @@ func TestPull(t *testing.T) { } } + +func TestPullWithV4TaskEnabled(t *testing.T) { + // Start a dummy Http server to simulate ECS metadata endpoints + // /v1/tasks: return the list of tasks containing datadog-agent task and nginx task + dummyECS, err := testutil.NewDummyECS( + testutil.FileHandlerOption("/v4/1234-1/taskWithTags", "./testdata/datadog-agent.json"), + testutil.FileHandlerOption("/v4/1234-2/taskWithTags", "./testdata/nginx.json"), + testutil.FileHandlerOption("/v1/tasks", "./testdata/tasks.json"), + ) + require.Nil(t, err) + ts := dummyECS.Start() + defer ts.Close() + + // Add container handler to return the v4 endpoints for different containers + store := &fakeWorkloadmetaStore{ + getGetContainerHandler: func(id string) (*workloadmeta.Container, error) { + // datadog-agent container ID, see ./testdata/datadog-agent.json + if id == "749d28eb7145ff3b6c52b71c59b381c70a884c1615e9f99516f027492679496e" { + return &workloadmeta.Container{ + EnvVars: map[string]string{ + v3or4.DefaultMetadataURIv4EnvVariable: fmt.Sprintf("%s/v4/1234-1", ts.URL), + }, + }, nil + } + // nginx container ID, see ./testdata/nginx.json + if id == "2ad9e753a0dfbba1c91e0e7bebaaf3a0918d3ef304b7549b1ced5f573bc05645" { + return &workloadmeta.Container{ + EnvVars: map[string]string{ + v3or4.DefaultMetadataURIv4EnvVariable: fmt.Sprintf("%s/v4/1234-2", ts.URL), + }, + }, nil + } + return &workloadmeta.Container{ + EnvVars: map[string]string{ + v3or4.DefaultMetadataURIv4EnvVariable: fmt.Sprintf("%s/v4/undefined", ts.URL), + }, + }, nil + }, + } + + // create an ECS collector with v4TaskEnabled enabled + collector := collector{ + store: store, + v4TaskEnabled: true, + metaV1: v1.NewClient(ts.URL), + metaV3or4: func(metaURI, metaVersion string) v3or4.Client { + return v3or4.NewClient(metaURI, metaVersion) + }, + v4TaskCache: cache.New(3*time.Minute, 30*time.Second), + v4TaskRefreshInterval: 3 * time.Minute, + v4TaskNumberLimitPerRun: 100, + v4TaskQueue: make([]string, 0, 5), + v4TaskRateLimiter: rate.NewLimiter(rate.Every(1*time.Second/35), 60), + } + + // Pull 3 times, only the first time should fetch the v4 tasks from the server + // The other 2 times should fetch the tasks from the cache + for i := 0; i < 3; i++ { + store.notifiedEvents = store.notifiedEvents[:0] + err = collector.Pull(context.Background()) + require.Nil(t, err) + // two ECS task events and two container events should be notified + require.Len(t, store.notifiedEvents, 4) + + count := 0 + for _, event := range store.notifiedEvents { + require.Equal(t, workloadmeta.EventTypeSet, event.Type) + require.Equal(t, workloadmeta.SourceNodeOrchestrator, event.Source) + switch entity := event.Entity.(type) { + case *workloadmeta.ECSTask: + require.Equal(t, 123457279990, entity.AWSAccountID) + require.Equal(t, "us-east-1", entity.Region) + require.Equal(t, "ecs-cluster", entity.ClusterName) + require.Equal(t, "RUNNING", entity.DesiredStatus) + require.Equal(t, workloadmeta.ECSLaunchTypeEC2, entity.LaunchType) + if entity.Family == "datadog-agent" { + require.Equal(t, "15", entity.Version) + require.Equal(t, "vpc-123", entity.VPCID) + count++ + } else if entity.Family == "nginx" { + require.Equal(t, "3", entity.Version) + require.Equal(t, "vpc-124", entity.VPCID) + count++ + } else { + t.Errorf("unexpected entity family: %s", entity.Family) + } + case *workloadmeta.Container: + require.Equal(t, "RUNNING", entity.KnownStatus) + require.Equal(t, "HEALTHY", entity.Health.Status) + if entity.Image.Name == "datadog/datadog-agent" { + require.Equal(t, "7.50.0", entity.Image.Tag) + require.Equal(t, "Agent health: PASS", entity.Health.Output) + count++ + } else if entity.Image.Name == "ghcr.io/nginx/my-nginx" { + require.Equal(t, "ghcr.io", entity.Image.Registry) + require.Equal(t, "main", entity.Image.Tag) + require.Equal(t, "Nginx health: PASS", entity.Health.Output) + count++ + } else { + t.Errorf("unexpected image name: %s", entity.Image.Name) + } + default: + t.Errorf("unexpected entity type: %T", entity) + } + } + require.Equal(t, 4, count) + require.Equal(t, 0, len(collector.v4TaskQueue)) + require.Equal(t, 2, len(collector.v4TaskCache.Items())) + } +} + +func TestGetTaskArnsToFetch(t *testing.T) { + collector := collector{ + v4TaskCache: cache.New(3*time.Minute, 30*time.Second), + v4TaskRefreshInterval: 3 * time.Minute, + v4TaskNumberLimitPerRun: 10, + v4TaskQueue: make([]string, 0, 3), + } + + // generate tasks + taskCount := 30 + tasks := make([]v1.Task, 0, taskCount) + for i := 0; i < taskCount; i++ { + tasks = append(tasks, v1.Task{ + Arn: fmt.Sprintf("1234-%d", i), + }) + } + + // add totalTaskInCache tasks to the cache + taskInCache := 3 + for i := 0; i < taskInCache; i++ { + collector.v4TaskCache.SetDefault(fmt.Sprintf("1234-%d", i), struct{}{}) + } + + // add stopped tasks to the queue + stoppedTask := 2 + for i := 0; i < stoppedTask; i++ { + collector.v4TaskQueue = append(collector.v4TaskQueue, fmt.Sprintf("1234-stopped-%d", i)) + } + + // add running tasks to the queue + runningTask := 10 + for i := 0; i < runningTask; i++ { + collector.v4TaskQueue = append(collector.v4TaskQueue, fmt.Sprintf("1234-%d", i)) + } + + taskArns := collector.getTaskArnsToFetch(tasks) + + require.Equal(t, collector.v4TaskNumberLimitPerRun, len(taskArns)) + for i := 3; i < 13; i++ { + if _, ok := taskArns[fmt.Sprintf("1234-%d", i)]; !ok { + t.Errorf("task arn %d not found", i) + } + } + assert.Equal(t, 17, len(collector.v4TaskQueue)) + +} + +func TestJitter(t *testing.T) { + actual := jitter("test") + assert.Equal(t, 23*time.Second, actual) + + actual = jitter("test2") + assert.Equal(t, 7*time.Second, actual) +} diff --git a/comp/core/workloadmeta/collectors/internal/ecs/testdata/datadog-agent.json b/comp/core/workloadmeta/collectors/internal/ecs/testdata/datadog-agent.json new file mode 100644 index 00000000000000..da47a4e8d06c2c --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecs/testdata/datadog-agent.json @@ -0,0 +1,89 @@ +{ + "Cluster": "ecs-cluster", + "TaskARN": "arn:aws:ecs:us-east-1:123457279990:task/ecs-cluster/7d2dae60ad844c608fb2d44215a46f6f", + "Family": "datadog-agent", + "Revision": "15", + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "PullStartedAt": "2023-10-18T19:00:40.015762747Z", + "PullStoppedAt": "2023-10-18T19:00:40.229561055Z", + "AvailabilityZone": "us-east-1a", + "LaunchType": "EC2", + "Containers": [ + { + "DockerId": "749d28eb7145ff3b6c52b71c59b381c70a884c1615e9f99516f027492679496e", + "Name": "datadog-agent", + "DockerName": "datadog-agent-15-datadog-agent-b2bb99c4dda5b2b25000", + "Image": "datadog/datadog-agent:7.50.0", + "ImageID": "sha256:e4c58958181a5925816faa528ce959e487632f4cfc192f8132f71b32df2744b4", + "Ports": [ + { + "ContainerPort": 8125, + "Protocol": "udp", + "HostPort": 8125, + "HostIp": "0.0.0.0" + }, + { + "ContainerPort": 8125, + "Protocol": "udp", + "HostPort": 8125, + "HostIp": "::" + } + ], + "Labels": { + "baseimage.name": "ubuntu:", + "baseimage.os": "ubuntu ", + "com.amazonaws.ecs.cluster": "ecs-cluster", + "com.amazonaws.ecs.container-name": "datadog-agent", + "org.opencontainers.image.ref.name": "ubuntu", + "org.opencontainers.image.source": "https://github.com/DataDog/datadog-agent", + "org.opencontainers.image.version": "23.04" + }, + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { + "CPU": 10, + "Memory": 512 + }, + "CreatedAt": "2023-10-18T19:00:40.244667009Z", + "StartedAt": "2023-10-18T19:00:41.083094165Z", + "Type": "NORMAL", + "Health": { + "status": "HEALTHY", + "statusSince": "2023-12-27T15:39:53.043973594Z", + "output": "Agent health: PASS" + }, + "Volumes": [ + { + "Source": "/var/run/docker.sock", + "Destination": "/var/run/docker.sock" + }, + { + "Source": "/sys/fs/cgroup", + "Destination": "/host/sys/fs/cgroup" + }, + { + "Source": "/var/lib/ecs/deps/execute-command/config/seelog.xml", + "Destination": "/ecs-execute-command/configuration/seelog.xml" + } + ], + "LogDriver": "awslogs", + "LogOptions": { + "awslogs-group": "datadog-agent", + "awslogs-region": "us-east-1", + "awslogs-stream": "log_router/datadog-agent" + }, + "ContainerARN": "arn:aws:ecs:us-east-1:601427279990:container/ecs-cluster/14d5ab87e06f/52ff1eab-fb19", + "Networks": [ + { + "NetworkMode": "bridge", + "IPv4Addresses": [ + "172.17.0.2" + ] + } + ] + } + ], + "VPCID": "vpc-123", + "ServiceName": "agent-ec2" +} diff --git a/comp/core/workloadmeta/collectors/internal/ecs/testdata/nginx.json b/comp/core/workloadmeta/collectors/internal/ecs/testdata/nginx.json new file mode 100644 index 00000000000000..cb290bc204a350 --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecs/testdata/nginx.json @@ -0,0 +1,89 @@ +{ + "Cluster": "ecs-cluster", + "TaskARN": "arn:aws:ecs:us-east-1:123457279990:task/ecs-cluster/e39c666b3b134c2ca2241c4c3c64e6ac", + "Family": "nginx", + "Revision": "3", + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "PullStartedAt": "2023-10-18T19:00:40.015762747Z", + "PullStoppedAt": "2023-10-18T19:00:40.229561055Z", + "AvailabilityZone": "us-east-1a", + "LaunchType": "EC2", + "Containers": [ + { + "DockerId": "2ad9e753a0dfbba1c91e0e7bebaaf3a0918d3ef304b7549b1ced5f573bc05645", + "Name": "my-nginx", + "DockerName": "nginx-my-nginx-aa9883f7f3d6c5f73e00", + "Image": "ghcr.io/nginx/my-nginx:main", + "ImageID": "sha256:5365aa84ac82631670d111e43553ae75285d0335dd2fcca143cf0e9d6f88cd185", + "Ports": [ + { + "ContainerPort": 8125, + "Protocol": "udp", + "HostPort": 8125, + "HostIp": "0.0.0.0" + }, + { + "ContainerPort": 8125, + "Protocol": "udp", + "HostPort": 8125, + "HostIp": "::" + } + ], + "Labels": { + "baseimage.name": "ubuntu:", + "baseimage.os": "ubuntu ", + "com.amazonaws.ecs.cluster": "ecs-cluster", + "com.amazonaws.ecs.container-name": "nginx", + "org.opencontainers.image.ref.name": "ubuntu", + "org.opencontainers.image.source": "https://github.com/DataDog/datadog-nginx", + "org.opencontainers.image.version": "23.04" + }, + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { + "CPU": 10, + "Memory": 512 + }, + "CreatedAt": "2023-10-18T19:00:40.244667009Z", + "StartedAt": "2023-10-18T19:00:41.083094165Z", + "Type": "NORMAL", + "Health": { + "status": "HEALTHY", + "statusSince": "2023-12-27T15:39:53.043973594Z", + "output": "Nginx health: PASS" + }, + "Volumes": [ + { + "Source": "/var/run/docker.sock", + "Destination": "/var/run/docker.sock" + }, + { + "Source": "/sys/fs/cgroup", + "Destination": "/host/sys/fs/cgroup" + }, + { + "Source": "/var/lib/ecs/deps/execute-command/config/seelog.xml", + "Destination": "/ecs-execute-command/configuration/seelog.xml" + } + ], + "LogDriver": "awslogs", + "LogOptions": { + "awslogs-group": "nginx", + "awslogs-region": "us-east-1", + "awslogs-stream": "log_router/nginx" + }, + "ContainerARN": "arn:aws:ecs:us-east-1:601427279990:container/ecs-cluster/14d5ab87e06f/52ff1eab-fb20", + "Networks": [ + { + "NetworkMode": "bridge", + "IPv4Addresses": [ + "172.17.0.3" + ] + } + ] + } + ], + "VPCID": "vpc-124", + "ServiceName": "nginx-ec2" +} diff --git a/comp/core/workloadmeta/collectors/internal/ecs/testdata/tasks.json b/comp/core/workloadmeta/collectors/internal/ecs/testdata/tasks.json new file mode 100644 index 00000000000000..16dc06754c3b57 --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecs/testdata/tasks.json @@ -0,0 +1,46 @@ +{ + "Tasks": [ + { + "Arn": "arn:aws:ecs:us-east-1:123457279990:task/ecs-cluster/e39c666b3b134c2ca2241c4c3c64e6ac", + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Family": "nginx", + "Version": "3", + "Containers": [ + { + "DockerId": "2ad9e753a0dfbba1c91e0e7bebaaf3a0918d3ef304b7549b1ced5f573bc05645", + "DockerName": "nginx-3-stress-my-nginx-aa9883f7f3d6c5f73e00", + "Name": "my-nginx", + "Image": "ghcr.io/l9n41c/my-nginx:main", + "ImageID": "sha256:5365aa84ac82631670d111e43553ae75285d0335dd2fcca143cf0e9d6f88cd185", + "CreatedAt": "2023-11-13T19:33:52.310430928Z", + "StartedAt": "2023-11-13T19:33:52.939982634Z" + } + ] + }, + { + "Arn": "arn:aws:ecs:us-east-1:123457279990:task/ecs-cluster/7d2dae60ad844c608fb2d44215a46f6f", + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Family": "datadog-agent", + "Version": "15", + "Containers": [ + { + "DockerId": "749d28eb7145ff3b6c52b71c59b381c70a884c1615e9f99516f027492679496e", + "DockerName": "datadog-agent-15-datadog-agent-b2bb99c4dda5b2b25000", + "Name": "datadog-agent", + "Image": "datadog/datadog-agent:7.50.0", + "ImageID": "sha256:e4c58958181a5925816faa528ce959e487632f4cfc192f8132f71b32df2744b4", + "CreatedAt": "2023-10-18T19:26:17.940550067Z", + "StartedAt": "2023-10-18T19:26:18.743130652Z", + "Volumes": [ + { + "Source": "/var/run/datadog", + "Destination": "/var/run/datadog" + } + ] + } + ] + } + ] +} diff --git a/comp/core/workloadmeta/collectors/util/ecs_util.go b/comp/core/workloadmeta/collectors/util/ecs_util.go new file mode 100644 index 00000000000000..d83f9cef350443 --- /dev/null +++ b/comp/core/workloadmeta/collectors/util/ecs_util.go @@ -0,0 +1,251 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +package util + +import ( + "strconv" + "strings" + "time" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" + "github.com/DataDog/datadog-agent/pkg/util/flavor" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +// Isv4TaskEnabled returns true if the v4 task metadata collection is enabled +func Isv4TaskEnabled() bool { + return config.Datadog.GetBool("ecs_metadata_use_v4") && (flavor.GetFlavor() == flavor.DefaultAgent) +} + +// ParseV4Task parses a metadata v4 task into a workloadmeta.ECSTask +func ParseV4Task(task v3or4.Task, seen map[workloadmeta.EntityID]struct{}) []workloadmeta.CollectorEvent { + events := []workloadmeta.CollectorEvent{} + entityID := workloadmeta.EntityID{ + Kind: workloadmeta.KindECSTask, + ID: task.TaskARN, + } + + seen[entityID] = struct{}{} + + arnParts := strings.Split(task.TaskARN, "/") + taskID := arnParts[len(arnParts)-1] + + taskContainers, containerEvents := ParseV4TaskContainers(task, seen) + region, awsAccountID := parseRegionAndAWSAccountID(task.TaskARN) + + entity := &workloadmeta.ECSTask{ + EntityID: entityID, + EntityMeta: workloadmeta.EntityMeta{ + Name: taskID, + }, + ClusterName: parseClusterName(task.ClusterName), + AWSAccountID: awsAccountID, + Region: region, + Family: task.Family, + Version: task.Version, + DesiredStatus: task.DesiredStatus, + KnownStatus: task.KnownStatus, + VPCID: task.VPCID, + ServiceName: task.ServiceName, + EphemeralStorageMetrics: task.EphemeralStorageMetrics, + Limits: task.Limits, + AvailabilityZone: task.AvailabilityZone, + Containers: taskContainers, + Tags: task.TaskTags, + ContainerInstanceTags: task.ContainerInstanceTags, + PullStartedAt: parseTime(taskID, "PullStartedAt", task.PullStartedAt), + PullStoppedAt: parseTime(taskID, "PullStoppedAt", task.PullStoppedAt), + ExecutionStoppedAt: parseTime(taskID, "ExecutionStoppedAt", task.ExecutionStoppedAt), + } + + source := workloadmeta.SourceNodeOrchestrator + entity.LaunchType = workloadmeta.ECSLaunchTypeEC2 + if strings.ToUpper(task.LaunchType) == "FARGATE" { + entity.LaunchType = workloadmeta.ECSLaunchTypeFargate + source = workloadmeta.SourceRuntime + } + + events = append(events, containerEvents...) + events = append(events, workloadmeta.CollectorEvent{ + Source: source, + Type: workloadmeta.EventTypeSet, + Entity: entity, + }) + + return events +} + +// ParseV4TaskContainers extracts containers from a metadata v4 task and parse them +func ParseV4TaskContainers( + task v3or4.Task, + seen map[workloadmeta.EntityID]struct{}, +) ([]workloadmeta.OrchestratorContainer, []workloadmeta.CollectorEvent) { + taskContainers := make([]workloadmeta.OrchestratorContainer, 0, len(task.Containers)) + events := make([]workloadmeta.CollectorEvent, 0, len(task.Containers)) + + for _, container := range task.Containers { + containerID := container.DockerID + taskContainers = append(taskContainers, workloadmeta.OrchestratorContainer{ + ID: containerID, + Name: container.Name, + }) + entityID := workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: containerID, + } + + seen[entityID] = struct{}{} + + image, err := workloadmeta.NewContainerImage(container.ImageID, container.Image) + if err != nil { + log.Debugf("cannot split image name %q: %s", container.Image, err) + } + + ips := make(map[string]string) + for _, net := range container.Networks { + if net.NetworkMode == "awsvpc" && len(net.IPv4Addresses) > 0 { + ips["awsvpc"] = net.IPv4Addresses[0] + } + } + + containerEvent := &workloadmeta.Container{ + EntityID: entityID, + EntityMeta: workloadmeta.EntityMeta{ + Name: container.DockerName, + Labels: container.Labels, + }, + State: workloadmeta.ContainerState{ + Running: container.KnownStatus == "RUNNING", + ExitCode: container.ExitCode, + }, + Owner: &workloadmeta.EntityID{ + Kind: workloadmeta.KindECSTask, + ID: task.TaskARN, + }, + ContainerName: container.Name, + Limits: container.Limits, + Image: image, + Type: container.Type, + KnownStatus: container.KnownStatus, + DesiredStatus: container.DesiredStatus, + LogOptions: container.LogOptions, + LogDriver: container.LogDriver, + ContainerARN: container.ContainerARN, + Snapshotter: container.Snapshotter, + NetworkIPs: ips, + Networks: make([]workloadmeta.ContainerNetwork, 0, len(container.Networks)), + Ports: make([]workloadmeta.ContainerPort, 0, len(container.Ports)), + Volumes: make([]workloadmeta.ContainerVolume, 0, len(container.Volumes)), + } + + if container.StartedAt != "" { + containerEvent.State.StartedAt = *parseTime(containerID, "StartedAt", container.StartedAt) + } + if container.CreatedAt != "" { + containerEvent.State.CreatedAt = *parseTime(containerID, "CreatedAt", container.CreatedAt) + } + + for _, network := range container.Networks { + containerEvent.Networks = append(containerEvent.Networks, workloadmeta.ContainerNetwork{ + NetworkMode: network.NetworkMode, + IPv4Addresses: network.IPv4Addresses, + IPv6Addresses: network.IPv6Addresses, + }) + } + + for _, port := range container.Ports { + containerEvent.Ports = append(containerEvent.Ports, workloadmeta.ContainerPort{ + Port: int(port.ContainerPort), + Protocol: port.Protocol, + HostPort: port.HostPort, + HostIP: port.HostIP, + }) + } + + for _, volume := range container.Volumes { + containerEvent.Volumes = append(containerEvent.Volumes, workloadmeta.ContainerVolume{ + DockerName: volume.DockerName, + Source: volume.Source, + Destination: volume.Destination, + }) + } + + if container.Health != nil { + containerEvent.Health = &workloadmeta.ContainerHealthStatus{ + Status: container.Health.Status, + Since: parseTime(containerID, "Health.Since", container.Health.Since), + ExitCode: container.Health.ExitCode, + Output: container.Health.Output, + } + } + + source := workloadmeta.SourceNodeOrchestrator + containerEvent.Runtime = workloadmeta.ContainerRuntimeDocker + if task.LaunchType == "FARGATE" { + source = workloadmeta.SourceRuntime + containerEvent.Runtime = workloadmeta.ContainerRuntimeECSFargate + } + + events = append(events, workloadmeta.CollectorEvent{ + Source: source, + Type: workloadmeta.EventTypeSet, + Entity: containerEvent, + }) + } + + return taskContainers, events +} + +func parseTime(fieldOwner, fieldName, fieldValue string) *time.Time { + if fieldValue == "" { + return nil + } + result, err := time.Parse(time.RFC3339, fieldValue) + if err != nil { + log.Debugf("cannot parse %s %s for %s: %s", fieldName, fieldValue, fieldOwner, err) + } + return &result +} + +// parseRegionAndAWSAccountID parses the region and AWS account ID from a task ARN. +func parseRegionAndAWSAccountID(taskARN string) (string, int) { + arnParts := strings.Split(taskARN, ":") + if len(arnParts) < 5 { + return "", 0 + } + if arnParts[0] != "arn" || arnParts[1] != "aws" { + return "", 0 + } + region := arnParts[3] + if strings.Count(region, "-") < 2 { + region = "" + } + + id := arnParts[4] + // aws account id is 12 digits + // https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-identifiers.html + if len(id) != 12 { + return region, 0 + } + awsAccountID, err := strconv.Atoi(id) + if err != nil { + return region, 0 + } + + return region, awsAccountID +} + +func parseClusterName(cluster string) string { + parts := strings.Split(cluster, "/") + if len(parts) != 2 { + return cluster + } + return parts[1] +} diff --git a/comp/core/workloadmeta/collectors/util/ecs_util_test.go b/comp/core/workloadmeta/collectors/util/ecs_util_test.go new file mode 100644 index 00000000000000..07debfa7a477a9 --- /dev/null +++ b/comp/core/workloadmeta/collectors/util/ecs_util_test.go @@ -0,0 +1,28 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseRegionAndAWSAccountID(t *testing.T) { + // test valid arn + arn := "arn:aws:ecs:us-east-1:123456789012:task/12345678-1234-1234-1234-123456789012" + region, awsAccountID := parseRegionAndAWSAccountID(arn) + require.Equal(t, "us-east-1", region) + require.Equal(t, 123456789012, awsAccountID) + + // test invalid arn + arn = "arn:aws:ecs:us-east-1:123:task/12345678-1234-1234-1234-123456789012" + region, awsAccountID = parseRegionAndAWSAccountID(arn) + require.Equal(t, "us-east-1", region) + require.Equal(t, 0, awsAccountID) +} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index c28c0cb7829986..9ed96221b2d755 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -763,6 +763,11 @@ func InitConfig(config pkgconfigmodel.Config) { config.BindEnvAndSetDefault("ecs_collect_resource_tags_ec2", false) config.BindEnvAndSetDefault("ecs_resource_tags_replace_colon", false) config.BindEnvAndSetDefault("ecs_metadata_timeout", 500) // value in milliseconds + config.BindEnvAndSetDefault("ecs_metadata_use_v4", false) + config.BindEnvAndSetDefault("ecs_ec2_task_cache_ttl", 3*time.Minute) + config.BindEnvAndSetDefault("ecs_ec2_task_limit_per_run", 1000) + config.BindEnvAndSetDefault("ecs_ec2_task_rate", 35) + config.BindEnvAndSetDefault("ecs_ec2_task_burst", 60) // GCE config.BindEnvAndSetDefault("collect_gce_tags", true)