Skip to content

Commit

Permalink
feat: evalutor adds calculateNetworkTopologyScore
Browse files Browse the repository at this point in the history
Signed-off-by: huangmin <[email protected]>
  • Loading branch information
MinH-09 committed Nov 22, 2023
1 parent 21c19a9 commit 7b92f45
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 42 deletions.
2 changes: 1 addition & 1 deletion scheduler/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func WithTrainerClient(client trainerclient.V1) Option {
}

// Option is a functional option for configuring the announcer.
type Option func(s *announcer)
type Option func(a *announcer)

// New returns a new Announcer interface.
func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Storage, options ...Option) (Announcer, error) {
Expand Down
34 changes: 23 additions & 11 deletions scheduler/networktopology/mocks/network_topology_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions scheduler/networktopology/network_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type NetworkTopology interface {
// Probes loads probes interface by source host id and destination host id.
Probes(string, string) Probes

// AverageRTTs loads RTTs of source host id and destination hosts id.
AverageRTTs(string, []string) ([]time.Duration, error)

// ProbedCount is the number of times the host has been probed.
ProbedCount(string) (uint64, error)

Expand Down Expand Up @@ -264,6 +267,32 @@ func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID)
}

// AverageRTTs loads RTTs of source host id and destination hosts id.
func (nt *networkTopology) AverageRTTs(srcHostID string, destHostIDs []string) ([]time.Duration, error) {
pipeline := nt.rdb.Pipeline()
var cmds []*redis.StringCmd
for _, id := range destHostIDs {
cmds = append(cmds, pipeline.HGet(context.Background(), pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, id), "averageRTT"))
}

_, err := pipeline.Exec(context.Background())
if err != nil {
return nil, err
}

var averageRTTs []time.Duration
for i := range destHostIDs {
averageRTT, err := strconv.ParseInt(cmds[i].Val(), 10, 64)
if err != nil {
averageRTTs = append(averageRTTs, time.Duration(0))
} else {
averageRTTs = append(averageRTTs, time.Duration(averageRTT))
}
}

return averageRTTs, nil
}

// ProbedCount is the number of times the host has been probed.
func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
Expand Down
68 changes: 68 additions & 0 deletions scheduler/networktopology/network_topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,74 @@ func TestNetworkTopology_Probes(t *testing.T) {
}
}

func TestNetworkTopology_AverageRTTs(t *testing.T) {
tests := []struct {
name string
mock func(mockRDBClient redismock.ClientMock)
expect func(t *testing.T, networkTopology NetworkTopology, err error)
}{
{
name: "get average RTTs",
mock: func(mockRDBClient redismock.ClientMock) {
mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetVal(
strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, mockAverageRTT)
},
},
{
name: "get average RTTs error",
mock: func(mockRDBClient redismock.ClientMock) {
mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetErr(errors.New("get average RTTs error"))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

_, err = networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.EqualError(err, "get average RTTs error")
},
},
{
name: "parseInt error",
mock: func(mockRDBClient redismock.ClientMock) {
mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockHost.ID, mockSeedHost.ID), "averageRTT").SetVal(
"foo")
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)

averageRTTs, err := networkTopology.AverageRTTs(mockHost.ID, []string{mockSeedHost.ID})
assert.NoError(err)
assert.EqualValues(averageRTTs, []time.Duration{0})
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()

rdb, mockRDBClient := redismock.NewClientMock()
res := resource.NewMockResource(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.mock(mockRDBClient)

networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage)
tc.expect(t, networkTopology, err)
mockRDBClient.ClearExpect()
})
}
}

func TestNetworkTopology_ProbedCount(t *testing.T) {
tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions scheduler/networktopology/probes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ var (
},
}

mockAverageRTT = []time.Duration{30 * time.Millisecond}
mockProbesCreatedAt = time.Now()
mockProbedCount = 10
)
Expand Down
11 changes: 10 additions & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/scheduling/evaluator"

Check failure on line 57 in scheduler/scheduler.go

View workflow job for this annotation

GitHub Actions / Lint

could not import d7y.io/dragonfly/v2/scheduler/scheduling/evaluator (-: # d7y.io/dragonfly/v2/scheduler/scheduling/evaluator
"d7y.io/dragonfly/v2/scheduler/storage"
)

Expand Down Expand Up @@ -257,16 +258,24 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
}

// Initialize dial options of scheduling.
schedulingOptions := []scheduling.Option{}
// Initialize network topology service.
if cfg.NetworkTopology.Enable && pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
s.networkTopology, err = networktopology.NewNetworkTopology(cfg.NetworkTopology, rdb, resource, s.storage)
if err != nil {
return nil, err
}

// Initialize dial options of evaluator.
evaluatorOptions := []evaluator.EvaluatorOptions{}
evaluatorOptions = append(evaluatorOptions, evaluator.WithNetworkTopology(s.networkTopology))

