Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor reconciler svc #192

Merged
merged 8 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion diode-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ Edit the `.env` to match your environment:
* `NETBOX_TO_DIODE_API_KEY`: API key generated with the Diode NetBox plugin installation
* `INGESTER_TO_RECONCILER_API_KEY`: API key to authorize RPC calls between the Ingester and Reconciler services (at
least 40 characters, example generation with shell command: `openssl rand -base64 40 | head -c 40`)
* `MIGRATION_ENABLED`: Set to `false` to disable the migration, default is `true`
* `MIGRATION_ENABLED`: Set to `false` to disable the migration, default: `true`
* `RECONCILER_RATE_LIMITER_RPS`: Rate limit for the reconciler service for generating and applying change sets concurrently, default: `20`
* `RECONCILER_RATE_LIMITER_BURST`: Burst limit for the reconciler service for generating and applying change sets concurrently, default: `1`

### Running the Diode server

Expand Down
2 changes: 1 addition & 1 deletion diode-server/docker/docker-compose.netbox.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
netbox: &netbox
image: netboxcommunity/netbox:v4.0-2.9.1-diode
image: netboxcommunity/netbox:v4.1-3.0.1-diode
build:
context: .
dockerfile: netbox/Dockerfile-diode
Expand Down
2 changes: 2 additions & 0 deletions diode-server/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ services:
- LOGGING_LEVEL=${LOGGING_LEVEL}
- SENTRY_DSN=${SENTRY_DSN}
- MIGRATION_ENABLED=${MIGRATION_ENABLED}
- RECONCILER_RATE_LIMITER_RPS=${RECONCILER_RATE_LIMITER_RPS}
- RECONCILER_RATE_LIMITER_BURST=${RECONCILER_RATE_LIMITER_BURST}
restart: always
ports: [ ]
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion diode-server/docker/netbox/Dockerfile-diode
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM netboxcommunity/netbox:v4.0-2.9.1
FROM netboxcommunity/netbox:v4.1-3.0.1

