Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete by expr failed at retry progress (#35241) #35421

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,6 @@
ts uint64
lb LBPolicy
count atomic.Int64
err error

// task queue
queue *dmTaskQueue
Expand Down Expand Up @@ -430,7 +429,11 @@
}

taskCh := make(chan *deleteTask, 256)
go dr.receiveQueryResult(ctx, client, taskCh)
var receiveErr error
go func() {
receiveErr = dr.receiveQueryResult(ctx, client, taskCh)
close(taskCh)
}()
// wait all task finish
for task := range taskCh {
err := task.WaitToFinish()
Expand All @@ -441,50 +444,42 @@
}

// query or produce task failed
if dr.err != nil {
return dr.err
if receiveErr != nil {
return receiveErr
}
return nil
}
}

func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) {
defer func() {
close(taskCh)
}()

func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error {
for {
result, err := client.Recv()
if err != nil {
if err == io.EOF {
log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID))
return
return nil
}
dr.err = err
return
return err

Check warning on line 462 in internal/proxy/task_delete.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_delete.go#L462

Added line #L462 was not covered by tests
}

err = merr.Error(result.GetStatus())
if err != nil {
dr.err = err
log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err))
return
return err
}

if dr.limiter != nil {
err := dr.limiter.Alloc(ctx, []int64{dr.collectionID}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
if err != nil {
dr.err = err
log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
return
return err
}
}

task, err := dr.produce(ctx, result.GetIds())
if err != nil {
dr.err = err
log.Warn("produce delete task failed", zap.Error(err))
return
return err

Check warning on line 482 in internal/proxy/task_delete.go

View check run for this annotation

Codecov / codecov/patch

internal/proxy/task_delete.go#L482

Added line #L482 was not covered by tests
}

taskCh <- task
Expand Down
Loading