diff --git a/pkg/structs/process.go b/pkg/structs/process.go index 431f5dc470..a0cd70620d 100644 --- a/pkg/structs/process.go +++ b/pkg/structs/process.go @@ -8,18 +8,19 @@ import ( type Process struct { Id string `json:"id"` - App string `json:"app"` - Command string `json:"command"` - Cpu float64 `json:"cpu"` - Host string `json:"host"` - Image string `json:"image"` - Instance string `json:"instance"` - Memory float64 `json:"memory"` - Name string `json:"name"` - Ports []string `json:"ports"` - Release string `json:"release"` - Started time.Time `json:"started"` - Status string `json:"status"` + App string `json:"app"` + Command string `json:"command"` + Cpu float64 `json:"cpu"` + Host string `json:"host"` + Image string `json:"image"` + Instance string `json:"instance"` + Memory float64 `json:"memory"` + Name string `json:"name"` + Ports []string `json:"ports"` + Release string `json:"release"` + Started time.Time `json:"started"` + Status string `json:"status"` + TaskDefinition string `json:"task_definition"` } type Processes []Process diff --git a/provider/aws/aws_test.go b/provider/aws/aws_test.go index c402fc0127..f39f14b266 100644 --- a/provider/aws/aws_test.go +++ b/provider/aws/aws_test.go @@ -5,10 +5,13 @@ import ( "net/http/httptest" "os" + "github.com/aws/aws-sdk-go/service/cloudwatch" "github.com/convox/logger" + mockaws "github.com/convox/rack/pkg/mock/aws" "github.com/convox/rack/pkg/structs" "github.com/convox/rack/pkg/test/awsutil" "github.com/convox/rack/provider/aws" + "github.com/stretchr/testify/mock" ) func init() { @@ -34,6 +37,11 @@ func StubAwsProvider(cycles ...awsutil.Cycle) *AwsStub { os.Setenv("AWS_ACCESS_KEY_ID", "test-access") os.Setenv("AWS_SECRET_ACCESS_KEY", "test-secret") + cw := &mockaws.CloudWatchAPI{} + output := &cloudwatch.GetMetricDataOutput{MetricDataResults: []*cloudwatch.MetricDataResult{}} + + cw.On("GetMetricData", mock.Anything).Return(output, nil) + p := &aws.Provider{ Region: "us-test-1", Endpoint: s.URL, @@ -46,6 +54,7 @@ func StubAwsProvider(cycles ...awsutil.Cycle) *AwsStub { Rack: "convox", SettingsBucket: "convox-settings", SkipCache: true, + CloudWatch: cw, } return &AwsStub{p, s} diff --git a/provider/aws/helpers.go b/provider/aws/helpers.go index c4fc60476c..6f61d9324f 100644 --- a/provider/aws/helpers.go +++ b/provider/aws/helpers.go @@ -1509,3 +1509,7 @@ func (cr *CronJob) LongName() string { } return prefix + suffix } + +func serviceMetricsKey(metricType, serviceName string) string { + return fmt.Sprintf("service:%s:utilization:%s", metricType, strings.ReplaceAll(serviceName, "-", "_")) +} diff --git a/provider/aws/metrics.go b/provider/aws/metrics.go index 1c21470ed5..cc477fc8a7 100644 --- a/provider/aws/metrics.go +++ b/provider/aws/metrics.go @@ -307,6 +307,34 @@ func (p *Provider) systemMetricQueries() []metricDataQuerier { return mdqs } +func (p *Provider) servicesMetricQueries(names []string) []metricDataQuerier { + mdqs := []metricDataQuerier{} + + for _, name := range names { + mdqs = append(mdqs, metricStatistics{ + Name: serviceMetricsKey("cpu", name), + Namespace: "AWS/ECS", + Metric: "CPUUtilization", + Dimensions: map[string]string{ + "ClusterName": p.Cluster, + "ServiceName": name, + }, + Statistics: []string{"Average"}, + }, metricStatistics{ + Name: serviceMetricsKey("mem", name), + Namespace: "AWS/ECS", + Metric: "MemoryUtilization", + Dimensions: map[string]string{ + "ClusterName": p.Cluster, + "ServiceName": name, + }, + Statistics: []string{"Average"}, + }) + } + + return mdqs +} + type metricDataQuerier interface { MetricDataQueries(period int64, suffix string) []*cloudwatch.MetricDataQuery } diff --git a/provider/aws/processes.go b/provider/aws/processes.go index 7ced9740c7..257a31986c 100644 --- a/provider/aws/processes.go +++ b/provider/aws/processes.go @@ -171,8 +171,53 @@ func (p *Provider) ProcessList(app string, opts structs.ProcessListOptions) (str ps = pss } + taskDefMap := map[string]bool{} for i := range ps { ps[i].App = app + taskDefMap[ps[i].TaskDefinition] = true + } + + services, err := p.clusterServices() + if err != nil { + return nil, err + } + + serviceNames := []string{} + taskToServiceMap := map[string]string{} + for _, s := range services { + if s.ServiceName != nil && s.TaskDefinition != nil && taskDefMap[*s.TaskDefinition] { + serviceNames = append(serviceNames, *s.ServiceName) + taskToServiceMap[*s.TaskDefinition] = *s.ServiceName + } + } + + mdqs := p.servicesMetricQueries(serviceNames) + + ms, err := p.cloudwatchMetrics(mdqs, structs.MetricsOptions{ + Start: aws.Time(time.Now().Add(-5 * time.Minute)), + }) + if err != nil { + return nil, err + } + + mMap := map[string]structs.Metric{} + for _, m := range ms { + mMap[m.Name] = m + } + + for i := range ps { + if serviceName, has := taskToServiceMap[ps[i].TaskDefinition]; has { + if m, has := mMap[serviceMetricsKey("mem", serviceName)]; has && len(m.Values) > 0 { + // normally there should be one point but if multiple points are fetched, pick the latest one + // points are sorted in TimestampAscending order + ps[i].Memory = m.Values[len(m.Values)-1].Average + } + if m, has := mMap[serviceMetricsKey("cpu", serviceName)]; has && len(m.Values) > 0 { + // normally there should be one point but if multiple points are fetched, pick the latest one + // points are sorted in TimestampAscending order + ps[i].Cpu = m.Values[len(m.Values)-1].Average + } + } } return ps, nil @@ -683,13 +728,14 @@ func (p *Provider) fetchProcess(task *ecs.Task, psch chan structs.Process, errch } ps := structs.Process{ - Id: arnToPid(*task.TaskArn), - Name: *container.Name, - App: coalesces(labels["convox.app"], env["APP"]), - Release: coalesces(labels["convox.release"], env["RELEASE"]), - Image: *cd.Image, - Ports: ports, - Status: taskStatus(*task.LastStatus), + Id: arnToPid(*task.TaskArn), + Name: *container.Name, + App: coalesces(labels["convox.app"], env["APP"]), + Release: coalesces(labels["convox.release"], env["RELEASE"]), + Image: *cd.Image, + Ports: ports, + Status: taskStatus(*task.LastStatus), + TaskDefinition: *task.TaskDefinitionArn, } if task.ContainerInstanceArn != nil { diff --git a/provider/aws/processes_test.go b/provider/aws/processes_test.go index cbef1ae4fd..3818fa66f2 100644 --- a/provider/aws/processes_test.go +++ b/provider/aws/processes_test.go @@ -34,6 +34,8 @@ func TestProcessExec(t *testing.T) { cycleProcessDescribeTaskDefinition1, cycleProcessDescribeContainerInstances, cycleProcessDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, cycleProcessListTasksRunning, cycleProcessListTasksStopped, cycleProcessDescribeTasks, @@ -88,6 +90,8 @@ func TestProcessList(t *testing.T) { cycleProcessDescribeTaskDefinition2, cycleProcessDescribeContainerInstances, cycleProcessDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, ) defer provider.Close() @@ -95,32 +99,34 @@ func TestProcessList(t *testing.T) { ps := structs.Processes{ structs.Process{ - Id: "5850760f0845", - App: "myapp", - Name: "web", - Release: "R1234", - Command: "", - Host: "10.0.1.244", - Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", - Instance: "i-5bc45dc2", - Ports: []string{}, - Cpu: 0, - Memory: 0, - Status: "running", + Id: "5850760f0845", + App: "myapp", + Name: "web", + Release: "R1234", + Command: "", + Host: "10.0.1.244", + Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", + Instance: "i-5bc45dc2", + Ports: []string{}, + Cpu: 0, + Memory: 0, + Status: "running", + TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34", }, structs.Process{ - Id: "5850760f0846", - App: "myapp", - Name: "web", - Release: "R1234", - Command: "ls -la 'name space'", - Host: "10.0.1.244", - Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", - Instance: "i-5bc45dc2", - Ports: []string{}, - Cpu: 0, - Memory: 0, - Status: "pending", + Id: "5850760f0846", + App: "myapp", + Name: "web", + Release: "R1234", + Command: "ls -la 'name space'", + Host: "10.0.1.244", + Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", + Instance: "i-5bc45dc2", + Ports: []string{}, + Cpu: 0, + Memory: 0, + Status: "pending", + TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34", }, } @@ -137,6 +143,8 @@ func TestProcessListEmpty(t *testing.T) { cycleProcessListTasksByService2Empty, cycleProcessListTasksByStartedEmpty, cycleProcessDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, ) defer provider.Close() @@ -164,6 +172,8 @@ func TestProcessListWithBuildCluster(t *testing.T) { cycleProcessDescribeTaskDefinition2, cycleProcessDescribeContainerInstancesBuild, cycleProcessDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, ) defer provider.Close() @@ -173,46 +183,49 @@ func TestProcessListWithBuildCluster(t *testing.T) { ps := structs.Processes{ structs.Process{ - Id: "5850760f0845", - App: "myapp", - Name: "web", - Release: "R1234", - Command: "", - Host: "10.0.1.244", - Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", - Instance: "i-5bc45dc2", - Ports: []string{}, - Cpu: 0, - Memory: 0, - Status: "running", + Id: "5850760f0845", + App: "myapp", + Name: "web", + Release: "R1234", + Command: "", + Host: "10.0.1.244", + Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", + Instance: "i-5bc45dc2", + Ports: []string{}, + Cpu: 0, + Memory: 0, + Status: "running", + TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34", }, structs.Process{ - Id: "5850760f0846", - App: "myapp", - Name: "web", - Release: "R1234", - Command: "ls -la 'name space'", - Host: "10.0.1.244", - Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", - Instance: "i-5bc45dc2", - Ports: []string{}, - Cpu: 0, - Memory: 0, - Status: "running", + Id: "5850760f0846", + App: "myapp", + Name: "web", + Release: "R1234", + Command: "ls -la 'name space'", + Host: "10.0.1.244", + Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", + Instance: "i-5bc45dc2", + Ports: []string{}, + Cpu: 0, + Memory: 0, + Status: "running", + TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34", }, structs.Process{ - Id: "5850760f0848", - App: "myapp", - Name: "web", - Release: "R1234", - Command: "", - Host: "10.0.1.244", - Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", - Instance: "i-5bc45dc2", - Ports: []string{}, - Cpu: 0, - Memory: 0, - Status: "running", + Id: "5850760f0848", + App: "myapp", + Name: "web", + Release: "R1234", + Command: "", + Host: "10.0.1.244", + Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", + Instance: "i-5bc45dc2", + Ports: []string{}, + Cpu: 0, + Memory: 0, + Status: "running", + TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34", }, } @@ -256,7 +269,7 @@ func TestProcessRunDetached(t *testing.T) { Release: options.String("RVFETUHHKKD"), }) - pse := &structs.Process{Id: "5850760f0845", App: "", Command: "ls -la 'name space'", Cpu: 0, Host: "10.0.1.244", Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", Instance: "i-5bc45dc2", Memory: 0, Name: "web", Ports: []string{}, Release: "R1234", Status: "running"} + pse := &structs.Process{Id: "5850760f0845", App: "", Command: "ls -la 'name space'", Cpu: 0, Host: "10.0.1.244", Image: "778743527532.dkr.ecr.us-east-1.amazonaws.com/convox-myapp-nkdecwppkq:web.BMPBJLITPZT", Instance: "i-5bc45dc2", Memory: 0, Name: "web", Ports: []string{}, Release: "R1234", Status: "running", TaskDefinition: "arn:aws:ecs:us-east-1:778743527532:task-definition/convox-myapp-web:34"} assert.NoError(t, err) assert.Equal(t, pse, psa) diff --git a/provider/aws/system.go b/provider/aws/system.go index c02cc28c48..37bc72ec42 100644 --- a/provider/aws/system.go +++ b/provider/aws/system.go @@ -366,10 +366,47 @@ func (p *Provider) SystemProcesses(opts structs.SystemProcessesOptions) (structs return nil, err } + services, err := p.clusterServices() + if err != nil { + return nil, err + } + + serviceNames := []string{} + taskToServiceMap := map[string]string{} + for _, s := range services { + if s.ServiceName != nil && s.TaskDefinition != nil { + serviceNames = append(serviceNames, *s.ServiceName) + taskToServiceMap[*s.TaskDefinition] = *s.ServiceName + } + } + + mdqs := p.servicesMetricQueries(serviceNames) + + ms, err := p.cloudwatchMetrics(mdqs, structs.MetricsOptions{ + Start: aws.Time(time.Now().Add(-5 * time.Minute)), + }) + if err != nil { + return nil, err + } + + mMap := map[string]structs.Metric{} + for _, m := range ms { + mMap[m.Name] = m + } + for i := range ps { if ps[i].App == "" { ps[i].App = p.Rack } + + if serviceName, has := taskToServiceMap[ps[i].TaskDefinition]; has { + if m, has := mMap[serviceMetricsKey("mem", serviceName)]; has && len(m.Values) > 0 { + ps[i].Memory = m.Values[len(m.Values)-1].Average + } + if m, has := mMap[serviceMetricsKey("cpu", serviceName)]; has && len(m.Values) > 0 { + ps[i].Cpu = m.Values[len(m.Values)-1].Average + } + } } return ps, nil diff --git a/provider/aws/system_test.go b/provider/aws/system_test.go index c45e409ba7..f0251631c6 100644 --- a/provider/aws/system_test.go +++ b/provider/aws/system_test.go @@ -252,6 +252,8 @@ func TestSystemProcessesList(t *testing.T) { cycleSystemDescribeTaskDefinition, cycleSystemDescribeContainerInstances, cycleSystemDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, cycleSystemDescribeTaskDefinition2, cycleSystemDescribeContainerInstances, ) @@ -269,6 +271,8 @@ func TestSystemProcessesListAll(t *testing.T) { cycleSystemDescribeTaskDefinition, cycleSystemDescribeContainerInstances, cycleSystemDescribeRackInstances, + cycleECSListServices, + cycleECSDescribeServices, cycleSystemDescribeTaskDefinition2, cycleSystemDescribeContainerInstances, )