Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 8, 2025
1 parent 12fdda4 commit bf82504
Show file tree
Hide file tree
Showing 33 changed files with 141 additions and 174 deletions.
32 changes: 12 additions & 20 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -52,8 +51,8 @@ import (
)

const (
keyspaceMode = "api"
tsoMode = "tso"
// Only serverless use this variable, we can use it to determine whether the PD is running in keyspace mode.
// The name is a little misleading, but it's kept for backward compatibility.
serviceModeEnv = "PD_SERVICE_MODE"
)

Expand Down Expand Up @@ -89,7 +88,7 @@ func NewServiceCommand() *cobra.Command {
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: tsoMode,
Use: "tso",
Short: "Run the TSO service",
Run: tso.CreateServerWrapper,
}
Expand Down Expand Up @@ -129,11 +128,12 @@ func NewSchedulingServiceCommand() *cobra.Command {
}

// NewPDServiceCommand returns the PD service command.
// We can use pd directly. This command is kept for backward compatibility.
func NewPDServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: apiMode,
Use: "api",
Short: "Run the PD service",
Run: createPDServiceWrapper,
Run: createServerWrapper,
}
addFlags(cmd)
return cmd
Expand All @@ -160,20 +160,12 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
}

func createPDServiceWrapper(cmd *cobra.Command, args []string) {
start(cmd, args, cmd.CalledAs())
}

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == keyspaceMode {
start(cmd, args, keyspaceMode)
} else {
start(cmd, args)
}
isKeyspaceEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceEnabled)
}