COPY ./netbox/configuration/ /etc/netbox/config/
RUN chmod 755 /etc/netbox/config/* && \
Expand Down
2 changes: 2 additions & 0 deletions diode-server/docker/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ INGESTER_TO_RECONCILER_API_KEY=sXjJZe6BBzVuovrVyyH4Q3vbceqvDwh2kC3DRpML
LOGGING_LEVEL=DEBUG
SENTRY_DSN=
MIGRATION_ENABLED=true
RECONCILER_RATE_LIMITER_RPS=20
RECONCILER_RATE_LIMITER_BURST=1
1 change: 1 addition & 0 deletions diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/redis/go-redis/v9 v9.5.1
github.com/segmentio/ksuid v1.0.4
github.com/stretchr/testify v1.9.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)
Expand Down
2 changes: 2 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
Expand Down
38 changes: 38 additions & 0 deletions diode-server/reconciler/applier/applier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package applier

import (
"context"
"log/slog"

"github.com/netboxlabs/diode/diode-server/netboxdiodeplugin"
"github.com/netboxlabs/diode/diode-server/reconciler/changeset"
)

// ApplyChangeSet applies a change set to NetBox
func ApplyChangeSet(ctx context.Context, logger *slog.Logger, cs changeset.ChangeSet, nbClient netboxdiodeplugin.NetBoxAPI) error {
changes := make([]netboxdiodeplugin.Change, 0)
for _, change := range cs.ChangeSet {
changes = append(changes, netboxdiodeplugin.Change{
ChangeID: change.ChangeID,
ChangeType: change.ChangeType,
ObjectType: change.ObjectType,
ObjectID: change.ObjectID,
ObjectVersion: change.ObjectVersion,
Data: change.Data,
})
}

req := netboxdiodeplugin.ChangeSetRequest{
ChangeSetID: cs.ChangeSetID,
ChangeSet: changes,
}

resp, err := nbClient.ApplyChangeSet(ctx, req)
if err != nil {
return err
}

logger.Debug("apply change set response", "response", resp)

return nil
}
69 changes: 69 additions & 0 deletions diode-server/reconciler/applier/applier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package applier_test

import (
"context"
"log/slog"
"os"
"testing"

"github.com/stretchr/testify/assert"

"github.com/netboxlabs/diode/diode-server/netbox"
"github.com/netboxlabs/diode/diode-server/netboxdiodeplugin"
nbClientMock "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks"
"github.com/netboxlabs/diode/diode-server/reconciler/applier"
"github.com/netboxlabs/diode/diode-server/reconciler/changeset"
)

func TestApplyChangeSet(t *testing.T) {
ctx := context.Background()
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))

mockNetBoxAPI := new(nbClientMock.NetBoxAPI)
cs := changeset.ChangeSet{
ChangeSetID: "00000000-0000-0000-0000-000000000000",
ChangeSet: []changeset.Change{
{
ChangeID: "00000000-0000-0000-0000-000000000001",
ChangeType: "create",
ObjectType: "dcim.site",
Data: &netbox.DcimSite{
Name: "Site A",
Slug: "site-a",
Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))),
},
},
},
}

req := netboxdiodeplugin.ChangeSetRequest{
ChangeSetID: "00000000-0000-0000-0000-000000000000",
ChangeSet: []netboxdiodeplugin.Change{
{
ChangeID: "00000000-0000-0000-0000-000000000001",
ChangeType: "create",
ObjectType: "dcim.site",
ObjectID: nil,
ObjectVersion: nil,
Data: &netbox.DcimSite{
Name: "Site A",
Slug: "site-a",
Status: (*netbox.DcimSiteStatus)(strPtr(string(netbox.DcimSiteStatusActive))),
},
},
},
}

resp := &netboxdiodeplugin.ChangeSetResponse{
ChangeSetID: "00000000-0000-0000-0000-000000000000",
Result: "success",
}

mockNetBoxAPI.On("ApplyChangeSet", ctx, req).Return(resp, nil)

err := applier.ApplyChangeSet(ctx, logger, cs, mockNetBoxAPI)
assert.NoError(t, err)
mockNetBoxAPI.AssertExpectations(t)
}

func strPtr(s string) *string { return &s }
183 changes: 0 additions & 183 deletions diode-server/reconciler/changeset/changeset.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,5 @@
package changeset

import (
"context"
"fmt"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb"
"github.com/netboxlabs/diode/diode-server/netbox"
"github.com/netboxlabs/diode/diode-server/netboxdiodeplugin"
)

const (
// ChangeTypeCreate is the change type for a creation
ChangeTypeCreate = "create"
Expand All @@ -20,22 +8,6 @@ const (
ChangeTypeUpdate = "update"
)

// IngestEntity represents an ingest entity
type IngestEntity struct {
RequestID string `json:"request_id"`
DataType string `json:"data_type"`
Entity any `json:"entity"`
State int `json:"state"`
}

// ObjectState represents a object state
type ObjectState struct {
ObjectID int `json:"object_id"`
ObjectType string `json:"object_type"`
ObjectChangeID int `json:"object_change_id"`
Object any `json:"object"`
}

// ChangeSet represents a change set
type ChangeSet struct {
ChangeSetID string `json:"change_set_id"`
Expand All @@ -51,158 +23,3 @@ type Change struct {
ObjectVersion *int `json:"object_version,omitempty"`
Data any `json:"data"`
}

// Prepare prepares a change set
func Prepare(entity IngestEntity, netboxAPI netboxdiodeplugin.NetBoxAPI) (*ChangeSet, error) {
// extract ingested entity (actual)
actual, err := extractIngestEntityData(entity)
if err != nil {
return nil, err
}

// get root object and all its nested objects (actual)
actualNestedObjects, err := actual.NestedObjects()
if err != nil {
return nil, err
}

// map out root object and all its nested objects (actual)
actualNestedObjectsMap := make(map[string]netbox.ComparableData)
for _, obj := range actualNestedObjects {
actualNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = obj
}

// retrieve root object all its nested objects from NetBox (intended)
intendedNestedObjectsMap := make(map[string]netbox.ComparableData)
for _, obj := range actualNestedObjects {
intended, err := retrieveObjectState(netboxAPI, obj)
if err != nil {
return nil, err
}
intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())] = intended
}

// map out retrieved root object and all its nested objects (current)
var current netbox.ComparableData
for _, obj := range actualNestedObjects {
if obj.DataType() == entity.DataType {
current = intendedNestedObjectsMap[fmt.Sprintf("%p", obj.Data())]
break
}
}

objectsToReconcile, err := actual.Patch(current, intendedNestedObjectsMap)
if err != nil {
return nil, err
}

// process objectsToReconcile and prepare changeset to return
changes := make([]Change, 0)

for _, obj := range objectsToReconcile {
operation := ChangeTypeCreate
var objectID *int

id := obj.ID()
if id > 0 {
objectID = &id
operation = ChangeTypeUpdate
}

changes = append(changes, Change{
ChangeID: uuid.NewString(),
ChangeType: operation,
ObjectType: obj.DataType(),
ObjectID: objectID,
ObjectVersion: nil,
Data: obj.Data(),
})
}

return &ChangeSet{ChangeSetID: uuid.NewString(), ChangeSet: changes}, nil
}

func retrieveObjectState(netboxAPI netboxdiodeplugin.NetBoxAPI, change netbox.ComparableData) (netbox.ComparableData, error) {
params := netboxdiodeplugin.RetrieveObjectStateQueryParams{
ObjectID: 0,
ObjectType: change.DataType(),
Params: change.ObjectStateQueryParams(),
}
resp, err := netboxAPI.RetrieveObjectState(context.Background(), params)
if err != nil {
return nil, err
}

if resp.Object.IsValid() {
objectState := &ObjectState{
ObjectID: resp.ObjectID,
ObjectType: change.DataType(),
ObjectChangeID: resp.ObjectChangeID,
Object: resp.Object,
}

return extractNetBoxObjectStateData(*objectState)
}

return nil, nil
}

func extractIngestEntityData(ingestEntity IngestEntity) (netbox.ComparableData, error) {
if ingestEntity.Entity == nil {
return nil, fmt.Errorf("ingest entity is nil")
}

dw, err := netbox.NewDataWrapper(ingestEntity.DataType)
if err != nil {
return nil, err
}

protoEntity, ok := ingestEntity.Entity.(*diodepb.Entity)
if !ok {
return nil, fmt.Errorf("ingest entity is not a proto entity")
}

if err = dw.FromProtoEntity(protoEntity); err != nil {
return nil, err
}

if !dw.IsValid() {
return nil, fmt.Errorf("invalid ingest entity")
}

return dw, nil
}

func extractNetBoxObjectStateData(obj ObjectState) (netbox.ComparableData, error) {
if obj.Object == nil {
return nil, fmt.Errorf("object state is nil")
}

dw, err := netbox.NewDataWrapper(obj.ObjectType)
if err != nil {
return nil, err
}

decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Result: &dw,
MatchName: netbox.IpamIPAddressAssignedObjectMatchName,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
netbox.IpamIPAddressAssignedObjectHookFunc(),
),
})
if err != nil {
return nil, err
}

if err := decoder.Decode(obj.Object); err != nil {
return nil, fmt.Errorf("failed to decode object entity %w", err)
}

if !dw.IsValid() {
return nil, fmt.Errorf("invalid object state")
}

dw.Normalise()

return dw, nil
}
17 changes: 10 additions & 7 deletions diode-server/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package reconciler

// Config is the configuration for the reconciler service
type Config struct {
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"`
RedisPort string `envconfig:"REDIS_PORT" default:"6379"`
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"`
RedisDB int `envconfig:"REDIS_DB" default:"0"`
RedisStreamDB int `envconfig:"REDIS_STREAM_DB" default:"1"`
MigrationEnabled bool `envconfig:"MIGRATION_ENABLED" default:"true"`
GRPCPort int `envconfig:"GRPC_PORT" default:"8081"`
RedisHost string `envconfig:"REDIS_HOST" default:"127.0.0.1"`
RedisPort string `envconfig:"REDIS_PORT" default:"6379"`
RedisPassword string `envconfig:"REDIS_PASSWORD" required:"true"`
RedisDB int `envconfig:"REDIS_DB" default:"0"`
RedisStreamDB int `envconfig:"REDIS_STREAM_DB" default:"1"`
MigrationEnabled bool `envconfig:"MIGRATION_ENABLED" default:"true"`
AutoApplyChangesets bool `envconfig:"AUTO_APPLY_CHANGESETS" default:"true"`
ReconcilerRateLimiterRPS int `envconfig:"RECONCILER_RATE_LIMITER_RPS" default:"20"`
ReconcilerRateLimiterBurst int `envconfig:"RECONCILER_RATE_LIMITER_BURST" default:"1"`

// API keys
DiodeToNetBoxAPIKey string `envconfig:"DIODE_TO_NETBOX_API_KEY" required:"true"`
Expand Down
Loading
Loading