Skip to content

Commit

Permalink
update code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
kangyili committed Mar 18, 2024
1 parent e7cfc51 commit e9f189e
Show file tree
Hide file tree
Showing 66 changed files with 683 additions and 525 deletions.
20 changes: 11 additions & 9 deletions pkg/collector/corechecks/cluster/orchestrator/collector_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
// to easily run them all.
type CollectorBundle struct {
check *OrchestratorCheck
collectors []collectors.Collector
collectors []collectors.K8sCollector
discoverCollectors bool
extraSyncTimeout time.Duration
inventory *inventory.CollectorInventory
Expand All @@ -68,11 +68,13 @@ func NewCollectorBundle(chk *OrchestratorCheck) *CollectorBundle {
check: chk,
inventory: inventory.NewCollectorInventory(),
runCfg: &collectors.CollectorRunConfig{
APIClient: chk.apiClient,
ClusterID: chk.clusterID,
Config: chk.orchestratorConfig,
MsgGroupRef: chk.groupID,
OrchestratorInformerFactory: chk.orchestratorInformerFactory,
K8sCollectorRunConfig: collectors.K8sCollectorRunConfig{
APIClient: chk.apiClient,
OrchestratorInformerFactory: chk.orchestratorInformerFactory,
},
ClusterID: chk.clusterID,
Config: chk.orchestratorConfig,
MsgGroupRef: chk.groupID,
},
stopCh: chk.stopCh,
manifestBuffer: NewManifestBuffer(chk),
Expand Down Expand Up @@ -135,7 +137,7 @@ func (cb *CollectorBundle) skipImportingDefaultCollectors() bool {
// - <apigroup_and_version>/<collector_name> (e.g. "batch/v1/cronjobs")
func (cb *CollectorBundle) addCollectorFromConfig(collectorName string, isCRD bool) {
var (
collector collectors.Collector
collector collectors.K8sCollector
err error
)

Expand Down Expand Up @@ -229,11 +231,11 @@ func (cb *CollectorBundle) importCollectorsFromDiscovery() bool {

collectors, err := discovery.NewAPIServerDiscoveryProvider().Discover(cb.inventory)
if err != nil {
_ = cb.check.Warnf("Collector discovery failed: %s", err)
_ = cb.check.Warnf("Kubernetes collector discovery failed: %s", err)
return false
}
if len(collectors) == 0 {
_ = cb.check.Warn("Collector discovery returned no collector")
_ = cb.check.Warn("Kubernetes collector discovery returned no collector")
return false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver && orchestrator
//go:build orchestrator

//nolint:revive // TODO(CAPP) Fix revive linter
package collectors
Expand All @@ -12,12 +12,8 @@ import (
"fmt"

"go.uber.org/atomic"
"k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
vpai "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/informers/externalversions"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
"github.com/DataDog/datadog-agent/pkg/orchestrator/config"
pkgorchestratormodel "github.com/DataDog/datadog-agent/pkg/orchestrator/model"
Expand All @@ -27,9 +23,6 @@ import (
// Collector is an interface that represents the collection process for a
// resource type.
type Collector interface {
// Informer returns the shared informer for that resource.
Informer() cache.SharedInformer

// Init is where the collector initialization happens. It is used to create
// informers and listers.
Init(*CollectorRunConfig)
Expand All @@ -42,30 +35,6 @@ type Collector interface {
Run(*CollectorRunConfig) (*CollectorRunResult, error)
}

// CollectorVersions represents the list of collector implementations that are
// supported, each one being tied to a specific kubernetes group and version.
type CollectorVersions struct {
Collectors []Collector
}

// NewCollectorVersions is used to build the collector version list.
func NewCollectorVersions(versions ...Collector) CollectorVersions {
return CollectorVersions{
versions,
}
}

// CollectorForVersion retrieves the collector implementing a given version. If
// no collector is known for that version, returns (nil, false).
func (cv *CollectorVersions) CollectorForVersion(version string) (Collector, bool) {
for _, collector := range cv.Collectors {
if collector.Metadata().Version == version {
return collector, true
}
}
return nil, false
}

// CollectorMetadata contains information about a collector.
type CollectorMetadata struct {
IsDefaultVersion bool
Expand All @@ -88,23 +57,25 @@ func (cm CollectorMetadata) FullName() string {
return cm.Name
}

// OrchestratorInformerFactory contains all informer factories used by the orchestration check
type OrchestratorInformerFactory struct {
InformerFactory informers.SharedInformerFactory
UnassignedPodInformerFactory informers.SharedInformerFactory
DynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory
CRDInformerFactory externalversions.SharedInformerFactory
VPAInformerFactory vpai.SharedInformerFactory
// K8sCollectorRunConfig is the configuration used to initialize or run the kubernetes collector.
type K8sCollectorRunConfig struct {
APIClient *apiserver.APIClient
OrchestratorInformerFactory *OrchestratorInformerFactory
}

// ECSCollectorRunConfig is the configuration used to initialize or run the ECS collector.
type ECSCollectorRunConfig struct {
WorkloadmetaStore workloadmeta.Component
}

// CollectorRunConfig is the configuration used to initialize or run the
// collector.
type CollectorRunConfig struct {
APIClient *apiserver.APIClient
ClusterID string
Config *config.OrchestratorConfig
MsgGroupRef *atomic.Int32
OrchestratorInformerFactory *OrchestratorInformerFactory
K8sCollectorRunConfig
ECSCollectorRunConfig
ClusterID string
Config *config.OrchestratorConfig
MsgGroupRef *atomic.Int32
}

// CollectorRunResult contains information about what the collector has done.
Expand All @@ -116,14 +87,3 @@ type CollectorRunResult struct {
ResourcesListed int
ResourcesProcessed int
}

func NewProcessorContext(rcfg *CollectorRunConfig, metadata *CollectorMetadata) *processors.ProcessorContext {
return &processors.ProcessorContext{
APIClient: rcfg.APIClient,
ApiGroupVersionTag: fmt.Sprintf("kube_api_version:%s", metadata.Version),
Cfg: rcfg.Config,
ClusterID: rcfg.ClusterID,
MsgGroupID: rcfg.MsgGroupRef.Inc(),
NodeType: metadata.NodeType,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCollectorInventory() *CollectorInventory {

// CollectorForDefaultVersion retrieves a collector given its name. It returns an error if the
// name is not known.
func (ci *CollectorInventory) CollectorForDefaultVersion(collectorName string) (collectors.Collector, error) {
func (ci *CollectorInventory) CollectorForDefaultVersion(collectorName string) (collectors.K8sCollector, error) {
for _, cv := range ci.collectors {
for _, c := range cv.Collectors {
if c.Metadata().Name == collectorName && c.Metadata().IsDefaultVersion {
Expand All @@ -67,7 +67,7 @@ func (ci *CollectorInventory) CollectorForDefaultVersion(collectorName string) (

// CollectorForVersion gets a collector given its name and version. It returns
// an error if the collector name or version is not known.
func (ci *CollectorInventory) CollectorForVersion(collectorName, collectorVersion string) (collectors.Collector, error) {
func (ci *CollectorInventory) CollectorForVersion(collectorName, collectorVersion string) (collectors.K8sCollector, error) {
for _, cv := range ci.collectors {
for _, c := range cv.Collectors {
if c.Metadata().Name == collectorName && c.Metadata().Version == collectorVersion {
Expand All @@ -79,8 +79,8 @@ func (ci *CollectorInventory) CollectorForVersion(collectorName, collectorVersio
}

// StableCollectors get a list of all stable collectors in the inventory.
func (ci *CollectorInventory) StableCollectors() []collectors.Collector {
var stableCollectors []collectors.Collector
func (ci *CollectorInventory) StableCollectors() []collectors.K8sCollector {
var stableCollectors []collectors.K8sCollector
for _, cv := range ci.collectors {
for _, c := range cv.Collectors {
if c.Metadata().IsStable && c.Metadata().IsDefaultVersion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
package k8s

import (
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"

"k8s.io/apimachinery/pkg/labels"
corev1Informers "k8s.io/client-go/informers/core/v1"
corev1Listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"
)

// NewClusterCollectorVersions builds the group of collector versions.
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c *ClusterCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed, err := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *ClusterRoleCollector) Run(rcfg *collectors.CollectorRunConfig) (*collec
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *ClusterRoleBindingCollector) Run(rcfg *collectors.CollectorRunConfig) (
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ func (c *CRCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.Coll
return nil, collectors.NewListingError(fmt.Errorf("crd collector %s/%s has reached to the limit %d, skipping it", c.metadata.Version, c.metadata.Name, defaultMaximumCRDQuota))
}

ctx := &processors.ProcessorContext{
APIClient: rcfg.APIClient,
Cfg: rcfg.Config,
ClusterID: rcfg.ClusterID,
MsgGroupID: rcfg.MsgGroupRef.Inc(),
NodeType: c.metadata.NodeType,
}
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,7 @@ func (c *CRDCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.Col
return nil, collectors.NewListingError(err)
}

ctx := &processors.ProcessorContext{
APIClient: rcfg.APIClient,
Cfg: rcfg.Config,
ClusterID: rcfg.ClusterID,
MsgGroupID: rcfg.MsgGroupRef.Inc(),
NodeType: c.metadata.NodeType,
}
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *CronJobV1Collector) Run(rcfg *collectors.CollectorRunConfig) (*collecto
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c *CronJobV1Beta1Collector) Run(rcfg *collectors.CollectorRunConfig) (*col
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *DaemonSetCollector) Run(rcfg *collectors.CollectorRunConfig) (*collecto
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *DeploymentCollector) Run(rcfg *collectors.CollectorRunConfig) (*collect
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *HorizontalPodAutoscalerCollector) Run(rcfg *collectors.CollectorRunConf
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *IngressCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *JobCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.Col
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *NamespaceCollector) Run(rcfg *collectors.CollectorRunConfig) (*collecto
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
package k8s

import (
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"

"k8s.io/apimachinery/pkg/labels"
networkingv1Informers "k8s.io/client-go/informers/networking/v1"
networkingv1Listers "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/collectors"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors"
k8sProcessors "github.com/DataDog/datadog-agent/pkg/collector/corechecks/cluster/orchestrator/processors/k8s"
"github.com/DataDog/datadog-agent/pkg/orchestrator"
)

// NewNetworkPolicyCollectorVersions builds the group of collector versions.
Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *NetworkPolicyCollector) Run(rcfg *collectors.CollectorRunConfig) (*coll
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *NodeCollector) Run(rcfg *collectors.CollectorRunConfig) (*collectors.Co
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *PersistentVolumeCollector) Run(rcfg *collectors.CollectorRunConfig) (*c
return nil, collectors.NewListingError(err)
}

ctx := collectors.NewProcessorContext(rcfg, c.metadata)
ctx := collectors.NewK8sProcessorContext(rcfg, c.metadata)

processResult, processed := c.processor.Process(ctx, list)

Expand Down
Loading

0 comments on commit e9f189e

Please sign in to comment.