Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeouts scaling, add queue size metrics #21

Merged
merged 8 commits into from
Nov 12, 2023
12 changes: 12 additions & 0 deletions internal/exploit/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
type Metrics struct {
FlagsSubmitted *prometheus.CounterVec
Teams prometheus.Gauge
Queue *prometheus.GaugeVec
}

func NewMetrics(namespace string) *Metrics {
const subsystem = "exploit_runner"
targetLabels := []string{"target_id", "target_ip"}
exploitLabels := []string{"exploit_id", "exploit_version", "exploit_type"}
queueLabels := []string{"type"}
pomo-mondreganto marked this conversation as resolved.
Show resolved Hide resolved

return &Metrics{
FlagsSubmitted: promauto.NewCounterVec(
Expand All @@ -33,5 +35,15 @@ func NewMetrics(namespace string) *Metrics {
Name: "teams",
Help: "Number of teams scheduled for the current runner",
}),

Queue: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue",
Help: "Number of exploits in the queue",
},
queueLabels,
),
}
}
19 changes: 10 additions & 9 deletions internal/exploit/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ func (r *FullResult) MetricLabels() prometheus.Labels {
}

type State struct {
ID string
Version int64
Dir string
Path string
Disabled bool
Endless bool
RunEvery time.Duration
LastRun time.Time
Timeout time.Duration
ID string
Version int64
Dir string
Path string
Disabled bool
Endless bool
RunEvery time.Duration
LastRun time.Time
Timeout time.Duration
OriginalTimeout time.Duration
pomo-mondreganto marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *State) ExploitType() models.ExploitType {
Expand Down
1 change: 1 addition & 0 deletions internal/exploit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (r *Runner) onServerStateUpdate(ctx context.Context, state *epb.ServerState
}

if r.storage.UpdateExploits(ctx, state.Exploits) {
r.storage.ScaleTimeouts(r.maxJobs, len(r.teams))
pomo-mondreganto marked this conversation as resolved.
Show resolved Hide resolved
r.logger.Info("Exploits changed, scheduling loops restart")
r.restartLoops()
}
Expand Down
41 changes: 33 additions & 8 deletions internal/exploit/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ func (s *Storage) UpdateExploits(ctx context.Context, exs []*epb.ExploitState) b
return true
}

func (s *Storage) ScaleTimeouts(workers, teams int) {
alpha := 0.0
for _, ex := range s.cache.Exploits() {
if ex.Endless {
continue
}
alpha += ex.OriginalTimeout.Seconds() / ex.RunEvery.Seconds()
pomo-mondreganto marked this conversation as resolved.
Show resolved Hide resolved
}
alpha = alpha * float64(teams) / float64(workers)
logrus.Infof("Scaling timeouts: alpha = %.2f", alpha)
for _, ex := range s.cache.Exploits() {
if ex.Endless {
continue
}
newTimeout := time.Duration(float64(ex.OriginalTimeout) / alpha)

// Round down to nearest second.
newTimeout -= newTimeout % time.Second

logrus.Infof("Scaling timeout for exploit %s: %s -> %s", ex.ID, ex.Timeout, newTimeout)
ex.Timeout = newTimeout
}
}

func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State, error) {
// Download the current exploit state.
resp, err := s.client.Exploit(ctx, exploitID)
Expand Down Expand Up @@ -115,14 +139,15 @@ func (s *Storage) updateExploit(ctx context.Context, exploitID string) (*State,
}

res := &State{
ID: state.ExploitId,
Version: state.Version,
Dir: "",
Path: entryPath,
Disabled: state.Config.Disabled,
Endless: state.Config.Endless,
RunEvery: state.Config.RunEvery.AsDuration(),
Timeout: state.Config.Timeout.AsDuration(),
ID: state.ExploitId,
Version: state.Version,
Dir: "",
Path: entryPath,
Disabled: state.Config.Disabled,
Endless: state.Config.Endless,
RunEvery: state.Config.RunEvery.AsDuration(),
Timeout: state.Config.Timeout.AsDuration(),
OriginalTimeout: state.Config.Timeout.AsDuration(),
}
if state.Config.IsArchive {
res.Dir = oPath
Expand Down
53 changes: 53 additions & 0 deletions internal/exploit/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -168,3 +169,55 @@ func Test_prepareEntry(t *testing.T) {
// Check that file is executable.
require.NotZero(t, fi.Mode()&0111)
}

func TestStorage_Scale(t *testing.T) {
st, cleanup := mockStorage()
defer func() {
require.NoError(t, cleanup())
}()

// This exploit's timeout should be halved, as teams = 2 * workers.
st.cache.Update([]*State{
{
ID: "1",
Version: 1,
RunEvery: time.Minute,
Timeout: time.Minute,
OriginalTimeout: time.Minute,
},
})
st.ScaleTimeouts(10, 20)
res, ok := st.Exploit("1")
require.True(t, ok)
require.EqualValues(t, 1, res.Version)
require.EqualValues(t, time.Minute, res.RunEvery)
require.EqualValues(t, 30*time.Second, res.Timeout)

// Now it should be doubled, as workers = 2 * teams.
st.ScaleTimeouts(20, 10)
res, ok = st.Exploit("1")
require.True(t, ok)
require.EqualValues(t, time.Minute, res.RunEvery)
require.EqualValues(t, 2*time.Minute, res.Timeout)

// Add another exploit, expect scale to work proportionally to original timeouts.
st.cache.Update([]*State{
{
ID: "2",
Version: 1,
RunEvery: time.Minute,
Timeout: time.Minute,
OriginalTimeout: time.Minute,
},
})
st.ScaleTimeouts(20, 10)
res, ok = st.Exploit("1")
require.True(t, ok)
require.EqualValues(t, time.Minute, res.RunEvery)
require.EqualValues(t, time.Minute, res.Timeout)

res, ok = st.Exploit("2")
require.True(t, ok)
require.EqualValues(t, time.Minute, res.RunEvery)
require.EqualValues(t, time.Minute, res.Timeout)
}
1 change: 1 addition & 0 deletions internal/exploit/submit_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (l *submitLoop) Start(ctx context.Context) {
}
case <-t.C:
flush()
l.metrics.Queue.WithLabelValues(string(l.q.Type())).Set(float64(l.q.Size()))
case <-ctx.Done():
return
}
Expand Down
4 changes: 4 additions & 0 deletions internal/exploit/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ func (m *mockQueue) Type() queue.Type {
return "mock"
}

func (m *mockQueue) Size() int {
return len(m.in)
}

func (m *mockQueue) Start(ctx context.Context) {
<-ctx.Done()
}
Expand Down
4 changes: 4 additions & 0 deletions internal/queue/endless.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (q *endlessQueue) Type() Type {
return TypeEndless
}

func (q *endlessQueue) Size() int {
return len(q.c)
}

// Start is synchronous.
// Cancel the start's context to stop the queue.
func (q *endlessQueue) Start(ctx context.Context) {
Expand Down
1 change: 1 addition & 0 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Queue interface {
Add(*Job) error
Results() <-chan *Output
Type() Type
Size() int

fmt.Stringer
}
4 changes: 4 additions & 0 deletions internal/queue/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (q *simpleQueue) Type() Type {
return TypeSimple
}

func (q *simpleQueue) Size() int {
return len(q.c)
}

// Start is synchronous.
// Cancel the start's context to stop the queue.
func (q *simpleQueue) Start(ctx context.Context) {
Expand Down
Loading