Skip to content

Commit

Permalink
fix: delete by expr failed at retry progress (#35241) (#35268)
Browse files Browse the repository at this point in the history
issue: #35240
pr: #35241
delete by expr shard the same err object between channels, so if one
channel's query failed, it will fail all channel, which will break
channel level retry policy, and make delete operation failed.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Aug 9, 2024
1 parent f7a3fd8 commit a4fa9ce
Showing 1 changed file with 12 additions and 16 deletions.
28 changes: 12 additions & 16 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ type deleteRunner struct {
ts uint64
lb LBPolicy
count atomic.Int64
err error

// task queue
queue *dmTaskQueue
Expand Down Expand Up @@ -434,7 +433,11 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}

taskCh := make(chan *deleteTask, 256)
go dr.receiveQueryResult(ctx, client, taskCh)
var receiveErr error
go func() {
receiveErr = dr.receiveQueryResult(ctx, client, taskCh)
close(taskCh)
}()
var allQueryCnt int64
// wait all task finish
for task := range taskCh {
Expand All @@ -447,42 +450,35 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}

// query or produce task failed
if dr.err != nil {
return dr.err
if receiveErr != nil {
return receiveErr
}
dr.allQueryCnt.Add(allQueryCnt)
return nil
}
}

func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) {
defer func() {
close(taskCh)
}()

func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error {
for {
result, err := client.Recv()
if err != nil {
if err == io.EOF {
log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID))
return
return nil
}
dr.err = err
return
return err
}

err = merr.Error(result.GetStatus())
if err != nil {
dr.err = err
log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err))
return
return err
}

task, err := dr.produce(ctx, result.GetIds())
if err != nil {
dr.err = err
log.Warn("produce delete task failed", zap.Error(err))
return
return err
}
task.allQueryCnt = result.GetAllRetrieveCount()

Expand Down

0 comments on commit a4fa9ce

Please sign in to comment.