Skip to content

Commit

Permalink
update ECS collector to use v4 endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kangyili committed Feb 28, 2024
1 parent ebcade3 commit e7d3c14
Show file tree
Hide file tree
Showing 8 changed files with 926 additions and 20 deletions.
258 changes: 240 additions & 18 deletions comp/core/workloadmeta/collectors/internal/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@ package ecs

import (
"context"
"fmt"
"hash/fnv"
"reflect"
"strings"
"time"

"github.com/patrickmn/go-cache"
"golang.org/x/time/rate"

"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/errors"
ecsutil "github.com/DataDog/datadog-agent/pkg/util/ecs"
ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata"
v1 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v1"
v3or4 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/retry"

"go.uber.org/fx"
)
Expand All @@ -30,16 +39,22 @@ const (
)

type collector struct {
id string
store workloadmeta.Component
catalog workloadmeta.AgentType
metaV1 v1.Client
metaV3or4 func(metaURI, metaVersion string) v3or4.Client
clusterName string
hasResourceTags bool
collectResourceTags bool
resourceTags map[string]resourceTags
seen map[workloadmeta.EntityID]struct{}
id string
store workloadmeta.Component
catalog workloadmeta.AgentType
metaV1 v1.Client
metaV3or4 func(metaURI, metaVersion string) v3or4.Client
clusterName string
hasResourceTags bool
collectResourceTags bool
resourceTags map[string]resourceTags
seen map[workloadmeta.EntityID]struct{}
v4TaskEnabled bool
v4TaskCache *cache.Cache
v4TaskRefreshInterval time.Duration
v4TaskQueue []string
v4TaskNumberLimitPerRun int
v4TaskRateLimiter *rate.Limiter
}

type resourceTags struct {
Expand All @@ -49,12 +64,24 @@ type resourceTags struct {

// NewCollector returns a new ecs collector provider and an error
func NewCollector() (workloadmeta.CollectorProvider, error) {
v4TaskEnabled := util.Isv4TaskEnabled()
v4TaskRefreshInterval := config.Datadog.GetDuration("ecs_ec2_task_cache_ttl")
v4TaskNumberLimitPerRun := config.Datadog.GetInt("ecs_ec2_task_limit_per_run")
v4TaskRateRPS := config.Datadog.GetInt("ecs_ec2_task_rate")
v4TaskRateBurst := config.Datadog.GetInt("ecs_ec2_task_burst")

return workloadmeta.CollectorProvider{
Collector: &collector{
id: collectorID,
resourceTags: make(map[string]resourceTags),
seen: make(map[workloadmeta.EntityID]struct{}),
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
id: collectorID,
resourceTags: make(map[string]resourceTags),
seen: make(map[workloadmeta.EntityID]struct{}),
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
v4TaskEnabled: v4TaskEnabled,
v4TaskCache: cache.New(v4TaskRefreshInterval, 30*time.Second),
v4TaskQueue: make([]string, 0, 2*v4TaskNumberLimitPerRun),
v4TaskRefreshInterval: v4TaskRefreshInterval,
v4TaskNumberLimitPerRun: v4TaskNumberLimitPerRun,
v4TaskRateLimiter: rate.NewLimiter(rate.Every(time.Duration(1/v4TaskRateRPS)*time.Second), v4TaskRateBurst),
},
}, nil
}
Expand Down Expand Up @@ -105,8 +132,11 @@ func (c *collector) Pull(ctx context.Context) error {
// immutable: the list of containers in the task changes as containers
// don't get added until they actually start running, and killed
// containers will get re-created.
c.store.Notify(c.parseTasks(ctx, tasks))

if c.v4TaskEnabled {
c.store.Notify(c.parseV4Tasks(ctx, tasks))
} else {
c.store.Notify(c.parseTasks(ctx, tasks))
}
return nil
}

Expand All @@ -124,7 +154,7 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm

for _, task := range tasks {
// We only want to collect tasks without a STOPPED status.
if task.KnownStatus == "STOPPED" {
if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped {
continue
}

Expand Down Expand Up @@ -166,6 +196,43 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm
})
}

return c.setLastSeenEntitiesAndUnsetEvents(events, seen)
}

// parseV4Tasks queries the v4 task endpoint for each task, parses them and stores them in the store.
func (c *collector) parseV4Tasks(ctx context.Context, tasks []v1.Task) []workloadmeta.CollectorEvent {
events := []workloadmeta.CollectorEvent{}
seen := make(map[workloadmeta.EntityID]struct{})
// get task ARNs to fetch from the metadata v4 API
taskArns := c.getTaskArnsToFetch(tasks)

for _, task := range tasks {
// We only want to collect tasks without a STOPPED status.
if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped {
continue
}

var v4Task v3or4.Task
if _, ok := taskArns[task.Arn]; ok {
v4Task = c.getV4TaskWithTags(ctx, task)
} else {
// if the task is not returned by getTaskArnsToFetch, it means it has been fetched during previous runs
// retrieve it from the cache
taskCached, found := c.v4TaskCache.Get(task.Arn)
if found {
v4Task = *taskCached.(*v3or4.Task)
} else {
v4Task = v1TaskToV4Task(task)
}
}

events = append(events, util.ParseV4Task(v4Task, seen)...)
}

return c.setLastSeenEntitiesAndUnsetEvents(events, seen)
}

