diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 19673a21..02f1a598 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -10,7 +10,6 @@ on: push: branches: - main - - develop env: REGISTRY: 353146681200.dkr.ecr.us-east-1.amazonaws.com/otterize @@ -32,7 +31,6 @@ jobs: uses: actions/checkout@v2 with: submodules: recursive - token: ${{ secrets.OTTERIZEBOT_GITHUB_TOKEN }} # required for checking out submodules - name: Set up Docker Buildx id: buildx diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml deleted file mode 100644 index a9148dad..00000000 --- a/.github/workflows/golangci-lint.yml +++ /dev/null @@ -1,47 +0,0 @@ -name: golangci-lint -on: - push: - tags: - - v* - branches: - - main - pull_request: -permissions: - contents: read - # Optional: allow read access to pull request. Use with `only-new-issues` option. - # pull-requests: read -jobs: - golangci: - name: lint - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: '>=1.19.1' - - uses: actions/checkout@v3 - - name: Install dependencies - run: sudo apt update && sudo apt install libpcap-dev # required for the linter to be able to lint github.com/google/gopacket - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - # Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version - version: v1.50.1 - - # Optional: working directory, useful for monorepos - working-directory: src - - # Optional: golangci-lint command line arguments. - args: --timeout 5m - - # Optional: show only new issues if it's a pull request. The default value is `false`. - # only-new-issues: true - - # Optional: if set to true then the all caching functionality will be complete disabled, - # takes precedence over all other caching options. -# skip-cache: true - - # Optional: if set to true then the action don't cache or restore ~/go/pkg. -# skip-pkg-cache: true - - # Optional: if set to true then the action don't cache or restore ~/.cache/go-build. -# skip-build-cache: true \ No newline at end of file diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 9b200876..00000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "src/cloudgraphql"] - path = src/cloudgraphql - url = https://github.com/otterize/graphql diff --git a/src/cloudgraphql b/src/cloudgraphql deleted file mode 160000 index 4d044b48..00000000 --- a/src/cloudgraphql +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 4d044b48dac3079db5a4e431f6b49fbc8fd40546 diff --git a/src/go.mod b/src/go.mod index abe846fd..e8267d60 100644 --- a/src/go.mod +++ b/src/go.mod @@ -16,7 +16,6 @@ require ( github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 github.com/vektah/gqlparser/v2 v2.4.5 - golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 k8s.io/api v0.25.0 k8s.io/apimachinery v0.25.0 k8s.io/client-go v0.25.0 @@ -87,6 +86,7 @@ require ( golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/src/mappergraphql/schema.graphql b/src/graphql/schema.graphql similarity index 77% rename from src/mappergraphql/schema.graphql rename to src/graphql/schema.graphql index a4f1b8df..f025ad98 100644 --- a/src/mappergraphql/schema.graphql +++ b/src/graphql/schema.graphql @@ -1,13 +1,8 @@ -scalar Time # Equivalent of Go's time.Time provided by gqlgen + input CaptureResultForSrcIp { srcIp: String! - destinations: [Destination!]! -} - -input Destination { - destination: String! - lastSeen: Time! + destinations: [String!]! } input CaptureResults { @@ -16,7 +11,7 @@ input CaptureResults { input SocketScanResultForSrcIp { srcIp: String! - destIps: [Destination!]! + destIps: [String!]! } input SocketScanResults { diff --git a/src/mapper.Dockerfile b/src/mapper.Dockerfile index b17b7dae..b47c58b9 100644 --- a/src/mapper.Dockerfile +++ b/src/mapper.Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=linux/amd64 golang:1.19-alpine as buildenv +FROM --platform=linux/amd64 golang:1.18-alpine as buildenv RUN apk add --no-cache ca-certificates git protoc RUN apk add build-base libpcap-dev WORKDIR /src diff --git a/src/mapper/cmd/main.go b/src/mapper/cmd/main.go index 12020c40..782468eb 100644 --- a/src/mapper/cmd/main.go +++ b/src/mapper/cmd/main.go @@ -5,8 +5,6 @@ import ( "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" "github.com/otterize/intents-operator/src/shared/serviceidresolver" - "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" - "github.com/otterize/network-mapper/src/mapper/pkg/clouduploader" "github.com/otterize/network-mapper/src/mapper/pkg/config" "github.com/otterize/network-mapper/src/mapper/pkg/kubefinder" "github.com/otterize/network-mapper/src/mapper/pkg/resolvers" @@ -15,11 +13,9 @@ import ( "github.com/spf13/viper" "net/http" "os" - "os/signal" clientconfig "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "syscall" "time" ) @@ -83,22 +79,11 @@ func main() { logrus.Error(intentHolderCfg) os.Exit(1) } - - cloudConfig := clouduploader.ConfigFromViper() intentsHolder := resolvers.NewIntentsHolder(mgr.GetClient(), intentHolderCfg) - cloudClient := clouduploader.NewCloudUploader(intentsHolder, cloudConfig, cloudclient.NewClient) resolver := resolvers.NewResolver(kubeFinder, serviceidresolver.NewResolver(mgr.GetClient()), intentsHolder) _ = resolver.LoadStore(initCtx) // loads the store from the previous run resolver.Register(e) - if cloudConfig.IsCloudUploadEnabled() { - go func() { - cloudClientCtx, cloudClientCancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer cloudClientCancel() - cloudClient.PeriodicIntentsUpload(cloudClientCtx) - }() - } - logrus.Info("Starting api server") err = e.Start("0.0.0.0:9090") if err != nil { diff --git a/src/mapper/gqlgen.yml b/src/mapper/gqlgen.yml index 38ed6627..fb309a25 100644 --- a/src/mapper/gqlgen.yml +++ b/src/mapper/gqlgen.yml @@ -1,6 +1,6 @@ # Where are all the schema files located? globs are supported eg src/**/*.graphqls schema: - - '../mappergraphql/*.graphql' + - '../graphql/*' # Where should the generated server code go? exec: diff --git a/src/mapper/pkg/cloudclient/cloud_client.go b/src/mapper/pkg/cloudclient/cloud_client.go deleted file mode 100644 index ac0e748b..00000000 --- a/src/mapper/pkg/cloudclient/cloud_client.go +++ /dev/null @@ -1,52 +0,0 @@ -package cloudclient - -import ( - "context" - "fmt" - "github.com/Khan/genqlient/graphql" - "github.com/sirupsen/logrus" - "golang.org/x/oauth2" -) - -type FactoryFunction func(ctx context.Context, apiAddress string, tokenSource oauth2.TokenSource) CloudClient - -type CloudClient interface { - ReportDiscoveredIntents(intents []*DiscoveredIntentInput) bool - ReportComponentStatus(component ComponentType) -} - -type CloudClientImpl struct { - ctx context.Context - client graphql.Client -} - -func NewClient(ctx context.Context, apiAddress string, tokenSource oauth2.TokenSource) CloudClient { - url := fmt.Sprintf("%s/graphql/v1", apiAddress) - client := graphql.NewClient(url, oauth2.NewClient(ctx, tokenSource)) - - return &CloudClientImpl{ - client: client, - ctx: ctx, - } -} - -func (c *CloudClientImpl) ReportDiscoveredIntents(intents []*DiscoveredIntentInput) bool { - logrus.Info("Uploading intents to cloud, count: ", len(intents)) - - _, err := ReportDiscoveredIntents(c.ctx, c.client, intents) - if err != nil { - logrus.Error("Failed to upload intents to cloud ", err) - return false - } - - return true -} - -func (c *CloudClientImpl) ReportComponentStatus(component ComponentType) { - logrus.Info("Uploading component to cloud") - - _, err := ReportComponentStatus(c.ctx, c.client, component) - if err != nil { - logrus.Error("Failed to upload component to cloud ", err) - } -} diff --git a/src/mapper/pkg/cloudclient/generate.go b/src/mapper/pkg/cloudclient/generate.go deleted file mode 100644 index 12a254de..00000000 --- a/src/mapper/pkg/cloudclient/generate.go +++ /dev/null @@ -1,4 +0,0 @@ -package cloudclient - -//go:generate go run github.com/Khan/genqlient ./genqlient.yaml -//go:generate go run github.com/golang/mock/mockgen@v1.6.0 -destination=./mocks/mocks.go -package=cloudclientmocks -source=./cloud_client.go CloudClient diff --git a/src/mapper/pkg/cloudclient/generated.go b/src/mapper/pkg/cloudclient/generated.go deleted file mode 100644 index be39541d..00000000 --- a/src/mapper/pkg/cloudclient/generated.go +++ /dev/null @@ -1,213 +0,0 @@ -// Code generated by github.com/Khan/genqlient, DO NOT EDIT. - -package cloudclient - -import ( - "context" - "time" - - "github.com/Khan/genqlient/graphql" -) - -type ComponentType string - -const ( - ComponentTypeIntentsOperator ComponentType = "INTENTS_OPERATOR" - ComponentTypeCredentialsOperator ComponentType = "CREDENTIALS_OPERATOR" - ComponentTypeNetworkMapper ComponentType = "NETWORK_MAPPER" -) - -type DiscoveredIntentInput struct { - DiscoveredAt *time.Time `json:"discoveredAt"` - Intent *IntentInput `json:"intent"` -} - -// GetDiscoveredAt returns DiscoveredIntentInput.DiscoveredAt, and is useful for accessing the field via an interface. -func (v *DiscoveredIntentInput) GetDiscoveredAt() *time.Time { return v.DiscoveredAt } - -// GetIntent returns DiscoveredIntentInput.Intent, and is useful for accessing the field via an interface. -func (v *DiscoveredIntentInput) GetIntent() *IntentInput { return v.Intent } - -type HTTPConfigInput struct { - Path *string `json:"path"` - Method *HTTPMethod `json:"method"` -} - -// GetPath returns HTTPConfigInput.Path, and is useful for accessing the field via an interface. -func (v *HTTPConfigInput) GetPath() *string { return v.Path } - -// GetMethod returns HTTPConfigInput.Method, and is useful for accessing the field via an interface. -func (v *HTTPConfigInput) GetMethod() *HTTPMethod { return v.Method } - -type HTTPMethod string - -const ( - HTTPMethodGet HTTPMethod = "GET" - HTTPMethodPost HTTPMethod = "POST" - HTTPMethodPut HTTPMethod = "PUT" - HTTPMethodDelete HTTPMethod = "DELETE" - HTTPMethodOptions HTTPMethod = "OPTIONS" - HTTPMethodTrace HTTPMethod = "TRACE" - HTTPMethodPatch HTTPMethod = "PATCH" - HTTPMethodConnect HTTPMethod = "CONNECT" -) - -type IntentInput struct { - Namespace *string `json:"namespace"` - ClientName *string `json:"clientName"` - ServerName *string `json:"serverName"` - ServerNamespace *string `json:"serverNamespace"` - Type *IntentType `json:"type"` - Topics []*KafkaConfigInput `json:"topics"` - Resources []*HTTPConfigInput `json:"resources"` -} - -// GetNamespace returns IntentInput.Namespace, and is useful for accessing the field via an interface. -func (v *IntentInput) GetNamespace() *string { return v.Namespace } - -// GetClientName returns IntentInput.ClientName, and is useful for accessing the field via an interface. -func (v *IntentInput) GetClientName() *string { return v.ClientName } - -// GetServerName returns IntentInput.ServerName, and is useful for accessing the field via an interface. -func (v *IntentInput) GetServerName() *string { return v.ServerName } - -// GetServerNamespace returns IntentInput.ServerNamespace, and is useful for accessing the field via an interface. -func (v *IntentInput) GetServerNamespace() *string { return v.ServerNamespace } - -// GetType returns IntentInput.Type, and is useful for accessing the field via an interface. -func (v *IntentInput) GetType() *IntentType { return v.Type } - -// GetTopics returns IntentInput.Topics, and is useful for accessing the field via an interface. -func (v *IntentInput) GetTopics() []*KafkaConfigInput { return v.Topics } - -// GetResources returns IntentInput.Resources, and is useful for accessing the field via an interface. -func (v *IntentInput) GetResources() []*HTTPConfigInput { return v.Resources } - -type IntentType string - -const ( - IntentTypeHttp IntentType = "HTTP" - IntentTypeKafka IntentType = "KAFKA" -) - -type KafkaConfigInput struct { - Name *string `json:"name"` - Operations []*KafkaOperation `json:"operations"` -} - -// GetName returns KafkaConfigInput.Name, and is useful for accessing the field via an interface. -func (v *KafkaConfigInput) GetName() *string { return v.Name } - -// GetOperations returns KafkaConfigInput.Operations, and is useful for accessing the field via an interface. -func (v *KafkaConfigInput) GetOperations() []*KafkaOperation { return v.Operations } - -type KafkaOperation string - -const ( - KafkaOperationConsume KafkaOperation = "CONSUME" - KafkaOperationProduce KafkaOperation = "PRODUCE" - KafkaOperationCreate KafkaOperation = "CREATE" - KafkaOperationAlter KafkaOperation = "ALTER" - KafkaOperationDelete KafkaOperation = "DELETE" - KafkaOperationDescribe KafkaOperation = "DESCRIBE" - KafkaOperationClusterAction KafkaOperation = "CLUSTER_ACTION" - KafkaOperationDescribeConfigs KafkaOperation = "DESCRIBE_CONFIGS" - KafkaOperationAlterConfigs KafkaOperation = "ALTER_CONFIGS" - KafkaOperationIdempotentWrite KafkaOperation = "IDEMPOTENT_WRITE" -) - -// ReportComponentStatusResponse is returned by ReportComponentStatus on success. -type ReportComponentStatusResponse struct { - ReportIntegrationComponentStatus bool `json:"reportIntegrationComponentStatus"` -} - -// GetReportIntegrationComponentStatus returns ReportComponentStatusResponse.ReportIntegrationComponentStatus, and is useful for accessing the field via an interface. -func (v *ReportComponentStatusResponse) GetReportIntegrationComponentStatus() bool { - return v.ReportIntegrationComponentStatus -} - -// ReportDiscoveredIntentsResponse is returned by ReportDiscoveredIntents on success. -type ReportDiscoveredIntentsResponse struct { - ReportDiscoveredIntents *bool `json:"reportDiscoveredIntents"` -} - -// GetReportDiscoveredIntents returns ReportDiscoveredIntentsResponse.ReportDiscoveredIntents, and is useful for accessing the field via an interface. -func (v *ReportDiscoveredIntentsResponse) GetReportDiscoveredIntents() *bool { - return v.ReportDiscoveredIntents -} - -// __ReportComponentStatusInput is used internally by genqlient -type __ReportComponentStatusInput struct { - Component ComponentType `json:"component"` -} - -// GetComponent returns __ReportComponentStatusInput.Component, and is useful for accessing the field via an interface. -func (v *__ReportComponentStatusInput) GetComponent() ComponentType { return v.Component } - -// __ReportDiscoveredIntentsInput is used internally by genqlient -type __ReportDiscoveredIntentsInput struct { - Intents []*DiscoveredIntentInput `json:"intents"` -} - -// GetIntents returns __ReportDiscoveredIntentsInput.Intents, and is useful for accessing the field via an interface. -func (v *__ReportDiscoveredIntentsInput) GetIntents() []*DiscoveredIntentInput { return v.Intents } - -func ReportComponentStatus( - ctx context.Context, - client graphql.Client, - component ComponentType, -) (*ReportComponentStatusResponse, error) { - req := &graphql.Request{ - OpName: "ReportComponentStatus", - Query: ` -mutation ReportComponentStatus ($component: ComponentType!) { - reportIntegrationComponentStatus(component: $component) -} -`, - Variables: &__ReportComponentStatusInput{ - Component: component, - }, - } - var err error - - var data ReportComponentStatusResponse - resp := &graphql.Response{Data: &data} - - err = client.MakeRequest( - ctx, - req, - resp, - ) - - return &data, err -} - -func ReportDiscoveredIntents( - ctx context.Context, - client graphql.Client, - intents []*DiscoveredIntentInput, -) (*ReportDiscoveredIntentsResponse, error) { - req := &graphql.Request{ - OpName: "ReportDiscoveredIntents", - Query: ` -mutation ReportDiscoveredIntents ($intents: [DiscoveredIntentInput!]!) { - reportDiscoveredIntents(intents: $intents) -} -`, - Variables: &__ReportDiscoveredIntentsInput{ - Intents: intents, - }, - } - var err error - - var data ReportDiscoveredIntentsResponse - resp := &graphql.Response{Data: &data} - - err = client.MakeRequest( - ctx, - req, - resp, - ) - - return &data, err -} diff --git a/src/mapper/pkg/cloudclient/genqlient.graphql b/src/mapper/pkg/cloudclient/genqlient.graphql deleted file mode 100644 index 906f491e..00000000 --- a/src/mapper/pkg/cloudclient/genqlient.graphql +++ /dev/null @@ -1,8 +0,0 @@ -# @genqlient(pointer: true) -mutation ReportDiscoveredIntents($intents: [DiscoveredIntentInput!]!) { - reportDiscoveredIntents(intents: $intents) -} - -mutation ReportComponentStatus($component: ComponentType!) { - reportIntegrationComponentStatus(component: $component) -} \ No newline at end of file diff --git a/src/mapper/pkg/cloudclient/genqlient.yaml b/src/mapper/pkg/cloudclient/genqlient.yaml deleted file mode 100644 index a7d57948..00000000 --- a/src/mapper/pkg/cloudclient/genqlient.yaml +++ /dev/null @@ -1,11 +0,0 @@ -# genqlient config; for full documentation see: -# https://github.com/Khan/genqlient/blob/main/docs/genqlient.yaml -schema: - - '../../../cloudgraphql/*.graphql' - - '../../../cloudgraphql/federation/*.graphql' -operations: - - genqlient.graphql -generated: ./generated.go -bindings: - Time: - type: time.Time diff --git a/src/mapper/pkg/cloudclient/mocks/dummy.go b/src/mapper/pkg/cloudclient/mocks/dummy.go deleted file mode 100644 index e8b14648..00000000 --- a/src/mapper/pkg/cloudclient/mocks/dummy.go +++ /dev/null @@ -1 +0,0 @@ -package cloudclientmocks diff --git a/src/mapper/pkg/cloudclient/mocks/mocks.go b/src/mapper/pkg/cloudclient/mocks/mocks.go deleted file mode 100644 index e336058d..00000000 --- a/src/mapper/pkg/cloudclient/mocks/mocks.go +++ /dev/null @@ -1,61 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: ./cloud_client.go - -// Package cloudclientmocks is a generated GoMock package. -package cloudclientmocks - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - cloudclient "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" -) - -// MockCloudClient is a mock of CloudClient interface. -type MockCloudClient struct { - ctrl *gomock.Controller - recorder *MockCloudClientMockRecorder -} - -// MockCloudClientMockRecorder is the mock recorder for MockCloudClient. -type MockCloudClientMockRecorder struct { - mock *MockCloudClient -} - -// NewMockCloudClient creates a new mock instance. -func NewMockCloudClient(ctrl *gomock.Controller) *MockCloudClient { - mock := &MockCloudClient{ctrl: ctrl} - mock.recorder = &MockCloudClientMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockCloudClient) EXPECT() *MockCloudClientMockRecorder { - return m.recorder -} - -// ReportComponentStatus mocks base method. -func (m *MockCloudClient) ReportComponentStatus(component cloudclient.ComponentType) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "ReportComponentStatus", component) -} - -// ReportComponentStatus indicates an expected call of ReportComponentStatus. -func (mr *MockCloudClientMockRecorder) ReportComponentStatus(component interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportComponentStatus", reflect.TypeOf((*MockCloudClient)(nil).ReportComponentStatus), component) -} - -// ReportDiscoveredIntents mocks base method. -func (m *MockCloudClient) ReportDiscoveredIntents(intents []*cloudclient.DiscoveredIntentInput) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportDiscoveredIntents", intents) - ret0, _ := ret[0].(bool) - return ret0 -} - -// ReportDiscoveredIntents indicates an expected call of ReportDiscoveredIntents. -func (mr *MockCloudClientMockRecorder) ReportDiscoveredIntents(intents interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportDiscoveredIntents", reflect.TypeOf((*MockCloudClient)(nil).ReportDiscoveredIntents), intents) -} diff --git a/src/mapper/pkg/clouduploader/cloud_config.go b/src/mapper/pkg/clouduploader/cloud_config.go deleted file mode 100644 index 0cb30dfa..00000000 --- a/src/mapper/pkg/clouduploader/cloud_config.go +++ /dev/null @@ -1,26 +0,0 @@ -package clouduploader - -import ( - "github.com/otterize/network-mapper/src/mapper/pkg/config" - "github.com/spf13/viper" -) - -type Config struct { - ClientId string - Secret string - apiAddress string - UploadInterval int -} - -func ConfigFromViper() Config { - return Config{ - Secret: viper.GetString(config.ClientSecretKey), - ClientId: viper.GetString(config.ClientIDKey), - apiAddress: viper.GetString(config.CloudApiAddrKey), - UploadInterval: viper.GetInt(config.UploadIntervalSecondsKey), - } -} - -func (c *Config) IsCloudUploadEnabled() bool { - return c.ClientId != "" && c.Secret != "" -} diff --git a/src/mapper/pkg/clouduploader/cloud_upload.go b/src/mapper/pkg/clouduploader/cloud_upload.go deleted file mode 100644 index 8c8a62fe..00000000 --- a/src/mapper/pkg/clouduploader/cloud_upload.go +++ /dev/null @@ -1,100 +0,0 @@ -package clouduploader - -import ( - "context" - "fmt" - "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" - "github.com/otterize/network-mapper/src/mapper/pkg/resolvers" - "github.com/samber/lo" - "github.com/sirupsen/logrus" - "golang.org/x/oauth2" - "golang.org/x/oauth2/clientcredentials" - "time" -) - -type CloudUploader struct { - intentsHolder *resolvers.IntentsHolder - config Config - tokenSrc oauth2.TokenSource - lastUploadTimestamp time.Time - cloudClientFactory cloudclient.FactoryFunction -} - -func NewCloudUploader(intentsHolder *resolvers.IntentsHolder, config Config, cloudClientFactory cloudclient.FactoryFunction) *CloudUploader { - cfg := clientcredentials.Config{ - ClientID: config.ClientId, - ClientSecret: config.Secret, - TokenURL: fmt.Sprintf("%s/auth/tokens/token", config.apiAddress), - AuthStyle: oauth2.AuthStyleInParams, - } - - tokenSrc := cfg.TokenSource(context.Background()) - - return &CloudUploader{ - intentsHolder: intentsHolder, - config: config, - tokenSrc: tokenSrc, - cloudClientFactory: cloudClientFactory, - } -} - -func (c *CloudUploader) uploadDiscoveredIntents(ctx context.Context) { - logrus.Info("Search for intents") - - client := c.cloudClientFactory(ctx, c.config.apiAddress, c.tokenSrc) - - lastUpdate := c.intentsHolder.LastIntentsUpdate() - if !c.lastUploadTimestamp.Before(lastUpdate) { - return - } - - var discoveredIntents []*cloudclient.DiscoveredIntentInput - for _, intent := range c.intentsHolder.GetIntents(nil) { - var discoveredIntent cloudclient.IntentInput - discoveredIntent.ClientName = lo.ToPtr(intent.Source.Name) - discoveredIntent.Namespace = lo.ToPtr(intent.Source.Namespace) - discoveredIntent.ServerName = lo.ToPtr(intent.Destination.Name) - discoveredIntent.ServerNamespace = lo.ToPtr(intent.Destination.Namespace) - - input := &cloudclient.DiscoveredIntentInput{ - DiscoveredAt: lo.ToPtr(intent.Timestamp), - Intent: &discoveredIntent, - } - - discoveredIntents = append(discoveredIntents, input) - } - - if len(discoveredIntents) == 0 { - return - } - - uploadSuccess := client.ReportDiscoveredIntents(discoveredIntents) - if uploadSuccess { - c.lastUploadTimestamp = lastUpdate - } -} - -func (c *CloudUploader) reportStatus(ctx context.Context) { - client := c.cloudClientFactory(ctx, c.config.apiAddress, c.tokenSrc) - - client.ReportComponentStatus(cloudclient.ComponentTypeNetworkMapper) -} - -func (c *CloudUploader) PeriodicIntentsUpload(ctx context.Context) { - cloudUploadTicker := time.NewTicker(time.Second * time.Duration(c.config.UploadInterval)) - - logrus.Info("Starting cloud ticker") - c.reportStatus(ctx) - - for { - select { - case <-cloudUploadTicker.C: - c.uploadDiscoveredIntents(ctx) - c.reportStatus(ctx) - - case <-ctx.Done(): - logrus.Info("Periodic upload exit") - return - } - } -} diff --git a/src/mapper/pkg/clouduploader/cloud_uploader_test.go b/src/mapper/pkg/clouduploader/cloud_uploader_test.go deleted file mode 100644 index bea6c9bc..00000000 --- a/src/mapper/pkg/clouduploader/cloud_uploader_test.go +++ /dev/null @@ -1,160 +0,0 @@ -package clouduploader - -import ( - "context" - "github.com/golang/mock/gomock" - "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" - cloudclientmocks "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient/mocks" - "github.com/otterize/network-mapper/src/mapper/pkg/config" - "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" - "github.com/otterize/network-mapper/src/mapper/pkg/resolvers" - "github.com/samber/lo" - "github.com/stretchr/testify/suite" - "golang.org/x/oauth2" - "testing" - "time" -) - -var ( - testTimestamp = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) -) - -type CloudUploaderTestSuite struct { - suite.Suite - testNamespace string - intentsHolder *resolvers.IntentsHolder - cloudUploader *CloudUploader - cloudConfig Config - clientMock *cloudclientmocks.MockCloudClient -} - -func (s *CloudUploaderTestSuite) SetupTest() { - s.testNamespace = "test-namespace" - s.intentsHolder = resolvers.NewIntentsHolder(nil, resolvers.IntentsHolderConfig{StoreConfigMap: config.StoreConfigMapDefault, Namespace: s.testNamespace}) - s.cloudConfig = Config{ - ClientId: "test-client-id", - } -} - -func (s *CloudUploaderTestSuite) BeforeTest(_, testName string) { - controller := gomock.NewController(s.T()) - factory := s.GetCloudClientFactoryMock(controller) - s.cloudUploader = NewCloudUploader(s.intentsHolder, s.cloudConfig, factory) -} - -func (s *CloudUploaderTestSuite) GetCloudClientFactoryMock(controller *gomock.Controller) cloudclient.FactoryFunction { - s.clientMock = cloudclientmocks.NewMockCloudClient(controller) - - factory := func(ctx context.Context, apiAddress string, tokenSource oauth2.TokenSource) cloudclient.CloudClient { - return s.clientMock - } - return factory -} - -func (s *CloudUploaderTestSuite) addIntent(source string, srcNamespace string, destination string, dstNamespace string) { - s.intentsHolder.AddIntent( - model.OtterizeServiceIdentity{Name: source, Namespace: srcNamespace}, - model.OtterizeServiceIdentity{Name: destination, Namespace: dstNamespace}, - testTimestamp, - ) -} - -func intentInput(clientName string, namespace string, serverName string, serverNamespace string) cloudclient.IntentInput { - nilIfEmpty := func(s string) *string { - if s == "" { - return nil - } - return lo.ToPtr(s) - } - - return cloudclient.IntentInput{ - ClientName: nilIfEmpty(clientName), - ServerName: nilIfEmpty(serverName), - Namespace: nilIfEmpty(namespace), - ServerNamespace: nilIfEmpty(serverNamespace), - } -} - -func (s *CloudUploaderTestSuite) TestUploadIntents() { - s.addIntent("client1", s.testNamespace, "server1", s.testNamespace) - s.addIntent("client1", s.testNamespace, "server2", "external-namespace") - - intents1 := []cloudclient.IntentInput{ - intentInput("client1", s.testNamespace, "server1", s.testNamespace), - intentInput("client1", s.testNamespace, "server2", "external-namespace"), - } - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents1)).Return(true).Times(1) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) - - s.addIntent("client2", s.testNamespace, "server1", s.testNamespace) - - intents2 := []cloudclient.IntentInput{ - intentInput("client2", s.testNamespace, "server1", s.testNamespace), - intentInput("client1", s.testNamespace, "server1", s.testNamespace), - intentInput("client1", s.testNamespace, "server2", "external-namespace"), - } - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents2)).Return(true).Times(1) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) -} - -func (s *CloudUploaderTestSuite) TestDontUploadWithoutIntents() { - s.clientMock.EXPECT().ReportDiscoveredIntents(gomock.Any()).Times(0) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) -} - -func (s *CloudUploaderTestSuite) TestUploadSameIntentOnce() { - s.addIntent("client", s.testNamespace, "server", s.testNamespace) - - intents := []cloudclient.IntentInput{ - intentInput("client", s.testNamespace, "server", s.testNamespace), - } - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents)).Return(true).Times(1) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) - s.addIntent("client", s.testNamespace, "server", s.testNamespace) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) -} - -func (s *CloudUploaderTestSuite) TestRetryOnFailed() { - s.addIntent("client", s.testNamespace, "server", s.testNamespace) - - intents := []cloudclient.IntentInput{ - intentInput("client", s.testNamespace, "server", s.testNamespace), - } - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents)).Return(false).Times(1) - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents)).Return(true).Times(1) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) -} - -func (s *CloudUploaderTestSuite) TestDontUploadWhenNothingNew() { - s.addIntent("client", s.testNamespace, "server", s.testNamespace) - - intents := []cloudclient.IntentInput{ - intentInput("client", s.testNamespace, "server", s.testNamespace), - } - - s.clientMock.EXPECT().ReportDiscoveredIntents(GetMatcher(intents)).Return(true).Times(1) - - s.cloudUploader.uploadDiscoveredIntents(context.Background()) - s.cloudUploader.uploadDiscoveredIntents(context.Background()) -} - -func (s *CloudUploaderTestSuite) TestReportMapperComonent() { - s.clientMock.EXPECT().ReportComponentStatus(cloudclient.ComponentTypeNetworkMapper).Times(1) - - s.cloudUploader.reportStatus(context.Background()) -} - -func TestRunSuite(t *testing.T) { - suite.Run(t, new(CloudUploaderTestSuite)) -} diff --git a/src/mapper/pkg/clouduploader/intents_input_matcher.go b/src/mapper/pkg/clouduploader/intents_input_matcher.go deleted file mode 100644 index e905a16d..00000000 --- a/src/mapper/pkg/clouduploader/intents_input_matcher.go +++ /dev/null @@ -1,102 +0,0 @@ -package clouduploader - -import ( - "fmt" - "github.com/otterize/network-mapper/src/mapper/pkg/cloudclient" -) - -// IntentsMatcher Implement gomock.Matcher interface for []cloudclient.IntentInput -type IntentsMatcher struct { - expected []cloudclient.IntentInput -} - -func NilCompare[T comparable](a *T, b *T) bool { - return (a == nil && b == nil) || (a != nil && b != nil && *a == *b) -} - -func compareIntentInput(a cloudclient.IntentInput, b cloudclient.IntentInput) bool { - return NilCompare(a.ClientName, b.ClientName) && - NilCompare(a.Namespace, b.Namespace) && - NilCompare(a.ServerName, b.ServerName) && - NilCompare(a.ServerNamespace, b.ServerNamespace) -} - -func (m IntentsMatcher) Matches(x interface{}) bool { - if x == nil { - return false - } - actualDiscoveredIntents, ok := x.([]*cloudclient.DiscoveredIntentInput) - if !ok { - return false - } - expectedIntents := m.expected - actualIntents := discoveredIntentsPtrToIntents(actualDiscoveredIntents) - - if len(actualIntents) != len(expectedIntents) { - return false - } - - for _, expected := range expectedIntents { - found := false - for _, actual := range actualIntents { - if compareIntentInput(actual, expected) { - found = true - break - } - } - if !found { - return false - } - } - return true -} - -func discoveredIntentsPtrToIntents(actualDiscoveredIntents []*cloudclient.DiscoveredIntentInput) []cloudclient.IntentInput { - actualIntents := make([]cloudclient.IntentInput, 0) - for _, intent := range actualDiscoveredIntents { - intentObject := *intent.Intent - actualIntents = append(actualIntents, intentObject) - } - return actualIntents -} - -func (m IntentsMatcher) String() string { - return prettyPrint(m) -} - -func prettyPrint(m IntentsMatcher) string { - expected := m.expected - var result string - itemFormat := "IntentInput{ClientName: %s, ServerName: %s, Namespace: %s, ServerNamespace: %s}," - for _, intent := range expected { - var clientName, namespace, serverName, serverNamespace string - if intent.ClientName != nil { - clientName = *intent.ClientName - } - if intent.Namespace != nil { - namespace = *intent.Namespace - } - if intent.ServerName != nil { - serverName = *intent.ServerName - } - if intent.ServerNamespace != nil { - serverNamespace = *intent.ServerNamespace - } - result += fmt.Sprintf(itemFormat, clientName, serverName, namespace, serverNamespace) - } - - return result -} - -func (m IntentsMatcher) Got(got interface{}) string { - actual, ok := got.([]*cloudclient.DiscoveredIntentInput) - if !ok { - return fmt.Sprintf("Not an []*cloudclient.DiscoveredIntentInput, Got: %v", got) - } - - return prettyPrint(IntentsMatcher{discoveredIntentsPtrToIntents(actual)}) -} - -func GetMatcher(expected []cloudclient.IntentInput) IntentsMatcher { - return IntentsMatcher{expected} -} diff --git a/src/mapper/pkg/config/config.go b/src/mapper/pkg/config/config.go index 0aedbead..57034e47 100644 --- a/src/mapper/pkg/config/config.go +++ b/src/mapper/pkg/config/config.go @@ -7,21 +7,15 @@ import ( ) const ( - EnvPrefix = "OTTERIZE" - ClusterDomainKey = "cluster-domain" - ClusterDomainDefault = kubeutils.DefaultClusterDomain - DebugKey = "debug" - DebugDefault = false - StoreConfigMapKey = "store-config-map" - StoreConfigMapDefault = "otterize-network-mapper-store" - NamespaceKey = "namespace" // what namespace the mapper is running at. needed for development - NamespaceDefault = "" - CloudApiAddrKey = "api-address" - CloudApiAddrDefault = "https://app.otterize.com/api" - ClientSecretKey = "client-secret" - ClientIDKey = "client-id" - UploadIntervalSecondsKey = "upload-interval-seconds" - UploadIntervalSecondsDefault = 60 + EnvPrefix = "OTTERIZE" + ClusterDomainKey = "cluster-domain" + ClusterDomainDefault = kubeutils.DefaultClusterDomain + DebugKey = "debug" + DebugDefault = false + StoreConfigMapKey = "store-config-map" + StoreConfigMapDefault = "otterize-network-mapper-store" + NamespaceKey = "namespace" // what namespace the mapper is running at. needed for development + NamespaceDefault = "" ) func init() { @@ -29,8 +23,6 @@ func init() { viper.SetDefault(ClusterDomainKey, ClusterDomainDefault) // If not set by the user, the main.go of mapper will try to find the cluster domain and set it itself. viper.SetDefault(StoreConfigMapKey, StoreConfigMapDefault) viper.SetDefault(NamespaceKey, NamespaceDefault) - viper.SetDefault(CloudApiAddrKey, CloudApiAddrDefault) - viper.SetDefault(UploadIntervalSecondsKey, UploadIntervalSecondsDefault) viper.SetEnvPrefix(EnvPrefix) viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) viper.AutomaticEnv() diff --git a/src/mapper/pkg/graph/generated/generated.go b/src/mapper/pkg/graph/generated/generated.go index 4d5707eb..b6ddb5bb 100644 --- a/src/mapper/pkg/graph/generated/generated.go +++ b/src/mapper/pkg/graph/generated/generated.go @@ -9,7 +9,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/introspection" @@ -224,16 +223,11 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er } var sources = []*ast.Source{ - {Name: "../mappergraphql/schema.graphql", Input: `scalar Time # Equivalent of Go's time.Time provided by gqlgen + {Name: "../graphql/schema.graphql", Input: ` input CaptureResultForSrcIp { srcIp: String! - destinations: [Destination!]! -} - -input Destination { - destination: String! - lastSeen: Time! + destinations: [String!]! } input CaptureResults { @@ -242,7 +236,7 @@ input CaptureResults { input SocketScanResultForSrcIp { srcIp: String! - destIps: [Destination!]! + destIps: [String!]! } input SocketScanResults { @@ -1952,7 +1946,7 @@ func (ec *executionContext) unmarshalInputCaptureResultForSrcIp(ctx context.Cont var err error ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destinations")) - it.Destinations, err = ec.unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx, v) + it.Destinations, err = ec.unmarshalNString2ᚕstringᚄ(ctx, v) if err != nil { return it, err } @@ -1985,37 +1979,6 @@ func (ec *executionContext) unmarshalInputCaptureResults(ctx context.Context, ob return it, nil } -func (ec *executionContext) unmarshalInputDestination(ctx context.Context, obj interface{}) (model.Destination, error) { - var it model.Destination - asMap := map[string]interface{}{} - for k, v := range obj.(map[string]interface{}) { - asMap[k] = v - } - - for k, v := range asMap { - switch k { - case "destination": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destination")) - it.Destination, err = ec.unmarshalNString2string(ctx, v) - if err != nil { - return it, err - } - case "lastSeen": - var err error - - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("lastSeen")) - it.LastSeen, err = ec.unmarshalNTime2timeᚐTime(ctx, v) - if err != nil { - return it, err - } - } - } - - return it, nil -} - func (ec *executionContext) unmarshalInputSocketScanResultForSrcIp(ctx context.Context, obj interface{}) (model.SocketScanResultForSrcIP, error) { var it model.SocketScanResultForSrcIP asMap := map[string]interface{}{} @@ -2037,7 +2000,7 @@ func (ec *executionContext) unmarshalInputSocketScanResultForSrcIp(ctx context.C var err error ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destIps")) - it.DestIps, err = ec.unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx, v) + it.DestIps, err = ec.unmarshalNString2ᚕstringᚄ(ctx, v) if err != nil { return it, err } @@ -2752,28 +2715,6 @@ func (ec *executionContext) unmarshalNCaptureResults2githubᚗcomᚋotterizeᚋn return res, graphql.ErrorOnPath(ctx, err) } -func (ec *executionContext) unmarshalNDestination2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestination(ctx context.Context, v interface{}) (model.Destination, error) { - res, err := ec.unmarshalInputDestination(ctx, v) - return res, graphql.ErrorOnPath(ctx, err) -} - -func (ec *executionContext) unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx context.Context, v interface{}) ([]model.Destination, error) { - var vSlice []interface{} - if v != nil { - vSlice = graphql.CoerceList(v) - } - var err error - res := make([]model.Destination, len(vSlice)) - for i := range vSlice { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) - res[i], err = ec.unmarshalNDestination2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestination(ctx, vSlice[i]) - if err != nil { - return nil, err - } - } - return res, nil -} - func (ec *executionContext) marshalNOtterizeServiceIdentity2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐOtterizeServiceIdentity(ctx context.Context, sel ast.SelectionSet, v model.OtterizeServiceIdentity) graphql.Marshaler { return ec._OtterizeServiceIdentity(ctx, sel, &v) } @@ -2922,19 +2863,36 @@ func (ec *executionContext) marshalNString2string(ctx context.Context, sel ast.S return res } -func (ec *executionContext) unmarshalNTime2timeᚐTime(ctx context.Context, v interface{}) (time.Time, error) { - res, err := graphql.UnmarshalTime(v) - return res, graphql.ErrorOnPath(ctx, err) +func (ec *executionContext) unmarshalNString2ᚕstringᚄ(ctx context.Context, v interface{}) ([]string, error) { + var vSlice []interface{} + if v != nil { + vSlice = graphql.CoerceList(v) + } + var err error + res := make([]string, len(vSlice)) + for i := range vSlice { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) + res[i], err = ec.unmarshalNString2string(ctx, vSlice[i]) + if err != nil { + return nil, err + } + } + return res, nil } -func (ec *executionContext) marshalNTime2timeᚐTime(ctx context.Context, sel ast.SelectionSet, v time.Time) graphql.Marshaler { - res := graphql.MarshalTime(v) - if res == graphql.Null { - if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { - ec.Errorf(ctx, "must not be null") +func (ec *executionContext) marshalNString2ᚕstringᚄ(ctx context.Context, sel ast.SelectionSet, v []string) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + for i := range v { + ret[i] = ec.marshalNString2string(ctx, sel, v[i]) + } + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null } } - return res + + return ret } func (ec *executionContext) marshalN__Directive2githubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐDirective(ctx context.Context, sel ast.SelectionSet, v introspection.Directive) graphql.Marshaler { diff --git a/src/mapper/pkg/graph/model/models_gen.go b/src/mapper/pkg/graph/model/models_gen.go index 1d58cbf4..1939450d 100644 --- a/src/mapper/pkg/graph/model/models_gen.go +++ b/src/mapper/pkg/graph/model/models_gen.go @@ -2,24 +2,15 @@ package model -import ( - "time" -) - type CaptureResultForSrcIP struct { - SrcIP string `json:"srcIp"` - Destinations []Destination `json:"destinations"` + SrcIP string `json:"srcIp"` + Destinations []string `json:"destinations"` } type CaptureResults struct { Results []CaptureResultForSrcIP `json:"results"` } -type Destination struct { - Destination string `json:"destination"` - LastSeen time.Time `json:"lastSeen"` -} - type OtterizeServiceIdentity struct { Name string `json:"name"` Namespace string `json:"namespace"` @@ -31,8 +22,8 @@ type ServiceIntents struct { } type SocketScanResultForSrcIP struct { - SrcIP string `json:"srcIp"` - DestIps []Destination `json:"destIps"` + SrcIP string `json:"srcIp"` + DestIps []string `json:"destIps"` } type SocketScanResults struct { diff --git a/src/mapper/pkg/resolvers/helpers.go b/src/mapper/pkg/resolvers/helpers.go index 32d2a774..4a7e03e0 100644 --- a/src/mapper/pkg/resolvers/helpers.go +++ b/src/mapper/pkg/resolvers/helpers.go @@ -10,6 +10,7 @@ import ( "github.com/otterize/network-mapper/src/mapper/pkg/config" "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" "github.com/otterize/network-mapper/src/shared/kubeutils" + "github.com/samber/lo" "github.com/spf13/viper" "io" corev1 "k8s.io/api/core/v1" @@ -18,19 +19,28 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sort" "sync" - "time" ) -type SourceDestPair struct { - Source model.OtterizeServiceIdentity - Destination model.OtterizeServiceIdentity +type intentsHolderStore map[model.OtterizeServiceIdentity]*goset.Set[model.OtterizeServiceIdentity] + +func (s intentsHolderStore) MarshalJSON() ([]byte, error) { + // OtterizeServiceIdentity cannot be serialized as map key in JSON, because it is represented as a map itself + // therefore, we serialize the store as a slice of [Key, Value] "tuples" + return json.Marshal(lo.ToPairs(s)) } -type DiscoveredIntent struct { - Source model.OtterizeServiceIdentity `json:"source"` - Destination model.OtterizeServiceIdentity `json:"destination"` - Timestamp time.Time `json:"timestamp"` +func (s intentsHolderStore) UnmarshalJSON(b []byte) error { + var pairs []lo.Entry[model.OtterizeServiceIdentity, *goset.Set[model.OtterizeServiceIdentity]] + err := json.Unmarshal(b, &pairs) + if err != nil { + return err + } + for _, pair := range pairs { + s[pair.Key] = pair.Value + } + return nil } type IntentsHolderConfig struct { @@ -54,79 +64,66 @@ func IntentsHolderConfigFromViper() (IntentsHolderConfig, error) { } type IntentsHolder struct { - store map[SourceDestPair]time.Time - lock sync.Mutex - client client.Client - config IntentsHolderConfig - lastIntentsUpdate time.Time + store intentsHolderStore + lock sync.Mutex + client client.Client + config IntentsHolderConfig } func NewIntentsHolder(client client.Client, config IntentsHolderConfig) *IntentsHolder { return &IntentsHolder{ - store: newIntentsStore(nil), + store: make(intentsHolderStore), lock: sync.Mutex{}, client: client, config: config, } } -func newIntentsStore(intents []DiscoveredIntent) map[SourceDestPair]time.Time { - if intents == nil { - return make(map[SourceDestPair]time.Time) - } - - result := make(map[SourceDestPair]time.Time) - for _, intent := range intents { - result[SourceDestPair{Source: intent.Source, Destination: intent.Destination}] = intent.Timestamp - } - return result -} - func (i *IntentsHolder) Reset() { i.lock.Lock() defer i.lock.Unlock() - i.store = make(map[SourceDestPair]time.Time) + i.store = make(intentsHolderStore) } -func (i *IntentsHolder) AddIntent(srcService model.OtterizeServiceIdentity, dstService model.OtterizeServiceIdentity, newTimestamp time.Time) { +func (i *IntentsHolder) AddIntent(srcService model.OtterizeServiceIdentity, dstService model.OtterizeServiceIdentity) { i.lock.Lock() defer i.lock.Unlock() - - pair := SourceDestPair{Source: srcService, Destination: dstService} - currentTimestamp, alreadyExists := i.store[pair] - if !alreadyExists || newTimestamp.After(currentTimestamp) { - i.store[pair] = newTimestamp - i.lastIntentsUpdate = time.Now() + namespace := "" + if srcService.Namespace != dstService.Namespace { + // namespace is only needed if it's a connection between different namespaces. + namespace = dstService.Namespace } + intents, ok := i.store[srcService] + if !ok { + intents = goset.NewSet[model.OtterizeServiceIdentity]() + i.store[srcService] = intents + } + intents.Add(model.OtterizeServiceIdentity{Name: dstService.Name, Namespace: namespace}) } -func (i *IntentsHolder) LastIntentsUpdate() time.Time { - i.lock.Lock() - defer i.lock.Unlock() - return i.lastIntentsUpdate -} - -func (i *IntentsHolder) GetIntents(namespaces []string) []DiscoveredIntent { +func (i *IntentsHolder) GetIntentsPerService(namespaces []string) map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity { i.lock.Lock() defer i.lock.Unlock() - - return i.getIntents(namespaces) -} - -func (i *IntentsHolder) getIntents(namespaces []string) []DiscoveredIntent { namespacesSet := goset.FromSlice(namespaces) - result := make([]DiscoveredIntent, 0) - for pair, timestamp := range i.store { - if !namespacesSet.IsEmpty() && !namespacesSet.Contains(pair.Source.Namespace) { + result := make(map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity) + for service, intents := range i.store { + if !namespacesSet.IsEmpty() && !namespacesSet.Contains(service.Namespace) { continue } - - result = append(result, DiscoveredIntent{ - Source: pair.Source, - Destination: pair.Destination, - Timestamp: timestamp, + intentsSlice := intents.Items() + // sorting the intents so results will be consistent + sort.Slice(intentsSlice, func(i, j int) bool { + // Primary sort by name + if intentsSlice[i].Name != intentsSlice[j].Name { + return intentsSlice[i].Name < intentsSlice[j].Name + } + // Secondary sort by namespace + return intentsSlice[i].Namespace < intentsSlice[j].Namespace }) + if len(intentsSlice) != 0 { + result[service] = intentsSlice + } } return result } @@ -134,8 +131,7 @@ func (i *IntentsHolder) getIntents(namespaces []string) []DiscoveredIntent { func (i *IntentsHolder) WriteStore(ctx context.Context) error { i.lock.Lock() defer i.lock.Unlock() - intents := i.getIntents(nil) - jsonBytes, err := json.Marshal(intents) + jsonBytes, err := json.Marshal(i.store) if err != nil { return err } @@ -176,25 +172,5 @@ func (i *IntentsHolder) LoadStore(ctx context.Context) error { if err != nil { return err } - - var intents []DiscoveredIntent - err = json.Unmarshal(decompressedJson, &intents) - if err != nil { - return err - } - - i.store = newIntentsStore(intents) - return nil -} - -func groupDestinationsBySource(discoveredIntents []DiscoveredIntent) map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity { - serviceMap := make(map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity, 0) - for _, intents := range discoveredIntents { - if _, ok := serviceMap[intents.Source]; !ok { - serviceMap[intents.Source] = make([]model.OtterizeServiceIdentity, 0) - } - - serviceMap[intents.Source] = append(serviceMap[intents.Source], intents.Destination) - } - return serviceMap + return json.Unmarshal(decompressedJson, &i.store) } diff --git a/src/mapper/pkg/resolvers/resolver_test.go b/src/mapper/pkg/resolvers/resolver_test.go index 15dd0e4a..9b42eb35 100644 --- a/src/mapper/pkg/resolvers/resolver_test.go +++ b/src/mapper/pkg/resolvers/resolver_test.go @@ -45,22 +45,14 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { _, err := test_gql_client.ReportCaptureResults(context.Background(), s.client, test_gql_client.CaptureResults{ Results: []test_gql_client.CaptureResultForSrcIp{ { - SrcIp: "1.1.1.1", - Destinations: []test_gql_client.Destination{ - { - Destination: fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), - }, - }, + SrcIp: "1.1.1.1", + Destinations: []string{fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace)}, }, { SrcIp: "1.1.1.3", - Destinations: []test_gql_client.Destination{ - { - Destination: fmt.Sprintf("service1.%s.svc.cluster.local", s.TestNamespace), - }, - { - Destination: fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), - }, + Destinations: []string{ + fmt.Sprintf("service1.%s.svc.cluster.local", s.TestNamespace), + fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), }, }, }, @@ -68,7 +60,6 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { s.Require().NoError(err) res, err := test_gql_client.ServiceIntents(context.Background(), s.client, nil) - s.Require().NoError(err) s.Require().ElementsMatch(res.ServiceIntents, []test_gql_client.ServiceIntentsServiceIntents{ { Client: test_gql_client.ServiceIntentsServiceIntentsClientOtterizeServiceIdentity{ @@ -76,10 +67,7 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { Namespace: s.TestNamespace, }, Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ - { - Name: "service2", - Namespace: s.TestNamespace, - }, + {Name: "service2"}, }, }, { @@ -88,14 +76,8 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { Namespace: s.TestNamespace, }, Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ - { - Name: "service1", - Namespace: s.TestNamespace, - }, - { - Name: "service2", - Namespace: s.TestNamespace, - }, + {Name: "service1"}, + {Name: "service2"}, }, }, }) @@ -110,23 +92,12 @@ func (s *ResolverTestSuite) TestSocketScanResults() { _, err := test_gql_client.ReportSocketScanResults(context.Background(), s.client, test_gql_client.SocketScanResults{ Results: []test_gql_client.SocketScanResultForSrcIp{ { - SrcIp: "1.1.2.1", - DestIps: []test_gql_client.Destination{ - { - Destination: "1.1.2.2", - }, - }, + SrcIp: "1.1.2.1", + DestIps: []string{"1.1.2.2"}, }, { - SrcIp: "1.1.2.3", - DestIps: []test_gql_client.Destination{ - { - Destination: "1.1.2.1", - }, - { - Destination: "1.1.2.2", - }, - }, + SrcIp: "1.1.2.3", + DestIps: []string{"1.1.2.1", "1.1.2.2"}, }, }, }) @@ -141,10 +112,7 @@ func (s *ResolverTestSuite) TestSocketScanResults() { Namespace: s.TestNamespace, }, Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ - { - Name: "service2", - Namespace: s.TestNamespace, - }, + {Name: "service2"}, }, }, { @@ -153,14 +121,8 @@ func (s *ResolverTestSuite) TestSocketScanResults() { Namespace: s.TestNamespace, }, Intents: []test_gql_client.ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity{ - { - Name: "service1", - Namespace: s.TestNamespace, - }, - { - Name: "service2", - Namespace: s.TestNamespace, - }, + {Name: "service1"}, + {Name: "service2"}, }, }, }) @@ -178,23 +140,12 @@ func (s *ResolverTestSuite) TestLoadStore() { _, err = test_gql_client.ReportSocketScanResults(context.Background(), s.client, test_gql_client.SocketScanResults{ Results: []test_gql_client.SocketScanResultForSrcIp{ { - SrcIp: "1.1.3.1", - DestIps: []test_gql_client.Destination{ - { - Destination: "1.1.3.2", - }, - }, + SrcIp: "1.1.3.1", + DestIps: []string{"1.1.3.2"}, }, { - SrcIp: "1.1.3.3", - DestIps: []test_gql_client.Destination{ - { - Destination: "1.1.3.2", - }, - { - Destination: "1.1.3.2", - }, - }, + SrcIp: "1.1.3.3", + DestIps: []string{"1.1.3.1", "1.1.3.2"}, }, }, }) diff --git a/src/mapper/pkg/resolvers/schema.resolvers.go b/src/mapper/pkg/resolvers/schema.resolvers.go index 3c0fa4b3..fad5d077 100644 --- a/src/mapper/pkg/resolvers/schema.resolvers.go +++ b/src/mapper/pkg/resolvers/schema.resolvers.go @@ -40,12 +40,11 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod continue } for _, dest := range captureItem.Destinations { - destAddress := dest.Destination - if !strings.HasSuffix(destAddress, viper.GetString(config.ClusterDomainKey)) { + if !strings.HasSuffix(dest, viper.GetString(config.ClusterDomainKey)) { // not a k8s service, ignore continue } - ips, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, destAddress) + ips, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, dest) if err != nil { logrus.WithError(err).Warningf("Could not resolve service address %s", dest) continue @@ -71,7 +70,6 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod r.intentsHolder.AddIntent( model.OtterizeServiceIdentity{Name: srcService, Namespace: srcPod.Namespace}, model.OtterizeServiceIdentity{Name: dstService, Namespace: destPod.Namespace}, - dest.LastSeen, ) } } @@ -99,7 +97,7 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results continue } for _, destIp := range socketScanItem.DestIps { - destPod, err := r.kubeFinder.ResolveIpToPod(ctx, destIp.Destination) + destPod, err := r.kubeFinder.ResolveIpToPod(ctx, destIp) if err != nil { if errors.Is(err, kubefinder.FoundMoreThanOnePodError) { logrus.WithError(err).Debugf("Ip %s belongs to more than one pod, ignoring", destIp) @@ -116,7 +114,6 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results r.intentsHolder.AddIntent( model.OtterizeServiceIdentity{Name: srcService, Namespace: srcPod.Namespace}, model.OtterizeServiceIdentity{Name: dstService, Namespace: destPod.Namespace}, - destIp.LastSeen, ) } } @@ -128,24 +125,10 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results } func (r *queryResolver) ServiceIntents(ctx context.Context, namespaces []string) ([]model.ServiceIntents, error) { - discoveredIntents := r.intentsHolder.GetIntents(namespaces) - serviceToDestinations := groupDestinationsBySource(discoveredIntents) - result := make([]model.ServiceIntents, 0) - for service, destinations := range serviceToDestinations { - sort.Slice(destinations, func(i, j int) bool { - if destinations[i].Name != destinations[j].Name { - return destinations[i].Name < destinations[j].Name - } - return destinations[i].Namespace < destinations[j].Namespace - }) - - result = append(result, model.ServiceIntents{ - Client: lo.ToPtr(service), - Intents: destinations, - }) + for service, intents := range r.intentsHolder.GetIntentsPerService(namespaces) { + result = append(result, model.ServiceIntents{Client: lo.ToPtr(service), Intents: intents}) } - // sorting by service name so results are more consistent sort.Slice(result, func(i, j int) bool { return result[i].Client.Name < result[j].Client.Name diff --git a/src/mapper/pkg/resolvers/test_gql_client/generated.go b/src/mapper/pkg/resolvers/test_gql_client/generated.go index 1fa3a36f..cd4284a6 100644 --- a/src/mapper/pkg/resolvers/test_gql_client/generated.go +++ b/src/mapper/pkg/resolvers/test_gql_client/generated.go @@ -4,21 +4,20 @@ package test_gql_client import ( "context" - "time" "github.com/Khan/genqlient/graphql" ) type CaptureResultForSrcIp struct { - SrcIp string `json:"srcIp"` - Destinations []Destination `json:"destinations"` + SrcIp string `json:"srcIp"` + Destinations []string `json:"destinations"` } // GetSrcIp returns CaptureResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *CaptureResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestinations returns CaptureResultForSrcIp.Destinations, and is useful for accessing the field via an interface. -func (v *CaptureResultForSrcIp) GetDestinations() []Destination { return v.Destinations } +func (v *CaptureResultForSrcIp) GetDestinations() []string { return v.Destinations } type CaptureResults struct { Results []CaptureResultForSrcIp `json:"results"` @@ -27,17 +26,6 @@ type CaptureResults struct { // GetResults returns CaptureResults.Results, and is useful for accessing the field via an interface. func (v *CaptureResults) GetResults() []CaptureResultForSrcIp { return v.Results } -type Destination struct { - Destination string `json:"destination"` - LastSeen time.Time `json:"lastSeen"` -} - -// GetDestination returns Destination.Destination, and is useful for accessing the field via an interface. -func (v *Destination) GetDestination() string { return v.Destination } - -// GetLastSeen returns Destination.LastSeen, and is useful for accessing the field via an interface. -func (v *Destination) GetLastSeen() time.Time { return v.LastSeen } - // ReportCaptureResultsResponse is returned by ReportCaptureResults on success. type ReportCaptureResultsResponse struct { ReportCaptureResults bool `json:"reportCaptureResults"` @@ -111,15 +99,15 @@ func (v *ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity) GetNamespac } type SocketScanResultForSrcIp struct { - SrcIp string `json:"srcIp"` - DestIps []Destination `json:"destIps"` + SrcIp string `json:"srcIp"` + DestIps []string `json:"destIps"` } // GetSrcIp returns SocketScanResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *SocketScanResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestIps returns SocketScanResultForSrcIp.DestIps, and is useful for accessing the field via an interface. -func (v *SocketScanResultForSrcIp) GetDestIps() []Destination { return v.DestIps } +func (v *SocketScanResultForSrcIp) GetDestIps() []string { return v.DestIps } type SocketScanResults struct { Results []SocketScanResultForSrcIp `json:"results"` diff --git a/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml b/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml index b64a02ba..711fedfb 100644 --- a/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml +++ b/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml @@ -1,11 +1,7 @@ # genqlient config; for full documentation see: # https://github.com/Khan/genqlient/blob/main/docs/genqlient.yaml schema: - - ../../../../mappergraphql/schema.graphql + - ../../../../graphql/schema.graphql operations: - genqlient.graphql generated: generated.go - -bindings: - Time: - type: time.Time diff --git a/src/mappergraphql/.graphqlconfig.yaml b/src/mappergraphql/.graphqlconfig.yaml deleted file mode 100644 index de94a0a0..00000000 --- a/src/mappergraphql/.graphqlconfig.yaml +++ /dev/null @@ -1,4 +0,0 @@ -schema: - - ./*.graphql - - diff --git a/src/shared/testbase/testsuitebase.go b/src/shared/testbase/testsuitebase.go index 56cbb755..730354b0 100644 --- a/src/shared/testbase/testsuitebase.go +++ b/src/shared/testbase/testsuitebase.go @@ -117,7 +117,7 @@ func (s *ControllerManagerTestSuiteBase) AddPod(name string, podIp string, label if podIp != "" { pod.Status.PodIP = podIp - pod.Status.PodIPs = []corev1.PodIP{{IP: podIp}} + pod.Status.PodIPs = []corev1.PodIP{{podIp}} pod, err = s.K8sDirectClient.CoreV1().Pods(s.TestNamespace).UpdateStatus(context.Background(), pod, metav1.UpdateOptions{}) s.Require().NoError(err) } diff --git a/src/sniffer.Dockerfile b/src/sniffer.Dockerfile index 683d48dd..93ded2b2 100644 --- a/src/sniffer.Dockerfile +++ b/src/sniffer.Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=linux/amd64 golang:1.19-alpine as buildenv +FROM --platform=linux/amd64 golang:1.18-alpine as buildenv RUN apk add --no-cache ca-certificates git protoc RUN apk add build-base libpcap-dev WORKDIR /src @@ -15,7 +15,7 @@ RUN go test ./sniffer/... && echo dep > /dep # We start from the base image again, only this time it's using the target arch instead of always amd64. This is done to make the build faster. # Unlike the mapper, it can't be amd64 throughout and use Go's cross-compilation, since the sniffer depends on libpcap (C library). -FROM golang:1.19-alpine as builder +FROM golang:1.18-alpine as builder COPY --from=test /dep /dep RUN apk add --no-cache ca-certificates git protoc RUN apk add build-base libpcap-dev diff --git a/src/sniffer/pkg/client/generated.go b/src/sniffer/pkg/client/generated.go index d6b0a429..d0e76491 100644 --- a/src/sniffer/pkg/client/generated.go +++ b/src/sniffer/pkg/client/generated.go @@ -4,21 +4,20 @@ package client import ( "context" - "time" "github.com/Khan/genqlient/graphql" ) type CaptureResultForSrcIp struct { - SrcIp string `json:"srcIp"` - Destinations []Destination `json:"destinations"` + SrcIp string `json:"srcIp"` + Destinations []string `json:"destinations"` } // GetSrcIp returns CaptureResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *CaptureResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestinations returns CaptureResultForSrcIp.Destinations, and is useful for accessing the field via an interface. -func (v *CaptureResultForSrcIp) GetDestinations() []Destination { return v.Destinations } +func (v *CaptureResultForSrcIp) GetDestinations() []string { return v.Destinations } type CaptureResults struct { Results []CaptureResultForSrcIp `json:"results"` @@ -27,27 +26,16 @@ type CaptureResults struct { // GetResults returns CaptureResults.Results, and is useful for accessing the field via an interface. func (v *CaptureResults) GetResults() []CaptureResultForSrcIp { return v.Results } -type Destination struct { - Destination string `json:"destination"` - LastSeen time.Time `json:"lastSeen"` -} - -// GetDestination returns Destination.Destination, and is useful for accessing the field via an interface. -func (v *Destination) GetDestination() string { return v.Destination } - -// GetLastSeen returns Destination.LastSeen, and is useful for accessing the field via an interface. -func (v *Destination) GetLastSeen() time.Time { return v.LastSeen } - type SocketScanResultForSrcIp struct { - SrcIp string `json:"srcIp"` - DestIps []Destination `json:"destIps"` + SrcIp string `json:"srcIp"` + DestIps []string `json:"destIps"` } // GetSrcIp returns SocketScanResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *SocketScanResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestIps returns SocketScanResultForSrcIp.DestIps, and is useful for accessing the field via an interface. -func (v *SocketScanResultForSrcIp) GetDestIps() []Destination { return v.DestIps } +func (v *SocketScanResultForSrcIp) GetDestIps() []string { return v.DestIps } type SocketScanResults struct { Results []SocketScanResultForSrcIp `json:"results"` diff --git a/src/sniffer/pkg/client/genqlient.yaml b/src/sniffer/pkg/client/genqlient.yaml index 4df29655..5672f7d0 100644 --- a/src/sniffer/pkg/client/genqlient.yaml +++ b/src/sniffer/pkg/client/genqlient.yaml @@ -1,10 +1,7 @@ # genqlient config; for full documentation see: # https://github.com/Khan/genqlient/blob/main/docs/genqlient.yaml schema: - - ../../../mappergraphql/schema.graphql + - ../../../graphql/schema.graphql operations: - genqlient.graphql generated: generated.go -bindings: - Time: - type: time.Time diff --git a/src/sniffer/pkg/sniffer/sniffer.go b/src/sniffer/pkg/sniffer/sniffer.go index 6d2d4769..c543fdd3 100644 --- a/src/sniffer/pkg/sniffer/sniffer.go +++ b/src/sniffer/pkg/sniffer/sniffer.go @@ -2,6 +2,7 @@ package sniffer import ( "context" + "github.com/amit7itz/goset" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" @@ -13,11 +14,8 @@ import ( "time" ) -// capturesMap is a map of source IP to a map of destination DNS to the last time it was seen -type capturesMap map[string]map[string]time.Time - type Sniffer struct { - capturedRequests capturesMap + capturedRequests map[string]*goset.Set[string] socketScanner *socketscanner.SocketScanner lastReportTime time.Time mapperClient client.MapperClient @@ -25,7 +23,7 @@ type Sniffer struct { func NewSniffer(mapperClient client.MapperClient) *Sniffer { return &Sniffer{ - capturedRequests: make(capturesMap), + capturedRequests: make(map[string]*goset.Set[string]), socketScanner: socketscanner.NewSocketScanner(mapperClient), lastReportTime: time.Now(), mapperClient: mapperClient, @@ -33,7 +31,6 @@ func NewSniffer(mapperClient client.MapperClient) *Sniffer { } func (s *Sniffer) HandlePacket(packet gopacket.Packet) { - captureTime := detectCaptureTime(packet) ipLayer := packet.Layer(layers.LayerTypeIPv4) dnsLayer := packet.Layer(layers.LayerTypeDNS) if dnsLayer != nil && ipLayer != nil { @@ -45,25 +42,18 @@ func (s *Sniffer) HandlePacket(packet gopacket.Packet) { if answer.Type != layers.DNSTypeA && answer.Type != layers.DNSTypeAAAA { continue } - s.addCapturedRequest(ip.DstIP.String(), string(answer.Name), captureTime) + s.addCapturedRequest(ip.DstIP.String(), string(answer.Name)) } } } } -func detectCaptureTime(packet gopacket.Packet) time.Time { - captureTime := packet.Metadata().CaptureInfo.Timestamp - if captureTime.IsZero() { - return time.Now() - } - return captureTime -} - -func (s *Sniffer) addCapturedRequest(srcIp string, destDns string, seenAt time.Time) { +func (s *Sniffer) addCapturedRequest(srcIp string, destDns string) { if _, ok := s.capturedRequests[srcIp]; !ok { - s.capturedRequests[srcIp] = make(map[string]time.Time) + s.capturedRequests[srcIp] = goset.NewSet[string](destDns) + } else { + s.capturedRequests[srcIp].Add(destDns) } - s.capturedRequests[srcIp][destDns] = seenAt } func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { @@ -73,7 +63,10 @@ func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { return nil } s.PrintCapturedRequests() - results := getCaptureResults(s.capturedRequests) + results := make([]client.CaptureResultForSrcIp, 0, len(s.capturedRequests)) + for srcIp, destinations := range s.capturedRequests { + results = append(results, client.CaptureResultForSrcIp{SrcIp: srcIp, Destinations: destinations.Items()}) + } timeoutCtx, cancelFunc := context.WithTimeout(ctx, viper.GetDuration(config.CallsTimeoutKey)) defer cancelFunc() @@ -84,28 +77,16 @@ func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { } // delete the reported captured requests - s.capturedRequests = make(capturesMap) + s.capturedRequests = make(map[string]*goset.Set[string]) return nil } -func getCaptureResults(capturedRequests capturesMap) []client.CaptureResultForSrcIp { - results := make([]client.CaptureResultForSrcIp, 0, len(capturedRequests)) - for srcIp, destDNSToTime := range capturedRequests { - destinations := make([]client.Destination, 0) - for destDNS, lastSeen := range destDNSToTime { - destinations = append(destinations, client.Destination{Destination: destDNS, LastSeen: lastSeen}) - } - results = append(results, client.CaptureResultForSrcIp{SrcIp: srcIp, Destinations: destinations}) - } - return results -} - func (s *Sniffer) PrintCapturedRequests() { - for ip, destinations := range s.capturedRequests { + for ip, dests := range s.capturedRequests { logrus.Debugf("%s:\n", ip) - for destDNS, lastSeen := range destinations { - logrus.Debugf("\t%s, %s\n", destDNS, lastSeen) - } + dests.For(func(dest string) { + logrus.Debugf("\t%s\n", dest) + }) } } diff --git a/src/sniffer/pkg/sniffer/sniffer_test.go b/src/sniffer/pkg/sniffer/sniffer_test.go index c47afa2f..02143578 100644 --- a/src/sniffer/pkg/sniffer/sniffer_test.go +++ b/src/sniffer/pkg/sniffer/sniffer_test.go @@ -10,7 +10,6 @@ import ( mock_client "github.com/otterize/network-mapper/src/sniffer/pkg/client/mockclient" "github.com/stretchr/testify/suite" "testing" - "time" ) type SnifferTestSuite struct { @@ -31,20 +30,13 @@ func (s *SnifferTestSuite) TestHandlePacket() { s.Require().NoError(err) } packet := gopacket.NewPacket(rawDnsResponse, layers.LayerTypeEthernet, gopacket.Default) - timestamp := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) - packet.Metadata().CaptureInfo.Timestamp = timestamp sniffer.HandlePacket(packet) s.mockMapperClient.EXPECT().ReportCaptureResults(gomock.Any(), client.CaptureResults{ Results: []client.CaptureResultForSrcIp{ { - SrcIp: "10.101.81.13", - Destinations: []client.Destination{ - { - Destination: "sts.us-east-1.amazonaws.com", - LastSeen: timestamp, - }, - }, + SrcIp: "10.101.81.13", + Destinations: []string{"sts.us-east-1.amazonaws.com"}, }, }, }) diff --git a/src/sniffer/pkg/socketscanner/socketscanner.go b/src/sniffer/pkg/socketscanner/socketscanner.go index a6d3f449..001bd10b 100644 --- a/src/sniffer/pkg/socketscanner/socketscanner.go +++ b/src/sniffer/pkg/socketscanner/socketscanner.go @@ -3,25 +3,23 @@ package socketscanner import ( "context" "fmt" + "github.com/amit7itz/goset" "github.com/otterize/go-procnet/procnet" "github.com/otterize/network-mapper/src/sniffer/pkg/client" "github.com/otterize/network-mapper/src/sniffer/pkg/config" "github.com/spf13/viper" - "os" + "io/ioutil" "strconv" - "time" ) -type scanResultMap map[string]map[string]time.Time - type SocketScanner struct { - scanResults scanResultMap + scanResults map[string]*goset.Set[string] mapperClient client.MapperClient } func NewSocketScanner(mapperClient client.MapperClient) *SocketScanner { return &SocketScanner{ - scanResults: make(scanResultMap), + scanResults: make(map[string]*goset.Set[string]), mapperClient: mapperClient, } } @@ -45,16 +43,17 @@ func (s *SocketScanner) scanTcpFile(path string) { } if _, ok := listenPorts[sock.LocalAddr.Port]; ok { if _, ok := s.scanResults[sock.RemoteAddr.IP.String()]; !ok { - s.scanResults[sock.RemoteAddr.IP.String()] = make(map[string]time.Time) + s.scanResults[sock.RemoteAddr.IP.String()] = goset.NewSet(sock.LocalAddr.IP.String()) + } else { + s.scanResults[sock.RemoteAddr.IP.String()].Add(sock.LocalAddr.IP.String()) } - s.scanResults[sock.RemoteAddr.IP.String()][sock.LocalAddr.IP.String()] = time.Now() } } } func (s *SocketScanner) ScanProcDir() error { hostProcDir := viper.GetString(config.HostProcDirKey) - files, err := os.ReadDir(hostProcDir) + files, err := ioutil.ReadDir(hostProcDir) if err != nil { return err } @@ -71,26 +70,14 @@ func (s *SocketScanner) ScanProcDir() error { } func (s *SocketScanner) ReportSocketScanResults(ctx context.Context) error { - results := getModelResults(s.scanResults) + results := client.SocketScanResults{} + for srcIp, destIps := range s.scanResults { + results.Results = append(results.Results, client.SocketScanResultForSrcIp{SrcIp: srcIp, DestIps: destIps.Items()}) + } err := s.mapperClient.ReportSocketScanResults(ctx, results) if err != nil { return err } - s.scanResults = make(scanResultMap) + s.scanResults = make(map[string]*goset.Set[string]) return nil } - -func getModelResults(scanResults scanResultMap) client.SocketScanResults { - results := client.SocketScanResults{} - for srcIp, destinationsMap := range scanResults { - destinations := make([]client.Destination, 0) - for destIP, lastSeen := range destinationsMap { - destinations = append(destinations, client.Destination{Destination: destIP, LastSeen: lastSeen}) - } - results.Results = append(results.Results, client.SocketScanResultForSrcIp{ - SrcIp: srcIp, - DestIps: destinations, - }) - } - return results -} diff --git a/src/sniffer/pkg/socketscanner/socketscanner_test.go b/src/sniffer/pkg/socketscanner/socketscanner_test.go index 51f2a99e..ff99370e 100644 --- a/src/sniffer/pkg/socketscanner/socketscanner_test.go +++ b/src/sniffer/pkg/socketscanner/socketscanner_test.go @@ -24,63 +24,28 @@ func (s *SocketScannerTestSuite) SetupSuite() { s.mockMapperClient = mock_client.NewMockMapperClient(s.mockController) } -type SocketScanResultForSrcIpMatcher []client.SocketScanResultForSrcIp - -func (m SocketScanResultForSrcIpMatcher) Matches(x interface{}) bool { - results, ok := x.(client.SocketScanResults) - if !ok { - return false - } - - actualValues := results.Results - if len(actualValues) != len(m) { - return false - } - - // Match in any order - for _, expected := range m { - found := false - for _, actual := range actualValues { - if matchSocketScanResult(expected, actual) { - found = true - break - } - } - if !found { - return false - } - } - return true +type matchOne[T any] struct { + validResults []T } -func matchSocketScanResult(expected, actual client.SocketScanResultForSrcIp) bool { - if expected.SrcIp != actual.SrcIp { - return false - } - - if len(expected.DestIps) != len(actual.DestIps) { - return false - } - - for i := range expected.DestIps { - if expected.DestIps[i].Destination != actual.DestIps[i].Destination { - return false +func (m matchOne[T]) Matches(x interface{}) bool { + for _, option := range m.validResults { + if gomock.Eq(option).Matches(x) { + return true } } - - return true + return false } -func (m SocketScanResultForSrcIpMatcher) String() string { - var result string - for _, value := range m { - result += fmt.Sprintf("{Src: %v, Dest: %v}", value.SrcIp, value.DestIps) - } - return result +func (m matchOne[T]) String() string { + return fmt.Sprintf("One of the following: %v", m.validResults) } -func GetMatcher(expected []client.SocketScanResultForSrcIp) SocketScanResultForSrcIpMatcher { - return expected +// MatchOne makes sure that object matches one of the validResults +func MatchOne[T any](validResults []T) gomock.Matcher { + return matchOne[T]{ + validResults: validResults, + } } func (s *SocketScannerTestSuite) TestScanProcDir() { @@ -100,24 +65,21 @@ func (s *SocketScannerTestSuite) TestScanProcDir() { // all other sockets should be ignored (because parsing the server sides on all pods is enough) expectedResult := []client.SocketScanResultForSrcIp{ { - SrcIp: "192.168.35.14", - DestIps: []client.Destination{ - { - Destination: "192.168.38.211", - }, - }, + SrcIp: "192.168.35.14", + DestIps: []string{"192.168.38.211"}, }, { - SrcIp: "176.168.35.14", - DestIps: []client.Destination{ - { - Destination: "192.168.38.211", - }, - }, + SrcIp: "176.168.35.14", + DestIps: []string{"192.168.38.211"}, }, } + // order is random in the response, so we mark both orders as valid + validResults := []client.SocketScanResults{ + {Results: expectedResult}, + {Results: []client.SocketScanResultForSrcIp{expectedResult[1], expectedResult[0]}}, + } - s.mockMapperClient.EXPECT().ReportSocketScanResults(gomock.Any(), GetMatcher(expectedResult)) + s.mockMapperClient.EXPECT().ReportSocketScanResults(gomock.Any(), MatchOne(validResults)) err = sniffer.ReportSocketScanResults(context.Background()) s.Require().NoError(err) }