Skip to content

Commit

Permalink
server: advance ServerStart check (#8951) (#8969)
Browse files Browse the repository at this point in the history
close #8950

Signed-off-by: okJiang <[email protected]>

Co-authored-by: okJiang <[email protected]>
  • Loading branch information
ti-chi-bot and okJiang authored Jan 6, 2025
1 parent c1ee9cf commit 28001fd
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ func (s *GrpcServer) GetMinTS(
// GetMinTSFromTSOService queries all tso servers and gets the minimum timestamp across
// all keyspace groups.
func (s *GrpcServer) GetMinTSFromTSOService(dcLocation string) (*pdpb.Timestamp, error) {
if s.IsClosed() {
return nil, ErrNotStarted
}
addrs := s.keyspaceGroupManager.GetTSOServiceAddrs()
if len(addrs) == 0 {
return &pdpb.Timestamp{}, errs.ErrGetMinTS.FastGenByArgs("no tso servers/pods discovered")
Expand Down Expand Up @@ -547,6 +550,11 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return errors.WithStack(err)
}

// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return ErrNotStarted
}

forwardedHost := grpcutil.GetForwardedHost(stream.Context())
if !s.isLocalRequest(forwardedHost) {
clientConn, err := s.getDelegateClient(s.ctx, forwardedHost)
Expand All @@ -566,10 +574,6 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
return status.Errorf(codes.Unknown, "server not started")
}
if clusterID := s.ClusterID(); request.GetHeader().GetClusterId() != clusterID {
return status.Errorf(codes.FailedPrecondition,
"mismatch cluster id, need %d but got %d", clusterID, request.GetHeader().GetClusterId())
Expand Down Expand Up @@ -715,6 +719,9 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, request *pdpb.IsS
}, nil
}
}
if s.IsClosed() {
return nil, ErrNotStarted
}
// recovering mark is stored in etcd directly, there's no need to forward.
marked, err := s.Server.IsSnapshotRecovering(ctx)
if err != nil {
Expand Down

0 comments on commit 28001fd

Please sign in to comment.