func start(cmd *cobra.Command, args []string, services ...string) {
func start(cmd *cobra.Command, args []string, isKeyspaceEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down Expand Up @@ -218,8 +210,8 @@ func start(cmd *cobra.Command, args []string, services ...string) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.PDServiceMode)
if isKeyspaceEnabled {
versioninfo.Log(server.PDKeyspaceMode)
} else {
versioninfo.Log(server.PDMode)
}
Expand All @@ -245,7 +237,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceEnabled, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
for _, opt := range opts {
opt(cfg)
}
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
err = s.Run()
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion server/api/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *server.Server)
go func(cfg *config.Config) {
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`))
reqCh <- struct{}{}
Expand Down
10 changes: 0 additions & 10 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func RegisterMicroService(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}

if service := c.Param("service"); len(service) > 0 {
entries, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
Expand All @@ -65,11 +60,6 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return
}

if service := c.Param("service"); len(service) > 0 {
addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
c.IndentedJSON(http.StatusOK, addr)
Expand Down
17 changes: 8 additions & 9 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsPDServiceMode() bool
IsKeyspaceEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +156,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isPDServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
isKeyspaceEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +325,6 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isPDServiceMode = s.IsPDServiceMode()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,7 +375,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isPDServiceMode {
if c.isKeyspaceEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
Expand Down
32 changes: 16 additions & 16 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ const (

// PDMode represents that server is in PD mode.
PDMode = "PD"
// PDServiceMode represents that server is in PD service mode which is in microservice architecture.
PDServiceMode = "PD Service"
// PDKeyspaceMode represents that server is in PD Keyspace mode.
PDKeyspaceMode = "PD Keyspace"

// maxRetryTimesGetServicePrimary is the max retry times for getting primary addr.
// Note: it need to be less than client.defaultPDTimeout
Expand Down Expand Up @@ -227,7 +227,7 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
isKeyspaceEnabled bool
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -240,14 +240,14 @@ type Server struct {
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
var isKeyspaceEnabled bool
if len(services) != 0 {
mode = PDServiceMode
func CreateServer(ctx context.Context, cfg *config.Config, isKeyspaceEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
var mode string
if isKeyspaceEnabled {
mode = PDKeyspaceMode
} else {
mode = PDMode
}
log.Info("PD config", zap.Reflect("config", cfg))
log.Info("PD config", zap.Bool("enable-keyspace", isKeyspaceEnabled), zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -259,7 +259,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
isKeyspaceEnabled: isKeyspaceEnabled,
mode: mode,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -478,7 +478,7 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
if s.IsPDServiceMode() {
if s.IsKeyspaceEnabled() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
}
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
Expand Down Expand Up @@ -530,7 +530,7 @@ func (s *Server) Close() {
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsPDServiceMode() {
if s.IsKeyspaceEnabled() {
s.keyspaceGroupManager.Close()
}

Expand Down Expand Up @@ -786,9 +786,9 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsPDServiceMode return whether the server is in PD service mode.
func (s *Server) IsPDServiceMode() bool {
return s.mode == PDServiceMode
// IsKeyspaceEnabled returns whether the server is in PD Keyspace mode.
func (s *Server) IsKeyspaceEnabled() bool {
return s.mode == PDKeyspaceMode
}

// GetAddr returns the server urls for clients.
Expand Down Expand Up @@ -1722,13 +1722,13 @@ func (s *Server) campaignLeader() {
}
// EnableLeader to accept the remaining service, such as GetStore, GetRegion.
s.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1)
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
cancel()
s.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0)
})

CheckPDVersionWithClusterVersion(s.persistOptions)
Expand Down
29 changes: 14 additions & 15 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/goleak"

"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/assertutil"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand Down Expand Up @@ -67,7 +66,7 @@ func (suite *leaderServerTestSuite) SetupSuite() {

go func() {
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(suite.ctx, cfg, nil, mockHandler)
svr, err := CreateServer(suite.ctx, cfg, false, mockHandler)
re.NoError(err)
err = svr.Run()
re.NoError(err)
Expand Down Expand Up @@ -101,7 +100,7 @@ func newTestServersWithCfgs(
for _, cfg := range cfgs {
go func(cfg *config.Config) {
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, nil, mockHandler)
svr, err := CreateServer(ctx, cfg, false, mockHandler)
// prevent blocking if Asserts fails
failed := true
defer func() {
Expand Down Expand Up @@ -142,9 +141,9 @@ func (suite *leaderServerTestSuite) TestRegisterServerHandler() {
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, nil, mockHandler)
svr, err := CreateServer(ctx, cfg, false, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand All @@ -169,9 +168,9 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderForwarded() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, nil, mockHandler)
svr, err := CreateServer(ctx, cfg, false, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand Down Expand Up @@ -200,9 +199,9 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderXReal() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, nil, mockHandler)
svr, err := CreateServer(ctx, cfg, false, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand Down Expand Up @@ -231,9 +230,9 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, nil, mockHandler)
svr, err := CreateServer(ctx, cfg, false, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand All @@ -258,21 +257,21 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() {
re.Equal("Hello World\n", bodyString)
}

func TestAPIService(t *testing.T) {
func TestMode(t *testing.T) {
re := require.New(t)

cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
defer testutil.CleanServer(cfg.DataDir)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, []string{constant.PDServiceName}, mockHandler)
svr, err := CreateServer(ctx, cfg, true, mockHandler)
re.NoError(err)
defer svr.Close()
err = svr.Run()
re.NoError(err)
MustWaitLeader(re, []*Server{svr})
re.True(svr.IsPDServiceMode())
re.True(svr.IsKeyspaceEnabled())
}

func TestIsPathInDirectory(t *testing.T) {
Expand Down Expand Up @@ -318,7 +317,7 @@ func TestCheckClusterID(t *testing.T) {
// Start previous cluster, expect an error.
cfgA.InitialCluster = originInitial
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfgA, nil, mockHandler)
svr, err := CreateServer(ctx, cfgA, false, mockHandler)
re.NoError(err)

etcd, err := embed.StartEtcd(svr.etcdCfg)
Expand Down
2 changes: 1 addition & 1 deletion server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewTestServer(re *require.Assertions, c *assertutil.Checker) (*Server, test
ctx, cancel := context.WithCancel(context.Background())
cfg := NewTestSingleConfig(c)
mockHandler := CreateMockHandler(re, "127.0.0.1")
s, err := CreateServer(ctx, cfg, nil, mockHandler)
s, err := CreateServer(ctx, cfg, false, mockHandler)
if err != nil {
cancel()
return nil, nil, err
Expand Down
Loading

0 comments on commit bf82504

Please sign in to comment.