From 4dd718b5e32cb60e75d7bf72dba03ac26270f3ce Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Fri, 31 May 2024 13:09:57 +0300 Subject: [PATCH] [receiver/kubeletstats] Add `k8s.container.cpu.node.utilization` metric (#32295) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** At the moment. We calculate the `k8s.container.cpu_limit_utilization` as [a ratio of the container's limits](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/kubeletstatsreceiver/documentation.md#k8scontainercpu_limit_utilization) at https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/867d6700c31446172e6998e602c55fbf7351831f/receiver/kubeletstatsreceiver/internal/kubelet/cpu.go#L30. Similarly we can calculate the cpu utilization as ratio of the whole node's allocatable cpu, if we divide by the total number of node's cores. We can retrieve this information from the Node's `Status.Capacity`, for example: ```console $ k get nodes kind-control-plane -ojsonpath='{.status.capacity}' {"cpu":"8","ephemeral-storage":"485961008Ki","hugepages-1Gi":"0","hugepages-2Mi":"0","memory":"32564732Ki","pods":"110"} ``` ## Performance concerns In order to get the Node's capacity we need an API call to the k8s API in order to get the Node object. Something to consider here is the performance impact that this extra API call would bring. We can always choose to have this metric as disabled by default and clearly specify in the docs that this metric comes with an extra API call to get the Node of the Pods. The good thing is that `kubeletstats` receiver target's only one node so I believe it's a safe assumption to only fetch the current node because all the observed Pods will belong to the one single local node. Correct me if I miss anything here. In addition, instead of performing the API call explicitly on every single `scrape` we can use an informer instead and leverage its cache. I can change this patch to this direction if we agree on this. Would love to hear other's opinions on this. ## Todos ✅ 1) Apply this change behind a feature gate as it was indicated at https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27885#issuecomment-2037784116 ✅ 2) Use an Informer instead of direct API calls. **Link to tracking Issue:** ref: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27885 **Testing:** I experimented with this approach and the results look correct. In order to verify this I deployed a stress Pod on my machine to consume a target cpu of 4 cores: ```yaml apiVersion: v1 kind: Pod metadata: name: cpu-stress spec: containers: - name: cpu-stress image: polinux/stress command: ["stress"] args: ["-c", "4"] ``` And then the collected `container.cpu.utilization` for that Pod's container was at `0,5` as exepcted, based that my machine-node comes with 8 cores in total: ![cpu-stress](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/11754898/3abe4a0d-6c99-4b4e-a704-da5789dde01b) Unit test is also included. **Documentation:** Added: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/32295/files#diff-8ad3b506fb1132c961e8da99b677abd31f0108e3f9ed6999dd96ad3297b51e08 --------- Signed-off-by: ChrsMark --- .chloggen/k8s_container_metrics.yaml | 27 ++ receiver/kubeletstatsreceiver/README.md | 30 +++ receiver/kubeletstatsreceiver/config.go | 20 ++ receiver/kubeletstatsreceiver/config_test.go | 44 +++- .../kubeletstatsreceiver/documentation.md | 8 + receiver/kubeletstatsreceiver/factory.go | 2 +- .../internal/kubelet/accumulator.go | 6 +- .../internal/kubelet/accumulator_test.go | 8 +- .../internal/kubelet/cpu.go | 21 +- .../internal/kubelet/metadata.go | 9 +- .../internal/kubelet/metadata_test.go | 32 +-- .../internal/kubelet/metrics_test.go | 2 +- .../internal/kubelet/volume_test.go | 2 +- .../internal/metadata/generated_config.go | 4 + .../metadata/generated_config_test.go | 2 + .../internal/metadata/generated_metrics.go | 57 ++++ .../metadata/generated_metrics_test.go | 15 ++ .../internal/metadata/metrics.go | 2 + .../internal/metadata/testdata/config.yaml | 4 + receiver/kubeletstatsreceiver/metadata.yaml | 7 + .../mocked_objects_test.go | 18 ++ receiver/kubeletstatsreceiver/scraper.go | 90 ++++++- receiver/kubeletstatsreceiver/scraper_test.go | 80 ++++++ .../kubeletstatsreceiver/testdata/config.yaml | 6 + ...t_scraper_cpu_util_nodelimit_expected.yaml | 244 ++++++++++++++++++ 25 files changed, 702 insertions(+), 38 deletions(-) create mode 100644 .chloggen/k8s_container_metrics.yaml create mode 100644 receiver/kubeletstatsreceiver/testdata/scraper/test_scraper_cpu_util_nodelimit_expected.yaml diff --git a/.chloggen/k8s_container_metrics.yaml b/.chloggen/k8s_container_metrics.yaml new file mode 100644 index 000000000000..40a95a8b30c7 --- /dev/null +++ b/.chloggen/k8s_container_metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kubeletstatsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add k8s.container.cpu.node.utilization metric + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27885] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/kubeletstatsreceiver/README.md b/receiver/kubeletstatsreceiver/README.md index 9607d4dd6653..844e5424a62e 100644 --- a/receiver/kubeletstatsreceiver/README.md +++ b/receiver/kubeletstatsreceiver/README.md @@ -218,6 +218,36 @@ receivers: - pod ``` +### Collect k8s.container.cpu.node.utilization as ratio of total node's capacity + +In order to calculate the `k8s.container.cpu.node.utilization` metric, the information of the node's capacity +must be retrieved from the k8s API. In this, the `k8s_api_config` needs to be set. +In addition, the node name must be identified properly. The `K8S_NODE_NAME` env var can be set using the +downward API inside the collector pod spec as follows: + +```yaml +env: + - name: K8S_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName +``` +Then set `node` value to `${env:K8S_NODE_NAME}` in the receiver's configuration: + +```yaml +receivers: + kubeletstats: + collection_interval: 10s + auth_type: 'serviceAccount' + endpoint: '${env:K8S_NODE_NAME}:10250' + node: '${env:K8S_NODE_NAME}' + k8s_api_config: + auth_type: serviceAccount + metrics: + k8s.container.cpu.node.utilization: + enabled: true +``` + ### Optional parameters The following parameters can also be specified: diff --git a/receiver/kubeletstatsreceiver/config.go b/receiver/kubeletstatsreceiver/config.go index 2133938cbb92..e8a66c8b6731 100644 --- a/receiver/kubeletstatsreceiver/config.go +++ b/receiver/kubeletstatsreceiver/config.go @@ -40,6 +40,19 @@ type Config struct { // Configuration of the Kubernetes API client. K8sAPIConfig *k8sconfig.APIConfig `mapstructure:"k8s_api_config"` + // NodeName is the node name to limit the discovery of nodes. + // For example, node name can be set using the downward API inside the collector + // pod spec as follows: + // + // env: + // - name: K8S_NODE_NAME + // valueFrom: + // fieldRef: + // fieldPath: spec.nodeName + // + // Then set this value to ${env:K8S_NODE_NAME} in the configuration. + NodeName string `mapstructure:"node"` + // MetricsBuilderConfig allows customizing scraped metrics/attributes representation. metadata.MetricsBuilderConfig `mapstructure:",squash"` } @@ -105,3 +118,10 @@ func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { return nil } + +func (cfg *Config) Validate() error { + if cfg.Metrics.K8sContainerCPUNodeUtilization.Enabled && cfg.NodeName == "" { + return errors.New("for k8s.container.cpu.node.utilization node setting is required. Check the readme on how to set the required setting") + } + return nil +} diff --git a/receiver/kubeletstatsreceiver/config_test.go b/receiver/kubeletstatsreceiver/config_test.go index f588fb56e5bd..117bb07b5165 100644 --- a/receiver/kubeletstatsreceiver/config_test.go +++ b/receiver/kubeletstatsreceiver/config_test.go @@ -31,9 +31,9 @@ func TestLoadConfig(t *testing.T) { duration := 10 * time.Second tests := []struct { - id component.ID - expected component.Config - expectedErr error + id component.ID + expected component.Config + expectedValidationErr string }{ { id: component.NewIDWithName(metadata.Type, "default"), @@ -173,6 +173,34 @@ func TestLoadConfig(t *testing.T) { MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), }, }, + { + id: component.NewIDWithName(metadata.Type, "container_cpu_node_utilization"), + expected: &Config{ + ControllerConfig: scraperhelper.ControllerConfig{ + CollectionInterval: duration, + InitialDelay: time.Second, + }, + ClientConfig: kube.ClientConfig{ + APIConfig: k8sconfig.APIConfig{ + AuthType: "tls", + }, + }, + MetricGroupsToCollect: []kubelet.MetricGroup{ + kubelet.ContainerMetricGroup, + kubelet.PodMetricGroup, + kubelet.NodeMetricGroup, + }, + MetricsBuilderConfig: metadata.MetricsBuilderConfig{ + Metrics: metadata.MetricsConfig{ + K8sContainerCPUNodeUtilization: metadata.MetricConfig{ + Enabled: true, + }, + }, + ResourceAttributes: metadata.DefaultResourceAttributesConfig(), + }, + }, + expectedValidationErr: "for k8s.container.cpu.node.utilization node setting is required. Check the readme on how to set the required setting", + }, } for _, tt := range tests { @@ -184,8 +212,14 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, component.UnmarshalConfig(sub, cfg)) - assert.NoError(t, component.ValidateConfig(cfg)) - assert.Equal(t, tt.expected, cfg) + err = component.ValidateConfig(cfg) + if tt.expectedValidationErr != "" { + assert.EqualError(t, err, tt.expectedValidationErr) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expected, cfg) + } + }) } } diff --git a/receiver/kubeletstatsreceiver/documentation.md b/receiver/kubeletstatsreceiver/documentation.md index 7f9687acc30b..684af59c5b09 100644 --- a/receiver/kubeletstatsreceiver/documentation.md +++ b/receiver/kubeletstatsreceiver/documentation.md @@ -402,6 +402,14 @@ The time since the container started | ---- | ----------- | ---------- | ----------------------- | --------- | | s | Sum | Int | Cumulative | true | +### k8s.container.cpu.node.utilization + +Container cpu utilization as a ratio of the node's capacity + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| 1 | Gauge | Double | + ### k8s.container.cpu_limit_utilization Container cpu utilization as a ratio of the container's limits diff --git a/receiver/kubeletstatsreceiver/factory.go b/receiver/kubeletstatsreceiver/factory.go index 11131f109f14..a39ebfcb36ae 100644 --- a/receiver/kubeletstatsreceiver/factory.go +++ b/receiver/kubeletstatsreceiver/factory.go @@ -68,7 +68,7 @@ func createMetricsReceiver( return nil, err } - scrp, err := newKubletScraper(rest, set, rOptions, cfg.MetricsBuilderConfig) + scrp, err := newKubletScraper(rest, set, rOptions, cfg.MetricsBuilderConfig, cfg.NodeName) if err != nil { return nil, err } diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go b/receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go index bf9cb0e6d48e..b226fb968709 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/accumulator.go @@ -56,7 +56,7 @@ func (a *metricDataAccumulator) nodeStats(s stats.NodeStats) { currentTime := pcommon.NewTimestampFromTime(a.time) addUptimeMetric(a.mbs.NodeMetricsBuilder, metadata.NodeUptimeMetrics.Uptime, s.StartTime, currentTime) - addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime, resources{}) + addCPUMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeCPUMetrics, s.CPU, currentTime, resources{}, 0) addMemoryMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeMemoryMetrics, s.Memory, currentTime, resources{}) addFilesystemMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeFilesystemMetrics, s.Fs, currentTime) addNetworkMetrics(a.mbs.NodeMetricsBuilder, metadata.NodeNetworkMetrics, s.Network, currentTime) @@ -76,7 +76,7 @@ func (a *metricDataAccumulator) podStats(s stats.PodStats) { currentTime := pcommon.NewTimestampFromTime(a.time) addUptimeMetric(a.mbs.PodMetricsBuilder, metadata.PodUptimeMetrics.Uptime, s.StartTime, currentTime) - addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime, a.metadata.podResources[s.PodRef.UID]) + addCPUMetrics(a.mbs.PodMetricsBuilder, metadata.PodCPUMetrics, s.CPU, currentTime, a.metadata.podResources[s.PodRef.UID], 0) addMemoryMetrics(a.mbs.PodMetricsBuilder, metadata.PodMemoryMetrics, s.Memory, currentTime, a.metadata.podResources[s.PodRef.UID]) addFilesystemMetrics(a.mbs.PodMetricsBuilder, metadata.PodFilesystemMetrics, s.EphemeralStorage, currentTime) addNetworkMetrics(a.mbs.PodMetricsBuilder, metadata.PodNetworkMetrics, s.Network, currentTime) @@ -110,7 +110,7 @@ func (a *metricDataAccumulator) containerStats(sPod stats.PodStats, s stats.Cont currentTime := pcommon.NewTimestampFromTime(a.time) resourceKey := sPod.PodRef.UID + s.Name addUptimeMetric(a.mbs.ContainerMetricsBuilder, metadata.ContainerUptimeMetrics.Uptime, s.StartTime, currentTime) - addCPUMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerCPUMetrics, s.CPU, currentTime, a.metadata.containerResources[resourceKey]) + addCPUMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerCPUMetrics, s.CPU, currentTime, a.metadata.containerResources[resourceKey], a.metadata.cpuNodeLimit) addMemoryMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerMemoryMetrics, s.Memory, currentTime, a.metadata.containerResources[resourceKey]) addFilesystemMetrics(a.mbs.ContainerMetricsBuilder, metadata.ContainerFilesystemMetrics, s.Rootfs, currentTime) diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/accumulator_test.go b/receiver/kubeletstatsreceiver/internal/kubelet/accumulator_test.go index 536f13161ab5..afcc7182cd56 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/accumulator_test.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/accumulator_test.go @@ -53,7 +53,7 @@ func TestMetadataErrorCases(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), testScenario: func(acc metricDataAccumulator) { now := metav1.Now() podStats := stats.PodStats{ @@ -79,7 +79,7 @@ func TestMetadataErrorCases(t *testing.T) { metricGroupsToCollect: map[MetricGroup]bool{ VolumeMetricGroup: true, }, - metadata: NewMetadata([]MetadataLabel{MetadataLabelVolumeType}, nil, nil), + metadata: NewMetadata([]MetadataLabel{MetadataLabelVolumeType}, nil, NodeLimits{}, nil), testScenario: func(acc metricDataAccumulator) { podStats := stats.PodStats{ PodRef: stats.PodReference{ @@ -121,7 +121,7 @@ func TestMetadataErrorCases(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), testScenario: func(acc metricDataAccumulator) { podStats := stats.PodStats{ PodRef: stats.PodReference{ @@ -165,7 +165,7 @@ func TestMetadataErrorCases(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), detailedPVCLabelsSetterOverride: func(*metadata.ResourceBuilder, string, string, string) error { // Mock failure cases. return errors.New("") diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/cpu.go b/receiver/kubeletstatsreceiver/internal/kubelet/cpu.go index 15507649147f..310b43ec9d68 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/cpu.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/cpu.go @@ -10,15 +10,27 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata" ) -func addCPUMetrics(mb *metadata.MetricsBuilder, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, currentTime pcommon.Timestamp, r resources) { +func addCPUMetrics( + mb *metadata.MetricsBuilder, + cpuMetrics metadata.CPUMetrics, + s *stats.CPUStats, + currentTime pcommon.Timestamp, + r resources, + nodeCPULimit float64) { if s == nil { return } - addCPUUsageMetric(mb, cpuMetrics, s, currentTime, r) + addCPUUsageMetric(mb, cpuMetrics, s, currentTime, r, nodeCPULimit) addCPUTimeMetric(mb, cpuMetrics.Time, s, currentTime) } -func addCPUUsageMetric(mb *metadata.MetricsBuilder, cpuMetrics metadata.CPUMetrics, s *stats.CPUStats, currentTime pcommon.Timestamp, r resources) { +func addCPUUsageMetric( + mb *metadata.MetricsBuilder, + cpuMetrics metadata.CPUMetrics, + s *stats.CPUStats, + currentTime pcommon.Timestamp, + r resources, + nodeCPULimit float64) { if s.UsageNanoCores == nil { return } @@ -26,6 +38,9 @@ func addCPUUsageMetric(mb *metadata.MetricsBuilder, cpuMetrics metadata.CPUMetri cpuMetrics.Utilization(mb, currentTime, value) cpuMetrics.Usage(mb, currentTime, value) + if nodeCPULimit > 0 { + cpuMetrics.NodeUtilization(mb, currentTime, value/nodeCPULimit) + } if r.cpuLimit > 0 { cpuMetrics.LimitUtilization(mb, currentTime, value/r.cpuLimit) } diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/metadata.go b/receiver/kubeletstatsreceiver/internal/kubelet/metadata.go index 01fca68a5941..392eb46dee70 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/metadata.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/metadata.go @@ -52,6 +52,7 @@ type Metadata struct { DetailedPVCResourceSetter func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error podResources map[string]resources containerResources map[string]resources + cpuNodeLimit float64 } type resources struct { @@ -61,6 +62,11 @@ type resources struct { memoryLimit int64 } +type NodeLimits struct { + Name string + CPUNanoCoresLimit float64 +} + func getContainerResources(r *v1.ResourceRequirements) resources { if r == nil { return resources{} @@ -74,7 +80,7 @@ func getContainerResources(r *v1.ResourceRequirements) resources { } } -func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList, +func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList, nodeResourceLimits NodeLimits, detailedPVCResourceSetter func(rb *metadata.ResourceBuilder, volCacheID, volumeClaim, namespace string) error) Metadata { m := Metadata{ Labels: getLabelsMap(labels), @@ -82,6 +88,7 @@ func NewMetadata(labels []MetadataLabel, podsMetadata *v1.PodList, DetailedPVCResourceSetter: detailedPVCResourceSetter, podResources: make(map[string]resources), containerResources: make(map[string]resources), + cpuNodeLimit: nodeResourceLimits.CPUNanoCoresLimit, } if podsMetadata != nil { diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/metadata_test.go b/receiver/kubeletstatsreceiver/internal/kubelet/metadata_test.go index be2e0329ca22..c5cb72b6f7df 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/metadata_test.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/metadata_test.go @@ -70,7 +70,7 @@ func TestSetExtraLabels(t *testing.T) { }{ { name: "no_labels", - metadata: NewMetadata([]MetadataLabel{}, nil, nil), + metadata: NewMetadata([]MetadataLabel{}, nil, NodeLimits{}, nil), args: []string{"uid", "container.id", "container"}, want: map[string]any{}, }, @@ -98,7 +98,7 @@ func TestSetExtraLabels(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), args: []string{"uid-1234", "container.id", "container1"}, want: map[string]any{ string(MetadataLabelContainerID): "test-container", @@ -128,7 +128,7 @@ func TestSetExtraLabels(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), args: []string{"uid-1234", "container.id", "init-container1"}, want: map[string]any{ string(MetadataLabelContainerID): "test-init-container", @@ -136,7 +136,7 @@ func TestSetExtraLabels(t *testing.T) { }, { name: "set_container_id_no_metadata", - metadata: NewMetadata([]MetadataLabel{MetadataLabelContainerID}, nil, nil), + metadata: NewMetadata([]MetadataLabel{MetadataLabelContainerID}, nil, NodeLimits{}, nil), args: []string{"uid-1234", "container.id", "container1"}, wantError: "pods metadata were not fetched", }, @@ -158,7 +158,7 @@ func TestSetExtraLabels(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), args: []string{"uid-1234", "container.id", "container1"}, wantError: "pod \"uid-1234\" with container \"container1\" not found in the fetched metadata", }, @@ -180,13 +180,13 @@ func TestSetExtraLabels(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), args: []string{"uid-1234", "container.id", "container1"}, wantError: "pod \"uid-1234\" with container \"container1\" has an empty containerID", }, { name: "set_volume_type_no_metadata", - metadata: NewMetadata([]MetadataLabel{MetadataLabelVolumeType}, nil, nil), + metadata: NewMetadata([]MetadataLabel{MetadataLabelVolumeType}, nil, NodeLimits{}, nil), args: []string{"uid-1234", "k8s.volume.type", "volume0"}, wantError: "pods metadata were not fetched", }, @@ -208,7 +208,7 @@ func TestSetExtraLabels(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), args: []string{"uid-1234", "k8s.volume.type", "volume1"}, wantError: "pod \"uid-1234\" with volume \"volume1\" not found in the fetched metadata", }, @@ -376,7 +376,7 @@ func TestSetExtraLabelsForVolumeTypes(t *testing.T) { }, }, }, - }, func(*metadata.ResourceBuilder, string, string, string) error { + }, NodeLimits{}, func(*metadata.ResourceBuilder, string, string, string) error { return nil }) rb := metadata.NewResourceBuilder(metadata.DefaultResourceAttributesConfig()) @@ -407,7 +407,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }{ { name: "no metadata", - metadata: NewMetadata([]MetadataLabel{}, nil, nil), + metadata: NewMetadata([]MetadataLabel{}, nil, NodeLimits{}, nil), }, { name: "pod happy path", @@ -449,7 +449,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-1234", containerName: "container-2", wantPodCPULimit: 2.1, @@ -501,7 +501,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-12345", }, { @@ -544,7 +544,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-1234", containerName: "container-3", wantPodCPULimit: 0.7, @@ -584,7 +584,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-1234", containerName: "container-2", wantPodCPURequest: 2, @@ -624,7 +624,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-1234", containerName: "container-2", wantPodCPULimit: 2, @@ -662,7 +662,7 @@ func TestCpuAndMemoryGetters(t *testing.T) { }, }, }, - }, nil), + }, NodeLimits{}, nil), podUID: "uid-1234", containerName: "container-1", wantContainerCPULimit: 1, diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go b/receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go index 22b9ee9b681a..7fdb9d640018 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/metrics_test.go @@ -33,7 +33,7 @@ func TestMetricAccumulator(t *testing.T) { summary, _ := statsProvider.StatsSummary() metadataProvider := NewMetadataProvider(rc) podsMetadata, _ := metadataProvider.Pods() - k8sMetadata := NewMetadata([]MetadataLabel{MetadataLabelContainerID}, podsMetadata, nil) + k8sMetadata := NewMetadata([]MetadataLabel{MetadataLabelContainerID}, podsMetadata, NodeLimits{}, nil) mbs := &metadata.MetricsBuilders{ NodeMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()), PodMetricsBuilder: metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), receivertest.NewNopCreateSettings()), diff --git a/receiver/kubeletstatsreceiver/internal/kubelet/volume_test.go b/receiver/kubeletstatsreceiver/internal/kubelet/volume_test.go index 958617615bbe..d2d97b455fe0 100644 --- a/receiver/kubeletstatsreceiver/internal/kubelet/volume_test.go +++ b/receiver/kubeletstatsreceiver/internal/kubelet/volume_test.go @@ -177,7 +177,7 @@ func TestDetailedPVCLabels(t *testing.T) { }, }, }, - }, nil) + }, NodeLimits{}, nil) metadata.DetailedPVCResourceSetter = tt.detailedPVCLabelsSetterOverride res, err := getVolumeResourceOptions(rb, podStats, stats.VolumeStats{Name: tt.volumeName}, metadata) diff --git a/receiver/kubeletstatsreceiver/internal/metadata/generated_config.go b/receiver/kubeletstatsreceiver/internal/metadata/generated_config.go index b5244d545cfc..69d37d761839 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/generated_config.go +++ b/receiver/kubeletstatsreceiver/internal/metadata/generated_config.go @@ -41,6 +41,7 @@ type MetricsConfig struct { ContainerMemoryUsage MetricConfig `mapstructure:"container.memory.usage"` ContainerMemoryWorkingSet MetricConfig `mapstructure:"container.memory.working_set"` ContainerUptime MetricConfig `mapstructure:"container.uptime"` + K8sContainerCPUNodeUtilization MetricConfig `mapstructure:"k8s.container.cpu.node.utilization"` K8sContainerCPULimitUtilization MetricConfig `mapstructure:"k8s.container.cpu_limit_utilization"` K8sContainerCPURequestUtilization MetricConfig `mapstructure:"k8s.container.cpu_request_utilization"` K8sContainerMemoryLimitUtilization MetricConfig `mapstructure:"k8s.container.memory_limit_utilization"` @@ -127,6 +128,9 @@ func DefaultMetricsConfig() MetricsConfig { ContainerUptime: MetricConfig{ Enabled: false, }, + K8sContainerCPUNodeUtilization: MetricConfig{ + Enabled: false, + }, K8sContainerCPULimitUtilization: MetricConfig{ Enabled: false, }, diff --git a/receiver/kubeletstatsreceiver/internal/metadata/generated_config_test.go b/receiver/kubeletstatsreceiver/internal/metadata/generated_config_test.go index 8f64654e32f2..6fa188af811f 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/generated_config_test.go +++ b/receiver/kubeletstatsreceiver/internal/metadata/generated_config_test.go @@ -38,6 +38,7 @@ func TestMetricsBuilderConfig(t *testing.T) { ContainerMemoryUsage: MetricConfig{Enabled: true}, ContainerMemoryWorkingSet: MetricConfig{Enabled: true}, ContainerUptime: MetricConfig{Enabled: true}, + K8sContainerCPUNodeUtilization: MetricConfig{Enabled: true}, K8sContainerCPULimitUtilization: MetricConfig{Enabled: true}, K8sContainerCPURequestUtilization: MetricConfig{Enabled: true}, K8sContainerMemoryLimitUtilization: MetricConfig{Enabled: true}, @@ -118,6 +119,7 @@ func TestMetricsBuilderConfig(t *testing.T) { ContainerMemoryUsage: MetricConfig{Enabled: false}, ContainerMemoryWorkingSet: MetricConfig{Enabled: false}, ContainerUptime: MetricConfig{Enabled: false}, + K8sContainerCPUNodeUtilization: MetricConfig{Enabled: false}, K8sContainerCPULimitUtilization: MetricConfig{Enabled: false}, K8sContainerCPURequestUtilization: MetricConfig{Enabled: false}, K8sContainerMemoryLimitUtilization: MetricConfig{Enabled: false}, diff --git a/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics.go b/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics.go index b966d529769b..22fc836398dc 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics.go +++ b/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics.go @@ -679,6 +679,55 @@ func newMetricContainerUptime(cfg MetricConfig) metricContainerUptime { return m } +type metricK8sContainerCPUNodeUtilization struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills k8s.container.cpu.node.utilization metric with initial data. +func (m *metricK8sContainerCPUNodeUtilization) init() { + m.data.SetName("k8s.container.cpu.node.utilization") + m.data.SetDescription("Container cpu utilization as a ratio of the node's capacity") + m.data.SetUnit("1") + m.data.SetEmptyGauge() +} + +func (m *metricK8sContainerCPUNodeUtilization) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetDoubleValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricK8sContainerCPUNodeUtilization) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricK8sContainerCPUNodeUtilization) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricK8sContainerCPUNodeUtilization(cfg MetricConfig) metricK8sContainerCPUNodeUtilization { + m := metricK8sContainerCPUNodeUtilization{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricK8sContainerCPULimitUtilization struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -2837,6 +2886,7 @@ type MetricsBuilder struct { metricContainerMemoryUsage metricContainerMemoryUsage metricContainerMemoryWorkingSet metricContainerMemoryWorkingSet metricContainerUptime metricContainerUptime + metricK8sContainerCPUNodeUtilization metricK8sContainerCPUNodeUtilization metricK8sContainerCPULimitUtilization metricK8sContainerCPULimitUtilization metricK8sContainerCPURequestUtilization metricK8sContainerCPURequestUtilization metricK8sContainerMemoryLimitUtilization metricK8sContainerMemoryLimitUtilization @@ -2920,6 +2970,7 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.CreateSetting metricContainerMemoryUsage: newMetricContainerMemoryUsage(mbc.Metrics.ContainerMemoryUsage), metricContainerMemoryWorkingSet: newMetricContainerMemoryWorkingSet(mbc.Metrics.ContainerMemoryWorkingSet), metricContainerUptime: newMetricContainerUptime(mbc.Metrics.ContainerUptime), + metricK8sContainerCPUNodeUtilization: newMetricK8sContainerCPUNodeUtilization(mbc.Metrics.K8sContainerCPUNodeUtilization), metricK8sContainerCPULimitUtilization: newMetricK8sContainerCPULimitUtilization(mbc.Metrics.K8sContainerCPULimitUtilization), metricK8sContainerCPURequestUtilization: newMetricK8sContainerCPURequestUtilization(mbc.Metrics.K8sContainerCPURequestUtilization), metricK8sContainerMemoryLimitUtilization: newMetricK8sContainerMemoryLimitUtilization(mbc.Metrics.K8sContainerMemoryLimitUtilization), @@ -3130,6 +3181,7 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricContainerMemoryUsage.emit(ils.Metrics()) mb.metricContainerMemoryWorkingSet.emit(ils.Metrics()) mb.metricContainerUptime.emit(ils.Metrics()) + mb.metricK8sContainerCPUNodeUtilization.emit(ils.Metrics()) mb.metricK8sContainerCPULimitUtilization.emit(ils.Metrics()) mb.metricK8sContainerCPURequestUtilization.emit(ils.Metrics()) mb.metricK8sContainerMemoryLimitUtilization.emit(ils.Metrics()) @@ -3269,6 +3321,11 @@ func (mb *MetricsBuilder) RecordContainerUptimeDataPoint(ts pcommon.Timestamp, v mb.metricContainerUptime.recordDataPoint(mb.startTime, ts, val) } +// RecordK8sContainerCPUNodeUtilizationDataPoint adds a data point to k8s.container.cpu.node.utilization metric. +func (mb *MetricsBuilder) RecordK8sContainerCPUNodeUtilizationDataPoint(ts pcommon.Timestamp, val float64) { + mb.metricK8sContainerCPUNodeUtilization.recordDataPoint(mb.startTime, ts, val) +} + // RecordK8sContainerCPULimitUtilizationDataPoint adds a data point to k8s.container.cpu_limit_utilization metric. func (mb *MetricsBuilder) RecordK8sContainerCPULimitUtilizationDataPoint(ts pcommon.Timestamp, val float64) { mb.metricK8sContainerCPULimitUtilization.recordDataPoint(mb.startTime, ts, val) diff --git a/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics_test.go b/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics_test.go index d186dbb52227..bf8677cef5a9 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/kubeletstatsreceiver/internal/metadata/generated_metrics_test.go @@ -130,6 +130,9 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordContainerUptimeDataPoint(ts, 1) + allMetricsCount++ + mb.RecordK8sContainerCPUNodeUtilizationDataPoint(ts, 1) + allMetricsCount++ mb.RecordK8sContainerCPULimitUtilizationDataPoint(ts, 1) @@ -488,6 +491,18 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) + case "k8s.container.cpu.node.utilization": + assert.False(t, validatedMetrics["k8s.container.cpu.node.utilization"], "Found a duplicate in the metrics slice: k8s.container.cpu.node.utilization") + validatedMetrics["k8s.container.cpu.node.utilization"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "Container cpu utilization as a ratio of the node's capacity", ms.At(i).Description()) + assert.Equal(t, "1", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.Equal(t, float64(1), dp.DoubleValue()) case "k8s.container.cpu_limit_utilization": assert.False(t, validatedMetrics["k8s.container.cpu_limit_utilization"], "Found a duplicate in the metrics slice: k8s.container.cpu_limit_utilization") validatedMetrics["k8s.container.cpu_limit_utilization"] = true diff --git a/receiver/kubeletstatsreceiver/internal/metadata/metrics.go b/receiver/kubeletstatsreceiver/internal/metadata/metrics.go index 38b80f9d9f2c..5ba982b918c0 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/metrics.go +++ b/receiver/kubeletstatsreceiver/internal/metadata/metrics.go @@ -22,6 +22,7 @@ type CPUMetrics struct { Time RecordDoubleDataPointFunc Usage RecordDoubleDataPointFunc Utilization RecordDoubleDataPointFunc + NodeUtilization RecordDoubleDataPointFunc LimitUtilization RecordDoubleDataPointFunc RequestUtilization RecordDoubleDataPointFunc } @@ -44,6 +45,7 @@ var ContainerCPUMetrics = CPUMetrics{ Time: (*MetricsBuilder).RecordContainerCPUTimeDataPoint, Usage: (*MetricsBuilder).RecordContainerCPUUsageDataPoint, Utilization: (*MetricsBuilder).RecordContainerCPUUtilizationDataPoint, + NodeUtilization: (*MetricsBuilder).RecordK8sContainerCPUNodeUtilizationDataPoint, LimitUtilization: (*MetricsBuilder).RecordK8sContainerCPULimitUtilizationDataPoint, RequestUtilization: (*MetricsBuilder).RecordK8sContainerCPURequestUtilizationDataPoint, } diff --git a/receiver/kubeletstatsreceiver/internal/metadata/testdata/config.yaml b/receiver/kubeletstatsreceiver/internal/metadata/testdata/config.yaml index 7ad001fc3f89..8758f2993976 100644 --- a/receiver/kubeletstatsreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/kubeletstatsreceiver/internal/metadata/testdata/config.yaml @@ -27,6 +27,8 @@ all_set: enabled: true container.uptime: enabled: true + k8s.container.cpu.node.utilization: + enabled: true k8s.container.cpu_limit_utilization: enabled: true k8s.container.cpu_request_utilization: @@ -172,6 +174,8 @@ none_set: enabled: false container.uptime: enabled: false + k8s.container.cpu.node.utilization: + enabled: false k8s.container.cpu_limit_utilization: enabled: false k8s.container.cpu_request_utilization: diff --git a/receiver/kubeletstatsreceiver/metadata.yaml b/receiver/kubeletstatsreceiver/metadata.yaml index ef0bf3e2c700..7ddbf1475bda 100644 --- a/receiver/kubeletstatsreceiver/metadata.yaml +++ b/receiver/kubeletstatsreceiver/metadata.yaml @@ -379,6 +379,13 @@ metrics: gauge: value_type: int attributes: [] + k8s.container.cpu.node.utilization: + enabled: false + description: "Container cpu utilization as a ratio of the node's capacity" + unit: 1 + gauge: + value_type: double + attributes: [ ] k8s.container.cpu_limit_utilization: enabled: false description: "Container cpu utilization as a ratio of the container's limits" diff --git a/receiver/kubeletstatsreceiver/mocked_objects_test.go b/receiver/kubeletstatsreceiver/mocked_objects_test.go index e5c2bf5efe56..eff596298062 100644 --- a/receiver/kubeletstatsreceiver/mocked_objects_test.go +++ b/receiver/kubeletstatsreceiver/mocked_objects_test.go @@ -5,6 +5,7 @@ package kubeletstatsreceiver import ( v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -24,6 +25,23 @@ func getValidMockedObjects() []runtime.Object { } } +func getNodeWithCPUCapacity(nodeName string, cpuCap int) *v1.Node { + resourceList := make(v1.ResourceList) + q := resource.Quantity{} + q.Set(int64(cpuCap)) + resourceList["cpu"] = q + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + UID: "asdfg", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Capacity: resourceList, + }, + } +} + var volumeClaim1 = getPVC("volume_claim_1", "kube-system", "storage-provisioner-token-qzlx6") var volumeClaim2 = getPVC("volume_claim_2", "kube-system", "kube-proxy") var volumeClaim3 = getPVC("volume_claim_3", "kube-system", "coredns-token-dzc5t") diff --git a/receiver/kubeletstatsreceiver/scraper.go b/receiver/kubeletstatsreceiver/scraper.go index c7f95d8703f0..2fd399f251a7 100644 --- a/receiver/kubeletstatsreceiver/scraper.go +++ b/receiver/kubeletstatsreceiver/scraper.go @@ -6,16 +6,21 @@ package kubeletstatsreceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" "fmt" + "sync" "time" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/zap" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/kubelet" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver/internal/metadata" ) @@ -37,6 +42,12 @@ type kubletScraper struct { cachedVolumeSource map[string]v1.PersistentVolumeSource mbs *metadata.MetricsBuilders needsResources bool + nodeInformer cache.SharedInformer + stopCh chan struct{} + m sync.RWMutex + + // A struct that keeps Node's resource capacities + nodeLimits *kubelet.NodeLimits } func newKubletScraper( @@ -44,6 +55,7 @@ func newKubletScraper( set receiver.CreateSettings, rOptions *scraperOptions, metricsConfig metadata.MetricsBuilderConfig, + nodeName string, ) (scraperhelper.Scraper, error) { ks := &kubletScraper{ statsProvider: kubelet.NewStatsProvider(restClient), @@ -67,8 +79,20 @@ func newKubletScraper( metricsConfig.Metrics.K8sPodMemoryRequestUtilization.Enabled || metricsConfig.Metrics.K8sContainerMemoryLimitUtilization.Enabled || metricsConfig.Metrics.K8sContainerMemoryRequestUtilization.Enabled, + stopCh: make(chan struct{}), + nodeLimits: &kubelet.NodeLimits{}, + } + + if metricsConfig.Metrics.K8sContainerCPUNodeUtilization.Enabled { + ks.nodeInformer = k8sconfig.NewNodeSharedInformer(rOptions.k8sAPIClient, nodeName, 5*time.Minute) } - return scraperhelper.NewScraper(metadata.Type.String(), ks.scrape) + + return scraperhelper.NewScraper( + metadata.Type.String(), + ks.scrape, + scraperhelper.WithStart(ks.start), + scraperhelper.WithShutdown(ks.shutdown), + ) } func (r *kubletScraper) scrape(context.Context) (pmetric.Metrics, error) { @@ -88,9 +112,14 @@ func (r *kubletScraper) scrape(context.Context) (pmetric.Metrics, error) { } } - metadata := kubelet.NewMetadata(r.extraMetadataLabels, podsMetadata, r.detailedPVCLabelsSetter()) + var node kubelet.NodeLimits + if r.nodeInformer != nil { + node = r.node() + } + + metaD := kubelet.NewMetadata(r.extraMetadataLabels, podsMetadata, node, r.detailedPVCLabelsSetter()) - mds := kubelet.MetricsData(r.logger, summary, metadata, r.metricGroupsToCollect, r.mbs) + mds := kubelet.MetricsData(r.logger, summary, metaD, r.metricGroupsToCollect, r.mbs) md := pmetric.NewMetrics() for i := range mds { mds[i].ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) @@ -128,3 +157,58 @@ func (r *kubletScraper) detailedPVCLabelsSetter() func(rb *metadata.ResourceBuil return nil } } + +func (r *kubletScraper) node() kubelet.NodeLimits { + r.m.RLock() + defer r.m.RUnlock() + return *r.nodeLimits +} + +func (r *kubletScraper) start(_ context.Context, _ component.Host) error { + if r.nodeInformer != nil { + _, err := r.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: r.handleNodeAdd, + UpdateFunc: r.handleNodeUpdate, + }) + if err != nil { + r.logger.Error("error adding event handler to node informer", zap.Error(err)) + } + go r.nodeInformer.Run(r.stopCh) + } + return nil +} + +func (r *kubletScraper) shutdown(_ context.Context) error { + r.logger.Debug("executing close") + if r.stopCh != nil { + close(r.stopCh) + } + return nil +} + +func (r *kubletScraper) handleNodeAdd(obj any) { + if node, ok := obj.(*v1.Node); ok { + r.addOrUpdateNode(node) + } else { + r.logger.Error("object received was not of type v1.Node", zap.Any("received", obj)) + } +} + +func (r *kubletScraper) handleNodeUpdate(_, newNode any) { + if node, ok := newNode.(*v1.Node); ok { + r.addOrUpdateNode(node) + } else { + r.logger.Error("object received was not of type v1.Node", zap.Any("received", newNode)) + } +} + +func (r *kubletScraper) addOrUpdateNode(node *v1.Node) { + r.m.Lock() + defer r.m.Unlock() + + if cpu, ok := node.Status.Capacity["cpu"]; ok { + if q, err := resource.ParseQuantity(cpu.String()); err == nil { + r.nodeLimits.CPUNanoCoresLimit = float64(q.MilliValue()) / 1000 + } + } +} diff --git a/receiver/kubeletstatsreceiver/scraper_test.go b/receiver/kubeletstatsreceiver/scraper_test.go index 13d286505300..0fb5e4daaefb 100644 --- a/receiver/kubeletstatsreceiver/scraper_test.go +++ b/receiver/kubeletstatsreceiver/scraper_test.go @@ -14,8 +14,11 @@ import ( "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" @@ -55,6 +58,7 @@ func TestScraper(t *testing.T) { receivertest.NewNopCreateSettings(), options, metadata.DefaultMetricsBuilderConfig(), + "worker-42", ) require.NoError(t, err) @@ -76,6 +80,77 @@ func TestScraper(t *testing.T) { pmetrictest.IgnoreMetricsOrder())) } +func TestScraperWithNodeUtilization(t *testing.T) { + watcherStarted := make(chan struct{}) + // Create the fake client. + client := fake.NewSimpleClientset() + // A catch-all watch reactor that allows us to inject the watcherStarted channel. + client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) { + gvr := action.GetResource() + ns := action.GetNamespace() + watch, err := client.Tracker().Watch(gvr, ns) + if err != nil { + return false, nil, err + } + close(watcherStarted) + return true, watch, nil + }) + + options := &scraperOptions{ + metricGroupsToCollect: map[kubelet.MetricGroup]bool{ + kubelet.ContainerMetricGroup: true, + }, + k8sAPIClient: client, + } + r, err := newKubletScraper( + &fakeRestClient{}, + receivertest.NewNopCreateSettings(), + options, + metadata.MetricsBuilderConfig{ + Metrics: metadata.MetricsConfig{ + K8sContainerCPUNodeUtilization: metadata.MetricConfig{ + Enabled: true, + }, + }, + ResourceAttributes: metadata.DefaultResourceAttributesConfig(), + }, + "worker-42", + ) + require.NoError(t, err) + + err = r.Start(context.Background(), nil) + require.NoError(t, err) + + // we wait until the watcher starts + <-watcherStarted + // Inject an event node into the fake client. + node := getNodeWithCPUCapacity("worker-42", 8) + _, err = client.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + if err != nil { + require.NoError(t, err) + } + + md, err := r.Scrape(context.Background()) + require.NoError(t, err) + require.Equal(t, numContainers, md.DataPointCount()) + expectedFile := filepath.Join("testdata", "scraper", "test_scraper_cpu_util_nodelimit_expected.yaml") + + // Uncomment to regenerate '*_expected.yaml' files + // golden.WriteMetrics(t, expectedFile, md) + + expectedMetrics, err := golden.ReadMetrics(expectedFile) + require.NoError(t, err) + require.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, md, + pmetrictest.IgnoreStartTimestamp(), + pmetrictest.IgnoreResourceMetricsOrder(), + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreTimestamp(), + pmetrictest.IgnoreMetricsOrder())) + + err = r.Shutdown(context.Background()) + require.NoError(t, err) +} + func TestScraperWithMetadata(t *testing.T) { tests := []struct { name string @@ -118,6 +193,7 @@ func TestScraperWithMetadata(t *testing.T) { receivertest.NewNopCreateSettings(), options, metadata.DefaultMetricsBuilderConfig(), + "worker-42", ) require.NoError(t, err) @@ -310,6 +386,7 @@ func TestScraperWithPercentMetrics(t *testing.T) { receivertest.NewNopCreateSettings(), options, metricsConfig, + "worker-42", ) require.NoError(t, err) @@ -388,6 +465,7 @@ func TestScraperWithMetricGroups(t *testing.T) { metricGroupsToCollect: test.metricGroups, }, metadata.DefaultMetricsBuilderConfig(), + "worker-42", ) require.NoError(t, err) @@ -552,6 +630,7 @@ func TestScraperWithPVCDetailedLabels(t *testing.T) { k8sAPIClient: test.k8sAPIClient, }, metadata.DefaultMetricsBuilderConfig(), + "worker-42", ) require.NoError(t, err) @@ -636,6 +715,7 @@ func TestClientErrors(t *testing.T) { settings, options, metadata.DefaultMetricsBuilderConfig(), + "", ) require.NoError(t, err) diff --git a/receiver/kubeletstatsreceiver/testdata/config.yaml b/receiver/kubeletstatsreceiver/testdata/config.yaml index 8b1923a163d5..fdfab83bf0e3 100644 --- a/receiver/kubeletstatsreceiver/testdata/config.yaml +++ b/receiver/kubeletstatsreceiver/testdata/config.yaml @@ -28,3 +28,9 @@ kubeletstats/metric_groups: collection_interval: 20s auth_type: "serviceAccount" metric_groups: [ pod, node, volume ] +kubeletstats/container_cpu_node_utilization: + collection_interval: 10s + metric_groups: [ container, pod, node ] + metrics: + k8s.container.cpu.node.utilization: + enabled: true diff --git a/receiver/kubeletstatsreceiver/testdata/scraper/test_scraper_cpu_util_nodelimit_expected.yaml b/receiver/kubeletstatsreceiver/testdata/scraper/test_scraper_cpu_util_nodelimit_expected.yaml new file mode 100644 index 000000000000..4364df51e4d4 --- /dev/null +++ b/receiver/kubeletstatsreceiver/testdata/scraper/test_scraper_cpu_util_nodelimit_expected.yaml @@ -0,0 +1,244 @@ +resourceMetrics: + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: coredns + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: coredns-66bff467f8-58qvv + - key: k8s.pod.uid + value: + stringValue: eb632b33-62c6-4a80-9575-a97ab363ad7f + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.000569754 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: coredns + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: coredns-66bff467f8-szddj + - key: k8s.pod.uid + value: + stringValue: 0adffe8e-9849-4e05-b4cd-92d2d1e1f1c3 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.00043856325 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: etcd + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: etcd-minikube + - key: k8s.pod.uid + value: + stringValue: 5a5fbd34cfb43ee7bee976798370c910 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.002483161375 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: kube-apiserver + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: kube-apiserver-minikube + - key: k8s.pod.uid + value: + stringValue: 3bef16d65fa74d46458df57d8f6f59af + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.005768109875 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: kube-controller-manager + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: kube-controller-manager-minikube + - key: k8s.pod.uid + value: + stringValue: 3016593d20758bbfe68aba26604a8e3d + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.00228089325 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: kube-proxy + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: kube-proxy-v48tf + - key: k8s.pod.uid + value: + stringValue: 0a6d6b05-0e8d-4920-8a38-926a33164d45 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 4.9618125e-05 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: kube-scheduler + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: kube-scheduler-minikube + - key: k8s.pod.uid + value: + stringValue: 5795d0c442cb997ff93c49feeb9f6386 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0.000429828125 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: server + - key: k8s.namespace.name + value: + stringValue: default + - key: k8s.pod.name + value: + stringValue: go-hello-world-5456b4b8cd-99vxc + - key: k8s.pod.uid + value: + stringValue: 42ad382b-ed0b-446d-9aab-3fdce8b4f9e2 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 0 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest + - resource: + attributes: + - key: k8s.container.name + value: + stringValue: storage-provisioner + - key: k8s.namespace.name + value: + stringValue: kube-system + - key: k8s.pod.name + value: + stringValue: storage-provisioner + - key: k8s.pod.uid + value: + stringValue: 14bf95e0-9451-4192-b111-807b03163670 + scopeMetrics: + - metrics: + - description: Container cpu utilization as a ratio of the node's capacity + gauge: + dataPoints: + - asDouble: 4.086125e-05 + startTimeUnixNano: "1000000" + timeUnixNano: "2000000" + name: k8s.container.cpu.node.utilization + unit: "1" + scope: + name: otelcol/kubeletstatsreceiver + version: latest