diff --git a/.chloggen/add-pod-resources-store.yaml b/.chloggen/add-pod-resources-store.yaml deleted file mode 100644 index 67c5d7777528..000000000000 --- a/.chloggen/add-pod-resources-store.yaml +++ /dev/null @@ -1,20 +0,0 @@ -# Use this changelog template to create an entry for release notes. -# If your change doesn't affect end users, such as a test fix or a tooling change, -# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. - -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement - -# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: awscontainerinsightreceiver - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: added a new podresourcestore which provides mapping from resource to container and vice-versa - -# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. -issues: [167] - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: this change provides a new store in awscontainerinsightreceiver, which when started provides mapping from resources to container and vice versa using kubelet podresourcesapi. \ No newline at end of file diff --git a/receiver/awscontainerinsightreceiver/go.mod b/receiver/awscontainerinsightreceiver/go.mod index 76345567b4b3..f5f164ca91aa 100644 --- a/receiver/awscontainerinsightreceiver/go.mod +++ b/receiver/awscontainerinsightreceiver/go.mod @@ -23,12 +23,10 @@ require ( go.opentelemetry.io/otel/trace v1.23.1 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.26.0 - google.golang.org/grpc v1.61.0 k8s.io/api v0.29.2 k8s.io/apimachinery v0.29.2 k8s.io/client-go v0.29.2 k8s.io/klog v1.0.0 - k8s.io/kubelet v0.27.3 ) require ( @@ -142,6 +140,7 @@ require ( golang.org/x/time v0.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect + google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/receiver/awscontainerinsightreceiver/go.sum b/receiver/awscontainerinsightreceiver/go.sum index 72eed1243421..25aff89d014f 100644 --- a/receiver/awscontainerinsightreceiver/go.sum +++ b/receiver/awscontainerinsightreceiver/go.sum @@ -741,8 +741,6 @@ k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= -k8s.io/kubelet v0.27.3 h1:5WhTV1iiBu9q/rr+gvy65LQ+K/e7dmgcaYjys5ipLqY= -k8s.io/kubelet v0.27.3/go.mod h1:Mz42qgZZgWgPmOJEYaR5evmh+EoSwFzEvPBozA2y9mg= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go b/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go deleted file mode 100644 index d9ec7c769522..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil/podresourcesclient.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package kubeletutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" - -import ( - "context" - "fmt" - "net" - "os" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" -) - -const ( - socketPath = "/var/lib/kubelet/pod-resources/kubelet.sock" - connectionTimeout = 10 * time.Second -) - -type PodResourcesClient struct { - delegateClient podresourcesapi.PodResourcesListerClient - conn *grpc.ClientConn -} - -func NewPodResourcesClient() (*PodResourcesClient, error) { - podResourcesClient := &PodResourcesClient{} - - conn, err := podResourcesClient.connectToServer(socketPath) - podResourcesClient.conn = conn - if err != nil { - return nil, fmt.Errorf("failed to connect to server: %w", err) - } - - podResourcesClient.delegateClient = podresourcesapi.NewPodResourcesListerClient(conn) - - return podResourcesClient, nil -} - -func (p *PodResourcesClient) connectToServer(socket string) (*grpc.ClientConn, error) { - _, err := os.Stat(socket) - if os.IsNotExist(err) { - return nil, fmt.Errorf("socket path does not exist: %s", socket) - } else if err != nil { - return nil, fmt.Errorf("failed to check socket path: %w", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) - defer cancel() - - conn, err := grpc.DialContext(ctx, - socket, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { - d := net.Dialer{} - return d.DialContext(ctx, "unix", addr) - }), - ) - - if err != nil { - return nil, fmt.Errorf("failure connecting to '%s': %w", socket, err) - } - - return conn, nil -} - -func (p *PodResourcesClient) ListPods() (*podresourcesapi.ListPodResourcesResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) - defer cancel() - - resp, err := p.delegateClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{}) - if err != nil { - return nil, fmt.Errorf("failure getting pod resources: %w", err) - } - - return resp, nil -} - -func (p *PodResourcesClient) Shutdown() { - err := p.conn.Close() - if err != nil { - return - } -} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go deleted file mode 100644 index d2266ffd1cbb..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore.go +++ /dev/null @@ -1,166 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" - -import ( - "context" - "fmt" - "sync" - "time" - - "go.uber.org/zap" - v1 "k8s.io/kubelet/pkg/apis/podresources/v1" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil" -) - -const ( - taskTimeout = 10 * time.Second -) - -var ( - instance *PodResourcesStore - once sync.Once -) - -type ContainerInfo struct { - PodName string - ContainerName string - Namespace string -} - -type ResourceInfo struct { - ResourceName string - DeviceID string -} - -type PodResourcesClientInterface interface { - ListPods() (*v1.ListPodResourcesResponse, error) - Shutdown() -} - -type PodResourcesStore struct { - containerInfoToResourcesMap map[ContainerInfo][]ResourceInfo - resourceToPodContainerMap map[ResourceInfo]ContainerInfo - resourceNameSet map[string]struct{} - lastRefreshed time.Time - ctx context.Context - cancel context.CancelFunc - logger *zap.Logger - podResourcesClient PodResourcesClientInterface -} - -func NewPodResourcesStore(logger *zap.Logger) *PodResourcesStore { - once.Do(func() { - podResourcesClient, _ := kubeletutil.NewPodResourcesClient() - ctx, cancel := context.WithCancel(context.Background()) - instance = &PodResourcesStore{ - containerInfoToResourcesMap: make(map[ContainerInfo][]ResourceInfo), - resourceToPodContainerMap: make(map[ResourceInfo]ContainerInfo), - resourceNameSet: make(map[string]struct{}), - lastRefreshed: time.Now(), - ctx: ctx, - cancel: cancel, - logger: logger, - podResourcesClient: podResourcesClient, - } - - go func() { - refreshTicker := time.NewTicker(time.Second) - for { - select { - case <-refreshTicker.C: - instance.refreshTick() - case <-instance.ctx.Done(): - refreshTicker.Stop() - return - } - } - }() - }) - return instance -} - -func (p *PodResourcesStore) refreshTick() { - now := time.Now() - if now.Sub(p.lastRefreshed) >= taskTimeout { - p.refresh() - p.lastRefreshed = now - } -} - -func (p *PodResourcesStore) refresh() { - doRefresh := func() { - p.updateMaps() - } - - refreshWithTimeout(p.ctx, doRefresh, taskTimeout) -} - -func (p *PodResourcesStore) updateMaps() { - p.containerInfoToResourcesMap = make(map[ContainerInfo][]ResourceInfo) - p.resourceToPodContainerMap = make(map[ResourceInfo]ContainerInfo) - - if len(p.resourceNameSet) == 0 { - p.logger.Warn("No resource names allowlisted thus skipping updating of maps.") - return - } - - devicePods, err := p.podResourcesClient.ListPods() - - if err != nil { - p.logger.Error(fmt.Sprintf("Error getting pod resources: %v", err)) - return - } - - for _, pod := range devicePods.GetPodResources() { - for _, container := range pod.GetContainers() { - for _, device := range container.GetDevices() { - - containerInfo := ContainerInfo{ - PodName: pod.GetName(), - Namespace: pod.GetNamespace(), - ContainerName: container.GetName(), - } - - for _, deviceID := range device.GetDeviceIds() { - resourceInfo := ResourceInfo{ - ResourceName: device.GetResourceName(), - DeviceID: deviceID, - } - _, found := p.resourceNameSet[resourceInfo.ResourceName] - if found { - p.containerInfoToResourcesMap[containerInfo] = append(p.containerInfoToResourcesMap[containerInfo], resourceInfo) - p.resourceToPodContainerMap[resourceInfo] = containerInfo - } - } - } - } - } -} - -func (p *PodResourcesStore) GetContainerInfo(deviceID string, resourceName string) *ContainerInfo { - key := ResourceInfo{DeviceID: deviceID, ResourceName: resourceName} - if containerInfo, ok := p.resourceToPodContainerMap[key]; ok { - return &containerInfo - } - return nil -} - -func (p *PodResourcesStore) GetResourcesInfo(podName string, containerName string, namespace string) *[]ResourceInfo { - key := ContainerInfo{PodName: podName, ContainerName: containerName, Namespace: namespace} - if resourceInfo, ok := p.containerInfoToResourcesMap[key]; ok { - return &resourceInfo - } - return nil -} - -func (p *PodResourcesStore) AddResourceName(resourceName string) { - p.resourceNameSet[resourceName] = struct{}{} -} - -func (p *PodResourcesStore) Shutdown() { - p.cancel() - p.podResourcesClient.Shutdown() -} diff --git a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go b/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go deleted file mode 100644 index 02fbac32f44f..000000000000 --- a/receiver/awscontainerinsightreceiver/internal/stores/podresourcesstore_test.go +++ /dev/null @@ -1,267 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores" - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" -) - -const ( - defaultResourceName = "Resource-1" - defaultPodName = "Pod-1" - defaultNamespace = "Namespace-1" - defaultContainerName = "Container-1" - defaultDeviceID1 = "Device-1" - defaultDeviceID2 = "Device-2" - defaultDeviceID3 = "Device-3" - defaultDeviceID4 = "Device-4" - defaultResourceNameSkipped = "Resource-Skipped" - defaultContainerNameNoDevice = "Container-NoDevice" - defaultNamespaceNoDevice = "Namespace-NoDevice" - defaultPodNameNoDevice = "Pod-NoDevice" -) - -var ( - expectedContainerInfoToResourcesMap = map[ContainerInfo][]ResourceInfo{ - { - PodName: defaultPodName, - ContainerName: defaultContainerName, - Namespace: defaultNamespace, - }: { - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID1, - }, - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID2, - }, - }, - } - - expectedResourceToPodContainerMap = map[ResourceInfo]ContainerInfo{ - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID1, - }: { - PodName: defaultPodName, - ContainerName: defaultContainerName, - Namespace: defaultNamespace, - }, - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID2, - }: { - PodName: defaultPodName, - ContainerName: defaultContainerName, - Namespace: defaultNamespace, - }, - } - - expectedContainerInfo = ContainerInfo{ - PodName: defaultPodName, - ContainerName: defaultContainerName, - Namespace: defaultNamespace, - } - - expectedResourceInfo = []ResourceInfo{ - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID1, - }, - { - ResourceName: defaultResourceName, - DeviceID: defaultDeviceID2, - }, - } - - listPodResourcesResponse = &podresourcesv1.ListPodResourcesResponse{ - PodResources: []*podresourcesv1.PodResources{ - { - Name: defaultPodName, - Namespace: defaultNamespace, - Containers: []*podresourcesv1.ContainerResources{ - { - Name: defaultContainerName, - Devices: []*podresourcesv1.ContainerDevices{ - { - ResourceName: defaultResourceName, - DeviceIds: []string{defaultDeviceID1, defaultDeviceID2}, - }, - { - ResourceName: defaultResourceNameSkipped, - DeviceIds: []string{defaultDeviceID3, defaultDeviceID4}, - }, - }, - }, - }, - }, - { - Name: defaultPodNameNoDevice, - Namespace: defaultNamespaceNoDevice, - Containers: []*podresourcesv1.ContainerResources{ - { - Name: defaultContainerNameNoDevice, - Devices: []*podresourcesv1.ContainerDevices{}, - }, - }, - }, - }, - } - - listPodResourcesResponseWithEmptyPodResources = &podresourcesv1.ListPodResourcesResponse{ - PodResources: []*podresourcesv1.PodResources{}, - } - - listPodResourcesResponseWithEmptyResponse = &podresourcesv1.ListPodResourcesResponse{} - - resourceNameSet = map[string]struct{}{ - defaultResourceName: {}, - } -) - -type MockPodResourcesClient struct { - response *podresourcesv1.ListPodResourcesResponse - err error - shutdownCalled bool -} - -func (m *MockPodResourcesClient) ListPods() (*podresourcesv1.ListPodResourcesResponse, error) { - return m.response, m.err -} - -func (m *MockPodResourcesClient) Shutdown() { - m.shutdownCalled = true -} - -func TestNewPodResourcesStore(t *testing.T) { - logger := zap.NewNop() - store := NewPodResourcesStore(logger) - assert.NotNil(t, store, "PodResourcesStore should not be nil") - assert.NotNil(t, store.ctx, "Context should not be nil") - assert.NotNil(t, store.cancel, "Cancel function should not be nil") -} - -func TestRefreshTick(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) - - store.lastRefreshed = time.Now().Add(-time.Hour) - - store.refreshTick() - - assert.True(t, store.lastRefreshed.After(time.Now().Add(-time.Hour)), "lastRefreshed should have been updated") -} - -func TestShutdown(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) - - mockClient := &MockPodResourcesClient{listPodResourcesResponse, nil, false} - store.podResourcesClient = mockClient - - store.Shutdown() - - assert.True(t, mockClient.shutdownCalled, "Shutdown method of the client should have been called") -} - -func TestUpdateMaps(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) - store.updateMaps() - - assert.NotNil(t, store.containerInfoToResourcesMap) - assert.NotNil(t, store.resourceToPodContainerMap) - assert.Equal(t, len(expectedContainerInfoToResourcesMap), len(store.containerInfoToResourcesMap)) - assert.Equal(t, len(expectedResourceToPodContainerMap), len(store.resourceToPodContainerMap)) - assert.Equal(t, expectedContainerInfoToResourcesMap, store.containerInfoToResourcesMap) - assert.Equal(t, expectedResourceToPodContainerMap, store.resourceToPodContainerMap) -} - -func TestGets(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) - store.updateMaps() - - assertMapsContainData(t, store) -} - -func TestGetsWhenThereAreNoPods(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyPodResources, nil) - store.updateMaps() - - assertMapsDontContainData(t, store) -} - -func TestGetsWhenPodResourcesResponseIsEmpty(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyResponse, nil) - store.updateMaps() - - assertMapsDontContainData(t, store) -} - -func TestGetsWhenPodResourcesThrowsError(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponseWithEmptyResponse, fmt.Errorf("mocked behavior")) - store.updateMaps() - - assertMapsDontContainData(t, store) -} - -func TestAddResourceName(t *testing.T) { - store := constructPodResourcesStore(make(map[ContainerInfo][]ResourceInfo), make(map[ResourceInfo]ContainerInfo), listPodResourcesResponse, nil) - - store.resourceNameSet = make(map[string]struct{}) - store.updateMaps() - assertMapsDontContainData(t, store) - - // After adding resource to map - store.AddResourceName(defaultResourceName) - store.updateMaps() - assertMapsContainData(t, store) -} - -func constructPodResourcesStore(containerToDevices map[ContainerInfo][]ResourceInfo, deviceToContainer map[ResourceInfo]ContainerInfo, podResourcesResponse *podresourcesv1.ListPodResourcesResponse, podResourcesError error) *PodResourcesStore { - logger, _ := zap.NewDevelopment() - return &PodResourcesStore{ - containerInfoToResourcesMap: containerToDevices, - resourceToPodContainerMap: deviceToContainer, - resourceNameSet: resourceNameSet, - lastRefreshed: time.Now(), - ctx: context.Background(), - cancel: func() {}, - logger: logger, - podResourcesClient: &MockPodResourcesClient{podResourcesResponse, podResourcesError, false}, - } -} - -func assertMapsContainData(t *testing.T, store *PodResourcesStore) { - assert.Equal(t, len(expectedContainerInfoToResourcesMap), len(store.containerInfoToResourcesMap)) - assert.Equal(t, len(expectedResourceToPodContainerMap), len(store.resourceToPodContainerMap)) - - assert.Equal(t, expectedContainerInfo, *store.GetContainerInfo(defaultDeviceID1, defaultResourceName)) - assert.Equal(t, expectedResourceInfo, *store.GetResourcesInfo(defaultPodName, defaultContainerName, defaultNamespace)) - - actualResourceInfo := store.GetResourcesInfo(defaultPodNameNoDevice, defaultContainerNameNoDevice, defaultNamespaceNoDevice) - if actualResourceInfo != nil { - t.Errorf("Expected GetResourcesInfo to return nil for an unexpected key, but got %v", actualResourceInfo) - } -} - -func assertMapsDontContainData(t *testing.T, store *PodResourcesStore) { - assert.Equal(t, 0, len(store.containerInfoToResourcesMap)) - assert.Equal(t, 0, len(store.resourceToPodContainerMap)) - - actualContainerInfo := store.GetContainerInfo(defaultDeviceID1, defaultResourceName) - if actualContainerInfo != nil { - t.Errorf("Expected GetContainerInfo to return nil for an unexpected key, but got %v", actualContainerInfo) - } - - actualResourceInfo := store.GetResourcesInfo(defaultPodName, defaultContainerName, defaultNamespace) - if actualResourceInfo != nil { - t.Errorf("Expected GetResourcesInfo to return nil for an unexpected key, but got %v", actualResourceInfo) - } -}