Skip to content

Commit

Permalink
CBG-1095 Re-register import node after transient removal (2.8.0) (#4804)
Browse files Browse the repository at this point in the history
If an import node is temporarily removed from the set of cbgt node definitions (e.g. due to temporary dropped heartbeats), have nodes re-register themselves.
  • Loading branch information
adamcfraser authored Sep 25, 2020
1 parent 1dda0cb commit e2e7d42
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
30 changes: 20 additions & 10 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func registerHeartbeatListener(heartbeater Heartbeater, cbgtContext *CbgtContext
}

// Register listener for import, uses cfg and manager to manage set of participating nodes
importHeartbeatListener, err := NewImportHeartbeatListener(cbgtContext.Cfg, cbgtContext.Manager.Version())
importHeartbeatListener, err := NewImportHeartbeatListener(cbgtContext.Cfg, cbgtContext.Manager)
if err != nil {
return nil, err
}
Expand All @@ -380,26 +380,26 @@ func registerHeartbeatListener(heartbeater Heartbeater, cbgtContext *CbgtContext
// ImportHeartbeatListener uses cbgt's cfg to manage node list
type importHeartbeatListener struct {
cfg cbgt.Cfg // cbgt cfg being used for import
mgrVersion string // cbgt manager version, required for cbgt.UnregisterNodes
mgr *cbgt.Manager // cbgt manager associated with this import node
terminator chan struct{} // close cfg subscription on close
nodeIDs []string // Set of nodes from the latest retrieval
lock sync.RWMutex // lock for nodeIDs access
}

func NewImportHeartbeatListener(cfg cbgt.Cfg, mgrVersion string) (*importHeartbeatListener, error) {
func NewImportHeartbeatListener(cfg cbgt.Cfg, mgr *cbgt.Manager) (*importHeartbeatListener, error) {

if cfg == nil {
return nil, errors.New("Cfg must not be nil for ImportHeartbeatListener")
}

listener := &importHeartbeatListener{
cfg: cfg,
mgrVersion: mgrVersion,
mgr: mgr,
terminator: make(chan struct{}),
}

// Initialize the node set
err := listener.reloadNodes()
_, err := listener.reloadNodes()
if err != nil {
return nil, err
}
Expand All @@ -421,7 +421,7 @@ func (l *importHeartbeatListener) Name() string {
func (l *importHeartbeatListener) StaleHeartbeatDetected(nodeUUID string) {

Infof(KeyCluster, "StaleHeartbeatDetected by import listener for node: %v", nodeUUID)
err := cbgt.UnregisterNodes(l.cfg, l.mgrVersion, []string{nodeUUID})
err := cbgt.UnregisterNodes(l.cfg, l.mgr.Version(), []string{nodeUUID})
if err != nil {
Warnf("Attempt to unregister %v from CBGT got error: %v", nodeUUID, err)
}
Expand All @@ -442,10 +442,17 @@ func (l *importHeartbeatListener) subscribeNodeChanges() error {
for {
select {
case <-cfgEvents:
err := l.reloadNodes()
localNodeRegistered, err := l.reloadNodes()
if err != nil {
Warnf("Error while reloading heartbeat node definitions: %v", err)
}
if !localNodeRegistered {
registerErr := l.mgr.Register(cbgt.NODE_DEFS_WANTED)
if registerErr != nil {
Warnf("Error attempting to re-register node, node will not participate in import until restarted or cbgt cfg is next updated: %v", registerErr)
}
}

case <-l.terminator:
return
}
Expand All @@ -454,25 +461,28 @@ func (l *importHeartbeatListener) subscribeNodeChanges() error {
return nil
}

func (l *importHeartbeatListener) reloadNodes() error {
func (l *importHeartbeatListener) reloadNodes() (localNodePresent bool, err error) {

nodeSet, _, err := cbgt.CfgGetNodeDefs(l.cfg, cbgt.NODE_DEFS_KNOWN)
if err != nil {
return err
return false, err
}

nodeUUIDs := make([]string, 0)
if nodeSet != nil {
for _, nodeDef := range nodeSet.NodeDefs {
nodeUUIDs = append(nodeUUIDs, nodeDef.UUID)
if nodeDef.UUID == l.mgr.UUID() {
localNodePresent = true
}
}
}

l.lock.Lock()
l.nodeIDs = nodeUUIDs
l.lock.Unlock()

return nil
return localNodePresent, nil
}

// GetNodes returns a copy of the in-memory node set
Expand Down
25 changes: 21 additions & 4 deletions base/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,33 @@ func TestCBGTManagerHeartbeater(t *testing.T) {
assert.NoError(t, node3.Start())

// Create three heartbeat listeners, associate one with each node
testManagerVersion := ""
listener1, err := NewImportHeartbeatListener(cfgCB, testManagerVersion)
testUUID := cbgt.NewUUID()
var eventHandlers cbgt.ManagerEventHandlers
options := make(map[string]string)
options[cbgt.FeedAllotmentOption] = cbgt.FeedAllotmentOnePerPIndex
options["managerLoadDataDir"] = "false"
testManager := cbgt.NewManagerEx(
cbgt.VERSION,
cbgt.NewCfgMem(),
testUUID,
nil,
"",
1,
"",
testUUID,
"",
"some-datasource",
eventHandlers,
options)
listener1, err := NewImportHeartbeatListener(cfgCB, testManager)
assert.NoError(t, err)
assert.NoError(t, node1.RegisterListener(listener1))

listener2, err := NewImportHeartbeatListener(cfgCB, testManagerVersion)
listener2, err := NewImportHeartbeatListener(cfgCB, testManager)
assert.NoError(t, err)
assert.NoError(t, node2.RegisterListener(listener2))

listener3, err := NewImportHeartbeatListener(cfgCB, testManagerVersion)
listener3, err := NewImportHeartbeatListener(cfgCB, testManager)
assert.NoError(t, err)
assert.NoError(t, node3.RegisterListener(listener3))

Expand Down
6 changes: 3 additions & 3 deletions db/sg_replicate_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1396,15 +1396,15 @@ func (l *ReplicationHeartbeatListener) subscribeNodeSetChanges() error {
select {
case <-cfgEvents:
localNodeRegistered, err := l.reloadNodes()
if err != nil {
base.Warnf("Error while reloading heartbeat node definitions: %v", err)
}
if !localNodeRegistered {
registerErr := l.mgr.RegisterNode(l.mgr.localNodeUUID)
if registerErr != nil {
base.Warnf("Error attempting to re-register node, node will not participate in sg-replicate until restarted or replication cfg is next updated: %v", registerErr)
}
}
if err != nil {
base.Warnf("Error while reloading heartbeat node definitions: %v", err)
}
case <-l.terminator:
return
}
Expand Down

0 comments on commit e2e7d42

Please sign in to comment.