func (c *collector) setLastSeenEntitiesAndUnsetEvents(events []workloadmeta.CollectorEvent, seen map[workloadmeta.EntityID]struct{}) []workloadmeta.CollectorEvent {
for seenID := range c.seen {
if _, ok := seen[seenID]; ok {
continue
Expand Down Expand Up @@ -194,7 +261,6 @@ func (c *collector) parseTasks(ctx context.Context, tasks []v1.Task) []workloadm
}

c.seen = seen

return events
}

Expand Down Expand Up @@ -233,6 +299,121 @@ func (c *collector) parseTaskContainers(
return taskContainers, events
}

// getV4TaskWithTags fetches task and tasks from the metadata v4 API
func (c *collector) getV4TaskWithTags(ctx context.Context, task v1.Task) v3or4.Task {
var metaURI string
for _, taskContainer := range task.Containers {
containerID := taskContainer.DockerID
container, err := c.store.GetContainer(containerID)
if err != nil {
log.Tracef("cannot find container %q found in task %s: %s", taskContainer, task.Arn, err)
continue
}

uri, ok := container.EnvVars[v3or4.DefaultMetadataURIv4EnvVariable]
if ok && uri != "" {
metaURI = uri
break
}
}

if metaURI == "" {
log.Errorf("failed to get client for metadata v4 API from task %s and the following containers: %v", task.Arn, task.Containers)
return v1TaskToV4Task(task)
}

err := c.v4TaskRateLimiter.Wait(ctx)
if err != nil {
log.Warnf("failed to get task with tags from metadata v4 API: %s", err)
return v1TaskToV4Task(task)
}

taskWithTags, err := getV4TaskWithRetry(ctx, c.metaV3or4(metaURI, "v4"))
if err != nil {
log.Warnf("failed to get task with tags from metadata v4 API: %s", err)
return v1TaskToV4Task(task)
}

c.v4TaskCache.Set(task.Arn, taskWithTags, c.v4TaskRefreshInterval+jitter(task.Arn))

return *taskWithTags
}

func getV4TaskWithRetry(ctx context.Context, metaV3orV4 v3or4.Client) (*v3or4.Task, error) {
var taskWithTagsRetry retry.Retrier
var taskWithTags *v3or4.Task
var err error
maxRetryCount := 3
retryCount := 0

_ = taskWithTagsRetry.SetupRetrier(&retry.Config{
Name: "get-v4-task-with-tags",
AttemptMethod: func() error {
retryCount++
taskWithTags, err = metaV3orV4.GetTaskWithTags(ctx)
return err
},
Strategy: retry.Backoff,
InitialRetryDelay: 250 * time.Millisecond,
MaxRetryDelay: 1 * time.Second,
})

// retry 3 times with exponential backoff strategy: 250ms, 500ms, 1s
for {
err = taskWithTagsRetry.TriggerRetry()
if err == nil || (reflect.ValueOf(err).Kind() == reflect.Ptr && reflect.ValueOf(err).IsNil()) {
break
}

if retry.IsErrPermaFail(err) {
return nil, err
}

if retryCount >= maxRetryCount {
return nil, fmt.Errorf("failed to get task with tags from metadata v4 API after %d retries", maxRetryCount)
}
}
return taskWithTags, nil
}

// getTaskArnsToFetch returns a list of task ARNs to fetch from the metadata v4 API
// It uses v4TaskCache to know whether a task has been fetched during previous runs
// The length of taskArns is limited by v4TaskNumberLimitPerRun to avoid long time running for a single pull
func (c *collector) getTaskArnsToFetch(tasks []v1.Task) map[string]struct{} {
taskArns := make(map[string]struct{}, c.v4TaskNumberLimitPerRun)

for _, task := range tasks {
if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped {
continue
}
c.v4TaskQueue = append(c.v4TaskQueue, task.Arn)
}

index := 0
for _, taskArn := range c.v4TaskQueue {
if len(taskArns) >= c.v4TaskNumberLimitPerRun {
break
}

// Task is in the queue but not in current running task list
// It means the task has been stopped, skip it
if !hasTask(taskArn, tasks) {
index++
continue
}

// if task is not in the cache or expired, add it
if _, ok := c.v4TaskCache.Get(taskArn); !ok {
taskArns[taskArn] = struct{}{}
}
index++
}

c.v4TaskQueue = c.v4TaskQueue[index:]

return taskArns
}

// getResourceTags fetches task and container instance tags from the ECS API,
// and caches them for the lifetime of the task, to avoid hitting throttling
// limits from tasks being updated on every pull. Tags won't change in the
Expand Down Expand Up @@ -294,3 +475,44 @@ func (c *collector) getResourceTags(ctx context.Context, entity *workloadmeta.EC

return rt
}

func v1TaskToV4Task(task v1.Task) v3or4.Task {
result := v3or4.Task{
TaskARN: task.Arn,
DesiredStatus: task.DesiredStatus,
KnownStatus: task.KnownStatus,
Family: task.Family,
Version: task.Version,
Containers: make([]v3or4.Container, 0, len(task.Containers)),
}

for _, container := range task.Containers {
result.Containers = append(result.Containers, v3or4.Container{
Name: container.Name,
DockerName: container.DockerName,
DockerID: container.DockerID,
})
}
return result
}

var hash32 = fnv.New32a()

func jitter(s string) time.Duration {
defer hash32.Reset()
_, err := hash32.Write([]byte(s))
if err != nil {
return 0
}
second := time.Duration(hash32.Sum32()%61) * time.Second
return second
}

func hasTask(taskARN string, tasks []v1.Task) bool {
for _, task := range tasks {
if task.Arn == taskARN {
return true
}
}
return false
}
Loading

0 comments on commit e7d3c14

Please sign in to comment.