diff --git a/diode-server/README.md b/diode-server/README.md index 0f9dc42b..99c2a344 100644 --- a/diode-server/README.md +++ b/diode-server/README.md @@ -67,7 +67,9 @@ Edit the `.env` to match your environment: * `NETBOX_TO_DIODE_API_KEY`: API key generated with the Diode NetBox plugin installation * `INGESTER_TO_RECONCILER_API_KEY`: API key to authorize RPC calls between the Ingester and Reconciler services (at least 40 characters, example generation with shell command: `openssl rand -base64 40 | head -c 40`) -* `MIGRATION_ENABLED`: Set to `false` to disable the migration, default is `true` +* `MIGRATION_ENABLED`: Set to `false` to disable the migration, default: `true` +* `RECONCILER_RATE_LIMITER_RPS`: Rate limit for the reconciler service for generating and applying change sets concurrently, default: `20` +* `RECONCILER_RATE_LIMITER_BURST`: Burst limit for the reconciler service for generating and applying change sets concurrently, default: `1` ### Running the Diode server diff --git a/diode-server/docker/docker-compose.netbox.yaml b/diode-server/docker/docker-compose.netbox.yaml index 70b818bb..30ac693b 100644 --- a/diode-server/docker/docker-compose.netbox.yaml +++ b/diode-server/docker/docker-compose.netbox.yaml @@ -1,6 +1,6 @@ services: netbox: &netbox - image: netboxcommunity/netbox:v4.0-2.9.1-diode + image: netboxcommunity/netbox:v4.1-3.0.1-diode build: context: . dockerfile: netbox/Dockerfile-diode diff --git a/diode-server/docker/docker-compose.yaml b/diode-server/docker/docker-compose.yaml index 8abf878c..b73c2ba5 100644 --- a/diode-server/docker/docker-compose.yaml +++ b/diode-server/docker/docker-compose.yaml @@ -64,6 +64,8 @@ services: - LOGGING_LEVEL=${LOGGING_LEVEL} - SENTRY_DSN=${SENTRY_DSN} - MIGRATION_ENABLED=${MIGRATION_ENABLED} + - RECONCILER_RATE_LIMITER_RPS=${RECONCILER_RATE_LIMITER_RPS} + - RECONCILER_RATE_LIMITER_BURST=${RECONCILER_RATE_LIMITER_BURST} restart: always ports: [ ] depends_on: diff --git a/diode-server/docker/netbox/Dockerfile-diode b/diode-server/docker/netbox/Dockerfile-diode index 6ea3b621..f00470a3 100644 --- a/diode-server/docker/netbox/Dockerfile-diode +++ b/diode-server/docker/netbox/Dockerfile-diode @@ -1,4 +1,4 @@ -FROM netboxcommunity/netbox:v4.0-2.9.1 +FROM netboxcommunity/netbox:v4.1-3.0.1 COPY ./netbox/configuration/ /etc/netbox/config/ RUN chmod 755 /etc/netbox/config/* && \ diff --git a/diode-server/docker/sample.env b/diode-server/docker/sample.env index 410be79c..5a697a11 100644 --- a/diode-server/docker/sample.env +++ b/diode-server/docker/sample.env @@ -13,3 +13,5 @@ INGESTER_TO_RECONCILER_API_KEY=sXjJZe6BBzVuovrVyyH4Q3vbceqvDwh2kC3DRpML LOGGING_LEVEL=DEBUG SENTRY_DSN= MIGRATION_ENABLED=true +RECONCILER_RATE_LIMITER_RPS=20 +RECONCILER_RATE_LIMITER_BURST=1 diff --git a/diode-server/go.mod b/diode-server/go.mod index dc9d99e6..20e0e103 100644 --- a/diode-server/go.mod +++ b/diode-server/go.mod @@ -17,6 +17,7 @@ require ( github.com/redis/go-redis/v9 v9.5.1 github.com/segmentio/ksuid v1.0.4 github.com/stretchr/testify v1.9.0 + golang.org/x/time v0.3.0 google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 ) diff --git a/diode-server/go.sum b/diode-server/go.sum index bd0b02f0..aba4f74d 100644 --- a/diode-server/go.sum +++ b/diode-server/go.sum @@ -64,6 +64,8 @@ golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= diff --git a/diode-server/reconciler/applier/applier.go b/diode-server/reconciler/applier/applier.go new file mode 100644 index 00000000..70501441 --- /dev/null +++ b/diode-server/reconciler/applier/applier.go @@ -0,0 +1,38 @@ +package applier + +import ( + "context" + "log/slog" + + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" +) + +// ApplyChangeSet applies a change set to NetBox +func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, nbClient netboxdiodeplugin.NetBoxAPI) error { + changes := make([]netboxdiodeplugin.Change, 0) + for _, change := range cs.ChangeSet { + changes = append(changes, netboxdiodeplugin.Change{ + ChangeID: change.ChangeID, + ChangeType: change.ChangeType, + ObjectType: change.ObjectType, + ObjectID: change.ObjectID, + ObjectVersion: change.ObjectVersion, + Data: change.Data, + }) + } + + req := netboxdiodeplugin.ChangeSetRequest{ + ChangeSetID: cs.ChangeSetID, + ChangeSet: changes, + } + + resp, err := nbClient.ApplyChangeSet(ctx, req) + if err != nil { + return err + } + + logger.Debug("apply change set response", "response", resp) + + return nil +} diff --git a/diode-server/reconciler/applier/applier_test.go b/diode-server/reconciler/applier/applier_test.go new file mode 100644 index 00000000..d70c5893 --- /dev/null +++ b/diode-server/reconciler/applier/applier_test.go @@ -0,0 +1,69 @@ +package applier_test + +import ( + "context" + "log/slog" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/netboxlabs/diode/diode-server/netbox" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + nbClientMock "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler/applier" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" +) + +func TestApplyChangeSet(t *testing.T) { + ctx := context.Background() + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + mockNetBoxAPI := new(nbClientMock.NetBoxAPI) + cs := changeset.ChangeSet{ + ChangeSetID: "00000000-0000-0000-0000-000000000000", + ChangeSet: []changeset.Change{ + { + ChangeID: "00000000-0000-0000-0000-000000000001", + ChangeType: "create", + ObjectType: "dcim.site", + Data: &netbox.DcimSite{ + Name: "Site A", + Slug: "site-a", + Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))), + }, + }, + }, + } + + req := netboxdiodeplugin.ChangeSetRequest{ + ChangeSetID: "00000000-0000-0000-0000-000000000000", + ChangeSet: []netboxdiodeplugin.Change{ + { + ChangeID: "00000000-0000-0000-0000-000000000001", + ChangeType: "create", + ObjectType: "dcim.site", + ObjectID: nil, + ObjectVersion: nil, + Data: &netbox.DcimSite{ + Name: "Site A", + Slug: "site-a", + Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))), + }, + }, + }, + } + + resp := &netboxdiodeplugin.ChangeSetResponse{ + ChangeSetID: "00000000-0000-0000-0000-000000000000", + Result: "success", + } + + mockNetBoxAPI.On("ApplyChangeSet", ctx, req).Return(resp, nil) + + err := applier.ApplyChangeSet(ctx, logger, cs, mockNetBoxAPI) + assert.NoError(t, err) + mockNetBoxAPI.AssertExpectations(t) +} + +func strPtr(s string) *string { return &s } diff --git a/diode-server/reconciler/changeset/changeset.go b/diode-server/reconciler/changeset/changeset.go index 0addff20..7a79170c 100644 --- a/diode-server/reconciler/changeset/changeset.go +++ b/diode-server/reconciler/changeset/changeset.go @@ -1,17 +1,5 @@ package changeset -import ( - "context" - "fmt" - - "github.com/google/uuid" - "github.com/mitchellh/mapstructure" - - "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" - "github.com/netboxlabs/diode/diode-server/netbox" - "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" -) - const ( // ChangeTypeCreate is the change type for a creation ChangeTypeCreate = "create" @@ -20,22 +8,6 @@ const ( ChangeTypeUpdate = "update" ) -// IngestEntity represents an ingest entity -type IngestEntity struct { - RequestID string `json:"request_id"` - DataType string `json:"data_type"` - Entity any `json:"entity"` - State int `json:"state"` -} - -// ObjectState represents a object state -type ObjectState struct { - ObjectID int `json:"object_id"` - ObjectType string `json:"object_type"` - ObjectChangeID int `json:"object_change_id"` - Object any `json:"object"` -} - // ChangeSet represents a change set type ChangeSet struct { ChangeSetID string `json:"change_set_id"` @@ -51,158 +23,3 @@ type Change struct { ObjectVersion *int `json:"object_version,omitempty"` Data any `json:"data"` } - -// Prepare prepares a change set -func Prepare(entity IngestEntity, netboxAPI netboxdiodeplugin.NetBoxAPI) (*ChangeSet, error) { - // extract ingested entity (actual) - actual, err := extractIngestEntityData(entity) - if err != nil { - return nil, err - } - - // get root object and all its nested objects (actual) - actualNestedObjects, err := actual.NestedObjects() - if err != nil { - return nil, err - } - - // map out root object and all its nested objects (actual) - actualNestedObjectsMap := make(map[string]netbox.ComparableData) - for _, obj := range actualNestedObjects { - actualNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = obj - } - - // retrieve root object all its nested objects from NetBox (intended) - intendedNestedObjectsMap := make(map[string]netbox.ComparableData) - for _, obj := range actualNestedObjects { - intended, err := retrieveObjectState(netboxAPI, obj) - if err != nil { - return nil, err - } - intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = intended - } - - // map out retrieved root object and all its nested objects (current) - var current netbox.ComparableData - for _, obj := range actualNestedObjects { - if obj.DataType() == entity.DataType { - current = intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] - break - } - } - - objectsToReconcile, err := actual.Patch(current, intendedNestedObjectsMap) - if err != nil { - return nil, err - } - - // process objectsToReconcile and prepare changeset to return - changes := make([]Change, 0) - - for _, obj := range objectsToReconcile { - operation := ChangeTypeCreate - var objectID *int - - id := obj.ID() - if id > 0 { - objectID = &id - operation = ChangeTypeUpdate - } - - changes = append(changes, Change{ - ChangeID: uuid.NewString(), - ChangeType: operation, - ObjectType: obj.DataType(), - ObjectID: objectID, - ObjectVersion: nil, - Data: obj.Data(), - }) - } - - return &ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil -} - -func retrieveObjectState(netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData) (netbox.ComparableData, error) { - params := netboxdiodeplugin.RetrieveObjectStateQueryParams{ - ObjectID: 0, - ObjectType: change.DataType(), - Params: change.ObjectStateQueryParams(), - } - resp, err := netboxAPI.RetrieveObjectState(context.Background(), params) - if err != nil { - return nil, err - } - - if resp.Object.IsValid() { - objectState := &ObjectState{ - ObjectID: resp.ObjectID, - ObjectType: change.DataType(), - ObjectChangeID: resp.ObjectChangeID, - Object: resp.Object, - } - - return extractNetBoxObjectStateData(*objectState) - } - - return nil, nil -} - -func extractIngestEntityData(ingestEntity IngestEntity) (netbox.ComparableData, error) { - if ingestEntity.Entity == nil { - return nil, fmt.Errorf("ingest entity is nil") - } - - dw, err := netbox.NewDataWrapper(ingestEntity.DataType) - if err != nil { - return nil, err - } - - protoEntity, ok := ingestEntity.Entity.(*diodepb.Entity) - if !ok { - return nil, fmt.Errorf("ingest entity is not a proto entity") - } - - if err = dw.FromProtoEntity(protoEntity); err != nil { - return nil, err - } - - if !dw.IsValid() { - return nil, fmt.Errorf("invalid ingest entity") - } - - return dw, nil -} - -func extractNetBoxObjectStateData(obj ObjectState) (netbox.ComparableData, error) { - if obj.Object == nil { - return nil, fmt.Errorf("object state is nil") - } - - dw, err := netbox.NewDataWrapper(obj.ObjectType) - if err != nil { - return nil, err - } - - decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ - Result: &dw, - MatchName: netbox.IpamIPAddressAssignedObjectMatchName, - DecodeHook: mapstructure.ComposeDecodeHookFunc( - netbox.IpamIPAddressAssignedObjectHookFunc(), - ), - }) - if err != nil { - return nil, err - } - - if err := decoder.Decode(obj.Object); err != nil { - return nil, fmt.Errorf("failed to decode object entity %w", err) - } - - if !dw.IsValid() { - return nil, fmt.Errorf("invalid object state") - } - - dw.Normalise() - - return dw, nil -} diff --git a/diode-server/reconciler/config.go b/diode-server/reconciler/config.go index 58366e32..a774c2b9 100644 --- a/diode-server/reconciler/config.go +++ b/diode-server/reconciler/config.go @@ -2,13 +2,16 @@ package reconciler // Config is the configuration for the reconciler service type Config struct { - GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` - RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` - RedisPort string `envconfig:"REDIS_PORT" default:"6379"` - RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` - RedisDB int `envconfig:"REDIS_DB" default:"0"` - RedisStreamDB int `envconfig:"REDIS_STREAM_DB" default:"1"` - MigrationEnabled bool `envconfig:"MIGRATION_ENABLED" default:"true"` + GRPCPort int `envconfig:"GRPC_PORT" default:"8081"` + RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"` + RedisPort string `envconfig:"REDIS_PORT" default:"6379"` + RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"` + RedisDB int `envconfig:"REDIS_DB" default:"0"` + RedisStreamDB int `envconfig:"REDIS_STREAM_DB" default:"1"` + MigrationEnabled bool `envconfig:"MIGRATION_ENABLED" default:"true"` + AutoApplyChangesets bool `envconfig:"AUTO_APPLY_CHANGESETS" default:"true"` + ReconcilerRateLimiterRPS int `envconfig:"RECONCILER_RATE_LIMITER_RPS" default:"20"` + ReconcilerRateLimiterBurst int `envconfig:"RECONCILER_RATE_LIMITER_BURST" default:"1"` // API keys DiodeToNetBoxAPIKey string `envconfig:"DIODE_TO_NETBOX_API_KEY" required:"true"` diff --git a/diode-server/reconciler/differ/differ.go b/diode-server/reconciler/differ/differ.go new file mode 100644 index 00000000..7ccb9033 --- /dev/null +++ b/diode-server/reconciler/differ/differ.go @@ -0,0 +1,185 @@ +package differ + +import ( + "context" + "fmt" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + "github.com/netboxlabs/diode/diode-server/netbox" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" +) + +// IngestEntity represents an ingest entity +type IngestEntity struct { + RequestID string `json:"request_id"` + DataType string `json:"data_type"` + Entity any `json:"entity"` + State int `json:"state"` +} + +// ObjectState represents a object state +type ObjectState struct { + ObjectID int `json:"object_id"` + ObjectType string `json:"object_type"` + ObjectChangeID int `json:"object_change_id"` + Object any `json:"object"` +} + +// Diff compares ingested entity with the intended state in NetBox and returns a change set +func Diff(ctx context.Context, entity IngestEntity, netboxAPI netboxdiodeplugin.NetBoxAPI) (*changeset.ChangeSet, error) { + // extract ingested entity (actual) + actual, err := extractIngestEntityData(entity) + if err != nil { + return nil, err + } + + // get root object and all its nested objects (actual) + actualNestedObjects, err := actual.NestedObjects() + if err != nil { + return nil, err + } + + // map out root object and all its nested objects (actual) + actualNestedObjectsMap := make(map[string]netbox.ComparableData) + for _, obj := range actualNestedObjects { + actualNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = obj + } + + // retrieve root object all its nested objects from NetBox (intended) + intendedNestedObjectsMap := make(map[string]netbox.ComparableData) + for _, obj := range actualNestedObjects { + intended, err := retrieveObjectState(ctx, netboxAPI, obj) + if err != nil { + return nil, err + } + intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = intended + } + + // map out retrieved root object and all its nested objects (current) + var current netbox.ComparableData + for _, obj := range actualNestedObjects { + if obj.DataType() == entity.DataType { + current = intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] + break + } + } + + objectsToReconcile, err := actual.Patch(current, intendedNestedObjectsMap) + if err != nil { + return nil, err + } + + // process objectsToReconcile and prepare change set to return + changes := make([]changeset.Change, 0) + + for _, obj := range objectsToReconcile { + operation := changeset.ChangeTypeCreate + var objectID *int + + id := obj.ID() + if id > 0 { + objectID = &id + operation = changeset.ChangeTypeUpdate + } + + changes = append(changes, changeset.Change{ + ChangeID: uuid.NewString(), + ChangeType: operation, + ObjectType: obj.DataType(), + ObjectID: objectID, + ObjectVersion: nil, + Data: obj.Data(), + }) + } + + return &changeset.ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil +} + +func retrieveObjectState(ctx context.Context, netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData) (netbox.ComparableData, error) { + params := netboxdiodeplugin.RetrieveObjectStateQueryParams{ + ObjectID: 0, + ObjectType: change.DataType(), + Params: change.ObjectStateQueryParams(), + } + resp, err := netboxAPI.RetrieveObjectState(ctx, params) + if err != nil { + return nil, err + } + + if resp.Object.IsValid() { + objectState := &ObjectState{ + ObjectID: resp.ObjectID, + ObjectType: change.DataType(), + ObjectChangeID: resp.ObjectChangeID, + Object: resp.Object, + } + + return extractNetBoxObjectStateData(*objectState) + } + + return nil, nil +} + +func extractIngestEntityData(ingestEntity IngestEntity) (netbox.ComparableData, error) { + if ingestEntity.Entity == nil { + return nil, fmt.Errorf("ingest entity is nil") + } + + dw, err := netbox.NewDataWrapper(ingestEntity.DataType) + if err != nil { + return nil, err + } + + protoEntity, ok := ingestEntity.Entity.(*diodepb.Entity) + if !ok { + return nil, fmt.Errorf("ingest entity is not a proto entity") + } + + if err = dw.FromProtoEntity(protoEntity); err != nil { + return nil, err + } + + if !dw.IsValid() { + return nil, fmt.Errorf("invalid ingest entity") + } + + return dw, nil +} + +func extractNetBoxObjectStateData(obj ObjectState) (netbox.ComparableData, error) { + if obj.Object == nil { + return nil, fmt.Errorf("object state is nil") + } + + dw, err := netbox.NewDataWrapper(obj.ObjectType) + if err != nil { + return nil, err + } + + decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + Result: &dw, + MatchName: netbox.IpamIPAddressAssignedObjectMatchName, + DecodeHook: mapstructure.ComposeDecodeHookFunc( + netbox.IpamIPAddressAssignedObjectHookFunc(), + ), + }) + if err != nil { + return nil, err + } + + if err := decoder.Decode(obj.Object); err != nil { + return nil, fmt.Errorf("failed to decode object entity %w", err) + } + + if !dw.IsValid() { + return nil, fmt.Errorf("invalid object state") + } + + dw.Normalise() + + return dw, nil +} diff --git a/diode-server/reconciler/changeset/changeset_dcim_test.go b/diode-server/reconciler/differ/differ_dcim_test.go similarity index 98% rename from diode-server/reconciler/changeset/changeset_dcim_test.go rename to diode-server/reconciler/differ/differ_dcim_test.go index 731ca5f3..c61599dc 100644 --- a/diode-server/reconciler/changeset/changeset_dcim_test.go +++ b/diode-server/reconciler/differ/differ_dcim_test.go @@ -1,4 +1,4 @@ -package changeset_test +package differ_test import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" ) func TestDcimPrepare(t *testing.T) { @@ -24,14 +25,14 @@ func TestDcimPrepare(t *testing.T) { } tests := []struct { name string - ingestEntity changeset.IngestEntity + ingestEntity differ.IngestEntity retrieveObjectStates []mockRetrieveObjectState wantChangeSet changeset.ChangeSet wantErr bool }{ { name: "[P1] ingest dcim.site with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -74,7 +75,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P1] ingest dcim.site with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -109,7 +110,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P1] ingest dcim.site with tags - existing object found - update with new tags", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -227,7 +228,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P1] ingest empty dcim.site - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -245,7 +246,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P2] ingest dcim.devicerole with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -288,7 +289,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P2] ingest dcim.devicerole with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -323,7 +324,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P2] ingest dcim.devicerole with name and new description - existing object found - update", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -375,7 +376,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P2] ingest dcim.devicerole with same color - existing object found - nothing to update", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -413,7 +414,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P2] ingest empty dcim.devicerole - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -431,7 +432,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P3] ingest dcim.manufacturer with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.manufacturer", Entity: &diodepb.Entity{ @@ -473,7 +474,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P3] ingest dcim.manufacturer with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.manufacturer", Entity: &diodepb.Entity{ @@ -507,7 +508,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P3] ingest empty dcim.manufacturer - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.manufacturer", Entity: &diodepb.Entity{ @@ -525,7 +526,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P4] ingest dcim.devicetype with model only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -591,7 +592,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P4] ingest dcim.devicetype with model only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -644,7 +645,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P4] ingest empty dcim.devicetype - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -662,7 +663,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P5] ingest dcim.devicetype with manufacturer - existing object not found - create manufacturer and devicetype", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -735,7 +736,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P5] ingest dcim.devicetype with new manufacturer - existing object found - create manufacturer and update devicetype", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -966,7 +967,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P5.2] ingest dcim.devicetype with new manufacturer - existing object found - create manufacturer and update devicetype", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -1097,7 +1098,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P5.3] ingest dcim.devicetype with new manufacturer - existing object found - update devicetype with new existing manufacturer", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicetype", Entity: &diodepb.Entity{ @@ -1231,7 +1232,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P6] ingest dcim.device with name only - existing object not found - create device and all related objects (using placeholders)", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1377,7 +1378,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P6] ingest dcim.device with name only - existing object and its related objects found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1492,7 +1493,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P6] ingest dcim.device with empty site", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1608,7 +1609,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P7] ingest dcim.device - existing object not found - create device and all related objects", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1793,7 +1794,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P7] ingest dcim.device with device type having manufacturer defined - existing object not found - create device and all related objects", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1981,7 +1982,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P6] ingest empty dcim.device - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -1999,7 +2000,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P7] ingest dcim.device - existing object found - create missing related objects and update device", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -2217,7 +2218,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P8] ingest dcim.device - existing object not found - create device and all related objects", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -2404,7 +2405,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P8] ingest dcim.device - existing object found - create missing related objects and update device", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -2606,7 +2607,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P8] ingest dcim.device - existing object found - create some missing related objects, use other existing one and update device", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -2793,7 +2794,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P8.1] ingest dcim.device with partial data - existing object found - create missing related objects and update device", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -2995,7 +2996,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P8.2] ingest dcim.device - existing object found - no changes to apply", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -3155,7 +3156,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P9] ingest dcim.site with name, status and description - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -3201,7 +3202,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P9] ingest dcim.site with name, status and new description - existing object found - update", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.site", Entity: &diodepb.Entity{ @@ -3254,7 +3255,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P10] ingest dcim.manufacturer with name and description - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.manufacturer", Entity: &diodepb.Entity{ @@ -3298,7 +3299,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P10] ingest dcim.manufacturer with name and new description - existing object found - update", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.manufacturer", Entity: &diodepb.Entity{ @@ -3348,7 +3349,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P11] ingest dcim.devicerole with name and additional attributes - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -3394,7 +3395,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P11] ingest dcim.devicerole with name and new additional attributes - existing object found - update", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.devicerole", Entity: &diodepb.Entity{ @@ -3447,7 +3448,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P12] ingest empty dcim.platform - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.platform", Entity: &diodepb.Entity{ @@ -3465,7 +3466,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P13] ingest dcim.interface with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.interface", Entity: &diodepb.Entity{ @@ -3632,7 +3633,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P13] ingest dcim.interface with name and device - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.interface", Entity: &diodepb.Entity{ @@ -3810,7 +3811,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P13] ingest dcim.interface with name, device and new label - existing object found - update with new label", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.interface", Entity: &diodepb.Entity{ @@ -4006,7 +4007,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P13] ingest empty dcim.interface - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.interface", Entity: &diodepb.Entity{ @@ -4024,7 +4025,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P14] ingest dcim.device with device type and manufacturer - device type and manufacturer objects found - create device with existing device type and manufacturer", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.device", Entity: &diodepb.Entity{ @@ -4185,7 +4186,7 @@ func TestDcimPrepare(t *testing.T) { }, { name: "[P15] ingest dcim.interface with name, mtu, device with site - device exists for platform Arista - create interface with existing device and platform", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "dcim.interface", Entity: &diodepb.Entity{ @@ -4331,10 +4332,11 @@ func TestDcimPrepare(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() mockClient := mocks.NewNetBoxAPI(t) for _, m := range tt.retrieveObjectStates { - mockClient.EXPECT().RetrieveObjectState(context.Background(), netboxdiodeplugin.RetrieveObjectStateQueryParams{ + mockClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ ObjectType: m.objectType, ObjectID: m.objectID, Params: m.queryParams, @@ -4346,7 +4348,7 @@ func TestDcimPrepare(t *testing.T) { }, nil) } - cs, err := changeset.Prepare(tt.ingestEntity, mockClient) + cs, err := differ.Diff(ctx, tt.ingestEntity, mockClient) if tt.wantErr { require.Error(t, err) return diff --git a/diode-server/reconciler/changeset/changeset_ipam_test.go b/diode-server/reconciler/differ/differ_ipam_test.go similarity index 98% rename from diode-server/reconciler/changeset/changeset_ipam_test.go rename to diode-server/reconciler/differ/differ_ipam_test.go index 69c1c6a7..8f044232 100644 --- a/diode-server/reconciler/changeset/changeset_ipam_test.go +++ b/diode-server/reconciler/differ/differ_ipam_test.go @@ -1,4 +1,4 @@ -package changeset_test +package differ_test import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" ) func TestIpamPrepare(t *testing.T) { @@ -24,14 +25,14 @@ func TestIpamPrepare(t *testing.T) { } tests := []struct { name string - ingestEntity changeset.IngestEntity + ingestEntity differ.IngestEntity retrieveObjectStates []mockRetrieveObjectState wantChangeSet changeset.ChangeSet wantErr bool }{ { name: "[P1] ingest ipam.ipaddress with address and interface - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -248,7 +249,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with address and a new interface - existing IP address and interface not found - create an interface and IP address", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -449,7 +450,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with address and a new interface - IP address found assigned to a different interface - create the interface and the IP address", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -698,7 +699,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with assigned interface - existing IP address found assigned a different device - create IP address with a new assigned object (interface)", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -973,7 +974,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with address and interface - existing IP address found with same interface assigned - no update needed", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -1228,7 +1229,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with address only - existing IP address found without interface assigned - no update needed", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -1262,7 +1263,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest ipam.ipaddress with address and new description - existing IP address found - update IP address with new description", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -1540,7 +1541,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P1] ingest empty ipam.ipaddress - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.ipaddress", Entity: &diodepb.Entity{ @@ -1558,7 +1559,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P2] ingest ipam.prefix with prefix only - existing object not found - create prefix and site (placeholder)", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.prefix", Entity: &diodepb.Entity{ @@ -1626,7 +1627,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P2] ingest ipam.prefix with prefix only - existing object and its related objects found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.prefix", Entity: &diodepb.Entity{ @@ -1683,7 +1684,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P2] ingest ipam.prefix with empty site", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.prefix", Entity: &diodepb.Entity{ @@ -1738,7 +1739,7 @@ func TestIpamPrepare(t *testing.T) { }, { name: "[P2] ingest ipam.prefix with prefix and a tag - existing object found - create tag and update prefix", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "ipam.prefix", Entity: &diodepb.Entity{ @@ -1841,10 +1842,11 @@ func TestIpamPrepare(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() mockClient := mocks.NewNetBoxAPI(t) for _, m := range tt.retrieveObjectStates { - mockClient.EXPECT().RetrieveObjectState(context.Background(), netboxdiodeplugin.RetrieveObjectStateQueryParams{ + mockClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ ObjectType: m.objectType, ObjectID: m.objectID, Params: m.queryParams, @@ -1856,7 +1858,7 @@ func TestIpamPrepare(t *testing.T) { }, nil) } - cs, err := changeset.Prepare(tt.ingestEntity, mockClient) + cs, err := differ.Diff(ctx, tt.ingestEntity, mockClient) if tt.wantErr { require.Error(t, err) return diff --git a/diode-server/reconciler/changeset/changeset_virt_test.go b/diode-server/reconciler/differ/differ_virt_test.go similarity index 97% rename from diode-server/reconciler/changeset/changeset_virt_test.go rename to diode-server/reconciler/differ/differ_virt_test.go index faea712f..f6bbee4f 100644 --- a/diode-server/reconciler/changeset/changeset_virt_test.go +++ b/diode-server/reconciler/differ/differ_virt_test.go @@ -1,4 +1,4 @@ -package changeset_test +package differ_test import ( "context" @@ -12,6 +12,7 @@ import ( "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" ) func TestVirtualizationPrepare(t *testing.T) { @@ -24,14 +25,14 @@ func TestVirtualizationPrepare(t *testing.T) { } tests := []struct { name string - ingestEntity changeset.IngestEntity + ingestEntity differ.IngestEntity retrieveObjectStates []mockRetrieveObjectState wantChangeSet changeset.ChangeSet wantErr bool }{ { name: "[P1] ingest virtualization.clustergroup with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustergroup", Entity: &diodepb.Entity{ @@ -73,7 +74,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P1] ingest virtualization.clustergroup with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustergroup", Entity: &diodepb.Entity{ @@ -107,7 +108,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P1] ingest empty virtualization.clustergroup - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustergroup", Entity: &diodepb.Entity{ @@ -125,7 +126,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P2] ingest virtualization.clustertype with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustertype", Entity: &diodepb.Entity{ @@ -167,7 +168,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P2] ingest virtualization.clustertype with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustertype", Entity: &diodepb.Entity{ @@ -201,7 +202,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P2] ingest empty virtualization.clustertype - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.clustertype", Entity: &diodepb.Entity{ @@ -227,7 +228,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P3] ingest virtualization.cluster with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.cluster", Entity: &diodepb.Entity{ @@ -318,7 +319,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P3] ingest virtualization.cluster with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.cluster", Entity: &diodepb.Entity{ @@ -391,7 +392,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P3] ingest empty virtualization.cluster - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.cluster", Entity: &diodepb.Entity{ @@ -409,7 +410,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest virtualization.virtualmachine with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -485,7 +486,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest virtualization.virtualmachine with name and cluster - existing objects not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -632,7 +633,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest virtualization.virtualmachine with name and existing cluster - existing vm not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -769,7 +770,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest virtualization.virtualmachine with name and cluster - existing vm found - create cluster", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -927,7 +928,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest virtualization.virtualmachine with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -1074,7 +1075,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P4] ingest empty virtualization.virtualmachine - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualmachine", Entity: &diodepb.Entity{ @@ -1092,7 +1093,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P5] ingest virtualization.vminterface with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.vminterface", Entity: &diodepb.Entity{ @@ -1197,7 +1198,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P5] ingest virtualization.vminterface with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.vminterface", Entity: &diodepb.Entity{ @@ -1289,7 +1290,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P5] ingest empty virtualization.vminterface - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.vminterface", Entity: &diodepb.Entity{ @@ -1307,7 +1308,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P6] ingest virtualization.virtualdisk with name only - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualdisk", Entity: &diodepb.Entity{ @@ -1412,7 +1413,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P6] ingest virtualization.virtualdisk with name only and no existing site - existing object not found - create", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualdisk", Entity: &diodepb.Entity{ @@ -1528,7 +1529,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P6] ingest virtualization.virtualdisk with name only - existing object found - do nothing", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualdisk", Entity: &diodepb.Entity{ @@ -1607,7 +1608,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, { name: "[P6] ingest empty virtualization.virtualdisk - error", - ingestEntity: changeset.IngestEntity{ + ingestEntity: differ.IngestEntity{ RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", DataType: "virtualization.virtualdisk", Entity: &diodepb.Entity{ @@ -1627,10 +1628,11 @@ func TestVirtualizationPrepare(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() mockClient := mocks.NewNetBoxAPI(t) for _, m := range tt.retrieveObjectStates { - mockClient.EXPECT().RetrieveObjectState(context.Background(), netboxdiodeplugin.RetrieveObjectStateQueryParams{ + mockClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ ObjectType: m.objectType, ObjectID: m.objectID, Params: m.queryParams, @@ -1642,7 +1644,7 @@ func TestVirtualizationPrepare(t *testing.T) { }, nil) } - cs, err := changeset.Prepare(tt.ingestEntity, mockClient) + cs, err := differ.Diff(ctx, tt.ingestEntity, mockClient) if tt.wantErr { require.Error(t, err) return diff --git a/diode-server/reconciler/ingestion_processor.go b/diode-server/reconciler/ingestion_processor.go index 0f23e2ec..01e9b2b2 100644 --- a/diode-server/reconciler/ingestion_processor.go +++ b/diode-server/reconciler/ingestion_processor.go @@ -15,6 +15,7 @@ import ( "github.com/kelseyhightower/envconfig" "github.com/redis/go-redis/v9" "github.com/segmentio/ksuid" + "golang.org/x/time/rate" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" @@ -22,7 +23,9 @@ import ( "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" "github.com/netboxlabs/diode/diode-server/netbox" "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + "github.com/netboxlabs/diode/diode-server/reconciler/applier" "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/differ" "github.com/netboxlabs/diode/diode-server/sentry" ) @@ -54,7 +57,7 @@ type RedisClient interface { // IngestionProcessor processes ingested data type IngestionProcessor struct { - config Config + Config Config logger *slog.Logger hostname string redisClient RedisClient @@ -62,6 +65,14 @@ type IngestionProcessor struct { nbClient netboxdiodeplugin.NetBoxAPI } +// IngestionLogToProcess represents an ingestion log to process +type IngestionLogToProcess struct { + key string + ingestionLog *reconcilerpb.IngestionLog + changeSet *changeset.ChangeSet + errors []error +} + // NewIngestionProcessor creates a new ingestion processor func NewIngestionProcessor(ctx context.Context, logger *slog.Logger) (*IngestionProcessor, error) { var cfg Config @@ -98,7 +109,7 @@ func NewIngestionProcessor(ctx context.Context, logger *slog.Logger) (*Ingestion } component := &IngestionProcessor{ - config: cfg, + Config: cfg, logger: logger, hostname: hostname, redisClient: redisClient, @@ -118,7 +129,7 @@ func (p *IngestionProcessor) Name() string { func (p *IngestionProcessor) Start(ctx context.Context) error { p.logger.Info("starting component", "name", p.Name()) - if p.config.MigrationEnabled { + if p.Config.MigrationEnabled { if err := migrate(ctx, p.logger, p.redisClient); err != nil { return fmt.Errorf("failed to migrate: %v", err) } @@ -186,6 +197,206 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. p.logger.Debug("handling ingest request", "request", ingestReq) + bufCapacity := 100 + + generateIngestionLogChan := make(chan IngestionLogToProcess, bufCapacity) + generateIngestionLogDoneChan := make(chan struct{}) + var applyChangeSetChan chan IngestionLogToProcess + var applyChangeSetDoneChan chan struct{} + + if p.Config.AutoApplyChangesets { + applyChangeSetChan = make(chan IngestionLogToProcess, bufCapacity) + applyChangeSetDoneChan = make(chan struct{}) + } + + p.GenerateChangeSet(ctx, generateIngestionLogChan, applyChangeSetChan, generateIngestionLogDoneChan) + + if p.Config.AutoApplyChangesets { + p.ApplyChangeSet(ctx, applyChangeSetChan, applyChangeSetDoneChan) + } + + createIngestionLogsErrs := p.CreateIngestionLogs(ctx, ingestReq, ingestionTs, generateIngestionLogChan) + if len(createIngestionLogsErrs) > 0 { + errs = append(errs, createIngestionLogsErrs...) + } + + p.redisStreamClient.XAck(ctx, redisStreamID, redisConsumerGroup, msg.ID) + + if len(errs) > 0 { + errsStr := make([]string, 0) + for _, err := range errs { + errsStr = append(errsStr, err.Error()) + } + p.logger.Warn("failed to handle ingest request", slog.String("request_id", ingestReq.Id), slog.Any("errors", errsStr)) + + contextMap := map[string]any{ + "redis_stream_msg_id": msg.ID, + "consumer": fmt.Sprintf("%s-%s", redisConsumerGroup, p.hostname), + "hostname": p.hostname, + } + sentry.CaptureError(fmt.Errorf("failed to handle ingest request: %v", errs), nil, "Ingestion request", contextMap) + } else { + p.redisStreamClient.XDel(ctx, redisStreamID, msg.ID) + } + + return nil +} + +// GenerateChangeSet generates a change set for an ingestion log +func (p *IngestionProcessor) GenerateChangeSet(ctx context.Context, generateChangeSetChan <-chan IngestionLogToProcess, applyChangeSetChan chan<- IngestionLogToProcess, doneChan chan<- struct{}) { + limiter := rate.NewLimiter(rate.Limit(p.Config.ReconcilerRateLimiterRPS), p.Config.ReconcilerRateLimiterBurst) + + go func() { + defer func() { + if applyChangeSetChan != nil { + close(applyChangeSetChan) + } + if doneChan != nil { + doneChan <- struct{}{} + } + }() + + for { + select { + case <-ctx.Done(): + p.logger.Debug("context cancelled", "error", ctx.Err()) + return + case ingestionLog, ok := <-generateChangeSetChan: + if !ok { + return + } + if err := limiter.Wait(ctx); err != nil { + p.logger.Debug("rate limiter wait", "error", err) + return + } + + p.logger.Debug("generating change set", "ingestionLogID", ingestionLog.ingestionLog.GetId()) + + ingestEntity := differ.IngestEntity{ + RequestID: ingestionLog.ingestionLog.GetId(), + DataType: ingestionLog.ingestionLog.GetDataType(), + Entity: ingestionLog.ingestionLog.GetEntity(), + State: int(ingestionLog.ingestionLog.GetState()), + } + + changeSet, err := differ.Diff(ctx, ingestEntity, p.nbClient) + if err != nil { + tags := map[string]string{ + "request_id": ingestEntity.RequestID, + } + contextMap := map[string]any{ + "request_id": ingestEntity.RequestID, + "data_type": ingestEntity.DataType, + } + sentry.CaptureError(err, tags, "Ingest Entity", contextMap) + ingestionLog.errors = append(ingestionLog.errors, fmt.Errorf("failed to prepare change set: %v", err)) + + ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED + ingestionLog.ingestionLog.Error = extractIngestionError(err) + + if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + return + } + + ingestionLog.changeSet = changeSet + + if len(changeSet.ChangeSet) > 0 { + csCompressed, err := compressChangeSet(changeSet) + if err != nil { + ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED + ingestionLog.errors = append(ingestionLog.errors, err) + + if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + return + } + + ingestionLog.ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{ + Id: changeSet.ChangeSetID, + Data: csCompressed, + } + + if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + + if applyChangeSetChan != nil { + applyChangeSetChan <- IngestionLogToProcess{ + key: ingestionLog.key, + ingestionLog: ingestionLog.ingestionLog, + changeSet: ingestionLog.changeSet, + } + } + } else { + ingestionLog.ingestionLog.State = reconcilerpb.State_NO_CHANGES + if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + } + p.logger.Debug("change set generated", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) + } + } + }() +} + +// ApplyChangeSet applies a change set for an ingestion log +func (p *IngestionProcessor) ApplyChangeSet(ctx context.Context, applyChan <-chan IngestionLogToProcess, doneChan chan<- struct{}) { + limiter := rate.NewLimiter(rate.Limit(p.Config.ReconcilerRateLimiterRPS), p.Config.ReconcilerRateLimiterBurst) + + go func() { + defer func() { + if doneChan != nil { + doneChan <- struct{}{} + } + }() + + for { + select { + case <-ctx.Done(): + p.logger.Debug("context cancelled", "error", ctx.Err()) + return + case ingestionLog, ok := <-applyChan: + if !ok { + return + } + if err := limiter.Wait(ctx); err != nil { + p.logger.Debug("rate limiter wait", "error", err) + return + } + + p.logger.Debug("applying change set", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) + + if err := applier.ApplyChangeSet(ctx, p.logger, *ingestionLog.changeSet, p.nbClient); err != nil { + ingestionLog.errors = append(ingestionLog.errors, fmt.Errorf("failed to apply chang eset: %v", err)) + + ingestionLog.ingestionLog.State = reconcilerpb.State_FAILED + ingestionLog.ingestionLog.Error = extractIngestionError(err) + + if _, err = p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + return + } + + ingestionLog.ingestionLog.State = reconcilerpb.State_RECONCILED + if _, err := p.writeIngestionLog(ctx, ingestionLog.key, ingestionLog.ingestionLog); err != nil { + ingestionLog.errors = append(ingestionLog.errors, err) + } + p.logger.Debug("change set applied", "ingestionLogID", ingestionLog.ingestionLog.GetId(), "changeSetID", ingestionLog.changeSet.ChangeSetID) + } + } + }() +} + +// CreateIngestionLogs creates ingestion logs for an ingest request +func (p *IngestionProcessor) CreateIngestionLogs(ctx context.Context, ingestReq *diodepb.IngestRequest, ingestionTs int, generateIngestionLogChan chan<- IngestionLogToProcess) []error { + defer close(generateIngestionLogChan) + + errs := make([]error, 0) + for i, v := range ingestReq.GetEntities() { if v.GetEntity() == nil { errs = append(errs, fmt.Errorf("entity at index %d is nil", i)) @@ -221,75 +432,13 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis. continue } - ingestEntity := changeset.IngestEntity{ - RequestID: ingestReq.GetId(), - DataType: objectType, - Entity: v, - State: int(reconcilerpb.State_QUEUED), - } - - changeSet, err := p.reconcileEntity(ctx, ingestEntity) - if err != nil { - errs = append(errs, err) - - ingestionLog.State = reconcilerpb.State_FAILED - ingestionLog.Error = extractIngestionError(err) - - if changeSet != nil { - ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} - csCompressed, err := compressChangeSet(changeSet) - if err != nil { - errs = append(errs, err) - } else { - ingestionLog.ChangeSet.Data = csCompressed - } - } - - if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil { - errs = append(errs, err) - } - continue - } - - if changeSet != nil { - ingestionLog.State = reconcilerpb.State_RECONCILED - ingestionLog.ChangeSet = &reconcilerpb.ChangeSet{Id: changeSet.ChangeSetID} - csCompressed, err := compressChangeSet(changeSet) - if err != nil { - errs = append(errs, err) - } else { - ingestionLog.ChangeSet.Data = csCompressed - } - } else { - ingestionLog.State = reconcilerpb.State_NO_CHANGES - } - - if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil { - errs = append(errs, fmt.Errorf("failed to write JSON: %v", err)) - continue + generateIngestionLogChan <- IngestionLogToProcess{ + key: key, + ingestionLog: ingestionLog, } } - p.redisStreamClient.XAck(ctx, redisStreamID, redisConsumerGroup, msg.ID) - - if len(errs) > 0 { - errsStr := make([]string, 0) - for _, err := range errs { - errsStr = append(errsStr, err.Error()) - } - p.logger.Warn("failed to handle ingest request", slog.String("request_id", ingestReq.Id), slog.Any("errors", errsStr)) - - contextMap := map[string]any{ - "redis_stream_msg_id": msg.ID, - "consumer": fmt.Sprintf("%s-%s", redisConsumerGroup, p.hostname), - "hostname": p.hostname, - } - sentry.CaptureError(fmt.Errorf("failed to handle ingest request: %v", errs), nil, "Ingestion request", contextMap) - } else { - p.redisStreamClient.XDel(ctx, redisStreamID, msg.ID) - } - - return nil + return errs } func extractIngestionError(err error) *reconcilerpb.IngestionError { @@ -309,51 +458,6 @@ func extractIngestionError(err error) *reconcilerpb.IngestionError { return ingestionErr } -func (p *IngestionProcessor) reconcileEntity(ctx context.Context, ingestEntity changeset.IngestEntity) (*changeset.ChangeSet, error) { - cs, err := changeset.Prepare(ingestEntity, p.nbClient) - if err != nil { - tags := map[string]string{ - "request_id": ingestEntity.RequestID, - } - contextMap := map[string]any{ - "request_id": ingestEntity.RequestID, - "data_type": ingestEntity.DataType, - } - sentry.CaptureError(err, tags, "Ingest Entity", contextMap) - return nil, fmt.Errorf("failed to prepare change set: %v", err) - } - - if len(cs.ChangeSet) == 0 { - p.logger.Debug("no changes to apply", "request_id", ingestEntity.RequestID) - return nil, nil - } - - changes := make([]netboxdiodeplugin.Change, 0) - for _, change := range cs.ChangeSet { - changes = append(changes, netboxdiodeplugin.Change{ - ChangeID: change.ChangeID, - ChangeType: change.ChangeType, - ObjectType: change.ObjectType, - ObjectID: change.ObjectID, - ObjectVersion: change.ObjectVersion, - Data: change.Data, - }) - } - - req := netboxdiodeplugin.ChangeSetRequest{ - ChangeSetID: cs.ChangeSetID, - ChangeSet: changes, - } - - resp, err := p.nbClient.ApplyChangeSet(ctx, req) - if err != nil { - return cs, err - } - - p.logger.Debug("apply change set response", "response", resp) - return cs, nil -} - func (p *IngestionProcessor) writeIngestionLog(ctx context.Context, key string, ingestionLog *reconcilerpb.IngestionLog) ([]byte, error) { ingestionLogJSON, err := protojson.Marshal(ingestionLog) if err != nil { @@ -378,16 +482,16 @@ func normalizeIngestionLog(l []byte) []byte { func compressChangeSet(cs *changeset.ChangeSet) ([]byte, error) { csJSON, err := json.Marshal(cs) if err != nil { - return nil, fmt.Errorf("failed to marshal changeset JSON: %v", err) + return nil, fmt.Errorf("failed to marshal change set JSON: %v", err) } var brotliBuf bytes.Buffer brotliWriter := brotli.NewWriter(&brotliBuf) if _, err = brotliWriter.Write(csJSON); err != nil { - return nil, fmt.Errorf("failed to compress changeset: %v", err) + return nil, fmt.Errorf("failed to compress change set: %v", err) } if err = brotliWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to compress changeset: %v", err) + return nil, fmt.Errorf("failed to compress change set: %v", err) } return brotliBuf.Bytes(), nil diff --git a/diode-server/reconciler/ingestion_processor_internal_test.go b/diode-server/reconciler/ingestion_processor_internal_test.go index 82331c19..06a9cf2c 100644 --- a/diode-server/reconciler/ingestion_processor_internal_test.go +++ b/diode-server/reconciler/ingestion_processor_internal_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "log/slog" "os" @@ -13,6 +14,7 @@ import ( "github.com/andybalholm/brotli" "github.com/redis/go-redis/v9" + "github.com/segmentio/ksuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -81,6 +83,11 @@ func TestWriteIngestionLog(t *testing.T) { p := &IngestionProcessor{ redisClient: mockRedisClient, logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})), + Config: Config{ + AutoApplyChangesets: true, + ReconcilerRateLimiterRPS: 20, + ReconcilerRateLimiterBurst: 1, + }, } // Set up the mock expectation @@ -107,111 +114,6 @@ func TestWriteIngestionLog(t *testing.T) { } } -func TestReconcileEntity(t *testing.T) { - tests := []struct { - name string - retrieveObjectStateErr error - applyErr error - expectedError bool - expectedCS *changeset.ChangeSet - }{ - { - name: "successful reconciliation", - expectedError: false, - expectedCS: &changeset.ChangeSet{ - ChangeSetID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b5", - ChangeSet: []changeset.Change{ - { - ChangeID: "5663a77e-9bad-4981-afe9-77d8a9f2b8b5", - ChangeType: changeset.ChangeTypeCreate, - ObjectType: "dcim.site", - ObjectID: nil, - ObjectVersion: nil, - Data: &netbox.DcimSite{ - Name: "Site A", - Slug: "site-a", - Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))), - }, - }, - }, - }, - }, - { - name: "prepare error", - retrieveObjectStateErr: errors.New("prepare error"), - expectedError: true, - }, - { - name: "apply error", - expectedCS: &changeset.ChangeSet{ - ChangeSetID: "cs123", - ChangeSet: []changeset.Change{}, - }, - applyErr: errors.New("apply error"), - expectedError: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - - // Mock nbClient - mockNbClient := new(mnp.NetBoxAPI) - logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) - // Create IngestionProcessor - p := &IngestionProcessor{ - nbClient: mockNbClient, - logger: logger, - } - - // Setup mock for RetrieveObjectState - if tt.retrieveObjectStateErr != nil { - mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(&netboxdiodeplugin.ObjectState{}, tt.retrieveObjectStateErr) - } else { - - mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(&netboxdiodeplugin.ObjectState{ObjectType: "dcim.site", - ObjectID: 0, - ObjectChangeID: 0, - Object: &netbox.DcimSiteDataWrapper{ - Site: nil, - }}, nil) - } - // Setup mock for ApplyChangeSet - if tt.expectedCS != nil { - mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(&netboxdiodeplugin.ChangeSetResponse{}, tt.applyErr) - } - - // Call reconcileEntity - ingestEntity := changeset.IngestEntity{ - RequestID: "cfa0f129-125c-440d-9e41-e87583cd7d89", - DataType: "dcim.site", - Entity: &diodepb.Entity{ - Entity: &diodepb.Entity_Site{ - Site: &diodepb.Site{ - Name: "Site A", - }, - }, - }, - } - - cs, err := p.reconcileEntity(ctx, ingestEntity) - - if tt.expectedError { - require.Error(t, err) - } else { - require.NoError(t, err) - require.Equal(t, tt.expectedCS.ChangeSet[0].ChangeType, cs.ChangeSet[0].ChangeType) - require.Equal(t, tt.expectedCS.ChangeSet[0].ObjectType, cs.ChangeSet[0].ObjectType) - require.Equal(t, tt.expectedCS.ChangeSet[0].Data, cs.ChangeSet[0].Data) - } - - // Assert expectations - mockNbClient.AssertExpectations(t) - }) - } -} - func TestHandleStreamMessage(t *testing.T) { tests := []struct { name string @@ -340,6 +242,11 @@ func TestHandleStreamMessage(t *testing.T) { redisClient: mockRedisClient, redisStreamClient: mockRedisStreamClient, logger: logger, + Config: Config{ + AutoApplyChangesets: true, + ReconcilerRateLimiterRPS: 20, + ReconcilerRateLimiterBurst: 1, + }, } request := redis.XMessage{} @@ -449,6 +356,11 @@ func TestConsumeIngestionStream(t *testing.T) { p := &IngestionProcessor{ redisStreamClient: mockRedisClient, logger: logger, + Config: Config{ + AutoApplyChangesets: true, + ReconcilerRateLimiterRPS: 20, + ReconcilerRateLimiterBurst: 1, + }, } err := p.consumeIngestionStream(ctx, "test-stream", "test-group", "test-consumer") @@ -526,3 +438,146 @@ func TestCompressChangeSet(t *testing.T) { require.Equal(t, csJSON, decodedOutput.Bytes()) require.Contains(t, decodedOutput.String(), "5663a77e-9bad-4981-afe9-77d8a9f2b8b5") } + +func TestIngestionProcessor_GenerateAndApplyChangeSet(t *testing.T) { + tests := []struct { + name string + ingestionLog *reconcilerpb.IngestionLog + mockRetrieveObjectStateResponse *netboxdiodeplugin.ObjectState + mockApplyChangeSetResponse *netboxdiodeplugin.ChangeSetResponse + autoApplyChangesets bool + expectedStatus reconcilerpb.State + expectedError bool + }{ + { + name: "generate and apply change set", + ingestionLog: &reconcilerpb.IngestionLog{ + Id: ksuid.New().String(), + RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", + ProducerAppName: "test-app", + ProducerAppVersion: "0.1.0", + SdkName: "diode-sdk-go", + SdkVersion: "0.2.0", + DataType: "dcim.site", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "Site A", + }, + }, + }, + IngestionTs: time.Now().UnixNano(), + State: reconcilerpb.State_QUEUED, + }, + mockRetrieveObjectStateResponse: &netboxdiodeplugin.ObjectState{ + ObjectType: "dcim.site", + ObjectID: 0, + Object: &netbox.DcimSiteDataWrapper{ + Site: nil, + }, + }, + mockApplyChangeSetResponse: &netboxdiodeplugin.ChangeSetResponse{ + ChangeSetID: "00000000-0000-0000-0000-000000000000", + Result: "success", + }, + autoApplyChangesets: true, + expectedStatus: reconcilerpb.State_RECONCILED, + expectedError: false, + }, + { + name: "generate change set only", + ingestionLog: &reconcilerpb.IngestionLog{ + Id: ksuid.New().String(), + RequestId: "cfa0f129-125c-440d-9e41-e87583cd7d89", + ProducerAppName: "test-app", + ProducerAppVersion: "0.1.0", + SdkName: "diode-sdk-go", + SdkVersion: "0.2.0", + DataType: "dcim.site", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "Site A", + }, + }, + }, + IngestionTs: time.Now().UnixNano(), + State: reconcilerpb.State_QUEUED, + }, + mockRetrieveObjectStateResponse: &netboxdiodeplugin.ObjectState{ + ObjectType: "dcim.site", + ObjectID: 0, + Object: &netbox.DcimSiteDataWrapper{ + Site: nil, + }, + }, + autoApplyChangesets: false, + expectedStatus: reconcilerpb.State_QUEUED, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + mockRedisClient := new(mr.RedisClient) + mockNbClient := new(mnp.NetBoxAPI) + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + p := &IngestionProcessor{ + redisClient: mockRedisClient, + nbClient: mockNbClient, + logger: logger, + Config: Config{ + AutoApplyChangesets: tt.autoApplyChangesets, + ReconcilerRateLimiterRPS: 20, + ReconcilerRateLimiterBurst: 1, + }, + } + + // Set up the mock expectation + cmd := redis.NewCmd(ctx) + if tt.expectedError { + cmd.SetErr(errors.New("error")) + } + redisKey := fmt.Sprintf("ingest-entity:%s-%d-%s", tt.ingestionLog.DataType, tt.ingestionLog.IngestionTs, tt.ingestionLog.Id) + mockRedisClient.On("Do", ctx, "JSON.SET", redisKey, "$", mock.Anything). + Return(cmd) + + mockNbClient.On("RetrieveObjectState", ctx, mock.Anything).Return(tt.mockRetrieveObjectStateResponse, nil) + if tt.autoApplyChangesets { + mockNbClient.On("ApplyChangeSet", ctx, mock.Anything).Return(tt.mockApplyChangeSetResponse, nil) + } + + bufCapacity := 1 + + generateChangeSetChannel := make(chan IngestionLogToProcess, bufCapacity) + var applyChangeSetChannel chan IngestionLogToProcess + if tt.autoApplyChangesets { + applyChangeSetChannel = make(chan IngestionLogToProcess, bufCapacity) + } + generateChangeSetDone := make(chan struct{}) + applyChangeSetDone := make(chan struct{}) + + p.GenerateChangeSet(ctx, generateChangeSetChannel, applyChangeSetChannel, generateChangeSetDone) + if tt.autoApplyChangesets { + p.ApplyChangeSet(ctx, applyChangeSetChannel, applyChangeSetDone) + } + + generateChangeSetChannel <- IngestionLogToProcess{ + key: redisKey, + ingestionLog: tt.ingestionLog, + } + close(generateChangeSetChannel) + + <-generateChangeSetDone + if tt.autoApplyChangesets { + <-applyChangeSetDone + } + + mockRedisClient.AssertExpectations(t) + require.NotNil(t, tt.ingestionLog.ChangeSet) + require.Equal(t, tt.expectedStatus, tt.ingestionLog.State) + }) + } +} diff --git a/diode-server/reconciler/migration_test.go b/diode-server/reconciler/migration_test.go index c501fb65..06f1c91a 100644 --- a/diode-server/reconciler/migration_test.go +++ b/diode-server/reconciler/migration_test.go @@ -41,6 +41,11 @@ func TestMigrate(t *testing.T) { processor := &IngestionProcessor{ redisClient: mockRedisClient, logger: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})), + Config: Config{ + AutoApplyChangesets: true, + ReconcilerRateLimiterRPS: 20, + ReconcilerRateLimiterBurst: 1, + }, } ctx := context.Background() diff --git a/diode-server/reconciler/reconciler_test.go b/diode-server/reconciler/reconciler_test.go index 70c9bd4d..72758fa8 100644 --- a/diode-server/reconciler/reconciler_test.go +++ b/diode-server/reconciler/reconciler_test.go @@ -37,6 +37,9 @@ func setupEnv(redisAddr string) { _ = os.Setenv("NETBOX_TO_DIODE_API_KEY", "netbox_to_diode_api_key") _ = os.Setenv("DIODE_API_KEY", "diode_api_key") _ = os.Setenv("INGESTER_TO_RECONCILER_API_KEY", "ingester_to_reconciler_api_key") + _ = os.Setenv("AUTO_APPLY_CHANGESETS", "true") + _ = os.Setenv("RECONCILER_RATE_LIMITER_RPS", "20") + _ = os.Setenv("RECONCILER_RATE_LIMITER_BURST", "1") } func teardownEnv() { @@ -52,4 +55,7 @@ func teardownEnv() { _ = os.Unsetenv("NETBOX_TO_DIODE_API_KEY") _ = os.Unsetenv("DIODE_API_KEY") _ = os.Unsetenv("INGESTER_TO_RECONCILER_API_KEY") + _ = os.Unsetenv("AUTO_APPLY_CHANGESETS") + _ = os.Unsetenv("RECONCILER_RATE_LIMITER_RPS") + _ = os.Unsetenv("RECONCILER_RATE_LIMITER_BURST") } diff --git a/tests/features/steps/utils.py b/tests/features/steps/utils.py index 3e015c5a..6e7e17fc 100644 --- a/tests/features/steps/utils.py +++ b/tests/features/steps/utils.py @@ -44,6 +44,8 @@ def send_get_request(endpoint, params=None): def send_delete_request(endpoint, id): """Send a request to the API with the given endpoint and headers. Return the response.""" + if endpoint.endswith("/"): + endpoint = endpoint[:-1] try: response = requests.delete(f"{api_root_path}/{endpoint}/{id}/", headers=headers) except Exception as e: @@ -99,7 +101,7 @@ def ingester(entities): """Ingest the site object using the Diode SDK""" api_key = str(configs["api_key"]) with DiodeClient( - target="grpc://localhost:8081", + target="grpc://localhost:8080/diode", app_name="my-test-app", app_version="0.0.1", api_key=api_key,