Skip to content

Commit

Permalink
fix: [2.4] Load l0 delta for growings when using RemoteLoad (#37772)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #37771
Related to #37574

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Nov 18, 2024
1 parent 46692d7 commit 876e06b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
11 changes: 1 addition & 10 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
}

for _, segment := range loaded {
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
continue
}

log.Info("forwarding L0 delete records...", zap.Int("deletionCount", len(deletedPks)))
err = segment.Delete(ctx, deletedPks, deletedTss)
err = sd.addL0ForGrowing(ctx, segment)
if err != nil {
log.Warn("failed to forward L0 deletions to growing segment",
zap.Error(err),
Expand Down
29 changes: 29 additions & 0 deletions internal/querynodev2/delegator/delta_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,35 @@ func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteDa
}
}

func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.Segment) error {
switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy {
case ForwardPolicyDefault, StreamingForwardPolicyBF:
return sd.addL0GrowingBF(ctx, segment)
case StreamingForwardPolicyDirect:
// forward streaming deletion without bf filtering
return sd.addL0ForGrowingLoad(ctx, segment)
default:
log.Fatal("unsupported streaming forward policy", zap.String("policy", policy))
}
return nil
}

func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error {
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
return nil
}

log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", len(deletedPks)))
return segment.Delete(ctx, deletedPks, deletedTss)
}

func (sd *shardDelegator) addL0ForGrowingLoad(ctx context.Context, segment segments.Segment) error {
deltalogs := sd.getLevel0Deltalogs(segment.Partition())
log.Info("forwarding L0 via loader...", zap.Int64("segmentID", segment.ID()), zap.Int("deltalogsNum", len(deltalogs)))
return sd.loader.LoadDeltaLogs(ctx, segment, deltalogs)
}

func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
info *querypb.SegmentLoadInfo,
candidate *pkoracle.BloomFilterSet,
Expand Down

0 comments on commit 876e06b

Please sign in to comment.