Skip to content

Commit

Permalink
fix: fail to init fg clears flushTs so that slows flush (milvus-io#36740
Browse files Browse the repository at this point in the history
)

See also: milvus-io#36709

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Oct 11, 2024
1 parent 0751c50 commit 794e3ab
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
24 changes: 11 additions & 13 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,19 +239,6 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
serverID: serverID,
}

err := params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.Error(err))
return nil, err
}
defer func() {
if err != nil {
defer params.WriteBufferManager.RemoveChannel(channelName)
}
}()

ctx, cancel := context.WithCancel(params.Ctx)
ds := &DataSyncService{
ctx: ctx,
Expand Down Expand Up @@ -324,6 +311,17 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
}
ds.fg = fg

// Register channel after channel pipeline is ready.
// This'll reject any FlushChannel and FlushSegments calls to prevent inconsistency between DN and DC over flushTs
// if fail to init flowgraph nodes.
err = params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err))
return nil, err
}

return ds, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/writebuffer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT
m.mut.RUnlock()

if !ok {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
log.Ctx(ctx).Warn("write buffer not found when flush channel",
zap.String("channel", channel),
zap.Uint64("flushTs", flushTs))
return merr.WrapErrChannelNotFound(channel)
Expand Down

0 comments on commit 794e3ab

Please sign in to comment.