schedulingOptions = append(schedulingOptions, scheduling.WithEvaluatorOptions(evaluatorOptions))
}

// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir())
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir(), schedulingOptions...)

// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
Expand Down
6 changes: 3 additions & 3 deletions scheduler/scheduling/evaluator/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ type Evaluator interface {
IsBadNode(peer *resource.Peer) bool
}

func New(algorithm string, pluginDir string) Evaluator {
func New(algorithm string, pluginDir string, options ...EvaluatorOptions) Evaluator {

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: EvaluatorOptions) (typecheck)

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: EvaluatorOptions) (typecheck)

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (disable-seed-peer)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (disable-seed-peer)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_seed-peer

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_seed-peer

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (split-running-tasks)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (split-running-tasks)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (ipv6)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests (ipv6)

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_dfdaemon

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_dfdaemon

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_manager

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / e2e_tests_with_manager

undefined: EvaluatorOptions

Check failure on line 42 in scheduler/scheduling/evaluator/evaluator.go

View workflow job for this annotation

GitHub Actions / Test

undefined: EvaluatorOptions
switch algorithm {
case PluginAlgorithm:
if plugin, err := LoadPlugin(pluginDir); err == nil {
return plugin
}
// TODO Implement MLAlgorithm.
case MLAlgorithm, DefaultAlgorithm:
return NewEvaluatorBase()
return NewEvaluatorBase(options...)
}

return NewEvaluatorBase()
return NewEvaluatorBase(options...)
}
76 changes: 67 additions & 9 deletions scheduler/scheduling/evaluator/evaluator_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"math/big"
"sort"
"strings"
"time"

"github.com/montanaflynn/stats"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/math"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
)

Expand All @@ -47,6 +49,9 @@ const (

// Location affinity weight.
locationAffinityWeight = 0.15

// Network topology weight.
networkTopologyWeight = 0.05
)

const (
Expand All @@ -70,20 +75,51 @@ const (
maxElementLen = 5
)

type evaluatorBase struct{}
type evaluatorBase struct {
networktopology networktopology.NetworkTopology
}

type Option func(eb *evaluatorBase)

// WithNetworkTopology sets the networkTopology.
func WithNetworkTopology(networktopology networktopology.NetworkTopology) Option {
return func(eb *evaluatorBase) {
eb.networktopology = networktopology
}
}

func NewEvaluatorBase(options ...Option) Evaluator {
eb := &evaluatorBase{}

func NewEvaluatorBase() Evaluator {
return &evaluatorBase{}
for _, opt := range options {
opt(eb)
}
return eb
}

// EvaluateParents sort parents by evaluating multiple feature scores.
func (eb *evaluatorBase) EvaluateParents(parents []*resource.Peer, child *resource.Peer, totalPieceCount int32) []*resource.Peer {
sort.Slice(
parents,
func(i, j int) bool {
return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount)
},
)
if eb.networktopology == nil {
sort.Slice(
parents,
func(i, j int) bool {
return evaluate(parents[i], child, totalPieceCount) > evaluate(parents[j], child, totalPieceCount)
},
)
} else {
var parentIDs []string
for _, parent := range parents {
parentIDs = append(parentIDs, parent.ID)
}
scoces := eb.calculateNetworkTopologyScore(child.ID, parentIDs)

sort.Slice(
parents,
func(i, j int) bool {
return (evaluate(parents[i], child, totalPieceCount) + networkTopologyWeight*scoces[i]) > (evaluate(parents[j], child, totalPieceCount) + networkTopologyWeight*scoces[j])
},
)
}

return parents
}
Expand Down Expand Up @@ -208,6 +244,28 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {
return float64(score) / float64(maxElementLen)
}

// calculateNetworkTopologyScore 0.0~1.0 larger and better.
func (eb *evaluatorBase) calculateNetworkTopologyScore(src string, dst []string) []float64 {
AverageRTTs, err := eb.networktopology.AverageRTTs(src, dst)
if err != nil {
return []float64{}
}

var MaxRTT time.Duration
for _, RTT := range AverageRTTs {
if MaxRTT < RTT {
MaxRTT = RTT
}
}

var scoces []float64
for _, RTT := range AverageRTTs {
scoces = append(scoces, float64((MaxRTT-RTT))/float64(MaxRTT))
}

return scoces
}

func (eb *evaluatorBase) IsBadNode(peer *resource.Peer) bool {
if peer.FSM.Is(resource.PeerStateFailed) || peer.FSM.Is(resource.PeerStateLeave) || peer.FSM.Is(resource.PeerStatePending) ||
peer.FSM.Is(resource.PeerStateReceivedTiny) || peer.FSM.Is(resource.PeerStateReceivedSmall) ||
Expand Down
Loading

0 comments on commit 7b92f45

Please sign in to comment.