diff --git a/src/mapper/pkg/clouduploader/cloud_upload.go b/src/mapper/pkg/clouduploader/cloud_upload.go index d87aa2eb..8c8a62fe 100644 --- a/src/mapper/pkg/clouduploader/cloud_upload.go +++ b/src/mapper/pkg/clouduploader/cloud_upload.go @@ -48,31 +48,24 @@ func (c *CloudUploader) uploadDiscoveredIntents(ctx context.Context) { return } - var intents []cloudclient.IntentInput - for service, serviceIntents := range c.intentsHolder.GetIntentsPerService(nil) { - for _, serviceIntent := range serviceIntents { - var intent cloudclient.IntentInput - intent.ClientName = lo.ToPtr(service.Name) - intent.Namespace = lo.ToPtr(service.Namespace) - intent.ServerName = lo.ToPtr(serviceIntent.Name) - intent.ServerNamespace = lo.ToPtr(serviceIntent.Namespace) - - intents = append(intents, intent) + var discoveredIntents []*cloudclient.DiscoveredIntentInput + for _, intent := range c.intentsHolder.GetIntents(nil) { + var discoveredIntent cloudclient.IntentInput + discoveredIntent.ClientName = lo.ToPtr(intent.Source.Name) + discoveredIntent.Namespace = lo.ToPtr(intent.Source.Namespace) + discoveredIntent.ServerName = lo.ToPtr(intent.Destination.Name) + discoveredIntent.ServerNamespace = lo.ToPtr(intent.Destination.Namespace) + + input := &cloudclient.DiscoveredIntentInput{ + DiscoveredAt: lo.ToPtr(intent.Timestamp), + Intent: &discoveredIntent, } - } - if len(intents) == 0 { - return + discoveredIntents = append(discoveredIntents, input) } - var discoveredIntents []*cloudclient.DiscoveredIntentInput - for _, intent := range intents { - input := cloudclient.DiscoveredIntentInput{ - Intent: lo.ToPtr(intent), - DiscoveredAt: lo.ToPtr(time.Now()), - } - - discoveredIntents = append(discoveredIntents, lo.ToPtr(input)) + if len(discoveredIntents) == 0 { + return } uploadSuccess := client.ReportDiscoveredIntents(discoveredIntents) diff --git a/src/mapper/pkg/clouduploader/cloud_uploader_test.go b/src/mapper/pkg/clouduploader/cloud_uploader_test.go index 0313525d..bea6c9bc 100644 --- a/src/mapper/pkg/clouduploader/cloud_uploader_test.go +++ b/src/mapper/pkg/clouduploader/cloud_uploader_test.go @@ -12,6 +12,11 @@ import ( "github.com/stretchr/testify/suite" "golang.org/x/oauth2" "testing" + "time" +) + +var ( + testTimestamp = time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) ) type CloudUploaderTestSuite struct { @@ -50,6 +55,7 @@ func (s *CloudUploaderTestSuite) addIntent(source string, srcNamespace string, d s.intentsHolder.AddIntent( model.OtterizeServiceIdentity{Name: source, Namespace: srcNamespace}, model.OtterizeServiceIdentity{Name: destination, Namespace: dstNamespace}, + testTimestamp, ) } diff --git a/src/mapper/pkg/graph/generated/generated.go b/src/mapper/pkg/graph/generated/generated.go index f9f177cb..4d5707eb 100644 --- a/src/mapper/pkg/graph/generated/generated.go +++ b/src/mapper/pkg/graph/generated/generated.go @@ -9,6 +9,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "github.com/99designs/gqlgen/graphql" "github.com/99designs/gqlgen/graphql/introspection" @@ -223,9 +224,16 @@ func (ec *executionContext) introspectType(name string) (*introspection.Type, er } var sources = []*ast.Source{ - {Name: "../mappergraphql/schema.graphql", Input: `input CaptureResultForSrcIp { + {Name: "../mappergraphql/schema.graphql", Input: `scalar Time # Equivalent of Go's time.Time provided by gqlgen + +input CaptureResultForSrcIp { srcIp: String! - destinations: [String!]! + destinations: [Destination!]! +} + +input Destination { + destination: String! + lastSeen: Time! } input CaptureResults { @@ -234,7 +242,7 @@ input CaptureResults { input SocketScanResultForSrcIp { srcIp: String! - destIps: [String!]! + destIps: [Destination!]! } input SocketScanResults { @@ -1944,7 +1952,7 @@ func (ec *executionContext) unmarshalInputCaptureResultForSrcIp(ctx context.Cont var err error ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destinations")) - it.Destinations, err = ec.unmarshalNString2ᚕstringᚄ(ctx, v) + it.Destinations, err = ec.unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx, v) if err != nil { return it, err } @@ -1977,6 +1985,37 @@ func (ec *executionContext) unmarshalInputCaptureResults(ctx context.Context, ob return it, nil } +func (ec *executionContext) unmarshalInputDestination(ctx context.Context, obj interface{}) (model.Destination, error) { + var it model.Destination + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + for k, v := range asMap { + switch k { + case "destination": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destination")) + it.Destination, err = ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + case "lastSeen": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("lastSeen")) + it.LastSeen, err = ec.unmarshalNTime2timeᚐTime(ctx, v) + if err != nil { + return it, err + } + } + } + + return it, nil +} + func (ec *executionContext) unmarshalInputSocketScanResultForSrcIp(ctx context.Context, obj interface{}) (model.SocketScanResultForSrcIP, error) { var it model.SocketScanResultForSrcIP asMap := map[string]interface{}{} @@ -1998,7 +2037,7 @@ func (ec *executionContext) unmarshalInputSocketScanResultForSrcIp(ctx context.C var err error ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("destIps")) - it.DestIps, err = ec.unmarshalNString2ᚕstringᚄ(ctx, v) + it.DestIps, err = ec.unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx, v) if err != nil { return it, err } @@ -2713,6 +2752,28 @@ func (ec *executionContext) unmarshalNCaptureResults2githubᚗcomᚋotterizeᚋn return res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) unmarshalNDestination2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestination(ctx context.Context, v interface{}) (model.Destination, error) { + res, err := ec.unmarshalInputDestination(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) unmarshalNDestination2ᚕgithubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestinationᚄ(ctx context.Context, v interface{}) ([]model.Destination, error) { + var vSlice []interface{} + if v != nil { + vSlice = graphql.CoerceList(v) + } + var err error + res := make([]model.Destination, len(vSlice)) + for i := range vSlice { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) + res[i], err = ec.unmarshalNDestination2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐDestination(ctx, vSlice[i]) + if err != nil { + return nil, err + } + } + return res, nil +} + func (ec *executionContext) marshalNOtterizeServiceIdentity2githubᚗcomᚋotterizeᚋnetworkᚑmapperᚋsrcᚋmapperᚋpkgᚋgraphᚋmodelᚐOtterizeServiceIdentity(ctx context.Context, sel ast.SelectionSet, v model.OtterizeServiceIdentity) graphql.Marshaler { return ec._OtterizeServiceIdentity(ctx, sel, &v) } @@ -2861,36 +2922,19 @@ func (ec *executionContext) marshalNString2string(ctx context.Context, sel ast.S return res } -func (ec *executionContext) unmarshalNString2ᚕstringᚄ(ctx context.Context, v interface{}) ([]string, error) { - var vSlice []interface{} - if v != nil { - vSlice = graphql.CoerceList(v) - } - var err error - res := make([]string, len(vSlice)) - for i := range vSlice { - ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) - res[i], err = ec.unmarshalNString2string(ctx, vSlice[i]) - if err != nil { - return nil, err - } - } - return res, nil +func (ec *executionContext) unmarshalNTime2timeᚐTime(ctx context.Context, v interface{}) (time.Time, error) { + res, err := graphql.UnmarshalTime(v) + return res, graphql.ErrorOnPath(ctx, err) } -func (ec *executionContext) marshalNString2ᚕstringᚄ(ctx context.Context, sel ast.SelectionSet, v []string) graphql.Marshaler { - ret := make(graphql.Array, len(v)) - for i := range v { - ret[i] = ec.marshalNString2string(ctx, sel, v[i]) - } - - for _, e := range ret { - if e == graphql.Null { - return graphql.Null +func (ec *executionContext) marshalNTime2timeᚐTime(ctx context.Context, sel ast.SelectionSet, v time.Time) graphql.Marshaler { + res := graphql.MarshalTime(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "must not be null") } } - - return ret + return res } func (ec *executionContext) marshalN__Directive2githubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐDirective(ctx context.Context, sel ast.SelectionSet, v introspection.Directive) graphql.Marshaler { diff --git a/src/mapper/pkg/graph/model/models_gen.go b/src/mapper/pkg/graph/model/models_gen.go index 1939450d..1d58cbf4 100644 --- a/src/mapper/pkg/graph/model/models_gen.go +++ b/src/mapper/pkg/graph/model/models_gen.go @@ -2,15 +2,24 @@ package model +import ( + "time" +) + type CaptureResultForSrcIP struct { - SrcIP string `json:"srcIp"` - Destinations []string `json:"destinations"` + SrcIP string `json:"srcIp"` + Destinations []Destination `json:"destinations"` } type CaptureResults struct { Results []CaptureResultForSrcIP `json:"results"` } +type Destination struct { + Destination string `json:"destination"` + LastSeen time.Time `json:"lastSeen"` +} + type OtterizeServiceIdentity struct { Name string `json:"name"` Namespace string `json:"namespace"` @@ -22,8 +31,8 @@ type ServiceIntents struct { } type SocketScanResultForSrcIP struct { - SrcIP string `json:"srcIp"` - DestIps []string `json:"destIps"` + SrcIP string `json:"srcIp"` + DestIps []Destination `json:"destIps"` } type SocketScanResults struct { diff --git a/src/mapper/pkg/resolvers/helpers.go b/src/mapper/pkg/resolvers/helpers.go index 880596da..32d2a774 100644 --- a/src/mapper/pkg/resolvers/helpers.go +++ b/src/mapper/pkg/resolvers/helpers.go @@ -10,7 +10,6 @@ import ( "github.com/otterize/network-mapper/src/mapper/pkg/config" "github.com/otterize/network-mapper/src/mapper/pkg/graph/model" "github.com/otterize/network-mapper/src/shared/kubeutils" - "github.com/samber/lo" "github.com/spf13/viper" "io" corev1 "k8s.io/api/core/v1" @@ -19,29 +18,19 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - "sort" "sync" "time" ) -type intentsHolderStore map[model.OtterizeServiceIdentity]*goset.Set[model.OtterizeServiceIdentity] - -func (s intentsHolderStore) MarshalJSON() ([]byte, error) { - // OtterizeServiceIdentity cannot be serialized as map key in JSON, because it is represented as a map itself - // therefore, we serialize the store as a slice of [Key, Value] "tuples" - return json.Marshal(lo.ToPairs(s)) +type SourceDestPair struct { + Source model.OtterizeServiceIdentity + Destination model.OtterizeServiceIdentity } -func (s intentsHolderStore) UnmarshalJSON(b []byte) error { - var pairs []lo.Entry[model.OtterizeServiceIdentity, *goset.Set[model.OtterizeServiceIdentity]] - err := json.Unmarshal(b, &pairs) - if err != nil { - return err - } - for _, pair := range pairs { - s[pair.Key] = pair.Value - } - return nil +type DiscoveredIntent struct { + Source model.OtterizeServiceIdentity `json:"source"` + Destination model.OtterizeServiceIdentity `json:"destination"` + Timestamp time.Time `json:"timestamp"` } type IntentsHolderConfig struct { @@ -65,7 +54,7 @@ func IntentsHolderConfigFromViper() (IntentsHolderConfig, error) { } type IntentsHolder struct { - store intentsHolderStore + store map[SourceDestPair]time.Time lock sync.Mutex client client.Client config IntentsHolderConfig @@ -74,33 +63,41 @@ type IntentsHolder struct { func NewIntentsHolder(client client.Client, config IntentsHolderConfig) *IntentsHolder { return &IntentsHolder{ - store: make(intentsHolderStore), + store: newIntentsStore(nil), lock: sync.Mutex{}, client: client, config: config, } } +func newIntentsStore(intents []DiscoveredIntent) map[SourceDestPair]time.Time { + if intents == nil { + return make(map[SourceDestPair]time.Time) + } + + result := make(map[SourceDestPair]time.Time) + for _, intent := range intents { + result[SourceDestPair{Source: intent.Source, Destination: intent.Destination}] = intent.Timestamp + } + return result +} + func (i *IntentsHolder) Reset() { i.lock.Lock() defer i.lock.Unlock() - i.store = make(intentsHolderStore) + i.store = make(map[SourceDestPair]time.Time) } -func (i *IntentsHolder) AddIntent(srcService model.OtterizeServiceIdentity, dstService model.OtterizeServiceIdentity) { +func (i *IntentsHolder) AddIntent(srcService model.OtterizeServiceIdentity, dstService model.OtterizeServiceIdentity, newTimestamp time.Time) { i.lock.Lock() defer i.lock.Unlock() - intents, ok := i.store[srcService] - if !ok { - intents = goset.NewSet[model.OtterizeServiceIdentity]() - i.store[srcService] = intents - } - intent := model.OtterizeServiceIdentity{Name: dstService.Name, Namespace: dstService.Namespace} - if !intents.Contains(intent) { + pair := SourceDestPair{Source: srcService, Destination: dstService} + currentTimestamp, alreadyExists := i.store[pair] + if !alreadyExists || newTimestamp.After(currentTimestamp) { + i.store[pair] = newTimestamp i.lastIntentsUpdate = time.Now() - intents.Add(intent) } } @@ -110,28 +107,26 @@ func (i *IntentsHolder) LastIntentsUpdate() time.Time { return i.lastIntentsUpdate } -func (i *IntentsHolder) GetIntentsPerService(namespaces []string) map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity { +func (i *IntentsHolder) GetIntents(namespaces []string) []DiscoveredIntent { i.lock.Lock() defer i.lock.Unlock() + + return i.getIntents(namespaces) +} + +func (i *IntentsHolder) getIntents(namespaces []string) []DiscoveredIntent { namespacesSet := goset.FromSlice(namespaces) - result := make(map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity) - for service, intents := range i.store { - if !namespacesSet.IsEmpty() && !namespacesSet.Contains(service.Namespace) { + result := make([]DiscoveredIntent, 0) + for pair, timestamp := range i.store { + if !namespacesSet.IsEmpty() && !namespacesSet.Contains(pair.Source.Namespace) { continue } - intentsSlice := intents.Items() - // sorting the intents so results will be consistent - sort.Slice(intentsSlice, func(i, j int) bool { - // Primary sort by name - if intentsSlice[i].Name != intentsSlice[j].Name { - return intentsSlice[i].Name < intentsSlice[j].Name - } - // Secondary sort by namespace - return intentsSlice[i].Namespace < intentsSlice[j].Namespace + + result = append(result, DiscoveredIntent{ + Source: pair.Source, + Destination: pair.Destination, + Timestamp: timestamp, }) - if len(intentsSlice) != 0 { - result[service] = intentsSlice - } } return result } @@ -139,7 +134,8 @@ func (i *IntentsHolder) GetIntentsPerService(namespaces []string) map[model.Otte func (i *IntentsHolder) WriteStore(ctx context.Context) error { i.lock.Lock() defer i.lock.Unlock() - jsonBytes, err := json.Marshal(i.store) + intents := i.getIntents(nil) + jsonBytes, err := json.Marshal(intents) if err != nil { return err } @@ -180,5 +176,25 @@ func (i *IntentsHolder) LoadStore(ctx context.Context) error { if err != nil { return err } - return json.Unmarshal(decompressedJson, &i.store) + + var intents []DiscoveredIntent + err = json.Unmarshal(decompressedJson, &intents) + if err != nil { + return err + } + + i.store = newIntentsStore(intents) + return nil +} + +func groupDestinationsBySource(discoveredIntents []DiscoveredIntent) map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity { + serviceMap := make(map[model.OtterizeServiceIdentity][]model.OtterizeServiceIdentity, 0) + for _, intents := range discoveredIntents { + if _, ok := serviceMap[intents.Source]; !ok { + serviceMap[intents.Source] = make([]model.OtterizeServiceIdentity, 0) + } + + serviceMap[intents.Source] = append(serviceMap[intents.Source], intents.Destination) + } + return serviceMap } diff --git a/src/mapper/pkg/resolvers/resolver_test.go b/src/mapper/pkg/resolvers/resolver_test.go index e84c5b33..15dd0e4a 100644 --- a/src/mapper/pkg/resolvers/resolver_test.go +++ b/src/mapper/pkg/resolvers/resolver_test.go @@ -45,14 +45,22 @@ func (s *ResolverTestSuite) TestReportCaptureResults() { _, err := test_gql_client.ReportCaptureResults(context.Background(), s.client, test_gql_client.CaptureResults{ Results: []test_gql_client.CaptureResultForSrcIp{ { - SrcIp: "1.1.1.1", - Destinations: []string{fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace)}, + SrcIp: "1.1.1.1", + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), + }, + }, }, { SrcIp: "1.1.1.3", - Destinations: []string{ - fmt.Sprintf("service1.%s.svc.cluster.local", s.TestNamespace), - fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), + Destinations: []test_gql_client.Destination{ + { + Destination: fmt.Sprintf("service1.%s.svc.cluster.local", s.TestNamespace), + }, + { + Destination: fmt.Sprintf("service2.%s.svc.cluster.local", s.TestNamespace), + }, }, }, }, @@ -102,12 +110,23 @@ func (s *ResolverTestSuite) TestSocketScanResults() { _, err := test_gql_client.ReportSocketScanResults(context.Background(), s.client, test_gql_client.SocketScanResults{ Results: []test_gql_client.SocketScanResultForSrcIp{ { - SrcIp: "1.1.2.1", - DestIps: []string{"1.1.2.2"}, + SrcIp: "1.1.2.1", + DestIps: []test_gql_client.Destination{ + { + Destination: "1.1.2.2", + }, + }, }, { - SrcIp: "1.1.2.3", - DestIps: []string{"1.1.2.1", "1.1.2.2"}, + SrcIp: "1.1.2.3", + DestIps: []test_gql_client.Destination{ + { + Destination: "1.1.2.1", + }, + { + Destination: "1.1.2.2", + }, + }, }, }, }) @@ -159,12 +178,23 @@ func (s *ResolverTestSuite) TestLoadStore() { _, err = test_gql_client.ReportSocketScanResults(context.Background(), s.client, test_gql_client.SocketScanResults{ Results: []test_gql_client.SocketScanResultForSrcIp{ { - SrcIp: "1.1.3.1", - DestIps: []string{"1.1.3.2"}, + SrcIp: "1.1.3.1", + DestIps: []test_gql_client.Destination{ + { + Destination: "1.1.3.2", + }, + }, }, { - SrcIp: "1.1.3.3", - DestIps: []string{"1.1.3.1", "1.1.3.2"}, + SrcIp: "1.1.3.3", + DestIps: []test_gql_client.Destination{ + { + Destination: "1.1.3.2", + }, + { + Destination: "1.1.3.2", + }, + }, }, }, }) diff --git a/src/mapper/pkg/resolvers/schema.resolvers.go b/src/mapper/pkg/resolvers/schema.resolvers.go index fad5d077..3c0fa4b3 100644 --- a/src/mapper/pkg/resolvers/schema.resolvers.go +++ b/src/mapper/pkg/resolvers/schema.resolvers.go @@ -40,11 +40,12 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod continue } for _, dest := range captureItem.Destinations { - if !strings.HasSuffix(dest, viper.GetString(config.ClusterDomainKey)) { + destAddress := dest.Destination + if !strings.HasSuffix(destAddress, viper.GetString(config.ClusterDomainKey)) { // not a k8s service, ignore continue } - ips, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, dest) + ips, err := r.kubeFinder.ResolveServiceAddressToIps(ctx, destAddress) if err != nil { logrus.WithError(err).Warningf("Could not resolve service address %s", dest) continue @@ -70,6 +71,7 @@ func (r *mutationResolver) ReportCaptureResults(ctx context.Context, results mod r.intentsHolder.AddIntent( model.OtterizeServiceIdentity{Name: srcService, Namespace: srcPod.Namespace}, model.OtterizeServiceIdentity{Name: dstService, Namespace: destPod.Namespace}, + dest.LastSeen, ) } } @@ -97,7 +99,7 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results continue } for _, destIp := range socketScanItem.DestIps { - destPod, err := r.kubeFinder.ResolveIpToPod(ctx, destIp) + destPod, err := r.kubeFinder.ResolveIpToPod(ctx, destIp.Destination) if err != nil { if errors.Is(err, kubefinder.FoundMoreThanOnePodError) { logrus.WithError(err).Debugf("Ip %s belongs to more than one pod, ignoring", destIp) @@ -114,6 +116,7 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results r.intentsHolder.AddIntent( model.OtterizeServiceIdentity{Name: srcService, Namespace: srcPod.Namespace}, model.OtterizeServiceIdentity{Name: dstService, Namespace: destPod.Namespace}, + destIp.LastSeen, ) } } @@ -125,10 +128,24 @@ func (r *mutationResolver) ReportSocketScanResults(ctx context.Context, results } func (r *queryResolver) ServiceIntents(ctx context.Context, namespaces []string) ([]model.ServiceIntents, error) { + discoveredIntents := r.intentsHolder.GetIntents(namespaces) + serviceToDestinations := groupDestinationsBySource(discoveredIntents) + result := make([]model.ServiceIntents, 0) - for service, intents := range r.intentsHolder.GetIntentsPerService(namespaces) { - result = append(result, model.ServiceIntents{Client: lo.ToPtr(service), Intents: intents}) + for service, destinations := range serviceToDestinations { + sort.Slice(destinations, func(i, j int) bool { + if destinations[i].Name != destinations[j].Name { + return destinations[i].Name < destinations[j].Name + } + return destinations[i].Namespace < destinations[j].Namespace + }) + + result = append(result, model.ServiceIntents{ + Client: lo.ToPtr(service), + Intents: destinations, + }) } + // sorting by service name so results are more consistent sort.Slice(result, func(i, j int) bool { return result[i].Client.Name < result[j].Client.Name diff --git a/src/mapper/pkg/resolvers/test_gql_client/generated.go b/src/mapper/pkg/resolvers/test_gql_client/generated.go index cd4284a6..1fa3a36f 100644 --- a/src/mapper/pkg/resolvers/test_gql_client/generated.go +++ b/src/mapper/pkg/resolvers/test_gql_client/generated.go @@ -4,20 +4,21 @@ package test_gql_client import ( "context" + "time" "github.com/Khan/genqlient/graphql" ) type CaptureResultForSrcIp struct { - SrcIp string `json:"srcIp"` - Destinations []string `json:"destinations"` + SrcIp string `json:"srcIp"` + Destinations []Destination `json:"destinations"` } // GetSrcIp returns CaptureResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *CaptureResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestinations returns CaptureResultForSrcIp.Destinations, and is useful for accessing the field via an interface. -func (v *CaptureResultForSrcIp) GetDestinations() []string { return v.Destinations } +func (v *CaptureResultForSrcIp) GetDestinations() []Destination { return v.Destinations } type CaptureResults struct { Results []CaptureResultForSrcIp `json:"results"` @@ -26,6 +27,17 @@ type CaptureResults struct { // GetResults returns CaptureResults.Results, and is useful for accessing the field via an interface. func (v *CaptureResults) GetResults() []CaptureResultForSrcIp { return v.Results } +type Destination struct { + Destination string `json:"destination"` + LastSeen time.Time `json:"lastSeen"` +} + +// GetDestination returns Destination.Destination, and is useful for accessing the field via an interface. +func (v *Destination) GetDestination() string { return v.Destination } + +// GetLastSeen returns Destination.LastSeen, and is useful for accessing the field via an interface. +func (v *Destination) GetLastSeen() time.Time { return v.LastSeen } + // ReportCaptureResultsResponse is returned by ReportCaptureResults on success. type ReportCaptureResultsResponse struct { ReportCaptureResults bool `json:"reportCaptureResults"` @@ -99,15 +111,15 @@ func (v *ServiceIntentsServiceIntentsIntentsOtterizeServiceIdentity) GetNamespac } type SocketScanResultForSrcIp struct { - SrcIp string `json:"srcIp"` - DestIps []string `json:"destIps"` + SrcIp string `json:"srcIp"` + DestIps []Destination `json:"destIps"` } // GetSrcIp returns SocketScanResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *SocketScanResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestIps returns SocketScanResultForSrcIp.DestIps, and is useful for accessing the field via an interface. -func (v *SocketScanResultForSrcIp) GetDestIps() []string { return v.DestIps } +func (v *SocketScanResultForSrcIp) GetDestIps() []Destination { return v.DestIps } type SocketScanResults struct { Results []SocketScanResultForSrcIp `json:"results"` diff --git a/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml b/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml index a8d71379..b64a02ba 100644 --- a/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml +++ b/src/mapper/pkg/resolvers/test_gql_client/genqlient.yaml @@ -5,3 +5,7 @@ schema: operations: - genqlient.graphql generated: generated.go + +bindings: + Time: + type: time.Time diff --git a/src/mappergraphql/schema.graphql b/src/mappergraphql/schema.graphql index 3d65dfb5..a4f1b8df 100644 --- a/src/mappergraphql/schema.graphql +++ b/src/mappergraphql/schema.graphql @@ -1,6 +1,13 @@ +scalar Time # Equivalent of Go's time.Time provided by gqlgen + input CaptureResultForSrcIp { srcIp: String! - destinations: [String!]! + destinations: [Destination!]! +} + +input Destination { + destination: String! + lastSeen: Time! } input CaptureResults { @@ -9,7 +16,7 @@ input CaptureResults { input SocketScanResultForSrcIp { srcIp: String! - destIps: [String!]! + destIps: [Destination!]! } input SocketScanResults { diff --git a/src/sniffer/pkg/client/generated.go b/src/sniffer/pkg/client/generated.go index d0e76491..d6b0a429 100644 --- a/src/sniffer/pkg/client/generated.go +++ b/src/sniffer/pkg/client/generated.go @@ -4,20 +4,21 @@ package client import ( "context" + "time" "github.com/Khan/genqlient/graphql" ) type CaptureResultForSrcIp struct { - SrcIp string `json:"srcIp"` - Destinations []string `json:"destinations"` + SrcIp string `json:"srcIp"` + Destinations []Destination `json:"destinations"` } // GetSrcIp returns CaptureResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *CaptureResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestinations returns CaptureResultForSrcIp.Destinations, and is useful for accessing the field via an interface. -func (v *CaptureResultForSrcIp) GetDestinations() []string { return v.Destinations } +func (v *CaptureResultForSrcIp) GetDestinations() []Destination { return v.Destinations } type CaptureResults struct { Results []CaptureResultForSrcIp `json:"results"` @@ -26,16 +27,27 @@ type CaptureResults struct { // GetResults returns CaptureResults.Results, and is useful for accessing the field via an interface. func (v *CaptureResults) GetResults() []CaptureResultForSrcIp { return v.Results } +type Destination struct { + Destination string `json:"destination"` + LastSeen time.Time `json:"lastSeen"` +} + +// GetDestination returns Destination.Destination, and is useful for accessing the field via an interface. +func (v *Destination) GetDestination() string { return v.Destination } + +// GetLastSeen returns Destination.LastSeen, and is useful for accessing the field via an interface. +func (v *Destination) GetLastSeen() time.Time { return v.LastSeen } + type SocketScanResultForSrcIp struct { - SrcIp string `json:"srcIp"` - DestIps []string `json:"destIps"` + SrcIp string `json:"srcIp"` + DestIps []Destination `json:"destIps"` } // GetSrcIp returns SocketScanResultForSrcIp.SrcIp, and is useful for accessing the field via an interface. func (v *SocketScanResultForSrcIp) GetSrcIp() string { return v.SrcIp } // GetDestIps returns SocketScanResultForSrcIp.DestIps, and is useful for accessing the field via an interface. -func (v *SocketScanResultForSrcIp) GetDestIps() []string { return v.DestIps } +func (v *SocketScanResultForSrcIp) GetDestIps() []Destination { return v.DestIps } type SocketScanResults struct { Results []SocketScanResultForSrcIp `json:"results"` diff --git a/src/sniffer/pkg/client/genqlient.yaml b/src/sniffer/pkg/client/genqlient.yaml index 38b7b943..4df29655 100644 --- a/src/sniffer/pkg/client/genqlient.yaml +++ b/src/sniffer/pkg/client/genqlient.yaml @@ -5,3 +5,6 @@ schema: operations: - genqlient.graphql generated: generated.go +bindings: + Time: + type: time.Time diff --git a/src/sniffer/pkg/sniffer/sniffer.go b/src/sniffer/pkg/sniffer/sniffer.go index c543fdd3..6d2d4769 100644 --- a/src/sniffer/pkg/sniffer/sniffer.go +++ b/src/sniffer/pkg/sniffer/sniffer.go @@ -2,7 +2,6 @@ package sniffer import ( "context" - "github.com/amit7itz/goset" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/pcap" @@ -14,8 +13,11 @@ import ( "time" ) +// capturesMap is a map of source IP to a map of destination DNS to the last time it was seen +type capturesMap map[string]map[string]time.Time + type Sniffer struct { - capturedRequests map[string]*goset.Set[string] + capturedRequests capturesMap socketScanner *socketscanner.SocketScanner lastReportTime time.Time mapperClient client.MapperClient @@ -23,7 +25,7 @@ type Sniffer struct { func NewSniffer(mapperClient client.MapperClient) *Sniffer { return &Sniffer{ - capturedRequests: make(map[string]*goset.Set[string]), + capturedRequests: make(capturesMap), socketScanner: socketscanner.NewSocketScanner(mapperClient), lastReportTime: time.Now(), mapperClient: mapperClient, @@ -31,6 +33,7 @@ func NewSniffer(mapperClient client.MapperClient) *Sniffer { } func (s *Sniffer) HandlePacket(packet gopacket.Packet) { + captureTime := detectCaptureTime(packet) ipLayer := packet.Layer(layers.LayerTypeIPv4) dnsLayer := packet.Layer(layers.LayerTypeDNS) if dnsLayer != nil && ipLayer != nil { @@ -42,18 +45,25 @@ func (s *Sniffer) HandlePacket(packet gopacket.Packet) { if answer.Type != layers.DNSTypeA && answer.Type != layers.DNSTypeAAAA { continue } - s.addCapturedRequest(ip.DstIP.String(), string(answer.Name)) + s.addCapturedRequest(ip.DstIP.String(), string(answer.Name), captureTime) } } } } -func (s *Sniffer) addCapturedRequest(srcIp string, destDns string) { +func detectCaptureTime(packet gopacket.Packet) time.Time { + captureTime := packet.Metadata().CaptureInfo.Timestamp + if captureTime.IsZero() { + return time.Now() + } + return captureTime +} + +func (s *Sniffer) addCapturedRequest(srcIp string, destDns string, seenAt time.Time) { if _, ok := s.capturedRequests[srcIp]; !ok { - s.capturedRequests[srcIp] = goset.NewSet[string](destDns) - } else { - s.capturedRequests[srcIp].Add(destDns) + s.capturedRequests[srcIp] = make(map[string]time.Time) } + s.capturedRequests[srcIp][destDns] = seenAt } func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { @@ -63,10 +73,7 @@ func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { return nil } s.PrintCapturedRequests() - results := make([]client.CaptureResultForSrcIp, 0, len(s.capturedRequests)) - for srcIp, destinations := range s.capturedRequests { - results = append(results, client.CaptureResultForSrcIp{SrcIp: srcIp, Destinations: destinations.Items()}) - } + results := getCaptureResults(s.capturedRequests) timeoutCtx, cancelFunc := context.WithTimeout(ctx, viper.GetDuration(config.CallsTimeoutKey)) defer cancelFunc() @@ -77,16 +84,28 @@ func (s *Sniffer) ReportCaptureResults(ctx context.Context) error { } // delete the reported captured requests - s.capturedRequests = make(map[string]*goset.Set[string]) + s.capturedRequests = make(capturesMap) return nil } +func getCaptureResults(capturedRequests capturesMap) []client.CaptureResultForSrcIp { + results := make([]client.CaptureResultForSrcIp, 0, len(capturedRequests)) + for srcIp, destDNSToTime := range capturedRequests { + destinations := make([]client.Destination, 0) + for destDNS, lastSeen := range destDNSToTime { + destinations = append(destinations, client.Destination{Destination: destDNS, LastSeen: lastSeen}) + } + results = append(results, client.CaptureResultForSrcIp{SrcIp: srcIp, Destinations: destinations}) + } + return results +} + func (s *Sniffer) PrintCapturedRequests() { - for ip, dests := range s.capturedRequests { + for ip, destinations := range s.capturedRequests { logrus.Debugf("%s:\n", ip) - dests.For(func(dest string) { - logrus.Debugf("\t%s\n", dest) - }) + for destDNS, lastSeen := range destinations { + logrus.Debugf("\t%s, %s\n", destDNS, lastSeen) + } } } diff --git a/src/sniffer/pkg/sniffer/sniffer_test.go b/src/sniffer/pkg/sniffer/sniffer_test.go index 02143578..c47afa2f 100644 --- a/src/sniffer/pkg/sniffer/sniffer_test.go +++ b/src/sniffer/pkg/sniffer/sniffer_test.go @@ -10,6 +10,7 @@ import ( mock_client "github.com/otterize/network-mapper/src/sniffer/pkg/client/mockclient" "github.com/stretchr/testify/suite" "testing" + "time" ) type SnifferTestSuite struct { @@ -30,13 +31,20 @@ func (s *SnifferTestSuite) TestHandlePacket() { s.Require().NoError(err) } packet := gopacket.NewPacket(rawDnsResponse, layers.LayerTypeEthernet, gopacket.Default) + timestamp := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + packet.Metadata().CaptureInfo.Timestamp = timestamp sniffer.HandlePacket(packet) s.mockMapperClient.EXPECT().ReportCaptureResults(gomock.Any(), client.CaptureResults{ Results: []client.CaptureResultForSrcIp{ { - SrcIp: "10.101.81.13", - Destinations: []string{"sts.us-east-1.amazonaws.com"}, + SrcIp: "10.101.81.13", + Destinations: []client.Destination{ + { + Destination: "sts.us-east-1.amazonaws.com", + LastSeen: timestamp, + }, + }, }, }, }) diff --git a/src/sniffer/pkg/socketscanner/socketscanner.go b/src/sniffer/pkg/socketscanner/socketscanner.go index 4171a6b0..a6d3f449 100644 --- a/src/sniffer/pkg/socketscanner/socketscanner.go +++ b/src/sniffer/pkg/socketscanner/socketscanner.go @@ -3,23 +3,25 @@ package socketscanner import ( "context" "fmt" - "github.com/amit7itz/goset" "github.com/otterize/go-procnet/procnet" "github.com/otterize/network-mapper/src/sniffer/pkg/client" "github.com/otterize/network-mapper/src/sniffer/pkg/config" "github.com/spf13/viper" "os" "strconv" + "time" ) +type scanResultMap map[string]map[string]time.Time + type SocketScanner struct { - scanResults map[string]*goset.Set[string] + scanResults scanResultMap mapperClient client.MapperClient } func NewSocketScanner(mapperClient client.MapperClient) *SocketScanner { return &SocketScanner{ - scanResults: make(map[string]*goset.Set[string]), + scanResults: make(scanResultMap), mapperClient: mapperClient, } } @@ -43,10 +45,9 @@ func (s *SocketScanner) scanTcpFile(path string) { } if _, ok := listenPorts[sock.LocalAddr.Port]; ok { if _, ok := s.scanResults[sock.RemoteAddr.IP.String()]; !ok { - s.scanResults[sock.RemoteAddr.IP.String()] = goset.NewSet(sock.LocalAddr.IP.String()) - } else { - s.scanResults[sock.RemoteAddr.IP.String()].Add(sock.LocalAddr.IP.String()) + s.scanResults[sock.RemoteAddr.IP.String()] = make(map[string]time.Time) } + s.scanResults[sock.RemoteAddr.IP.String()][sock.LocalAddr.IP.String()] = time.Now() } } } @@ -70,14 +71,26 @@ func (s *SocketScanner) ScanProcDir() error { } func (s *SocketScanner) ReportSocketScanResults(ctx context.Context) error { - results := client.SocketScanResults{} - for srcIp, destIps := range s.scanResults { - results.Results = append(results.Results, client.SocketScanResultForSrcIp{SrcIp: srcIp, DestIps: destIps.Items()}) - } + results := getModelResults(s.scanResults) err := s.mapperClient.ReportSocketScanResults(ctx, results) if err != nil { return err } - s.scanResults = make(map[string]*goset.Set[string]) + s.scanResults = make(scanResultMap) return nil } + +func getModelResults(scanResults scanResultMap) client.SocketScanResults { + results := client.SocketScanResults{} + for srcIp, destinationsMap := range scanResults { + destinations := make([]client.Destination, 0) + for destIP, lastSeen := range destinationsMap { + destinations = append(destinations, client.Destination{Destination: destIP, LastSeen: lastSeen}) + } + results.Results = append(results.Results, client.SocketScanResultForSrcIp{ + SrcIp: srcIp, + DestIps: destinations, + }) + } + return results +} diff --git a/src/sniffer/pkg/socketscanner/socketscanner_test.go b/src/sniffer/pkg/socketscanner/socketscanner_test.go index ff99370e..51f2a99e 100644 --- a/src/sniffer/pkg/socketscanner/socketscanner_test.go +++ b/src/sniffer/pkg/socketscanner/socketscanner_test.go @@ -24,28 +24,63 @@ func (s *SocketScannerTestSuite) SetupSuite() { s.mockMapperClient = mock_client.NewMockMapperClient(s.mockController) } -type matchOne[T any] struct { - validResults []T -} +type SocketScanResultForSrcIpMatcher []client.SocketScanResultForSrcIp + +func (m SocketScanResultForSrcIpMatcher) Matches(x interface{}) bool { + results, ok := x.(client.SocketScanResults) + if !ok { + return false + } + + actualValues := results.Results + if len(actualValues) != len(m) { + return false + } -func (m matchOne[T]) Matches(x interface{}) bool { - for _, option := range m.validResults { - if gomock.Eq(option).Matches(x) { - return true + // Match in any order + for _, expected := range m { + found := false + for _, actual := range actualValues { + if matchSocketScanResult(expected, actual) { + found = true + break + } + } + if !found { + return false } } - return false + return true } -func (m matchOne[T]) String() string { - return fmt.Sprintf("One of the following: %v", m.validResults) +func matchSocketScanResult(expected, actual client.SocketScanResultForSrcIp) bool { + if expected.SrcIp != actual.SrcIp { + return false + } + + if len(expected.DestIps) != len(actual.DestIps) { + return false + } + + for i := range expected.DestIps { + if expected.DestIps[i].Destination != actual.DestIps[i].Destination { + return false + } + } + + return true } -// MatchOne makes sure that object matches one of the validResults -func MatchOne[T any](validResults []T) gomock.Matcher { - return matchOne[T]{ - validResults: validResults, +func (m SocketScanResultForSrcIpMatcher) String() string { + var result string + for _, value := range m { + result += fmt.Sprintf("{Src: %v, Dest: %v}", value.SrcIp, value.DestIps) } + return result +} + +func GetMatcher(expected []client.SocketScanResultForSrcIp) SocketScanResultForSrcIpMatcher { + return expected } func (s *SocketScannerTestSuite) TestScanProcDir() { @@ -65,21 +100,24 @@ func (s *SocketScannerTestSuite) TestScanProcDir() { // all other sockets should be ignored (because parsing the server sides on all pods is enough) expectedResult := []client.SocketScanResultForSrcIp{ { - SrcIp: "192.168.35.14", - DestIps: []string{"192.168.38.211"}, + SrcIp: "192.168.35.14", + DestIps: []client.Destination{ + { + Destination: "192.168.38.211", + }, + }, }, { - SrcIp: "176.168.35.14", - DestIps: []string{"192.168.38.211"}, + SrcIp: "176.168.35.14", + DestIps: []client.Destination{ + { + Destination: "192.168.38.211", + }, + }, }, } - // order is random in the response, so we mark both orders as valid - validResults := []client.SocketScanResults{ - {Results: expectedResult}, - {Results: []client.SocketScanResultForSrcIp{expectedResult[1], expectedResult[0]}}, - } - s.mockMapperClient.EXPECT().ReportSocketScanResults(gomock.Any(), MatchOne(validResults)) + s.mockMapperClient.EXPECT().ReportSocketScanResults(gomock.Any(), GetMatcher(expectedResult)) err = sniffer.ReportSocketScanResults(context.Background()) s.Require().NoError(err) }