Skip to content

Commit

Permalink
feat: resource add rdb for cache task (#3467)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Aug 27, 2024
1 parent e3fd3b9 commit b2babf8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
11 changes: 11 additions & 0 deletions scheduler/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package resource

import (
"github.com/redis/go-redis/v9"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -63,6 +64,9 @@ type resource struct {
// Scheduler config.
config *config.Config

// Redis universal client interface.
rdb redis.UniversalClient

// TransportCredentials stores the Authenticator required to setup a client connection.
transportCredentials credentials.TransportCredentials
}
Expand All @@ -78,6 +82,13 @@ func WithTransportCredentials(creds credentials.TransportCredentials) Option {
}
}

// WithRedisClient returns a Option which configures the redis client.
func WithRedisClient(rdb redis.UniversalClient) Option {
return func(r *resource) {
r.rdb = rdb
}
}

// New returns Resource interface.
func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, options ...Option) (Resource, error) {
resource := &resource{config: cfg}
Expand Down
41 changes: 27 additions & 14 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
s.announcer = announcer

// Initialize GC.
s.gc = gc.New(gc.WithLogger(logger.GCLogger))

// Initialize certify client.
var (
certifyClient *certify.Certify
Expand Down Expand Up @@ -188,21 +191,16 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize dynconfig client.
dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, config.WithTransportCredentials(clientTransportCredentials))
if err != nil {
return nil, err
dynconfigOptions := []config.DynconfigOption{}
if clientTransportCredentials != nil {
dynconfigOptions = append(dynconfigOptions, config.WithTransportCredentials(clientTransportCredentials))
}
s.dynconfig = dynconfig

// Initialize GC.
s.gc = gc.New(gc.WithLogger(logger.GCLogger))

// Initialize resource.
resource, err := resource.New(cfg, s.gc, dynconfig, resource.WithTransportCredentials(clientTransportCredentials))
dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, dynconfigOptions...)
if err != nil {
return nil, err
}
s.resource = resource
s.dynconfig = dynconfig

// Initialize redis client.
var rdb redis.UniversalClient
Expand All @@ -219,18 +217,33 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
}

// Initialize resource.
resourceOptions := []resource.Option{}
if clientTransportCredentials != nil {
resourceOptions = append(resourceOptions, resource.WithTransportCredentials(clientTransportCredentials))
}

if rdb != nil {
resourceOptions = append(resourceOptions, resource.WithRedisClient(rdb))
}

resource, err := resource.New(cfg, s.gc, dynconfig, resourceOptions...)
if err != nil {
return nil, err
}
s.resource = resource

// Initialize job service.
if cfg.Job.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
if cfg.Job.Enable && rdb != nil {
s.job, err = job.New(cfg, resource)
if err != nil {
return nil, err
}
}

// Initialize options of evaluator.
// Initialize options of network topology options.
evaluatorNetworkTopologyOptions := []evaluator.NetworkTopologyOption{}
// Initialize network topology service.
if cfg.Scheduler.Algorithm == evaluator.NetworkTopologyAlgorithm {
if cfg.Scheduler.Algorithm == evaluator.NetworkTopologyAlgorithm && rdb != nil {
cache := cache.New(cfg.Scheduler.NetworkTopology.Cache.TTL, cfg.Scheduler.NetworkTopology.Cache.Interval)
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.Scheduler.NetworkTopology, rdb, cache, resource, s.storage)
if err != nil {
Expand Down

0 comments on commit b2babf8

Please sign in to comment.