diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index b070eeb3efeec..c45eb3639848d 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -247,7 +247,6 @@ type deleteRunner struct { ts uint64 lb LBPolicy count atomic.Int64 - err error // task queue queue *dmTaskQueue @@ -430,7 +429,11 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } taskCh := make(chan *deleteTask, 256) - go dr.receiveQueryResult(ctx, client, taskCh) + var receiveErr error + go func() { + receiveErr = dr.receiveQueryResult(ctx, client, taskCh) + close(taskCh) + }() // wait all task finish for task := range taskCh { err := task.WaitToFinish() @@ -441,50 +444,42 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } // query or produce task failed - if dr.err != nil { - return dr.err + if receiveErr != nil { + return receiveErr } return nil } } -func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) { - defer func() { - close(taskCh) - }() - +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error { for { result, err := client.Recv() if err != nil { if err == io.EOF { log.Debug("query stream for delete finished", zap.Int64("msgID", dr.msgID)) - return + return nil } - dr.err = err - return + return err } err = merr.Error(result.GetStatus()) if err != nil { - dr.err = err log.Warn("query stream for delete get error status", zap.Int64("msgID", dr.msgID), zap.Error(err)) - return + return err } if dr.limiter != nil { err := dr.limiter.Alloc(ctx, []int64{dr.collectionID}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds())) if err != nil { - dr.err = err log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err)) - return + return err } } task, err := dr.produce(ctx, result.GetIds()) if err != nil { - dr.err = err log.Warn("produce delete task failed", zap.Error(err)) - return + return err } taskCh <- task