diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index d46ef69f97e57..8512a088a178a 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -239,19 +239,6 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, serverID: serverID, } - err := params.WriteBufferManager.Register(channelName, metacache, - writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)), - writebuffer.WithIDAllocator(params.Allocator)) - if err != nil { - log.Warn("failed to register channel buffer", zap.Error(err)) - return nil, err - } - defer func() { - if err != nil { - defer params.WriteBufferManager.RemoveChannel(channelName) - } - }() - ctx, cancel := context.WithCancel(params.Ctx) ds := &DataSyncService{ ctx: ctx, @@ -324,6 +311,17 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams, } ds.fg = fg + // Register channel after channel pipeline is ready. + // This'll reject any FlushChannel and FlushSegments calls to prevent inconsistency between DN and DC over flushTs + // if fail to init flowgraph nodes. + err = params.WriteBufferManager.Register(channelName, metacache, + writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)), + writebuffer.WithIDAllocator(params.Allocator)) + if err != nil { + log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err)) + return nil, err + } + return ds, nil } diff --git a/internal/flushcommon/writebuffer/manager.go b/internal/flushcommon/writebuffer/manager.go index cd46c68c8b0f3..f2d325a148dd3 100644 --- a/internal/flushcommon/writebuffer/manager.go +++ b/internal/flushcommon/writebuffer/manager.go @@ -178,7 +178,7 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT m.mut.RUnlock() if !ok { - log.Ctx(ctx).Warn("write buffer not found when flush segments", + log.Ctx(ctx).Warn("write buffer not found when flush channel", zap.String("channel", channel), zap.Uint64("flushTs", flushTs)) return merr.WrapErrChannelNotFound(channel)