diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a5a5323369ade..9fd97a6490400 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -343,16 +343,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm } for _, segment := range loaded { - log := log.With( - zap.Int64("segmentID", segment.ID()), - ) - deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing)) - if len(deletedPks) == 0 { - continue - } - - log.Info("forwarding L0 delete records...", zap.Int("deletionCount", len(deletedPks))) - err = segment.Delete(ctx, deletedPks, deletedTss) + err = sd.addL0ForGrowing(ctx, segment) if err != nil { log.Warn("failed to forward L0 deletions to growing segment", zap.Error(err), diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index e2ba9234906d2..1e18db75731ca 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -85,6 +85,35 @@ func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteDa } } +func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.Segment) error { + switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy { + case ForwardPolicyDefault, StreamingForwardPolicyBF: + return sd.addL0GrowingBF(ctx, segment) + case StreamingForwardPolicyDirect: + // forward streaming deletion without bf filtering + return sd.addL0ForGrowingLoad(ctx, segment) + default: + log.Fatal("unsupported streaming forward policy", zap.String("policy", policy)) + } + return nil +} + +func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error { + deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing)) + if len(deletedPks) == 0 { + return nil + } + + log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", len(deletedPks))) + return segment.Delete(ctx, deletedPks, deletedTss) +} + +func (sd *shardDelegator) addL0ForGrowingLoad(ctx context.Context, segment segments.Segment) error { + deltalogs := sd.getLevel0Deltalogs(segment.Partition()) + log.Info("forwarding L0 via loader...", zap.Int64("segmentID", segment.ID()), zap.Int("deltalogsNum", len(deltalogs))) + return sd.loader.LoadDeltaLogs(ctx, segment, deltalogs) +} + func (sd *shardDelegator) forwardL0ByBF(ctx context.Context, info *querypb.SegmentLoadInfo, candidate *pkoracle.BloomFilterSet,