diff --git a/base/dcp_sharded.go b/base/dcp_sharded.go index 6350b3366f..9ec0e941df 100644 --- a/base/dcp_sharded.go +++ b/base/dcp_sharded.go @@ -364,7 +364,7 @@ func registerHeartbeatListener(heartbeater Heartbeater, cbgtContext *CbgtContext } // Register listener for import, uses cfg and manager to manage set of participating nodes - importHeartbeatListener, err := NewImportHeartbeatListener(cbgtContext.Cfg, cbgtContext.Manager.Version()) + importHeartbeatListener, err := NewImportHeartbeatListener(cbgtContext.Cfg, cbgtContext.Manager) if err != nil { return nil, err } @@ -380,13 +380,13 @@ func registerHeartbeatListener(heartbeater Heartbeater, cbgtContext *CbgtContext // ImportHeartbeatListener uses cbgt's cfg to manage node list type importHeartbeatListener struct { cfg cbgt.Cfg // cbgt cfg being used for import - mgrVersion string // cbgt manager version, required for cbgt.UnregisterNodes + mgr *cbgt.Manager // cbgt manager associated with this import node terminator chan struct{} // close cfg subscription on close nodeIDs []string // Set of nodes from the latest retrieval lock sync.RWMutex // lock for nodeIDs access } -func NewImportHeartbeatListener(cfg cbgt.Cfg, mgrVersion string) (*importHeartbeatListener, error) { +func NewImportHeartbeatListener(cfg cbgt.Cfg, mgr *cbgt.Manager) (*importHeartbeatListener, error) { if cfg == nil { return nil, errors.New("Cfg must not be nil for ImportHeartbeatListener") @@ -394,12 +394,12 @@ func NewImportHeartbeatListener(cfg cbgt.Cfg, mgrVersion string) (*importHeartbe listener := &importHeartbeatListener{ cfg: cfg, - mgrVersion: mgrVersion, + mgr: mgr, terminator: make(chan struct{}), } // Initialize the node set - err := listener.reloadNodes() + _, err := listener.reloadNodes() if err != nil { return nil, err } @@ -421,7 +421,7 @@ func (l *importHeartbeatListener) Name() string { func (l *importHeartbeatListener) StaleHeartbeatDetected(nodeUUID string) { Infof(KeyCluster, "StaleHeartbeatDetected by import listener for node: %v", nodeUUID) - err := cbgt.UnregisterNodes(l.cfg, l.mgrVersion, []string{nodeUUID}) + err := cbgt.UnregisterNodes(l.cfg, l.mgr.Version(), []string{nodeUUID}) if err != nil { Warnf("Attempt to unregister %v from CBGT got error: %v", nodeUUID, err) } @@ -442,10 +442,17 @@ func (l *importHeartbeatListener) subscribeNodeChanges() error { for { select { case <-cfgEvents: - err := l.reloadNodes() + localNodeRegistered, err := l.reloadNodes() if err != nil { Warnf("Error while reloading heartbeat node definitions: %v", err) } + if !localNodeRegistered { + registerErr := l.mgr.Register(cbgt.NODE_DEFS_WANTED) + if registerErr != nil { + Warnf("Error attempting to re-register node, node will not participate in import until restarted or cbgt cfg is next updated: %v", registerErr) + } + } + case <-l.terminator: return } @@ -454,17 +461,20 @@ func (l *importHeartbeatListener) subscribeNodeChanges() error { return nil } -func (l *importHeartbeatListener) reloadNodes() error { +func (l *importHeartbeatListener) reloadNodes() (localNodePresent bool, err error) { nodeSet, _, err := cbgt.CfgGetNodeDefs(l.cfg, cbgt.NODE_DEFS_KNOWN) if err != nil { - return err + return false, err } nodeUUIDs := make([]string, 0) if nodeSet != nil { for _, nodeDef := range nodeSet.NodeDefs { nodeUUIDs = append(nodeUUIDs, nodeDef.UUID) + if nodeDef.UUID == l.mgr.UUID() { + localNodePresent = true + } } } @@ -472,7 +482,7 @@ func (l *importHeartbeatListener) reloadNodes() error { l.nodeIDs = nodeUUIDs l.lock.Unlock() - return nil + return localNodePresent, nil } // GetNodes returns a copy of the in-memory node set diff --git a/base/heartbeat_test.go b/base/heartbeat_test.go index 307f730e82..896c8cff60 100644 --- a/base/heartbeat_test.go +++ b/base/heartbeat_test.go @@ -315,16 +315,33 @@ func TestCBGTManagerHeartbeater(t *testing.T) { assert.NoError(t, node3.Start()) // Create three heartbeat listeners, associate one with each node - testManagerVersion := "" - listener1, err := NewImportHeartbeatListener(cfgCB, testManagerVersion) + testUUID := cbgt.NewUUID() + var eventHandlers cbgt.ManagerEventHandlers + options := make(map[string]string) + options[cbgt.FeedAllotmentOption] = cbgt.FeedAllotmentOnePerPIndex + options["managerLoadDataDir"] = "false" + testManager := cbgt.NewManagerEx( + cbgt.VERSION, + cbgt.NewCfgMem(), + testUUID, + nil, + "", + 1, + "", + testUUID, + "", + "some-datasource", + eventHandlers, + options) + listener1, err := NewImportHeartbeatListener(cfgCB, testManager) assert.NoError(t, err) assert.NoError(t, node1.RegisterListener(listener1)) - listener2, err := NewImportHeartbeatListener(cfgCB, testManagerVersion) + listener2, err := NewImportHeartbeatListener(cfgCB, testManager) assert.NoError(t, err) assert.NoError(t, node2.RegisterListener(listener2)) - listener3, err := NewImportHeartbeatListener(cfgCB, testManagerVersion) + listener3, err := NewImportHeartbeatListener(cfgCB, testManager) assert.NoError(t, err) assert.NoError(t, node3.RegisterListener(listener3)) diff --git a/db/sg_replicate_cfg.go b/db/sg_replicate_cfg.go index 8e86792aa4..785d25d78f 100644 --- a/db/sg_replicate_cfg.go +++ b/db/sg_replicate_cfg.go @@ -1396,15 +1396,15 @@ func (l *ReplicationHeartbeatListener) subscribeNodeSetChanges() error { select { case <-cfgEvents: localNodeRegistered, err := l.reloadNodes() + if err != nil { + base.Warnf("Error while reloading heartbeat node definitions: %v", err) + } if !localNodeRegistered { registerErr := l.mgr.RegisterNode(l.mgr.localNodeUUID) if registerErr != nil { base.Warnf("Error attempting to re-register node, node will not participate in sg-replicate until restarted or replication cfg is next updated: %v", registerErr) } } - if err != nil { - base.Warnf("Error while reloading heartbeat node definitions: %v", err) - } case <-l.terminator: return }