Skip to content

Commit

Permalink
Add router data model chaos test. Fixes #2550
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Dec 5, 2024
1 parent 5644138 commit 83d73eb
Show file tree
Hide file tree
Showing 20 changed files with 2,313 additions and 341 deletions.
943 changes: 616 additions & 327 deletions common/pb/edge_ctrl_pb/edge_ctrl.pb.go

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions common/pb/edge_ctrl_pb/edge_ctrl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ enum ContentType {
DataStateType = 20500;
DataStateChangeSetType = 20501;
UpdateTokenType = 20502;
ValidateDataStateRequestType = 20503;
ValidateDataStateResponseType = 20504;
}

enum SessionType {
Expand Down Expand Up @@ -572,4 +574,22 @@ message ConnectEvents {

repeated IdentityConnectEvents events = 1;
bool fullState = 2;
}

message RouterDataModelValidateRequest {
DataState state = 1;
bool fix = 2;
}

message RouterDataModelDiff {
string entityType = 1;
string entityId = 2;
string diffType = 3;
string detail = 4;
}

message RouterDataModelValidateResponse {
map<string, uint32> origEntityCounts = 1;
map<string, uint32> copyEntityCounts = 2;
repeated RouterDataModelDiff diffs = 3;
}
16 changes: 16 additions & 0 deletions common/pb/edge_ctrl_pb/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package edge_ctrl_pb

import (
"fmt"
)

func (x *DataState_Identity) GetServiceConfigsAsMap() map[string]map[string]string {
if x.ServiceConfigs == nil {
return nil
Expand All @@ -32,3 +36,15 @@ func (x *DataState_Identity) GetServiceConfigsAsMap() map[string]map[string]stri

return result
}

func (request *RouterDataModelValidateRequest) GetContentType() int32 {
return int32(ContentType_ValidateDataStateRequestType)
}

func (request *RouterDataModelValidateResponse) GetContentType() int32 {
return int32(ContentType_ValidateDataStateResponseType)
}

func (diff *RouterDataModelDiff) ToDetail() string {
return fmt.Sprintf("%s id: %s %s: %s", diff.EntityType, diff.EntityId, diff.DiffType, diff.Detail)
}
152 changes: 152 additions & 0 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package common

import (
"bytes"
"compress/gzip"
"crypto"
"crypto/x509"
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
Expand Down Expand Up @@ -116,6 +118,21 @@ type RouterDataModel struct {
stopped atomic.Bool
}

// NewBareRouterDataModel creates a new RouterDataModel that is expected to have no buffers, listeners or subscriptions
func NewBareRouterDataModel() *RouterDataModel {
return &RouterDataModel{
EventCache: NewForgetfulEventCache(),
ConfigTypes: cmap.New[*ConfigType](),
Configs: cmap.New[*Config](),
Identities: cmap.New[*Identity](),
Services: cmap.New[*Service](),
ServicePolicies: cmap.New[*ServicePolicy](),
PostureChecks: cmap.New[*PostureCheck](),
PublicKeys: cmap.New[*edge_ctrl_pb.DataState_PublicKey](),
Revocations: cmap.New[*edge_ctrl_pb.DataState_Revocation](),
}
}

// NewSenderRouterDataModel creates a new RouterDataModel that will store events in a circular buffer of
// logSize. listenerBufferSize affects the buffer size of channels returned to listeners of the data model.
func NewSenderRouterDataModel(logSize uint64, listenerBufferSize uint) *RouterDataModel {
Expand Down Expand Up @@ -156,6 +173,30 @@ func NewReceiverRouterDataModel(listenerBufferSize uint, closeNotify <-chan stru
return result
}

// NewReceiverRouterDataModel creates a new RouterDataModel that does not store events. listenerBufferSize affects the
// buffer size of channels returned to listeners of the data model.
func NewReceiverRouterDataModelFromExisting(existing *RouterDataModel, listenerBufferSize uint, closeNotify <-chan struct{}) *RouterDataModel {
result := &RouterDataModel{
EventCache: NewForgetfulEventCache(),
ConfigTypes: existing.ConfigTypes,
Configs: existing.Configs,
Identities: existing.Identities,
Services: existing.Services,
ServicePolicies: existing.ServicePolicies,
PostureChecks: existing.PostureChecks,
PublicKeys: existing.PublicKeys,
CachedPublicKeys: existing.CachedPublicKeys,
Revocations: existing.Revocations,
listenerBufferSize: listenerBufferSize,
subscriptions: cmap.New[*IdentitySubscription](),
events: make(chan subscriberEvent),
closeNotify: closeNotify,
stopNotify: make(chan struct{}),
}
go result.processSubscriberEvents()
return result
}

// NewReceiverRouterDataModelFromFile creates a new RouterDataModel that does not store events and is initialized from
// a file backup. listenerBufferSize affects the buffer size of channels returned to listeners of the data model.
func NewReceiverRouterDataModelFromFile(path string, listenerBufferSize uint, closeNotify <-chan struct{}) (*RouterDataModel, error) {
Expand Down Expand Up @@ -516,6 +557,14 @@ func (rdm *RouterDataModel) GetPublicKeys() map[string]crypto.PublicKey {
return rdm.CachedPublicKeys.Load()
}

func (rdm *RouterDataModel) getPublicKeysAsCmap() cmap.ConcurrentMap[string, crypto.PublicKey] {
m := cmap.New[crypto.PublicKey]()
for k, v := range rdm.CachedPublicKeys.Load() {
m.Set(k, v)
}
return m
}

func (rdm *RouterDataModel) recalculateCachedPublicKeys() {
publicKeys := map[string]crypto.PublicKey{}
rdm.PublicKeys.IterCb(func(kid string, pubKey *edge_ctrl_pb.DataState_PublicKey) {
Expand Down Expand Up @@ -924,3 +973,106 @@ func (rdm *RouterDataModel) loadIdentityConfig(configId string, log *logrus.Entr
ConfigType: configType,
}
}

func (rdm *RouterDataModel) GetEntityCounts() map[string]uint32 {
result := map[string]uint32{
"configType": uint32(rdm.ConfigTypes.Count()),
"configs": uint32(rdm.Configs.Count()),
"identities": uint32(rdm.Identities.Count()),
"services": uint32(rdm.Services.Count()),
"service-policies": uint32(rdm.ServicePolicies.Count()),
"posture-checks": uint32(rdm.PostureChecks.Count()),
"public-keys": uint32(rdm.PublicKeys.Count()),
"revocations": uint32(rdm.Revocations.Count()),
"cached-public-keys": uint32(rdm.getPublicKeysAsCmap().Count()),
}
return result
}

type DiffType string

const (
DiffTypeAdd = "added"
DiffTypeMod = "modified"
DiffTypeSub = "removed"
)

type DiffSink func(entityType string, id string, diffType DiffType, detail string)

func (rdm *RouterDataModel) Diff(o *RouterDataModel, sink DiffSink) {
if o == nil {
sink("router-data-model", "root", DiffTypeSub, "router data model not present")
return
}

diffType("configType", rdm.ConfigTypes, o.ConfigTypes, sink)
diffType("config", rdm.Configs, o.Configs, sink)
diffType("identity", rdm.Identities, o.Identities, sink)
diffType("service", rdm.Services, o.Services, sink)
diffType("service-policy", rdm.ServicePolicies, o.ServicePolicies, sink)
diffType("posture-check", rdm.PostureChecks, o.PostureChecks, sink)
diffType("public-keys", rdm.PublicKeys, o.PublicKeys, sink)
diffType("revocations", rdm.Revocations, o.Revocations, sink)
diffType("cached-public-keys", rdm.getPublicKeysAsCmap(), o.getPublicKeysAsCmap(), sink)
}

func diffType[T any](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cmap.ConcurrentMap[string, T], sink DiffSink) {
diffReporter := &compareReporter{
f: func(key string, detail string) {
sink(entityType, key, DiffTypeMod, detail)
},
}

hasMissing := false
adapter := cmp.Reporter(diffReporter)
m1.IterCb(func(key string, v T) {
v2, exists := m2.Get(key)
if !exists {
sink(entityType, key, DiffTypeSub, "entity missing")
hasMissing = true
} else {
diffReporter.key = key
cmp.Diff(v, v2, adapter)
}
})

if m1.Count() != m2.Count() || hasMissing {
m2.IterCb(func(key string, v2 T) {
if _, exists := m1.Get(key); !exists {
sink(entityType, key, DiffTypeAdd, "entity unexpected")
}
})
}
}

type compareReporter struct {
steps []cmp.PathStep
key string
f func(key string, detail string)
}

func (self *compareReporter) PushStep(step cmp.PathStep) {
self.steps = append(self.steps, step)
}

func (self *compareReporter) Report(result cmp.Result) {
if !result.Equal() {
var step cmp.PathStep
path := &bytes.Buffer{}
for _, v := range self.steps {
path.Write([]byte(v.String()))
step = v
}
if step != nil {
vx, vy := step.Values()
err := fmt.Sprintf("%s mismatch. orig: %s, copy: %s", path.String(), vx.String(), vy.String())
self.f(self.key, err)
} else {
self.f(self.key, "programming error, empty path stack")
}
}
}

func (self *compareReporter) PopStep() {
self.steps = self.steps[:len(self.steps)-1]
}
6 changes: 6 additions & 0 deletions controller/env/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v3"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/common"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/event"
Expand All @@ -37,6 +38,7 @@ const (
ApiSessionUpdatedType = int32(edge_ctrl_pb.ContentType_ApiSessionUpdatedType)
RequestClientReSyncType = int32(edge_ctrl_pb.ContentType_RequestClientReSyncType)
DataStateType = int32(edge_ctrl_pb.ContentType_DataStateType)
ValidateDataStateType = int32(edge_ctrl_pb.ContentType_ValidateDataStateRequestType)
DataStateChangeSetType = int32(edge_ctrl_pb.ContentType_DataStateChangeSetType)

ServerHelloType = int32(edge_ctrl_pb.ContentType_ServerHelloType)
Expand Down Expand Up @@ -87,6 +89,10 @@ func (broker *Broker) ValidateRouterDataModel() []error {
return broker.routerSyncStrategy.Validate()
}

func (broker *Broker) GetRouterDataModel() *common.RouterDataModel {
return broker.routerSyncStrategy.GetRouterDataModel()
}

func (broker *Broker) AcceptClusterEvent(clusterEvent *event.ClusterEvent) {
if clusterEvent.EventType == event.ClusterLeadershipGained {
broker.ae.Managers.Controller.PeersConnected(clusterEvent.Peers, false)
Expand Down
2 changes: 2 additions & 0 deletions controller/env/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto"
"github.com/openziti/channel/v3"
"github.com/openziti/foundation/v2/versions"
"github.com/openziti/ziti/common"
"github.com/openziti/ziti/controller/db"
"github.com/openziti/ziti/controller/model"
"sync"
Expand Down Expand Up @@ -62,6 +63,7 @@ type RouterSyncStrategy interface {
RouterConnectionHandler
RouterSynchronizerEventHandler
Validate() []error
GetRouterDataModel() *common.RouterDataModel
}

// RouterConnectionHandler is responsible for handling router connect/disconnect for synchronizing state.
Expand Down
6 changes: 5 additions & 1 deletion controller/handler_mgmt/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ type BindHandler struct {
}

func NewBindHandler(env *env.AppEnv, network *network.Network, xmgmts *concurrenz.CopyOnWriteSlice[xmgmt.Xmgmt]) channel.BindHandler {
return &BindHandler{network: network, xmgmts: xmgmts}
return &BindHandler{
env: env,
network: network,
xmgmts: xmgmts,
}
}

func (bindHandler *BindHandler) BindChannel(binding channel.Binding) error {
Expand Down
47 changes: 38 additions & 9 deletions controller/handler_mgmt/validate_router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/openziti/channel/v3"
"github.com/openziti/channel/v3/protobufs"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
"github.com/openziti/ziti/common/pb/mgmt_pb"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/network"
"google.golang.org/protobuf/proto"
"time"
Expand Down Expand Up @@ -128,21 +130,18 @@ func (handler *validateRouterDataModelHandler) ValidateRouterDataModel(includeCt
cb(details)
}()
}

var dataState *edge_ctrl_pb.DataState
for _, router := range result.Entities {
connectedRouter := handler.appEnv.GetHostController().GetNetwork().GetConnectedRouter(router.Id)
if connectedRouter != nil {
if dataState == nil {
dataState = handler.appEnv.Broker.GetRouterDataModel().GetDataState()
}
sem.Acquire()
go func() {
defer sem.Release()

details := &mgmt_pb.RouterDataModelDetails{
ComponentType: "router",
ComponentId: router.Id,
ComponentName: router.Name,
ValidateSuccess: false,
Errors: []string{"not yet implemented"},
}
cb(details)
handler.ValidateRouterDataModelOnRouter(connectedRouter, dataState, cb)
}()
} else {
details := &mgmt_pb.RouterDataModelDetails{
Expand All @@ -164,3 +163,33 @@ func (handler *validateRouterDataModelHandler) ValidateRouterDataModel(includeCt

return count, evalF, nil
}

func (handler *validateRouterDataModelHandler) ValidateRouterDataModelOnRouter(router *model.Router, dataState *edge_ctrl_pb.DataState, cb RouterDataModelValidationCallback) {
details := &mgmt_pb.RouterDataModelDetails{
ComponentType: "router",
ComponentId: router.Id,
ComponentName: router.Name,
}

request := &edge_ctrl_pb.RouterDataModelValidateRequest{
State: dataState,
}
resp := &edge_ctrl_pb.RouterDataModelValidateResponse{}
respMsg, err := protobufs.MarshalTyped(request).WithTimeout(time.Minute).SendForReply(router.Control)
if err = protobufs.TypedResponse(resp).Unmarshall(respMsg, err); err != nil {
details.Errors = []string{fmt.Sprintf("unable to validate router data (%s)", err.Error())}
cb(details)
return
}

if len(resp.Diffs) == 0 {
details.ValidateSuccess = true
cb(details)
} else {
details.ValidateSuccess = false
for _, diff := range resp.Diffs {
details.Errors = append(details.Errors, diff.ToDetail())
}
cb(details)
}
}
4 changes: 4 additions & 0 deletions controller/sync_strats/sync_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,10 @@ func (strategy *InstantStrategy) BuildPostureChecks(index uint64, tx *bbolt.Tx,
return nil
}

func (strategy *InstantStrategy) GetRouterDataModel() *common.RouterDataModel {
return strategy.RouterDataModel
}

func (strategy *InstantStrategy) Validate() []error {
return strategy.ValidateAll(strategy.RouterDataModel)
}
Expand Down
Loading

0 comments on commit 83d73eb

Please sign in to comment.