From 7931d75687a5a55187e461eb7b1e9e6105d84621 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Thu, 9 Nov 2023 09:48:07 +0300 Subject: [PATCH 1/8] Add timeouts scaling, add queue size metrics --- internal/exploit/metrics.go | 12 +++++++ internal/exploit/models.go | 19 +++++----- internal/exploit/runner.go | 1 + internal/exploit/storage.go | 37 ++++++++++++++----- internal/exploit/storage_test.go | 53 ++++++++++++++++++++++++++++ internal/exploit/submit_loop.go | 1 + internal/exploit/submit_loop_test.go | 4 +++ internal/queue/endless.go | 4 +++ internal/queue/queue.go | 1 + internal/queue/simple.go | 4 +++ 10 files changed, 119 insertions(+), 17 deletions(-) diff --git a/internal/exploit/metrics.go b/internal/exploit/metrics.go index 26cc8de..6bf2bf2 100644 --- a/internal/exploit/metrics.go +++ b/internal/exploit/metrics.go @@ -9,12 +9,14 @@ import ( type Metrics struct { FlagsSubmitted *prometheus.CounterVec Teams prometheus.Gauge + Queue *prometheus.GaugeVec } func NewMetrics(namespace string) *Metrics { const subsystem = "exploit_runner" targetLabels := []string{"target_id", "target_ip"} exploitLabels := []string{"exploit_id", "exploit_version", "exploit_type"} + queueLabels := []string{"type"} return &Metrics{ FlagsSubmitted: promauto.NewCounterVec( @@ -33,5 +35,15 @@ func NewMetrics(namespace string) *Metrics { Name: "teams", Help: "Number of teams scheduled for the current runner", }), + + Queue: promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue", + Help: "Number of exploits in the queue", + }, + queueLabels, + ), } } diff --git a/internal/exploit/models.go b/internal/exploit/models.go index 3edf3fd..26737c4 100644 --- a/internal/exploit/models.go +++ b/internal/exploit/models.go @@ -42,15 +42,16 @@ func (r *FullResult) MetricLabels() prometheus.Labels { } type State struct { - ID string - Version int64 - Dir string - Path string - Disabled bool - Endless bool - RunEvery time.Duration - LastRun time.Time - Timeout time.Duration + ID string + Version int64 + Dir string + Path string + Disabled bool + Endless bool + RunEvery time.Duration + LastRun time.Time + Timeout time.Duration + OriginalTimeout time.Duration } func (s *State) ExploitType() models.ExploitType { diff --git a/internal/exploit/runner.go b/internal/exploit/runner.go index 7a2cec1..d2a2203 100644 --- a/internal/exploit/runner.go +++ b/internal/exploit/runner.go @@ -371,6 +371,7 @@ func (r *Runner) onServerStateUpdate(ctx context.Context, state *epb.ServerState } if r.storage.UpdateExploits(ctx, state.Exploits) { + r.storage.ScaleTimeouts(r.maxJobs, len(r.teams)) r.logger.Info("Exploits changed, scheduling loops restart") r.restartLoops() } diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index 8206d9e..e157cab 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -61,6 +61,26 @@ func (s *Storage) UpdateExploits(ctx context.Context, exs []*epb.ExploitState) b return true } +func (s *Storage) ScaleTimeouts(workers, teams int) { + alpha := 0.0 + for _, ex := range s.cache.Exploits() { + if ex.Endless { + continue + } + alpha += ex.OriginalTimeout.Seconds() / ex.RunEvery.Seconds() + } + alpha = alpha * float64(teams) / float64(workers) + logrus.Infof("Scaling timeouts: alpha = %.2f", alpha) + for _, ex := range s.cache.Exploits() { + if ex.Endless { + continue + } + newTimeout := time.Duration(float64(ex.OriginalTimeout) / alpha) + logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.Timeout, newTimeout) + ex.Timeout = newTimeout + } +} + func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State, error) { // Download the current exploit state. resp, err := s.client.Exploit(ctx, exploitID) @@ -115,14 +135,15 @@ func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State, } res := &State{ - ID: state.ExploitId, - Version: state.Version, - Dir: "", - Path: entryPath, - Disabled: state.Config.Disabled, - Endless: state.Config.Endless, - RunEvery: state.Config.RunEvery.AsDuration(), - Timeout: state.Config.Timeout.AsDuration(), + ID: state.ExploitId, + Version: state.Version, + Dir: "", + Path: entryPath, + Disabled: state.Config.Disabled, + Endless: state.Config.Endless, + RunEvery: state.Config.RunEvery.AsDuration(), + Timeout: state.Config.Timeout.AsDuration(), + OriginalTimeout: state.Config.Timeout.AsDuration(), } if state.Config.IsArchive { res.Dir = oPath diff --git a/internal/exploit/storage_test.go b/internal/exploit/storage_test.go index 20aa70c..739e62c 100644 --- a/internal/exploit/storage_test.go +++ b/internal/exploit/storage_test.go @@ -7,6 +7,7 @@ import ( "path" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -168,3 +169,55 @@ func Test_prepareEntry(t *testing.T) { // Check that file is executable. require.NotZero(t, fi.Mode()&0111) } + +func TestStorage_Scale(t *testing.T) { + st, cleanup := mockStorage() + defer func() { + require.NoError(t, cleanup()) + }() + + // This exploit's timeout should be halved, as teams = 2 * workers. + st.cache.Update([]*State{ + { + ID: "1", + Version: 1, + RunEvery: time.Minute, + Timeout: time.Minute, + OriginalTimeout: time.Minute, + }, + }) + st.ScaleTimeouts(10, 20) + res, ok := st.Exploit("1") + require.True(t, ok) + require.EqualValues(t, 1, res.Version) + require.EqualValues(t, time.Minute, res.RunEvery) + require.EqualValues(t, 30*time.Second, res.Timeout) + + // Now it should be doubled, as workers = 2 * teams. + st.ScaleTimeouts(20, 10) + res, ok = st.Exploit("1") + require.True(t, ok) + require.EqualValues(t, time.Minute, res.RunEvery) + require.EqualValues(t, 2*time.Minute, res.Timeout) + + // Add another exploit, expect scale to work proportionally to original timeouts. + st.cache.Update([]*State{ + { + ID: "2", + Version: 1, + RunEvery: time.Minute, + Timeout: time.Minute, + OriginalTimeout: time.Minute, + }, + }) + st.ScaleTimeouts(20, 10) + res, ok = st.Exploit("1") + require.True(t, ok) + require.EqualValues(t, time.Minute, res.RunEvery) + require.EqualValues(t, time.Minute, res.Timeout) + + res, ok = st.Exploit("2") + require.True(t, ok) + require.EqualValues(t, time.Minute, res.RunEvery) + require.EqualValues(t, time.Minute, res.Timeout) +} diff --git a/internal/exploit/submit_loop.go b/internal/exploit/submit_loop.go index 6f0d86a..852a0a2 100644 --- a/internal/exploit/submit_loop.go +++ b/internal/exploit/submit_loop.go @@ -126,6 +126,7 @@ func (l *submitLoop) Start(ctx context.Context) { } case <-t.C: flush() + l.metrics.Queue.WithLabelValues(string(l.q.Type())).Set(float64(l.q.Size())) case <-ctx.Done(): return } diff --git a/internal/exploit/submit_loop_test.go b/internal/exploit/submit_loop_test.go index 1554654..b45b7b6 100644 --- a/internal/exploit/submit_loop_test.go +++ b/internal/exploit/submit_loop_test.go @@ -330,6 +330,10 @@ func (m *mockQueue) Type() queue.Type { return "mock" } +func (m *mockQueue) Size() int { + return len(m.in) +} + func (m *mockQueue) Start(ctx context.Context) { <-ctx.Done() } diff --git a/internal/queue/endless.go b/internal/queue/endless.go index e0bff0e..73aea72 100644 --- a/internal/queue/endless.go +++ b/internal/queue/endless.go @@ -50,6 +50,10 @@ func (q *endlessQueue) Type() Type { return TypeEndless } +func (q *endlessQueue) Size() int { + return len(q.c) +} + // Start is synchronous. // Cancel the start's context to stop the queue. func (q *endlessQueue) Start(ctx context.Context) { diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 59b84c2..7ff30ba 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -31,6 +31,7 @@ type Queue interface { Add(*Job) error Results() <-chan *Output Type() Type + Size() int fmt.Stringer } diff --git a/internal/queue/simple.go b/internal/queue/simple.go index 5c16dc1..28bf2a8 100644 --- a/internal/queue/simple.go +++ b/internal/queue/simple.go @@ -49,6 +49,10 @@ func (q *simpleQueue) Type() Type { return TypeSimple } +func (q *simpleQueue) Size() int { + return len(q.c) +} + // Start is synchronous. // Cancel the start's context to stop the queue. func (q *simpleQueue) Start(ctx context.Context) { From 6d348992532c36f9b8b7d2bfca85d6ecd9d6d972 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Thu, 9 Nov 2023 10:27:14 +0300 Subject: [PATCH 2/8] Round scaled timeout --- internal/exploit/storage.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index e157cab..6f8f55c 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -76,6 +76,10 @@ func (s *Storage) ScaleTimeouts(workers, teams int) { continue } newTimeout := time.Duration(float64(ex.OriginalTimeout) / alpha) + + // Round down to nearest second. + newTimeout -= newTimeout % time.Second + logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.Timeout, newTimeout) ex.Timeout = newTimeout } From e838548425a96413236292ac526518df67626e29 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 13:30:15 +0300 Subject: [PATCH 3/8] Inline metric labels --- internal/exploit/metrics.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/exploit/metrics.go b/internal/exploit/metrics.go index 6bf2bf2..f5179a0 100644 --- a/internal/exploit/metrics.go +++ b/internal/exploit/metrics.go @@ -3,7 +3,6 @@ package exploit import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/samber/lo" ) type Metrics struct { @@ -14,9 +13,6 @@ type Metrics struct { func NewMetrics(namespace string) *Metrics { const subsystem = "exploit_runner" - targetLabels := []string{"target_id", "target_ip"} - exploitLabels := []string{"exploit_id", "exploit_version", "exploit_type"} - queueLabels := []string{"type"} return &Metrics{ FlagsSubmitted: promauto.NewCounterVec( @@ -26,7 +22,13 @@ func NewMetrics(namespace string) *Metrics { Name: "flags_submitted_total", Help: "Number of exploits finished", }, - lo.Union(targetLabels, exploitLabels), + []string{ + "target_id", + "target_ip", + "exploit_id", + "exploit_version", + "exploit_type", + }, ), Teams: promauto.NewGauge(prometheus.GaugeOpts{ @@ -43,7 +45,7 @@ func NewMetrics(namespace string) *Metrics { Name: "queue", Help: "Number of exploits in the queue", }, - queueLabels, + []string{"type"}, ), } } From a6f681655e8004542766e8ae53f85ceb23933a6c Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 13:35:52 +0300 Subject: [PATCH 4/8] Add timeout scaling option, rename exploit timeout --- cmd/client/cli/run.go | 5 +++++ cmd/client/cmd/run.go | 1 + internal/exploit/models.go | 20 ++++++++++---------- internal/exploit/runner.go | 32 ++++++++++++++++++++------------ internal/exploit/storage.go | 26 +++++++++++++------------- internal/exploit/storage_test.go | 28 ++++++++++++++-------------- 6 files changed, 63 insertions(+), 49 deletions(-) diff --git a/cmd/client/cli/run.go b/cmd/client/cli/run.go index a96788b..99aa750 100644 --- a/cmd/client/cli/run.go +++ b/cmd/client/cli/run.go @@ -42,6 +42,10 @@ func NewRun(cmd *cobra.Command, _ []string, cfg *client.Config) NeoCLI { jobs := parseJobsFlag(cmd, "jobs") endlessJobs := parseJobsFlag(cmd, "endless-jobs") + disableTimeoutScaling, err := cmd.Flags().GetBool("disable-timeout-scaling") + if err != nil { + logrus.Fatalf("Could not get disable-timeout-scaling flag: %v", err) + } neocli.Weight = jobs cli.sender = joblogger.NewRemoteSender(neocli) @@ -49,6 +53,7 @@ func NewRun(cmd *cobra.Command, _ []string, cfg *client.Config) NeoCLI { cli.ClientID(), jobs, endlessJobs, + disableTimeoutScaling, cfg, neocli, cli.sender, diff --git a/cmd/client/cmd/run.go b/cmd/client/cmd/run.go index 5452142..d674e27 100644 --- a/cmd/client/cmd/run.go +++ b/cmd/client/cmd/run.go @@ -29,4 +29,5 @@ func init() { rootCmd.AddCommand(runCmd) runCmd.Flags().IntP("jobs", "j", runtime.NumCPU()*cli.JobsPerCPU, "workers to run") runCmd.Flags().IntP("endless-jobs", "e", 0, "workers to run for endless mode. Default is 0 for no endless mode") + runCmd.Flags().Bool("disable-timeout-scaling", false, "disable scaling recurrent exploit timeouts to fit the workers count") } diff --git a/internal/exploit/models.go b/internal/exploit/models.go index 26737c4..d74df74 100644 --- a/internal/exploit/models.go +++ b/internal/exploit/models.go @@ -42,16 +42,16 @@ func (r *FullResult) MetricLabels() prometheus.Labels { } type State struct { - ID string - Version int64 - Dir string - Path string - Disabled bool - Endless bool - RunEvery time.Duration - LastRun time.Time - Timeout time.Duration - OriginalTimeout time.Duration + ID string + Version int64 + Dir string + Path string + Disabled bool + Endless bool + RunEvery time.Duration + LastRun time.Time + ScaledTimeout time.Duration + Timeout time.Duration } func (s *State) ExploitType() models.ExploitType { diff --git a/internal/exploit/runner.go b/internal/exploit/runner.go index d2a2203..318a790 100644 --- a/internal/exploit/runner.go +++ b/internal/exploit/runner.go @@ -31,19 +31,23 @@ var ( func NewRunner( clientID string, maxJobs, maxEndlessJobs int, + disableTimeoutScaling bool, clientConfig *client.Config, c *client.Client, logSender joblogger.Sender, ) *Runner { return &Runner{ - storage: NewStorage(NewCache(), clientConfig.ExploitDir, c), - cfg: &config.ExploitsConfig{}, - client: c, - maxJobs: maxJobs, - maxEndlessJobs: maxEndlessJobs, - singleRuns: make(chan *epb.SingleRunSubscribeResponse), - restarts: make(chan struct{}, 1), - logSender: logSender, + storage: NewStorage(NewCache(), clientConfig.ExploitDir, c), + cfg: &config.ExploitsConfig{}, + client: c, + + maxJobs: maxJobs, + maxEndlessJobs: maxEndlessJobs, + disableTimeoutScaling: disableTimeoutScaling, + + singleRuns: make(chan *epb.SingleRunSubscribeResponse), + restarts: make(chan struct{}, 1), + logSender: logSender, metricsPusher: push. New(clientConfig.MetricsHost, "neo_runner"). Grouping("client_id", clientID). @@ -63,8 +67,9 @@ type Runner struct { metricsPusher *push.Pusher metrics *Metrics - maxJobs int - maxEndlessJobs int + maxJobs int + maxEndlessJobs int + disableTimeoutScaling bool simpleLoop *submitLoop endlessLoop *submitLoop @@ -371,7 +376,10 @@ func (r *Runner) onServerStateUpdate(ctx context.Context, state *epb.ServerState } if r.storage.UpdateExploits(ctx, state.Exploits) { - r.storage.ScaleTimeouts(r.maxJobs, len(r.teams)) + if !r.disableTimeoutScaling { + r.storage.ScaleTimeouts(r.maxJobs, len(r.teams)) + } + r.logger.Info("Exploits changed, scheduling loops restart") r.restartLoops() } @@ -401,7 +409,7 @@ func CreateExploitJobs( ex.Path, ex.Dir, environ, - ex.Timeout, + ex.ScaledTimeout, joblogger.New(ex.ID, ex.Version, ip, sender), )) } diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index 6f8f55c..aa029f9 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -67,7 +67,7 @@ func (s *Storage) ScaleTimeouts(workers, teams int) { if ex.Endless { continue } - alpha += ex.OriginalTimeout.Seconds() / ex.RunEvery.Seconds() + alpha += ex.Timeout.Seconds() / ex.RunEvery.Seconds() } alpha = alpha * float64(teams) / float64(workers) logrus.Infof("Scaling timeouts: alpha = %.2f", alpha) @@ -75,13 +75,13 @@ func (s *Storage) ScaleTimeouts(workers, teams int) { if ex.Endless { continue } - newTimeout := time.Duration(float64(ex.OriginalTimeout) / alpha) + newTimeout := time.Duration(float64(ex.Timeout) / alpha) // Round down to nearest second. newTimeout -= newTimeout % time.Second - logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.Timeout, newTimeout) - ex.Timeout = newTimeout + logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.ScaledTimeout, newTimeout) + ex.ScaledTimeout = newTimeout } } @@ -139,15 +139,15 @@ func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State, } res := &State{ - ID: state.ExploitId, - Version: state.Version, - Dir: "", - Path: entryPath, - Disabled: state.Config.Disabled, - Endless: state.Config.Endless, - RunEvery: state.Config.RunEvery.AsDuration(), - Timeout: state.Config.Timeout.AsDuration(), - OriginalTimeout: state.Config.Timeout.AsDuration(), + ID: state.ExploitId, + Version: state.Version, + Dir: "", + Path: entryPath, + Disabled: state.Config.Disabled, + Endless: state.Config.Endless, + RunEvery: state.Config.RunEvery.AsDuration(), + ScaledTimeout: state.Config.Timeout.AsDuration(), + Timeout: state.Config.Timeout.AsDuration(), } if state.Config.IsArchive { res.Dir = oPath diff --git a/internal/exploit/storage_test.go b/internal/exploit/storage_test.go index 739e62c..fc5ef44 100644 --- a/internal/exploit/storage_test.go +++ b/internal/exploit/storage_test.go @@ -179,11 +179,11 @@ func TestStorage_Scale(t *testing.T) { // This exploit's timeout should be halved, as teams = 2 * workers. st.cache.Update([]*State{ { - ID: "1", - Version: 1, - RunEvery: time.Minute, - Timeout: time.Minute, - OriginalTimeout: time.Minute, + ID: "1", + Version: 1, + RunEvery: time.Minute, + ScaledTimeout: time.Minute, + Timeout: time.Minute, }, }) st.ScaleTimeouts(10, 20) @@ -191,33 +191,33 @@ func TestStorage_Scale(t *testing.T) { require.True(t, ok) require.EqualValues(t, 1, res.Version) require.EqualValues(t, time.Minute, res.RunEvery) - require.EqualValues(t, 30*time.Second, res.Timeout) + require.EqualValues(t, 30*time.Second, res.ScaledTimeout) // Now it should be doubled, as workers = 2 * teams. st.ScaleTimeouts(20, 10) res, ok = st.Exploit("1") require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) - require.EqualValues(t, 2*time.Minute, res.Timeout) + require.EqualValues(t, 2*time.Minute, res.ScaledTimeout) // Add another exploit, expect scale to work proportionally to original timeouts. st.cache.Update([]*State{ { - ID: "2", - Version: 1, - RunEvery: time.Minute, - Timeout: time.Minute, - OriginalTimeout: time.Minute, + ID: "2", + Version: 1, + RunEvery: time.Minute, + ScaledTimeout: time.Minute, + Timeout: time.Minute, }, }) st.ScaleTimeouts(20, 10) res, ok = st.Exploit("1") require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) - require.EqualValues(t, time.Minute, res.Timeout) + require.EqualValues(t, time.Minute, res.ScaledTimeout) res, ok = st.Exploit("2") require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) - require.EqualValues(t, time.Minute, res.Timeout) + require.EqualValues(t, time.Minute, res.ScaledTimeout) } From d5d081505580e67bcf90f799aa82cc5a8e2e7bc9 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 13:42:47 +0300 Subject: [PATCH 5/8] Add scaling algorithm description --- internal/exploit/storage.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index aa029f9..bd477e3 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -62,7 +62,21 @@ func (s *Storage) UpdateExploits(ctx context.Context, exs []*epb.ExploitState) b } func (s *Storage) ScaleTimeouts(workers, teams int) { + // Alpha is a worker usage coefficient. + // For example, an exploit with timeout 10s and run every 20s + // Uses half of the worker's time for each team, so if teams = 4, + // exploit will use 2 full workers. + // Alpha in the case above will be 10/20 = 0.5 after the loop, + // if workers = 2 its final value will be 0.5 * 4 / 2 = 1, + // which means full worker utilization. + // If it's smaller, we could increase the timeouts, if larger -- + // decrease them proportionally to their original values. + // NB 1: endless exploits are not scaled. + // NB 2: timeouts are rounded down to nearest second. + // NB 3: not all exploits will use up all worker's time, + // so it's more of an upper bound estimation. alpha := 0.0 + for _, ex := range s.cache.Exploits() { if ex.Endless { continue From 56c73bb4a5ad0db7f05a30894361bb91548954f8 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 13:48:46 +0300 Subject: [PATCH 6/8] Add timeout scale target flag --- cmd/client/cli/run.go | 11 +++++++---- cmd/client/cmd/run.go | 7 ++++++- internal/exploit/runner.go | 18 +++++++++--------- internal/exploit/storage.go | 9 +++++---- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/cmd/client/cli/run.go b/cmd/client/cli/run.go index 99aa750..cf25b38 100644 --- a/cmd/client/cli/run.go +++ b/cmd/client/cli/run.go @@ -26,7 +26,7 @@ func parseJobsFlag(cmd *cobra.Command, name string) int { logrus.Fatalf("Could not get jobs number: %v", err) } if jobs < 0 { - logrus.Fatal("run: job count should be non-negavtive") + logrus.Fatal("run: job count should be non-negative") } return jobs } @@ -42,9 +42,12 @@ func NewRun(cmd *cobra.Command, _ []string, cfg *client.Config) NeoCLI { jobs := parseJobsFlag(cmd, "jobs") endlessJobs := parseJobsFlag(cmd, "endless-jobs") - disableTimeoutScaling, err := cmd.Flags().GetBool("disable-timeout-scaling") + timeoutScaleTarget, err := cmd.Flags().GetFloat64("timeout-autoscale-target") if err != nil { - logrus.Fatalf("Could not get disable-timeout-scaling flag: %v", err) + logrus.Fatalf("Could not get timeout-autoscale-target flag: %v", err) + } + if timeoutScaleTarget < 0 { + logrus.Fatalf("timeout-autoscale-target should be non-negative") } neocli.Weight = jobs @@ -53,7 +56,7 @@ func NewRun(cmd *cobra.Command, _ []string, cfg *client.Config) NeoCLI { cli.ClientID(), jobs, endlessJobs, - disableTimeoutScaling, + timeoutScaleTarget, cfg, neocli, cli.sender, diff --git a/cmd/client/cmd/run.go b/cmd/client/cmd/run.go index d674e27..204788a 100644 --- a/cmd/client/cmd/run.go +++ b/cmd/client/cmd/run.go @@ -29,5 +29,10 @@ func init() { rootCmd.AddCommand(runCmd) runCmd.Flags().IntP("jobs", "j", runtime.NumCPU()*cli.JobsPerCPU, "workers to run") runCmd.Flags().IntP("endless-jobs", "e", 0, "workers to run for endless mode. Default is 0 for no endless mode") - runCmd.Flags().Bool("disable-timeout-scaling", false, "disable scaling recurrent exploit timeouts to fit the workers count") + runCmd.Flags().Float64( + "timeout-autoscale-target", + 1.0, + "target upper bound for recurrent exploit worker utilization by scaling timeouts."+ + " Setting this to 0 disables scaling", + ) } diff --git a/internal/exploit/runner.go b/internal/exploit/runner.go index 318a790..0493931 100644 --- a/internal/exploit/runner.go +++ b/internal/exploit/runner.go @@ -31,7 +31,7 @@ var ( func NewRunner( clientID string, maxJobs, maxEndlessJobs int, - disableTimeoutScaling bool, + timeoutScaleTarget float64, clientConfig *client.Config, c *client.Client, logSender joblogger.Sender, @@ -41,9 +41,9 @@ func NewRunner( cfg: &config.ExploitsConfig{}, client: c, - maxJobs: maxJobs, - maxEndlessJobs: maxEndlessJobs, - disableTimeoutScaling: disableTimeoutScaling, + maxJobs: maxJobs, + maxEndlessJobs: maxEndlessJobs, + timeoutScaleTarget: timeoutScaleTarget, singleRuns: make(chan *epb.SingleRunSubscribeResponse), restarts: make(chan struct{}, 1), @@ -67,9 +67,9 @@ type Runner struct { metricsPusher *push.Pusher metrics *Metrics - maxJobs int - maxEndlessJobs int - disableTimeoutScaling bool + maxJobs int + maxEndlessJobs int + timeoutScaleTarget float64 simpleLoop *submitLoop endlessLoop *submitLoop @@ -376,8 +376,8 @@ func (r *Runner) onServerStateUpdate(ctx context.Context, state *epb.ServerState } if r.storage.UpdateExploits(ctx, state.Exploits) { - if !r.disableTimeoutScaling { - r.storage.ScaleTimeouts(r.maxJobs, len(r.teams)) + if r.timeoutScaleTarget > 0 { + r.storage.ScaleTimeouts(r.maxJobs, len(r.teams), r.timeoutScaleTarget) } r.logger.Info("Exploits changed, scheduling loops restart") diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index bd477e3..c76112a 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -61,7 +61,7 @@ func (s *Storage) UpdateExploits(ctx context.Context, exs []*epb.ExploitState) b return true } -func (s *Storage) ScaleTimeouts(workers, teams int) { +func (s *Storage) ScaleTimeouts(workers, teams int, target float64) { // Alpha is a worker usage coefficient. // For example, an exploit with timeout 10s and run every 20s // Uses half of the worker's time for each team, so if teams = 4, @@ -74,7 +74,8 @@ func (s *Storage) ScaleTimeouts(workers, teams int) { // NB 1: endless exploits are not scaled. // NB 2: timeouts are rounded down to nearest second. // NB 3: not all exploits will use up all worker's time, - // so it's more of an upper bound estimation. + // so it's more of an upper bound estimation, so target can be provided + // for it to be more relaxed. alpha := 0.0 for _, ex := range s.cache.Exploits() { @@ -84,12 +85,12 @@ func (s *Storage) ScaleTimeouts(workers, teams int) { alpha += ex.Timeout.Seconds() / ex.RunEvery.Seconds() } alpha = alpha * float64(teams) / float64(workers) - logrus.Infof("Scaling timeouts: alpha = %.2f", alpha) + logrus.Infof("Scaling timeouts: alpha = %.2f, target = %.2f", alpha, target) for _, ex := range s.cache.Exploits() { if ex.Endless { continue } - newTimeout := time.Duration(float64(ex.Timeout) / alpha) + newTimeout := time.Duration(float64(ex.Timeout) * target / alpha) // Round down to nearest second. newTimeout -= newTimeout % time.Second From 2bebb2538deb938e50906f8da45e7f2722e1170b Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 13:56:41 +0300 Subject: [PATCH 7/8] More reasonable default for autoscale target, tests --- cmd/client/cmd/run.go | 2 +- internal/exploit/storage_test.go | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/cmd/client/cmd/run.go b/cmd/client/cmd/run.go index 204788a..c7980ef 100644 --- a/cmd/client/cmd/run.go +++ b/cmd/client/cmd/run.go @@ -31,7 +31,7 @@ func init() { runCmd.Flags().IntP("endless-jobs", "e", 0, "workers to run for endless mode. Default is 0 for no endless mode") runCmd.Flags().Float64( "timeout-autoscale-target", - 1.0, + 1.5, "target upper bound for recurrent exploit worker utilization by scaling timeouts."+ " Setting this to 0 disables scaling", ) diff --git a/internal/exploit/storage_test.go b/internal/exploit/storage_test.go index fc5ef44..5fb8c86 100644 --- a/internal/exploit/storage_test.go +++ b/internal/exploit/storage_test.go @@ -186,7 +186,8 @@ func TestStorage_Scale(t *testing.T) { Timeout: time.Minute, }, }) - st.ScaleTimeouts(10, 20) + st.ScaleTimeouts(10, 20, 1) + res, ok := st.Exploit("1") require.True(t, ok) require.EqualValues(t, 1, res.Version) @@ -194,7 +195,8 @@ func TestStorage_Scale(t *testing.T) { require.EqualValues(t, 30*time.Second, res.ScaledTimeout) // Now it should be doubled, as workers = 2 * teams. - st.ScaleTimeouts(20, 10) + st.ScaleTimeouts(20, 10, 1) + res, ok = st.Exploit("1") require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) @@ -210,7 +212,8 @@ func TestStorage_Scale(t *testing.T) { Timeout: time.Minute, }, }) - st.ScaleTimeouts(20, 10) + st.ScaleTimeouts(20, 10, 1) + res, ok = st.Exploit("1") require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) @@ -220,4 +223,15 @@ func TestStorage_Scale(t *testing.T) { require.True(t, ok) require.EqualValues(t, time.Minute, res.RunEvery) require.EqualValues(t, time.Minute, res.ScaledTimeout) + + // Scale with target = 2, expect exploit timeouts to scale up. + st.ScaleTimeouts(20, 10, 2) + + res, ok = st.Exploit("1") + require.True(t, ok) + require.EqualValues(t, 2*time.Minute, res.ScaledTimeout) + + res, ok = st.Exploit("2") + require.True(t, ok) + require.EqualValues(t, 2*time.Minute, res.ScaledTimeout) } From dc1b77fc151ec63d80a282fdd247e5d6d8508e96 Mon Sep 17 00:00:00 2001 From: Roman Nikitin Date: Sun, 12 Nov 2023 19:11:14 +0300 Subject: [PATCH 8/8] Add target parameter description --- internal/exploit/storage.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/exploit/storage.go b/internal/exploit/storage.go index c76112a..b2397cd 100644 --- a/internal/exploit/storage.go +++ b/internal/exploit/storage.go @@ -71,11 +71,12 @@ func (s *Storage) ScaleTimeouts(workers, teams int, target float64) { // which means full worker utilization. // If it's smaller, we could increase the timeouts, if larger -- // decrease them proportionally to their original values. + // Target allows to specify the desired Alpha value, + // as in most cases exploits finish before timeout, + // and "safe" case with target = 1 leads to + // suboptimal worker utilization. // NB 1: endless exploits are not scaled. // NB 2: timeouts are rounded down to nearest second. - // NB 3: not all exploits will use up all worker's time, - // so it's more of an upper bound estimation, so target can be provided - // for it to be more relaxed. alpha := 0.0 for _, ex := range s.cache.Exploits() {