Skip to content

Commit

Permalink
enhance: add ts support for iterator
Browse files Browse the repository at this point in the history
Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han committed Sep 27, 2024
1 parent 1f271e3 commit f3e7f68
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ replace (
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
github.com/milvus-io/milvus-proto/go-api/v2 => /home/hanchun/Documents/project/milvus-proto/go-api
)

exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b
14 changes: 13 additions & 1 deletion internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type queryParams struct {
limit int64
offset int64
reduceType reduce.IReduceType
isIterator bool
}

// translateToOutputFieldIDs translates output fields name to output fields id.
Expand Down Expand Up @@ -178,7 +179,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
limitStr, err := funcutil.GetAttrByKeyFromRepeatedKV(LimitKey, queryParamsPair)
// if limit is not provided
if err != nil {
return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType}, nil
return &queryParams{limit: typeutil.Unlimited, reduceType: reduceType, isIterator: isIterator}, nil
}
limit, err = strconv.ParseInt(limitStr, 0, 64)
if err != nil {
Expand All @@ -203,6 +204,7 @@ func parseQueryParams(queryParamsPair []*commonpb.KeyValuePair) (*queryParams, e
limit: limit,
offset: offset,
reduceType: reduceType,
isIterator: isIterator,
}, nil
}

Expand Down Expand Up @@ -461,6 +463,12 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
}
}
t.GuaranteeTimestamp = guaranteeTs
// need modify mvccTs and guaranteeTs for iterator specially
if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() > 0 {
t.MvccTimestamp = t.request.GetGuaranteeTimestamp()
t.GuaranteeTimestamp = t.request.GetGuaranteeTimestamp()
log.Info("hc===Set ts", zap.Uint64("t.MvccTimestamp", t.MvccTimestamp), zap.Uint64("t.GuaranteeTimestamp", t.GuaranteeTimestamp))
}

deadline, ok := t.TraceCtx().Deadline()
if ok {
Expand Down Expand Up @@ -542,6 +550,10 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
t.result.OutputFields = t.userOutputFields
metrics.ProxyReduceResultLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.RecordSpan().Milliseconds()))

if t.queryParams.isIterator && t.request.GetGuaranteeTimestamp() == 0 {
// first page for iteration, need to set up sessionTs for iterator
t.result.SessionTs = t.BeginTs()
}
log.Debug("Query PostExecute done")
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (sd *shardDelegator) QueryStream(ctx context.Context, req *querypb.QueryReq

// wait tsafe
waitTr := timerecord.NewTimeRecorder("wait tSafe")
tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp())
if err != nil {
log.Warn("delegator query failed to wait tsafe", zap.Error(err))
return err
Expand Down Expand Up @@ -473,7 +473,7 @@ func (sd *shardDelegator) Query(ctx context.Context, req *querypb.QueryRequest)

// wait tsafe
waitTr := timerecord.NewTimeRecorder("wait tSafe")
tSafe, err := sd.waitTSafe(ctx, req.Req.GuaranteeTimestamp)
tSafe, err := sd.waitTSafe(ctx, req.Req.GetGuaranteeTimestamp())
if err != nil {
log.Warn("delegator query failed to wait tsafe", zap.Error(err))
return nil, err
Expand Down

0 comments on commit f3e7f68

Please sign in to comment.