Skip to content

Commit

Permalink
Refine QueryNode errors (#27380)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Sep 26, 2023
1 parent fd979a7 commit 2d6a968
Showing 1 changed file with 15 additions and 42 deletions.
57 changes: 15 additions & 42 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,8 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return status, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return merr.Status(err), nil
}

// check metric type
Expand Down Expand Up @@ -358,12 +354,8 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return status, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return merr.Status(err), nil
}

node.unsubscribingChannels.Insert(req.GetChannelName())
Expand Down Expand Up @@ -551,12 +543,8 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return status, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return merr.Status(err), nil
}

if req.GetNeedTransfer() {
Expand Down Expand Up @@ -1249,12 +1237,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return &querypb.GetDataDistributionResponse{Status: status}, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return &querypb.GetDataDistributionResponse{
Status: merr.Status(err),
}, nil
}

sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed))
Expand Down Expand Up @@ -1339,13 +1325,8 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
log.Warn("failed to do match target id when sync ", zap.Int64("expect", req.GetBase().GetTargetID()), zap.Int64("actual", node.session.ServerID))
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return status, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return merr.Status(err), nil
}

// get shard delegator
Expand Down Expand Up @@ -1445,12 +1426,8 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
defer node.lifetime.Done()

// check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID()),
}
return status, nil
if err := merr.CheckTargetID(req.GetBase()); err != nil {
return merr.Status(err), nil
}

log.Info("QueryNode received worker delete request")
Expand All @@ -1463,11 +1440,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
if len(segments) == 0 {
err := merr.WrapErrSegmentNotFound(req.GetSegmentId())
log.Warn("segment not found for delete")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SegmentNotFound,
Reason: fmt.Sprintf("segment %d not found", req.GetSegmentId()),
Code: merr.Code(err),
}, nil
return merr.Status(err), nil
}

pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
Expand Down

0 comments on commit 2d6a968

Please sign in to comment.