Skip to content

Commit

Permalink
added test cleanup routine (fix #3)
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Mar 1, 2024
1 parent 225383f commit f9a09b7
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 8 deletions.
10 changes: 7 additions & 3 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var rootCmd = &cobra.Command{
logr.Fatal(err)
}

if maxConcurrentTests > 0 {
config.Coordinator.MaxConcurrentTests = maxConcurrentTests
}

if logFormat == "json" {
logr.SetFormatter(&logrus.JSONFormatter{})
logr.Info("Log format set to json")
Expand All @@ -37,7 +41,7 @@ var rootCmd = &cobra.Command{
logr.SetLevel(logrus.DebugLevel)
}

coord := coordinator.NewCoordinator(config, logr, metricsPort, maxConcurrentTests)
coord := coordinator.NewCoordinator(config, logr, metricsPort)

if err := coord.Run(cmd.Context()); err != nil {
logr.Fatal(err)
Expand All @@ -51,7 +55,7 @@ var (
logFormat string
verbose bool
metricsPort int
maxConcurrentTests int
maxConcurrentTests uint64
version bool
)

Expand All @@ -70,7 +74,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file")
rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "log format (default is text). Valid values are 'text', 'json'")
rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output")
rootCmd.Flags().IntVar(&maxConcurrentTests, "maxConcurrentTests", 1, "Number of tests to run concurrently")
rootCmd.Flags().Uint64Var(&maxConcurrentTests, "maxConcurrentTests", 0, "Number of tests to run concurrently")
rootCmd.Flags().IntVarP(&metricsPort, "metrics-port", "", 9090, "Port to serve Prometheus metrics on")
rootCmd.Flags().BoolVarP(&version, "version", "", false, "Print version information")
}
14 changes: 14 additions & 0 deletions pkg/coordinator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"

"github.com/ethpandaops/assertoor/pkg/coordinator/clients"
"github.com/ethpandaops/assertoor/pkg/coordinator/human-duration"
"github.com/ethpandaops/assertoor/pkg/coordinator/names"
"github.com/ethpandaops/assertoor/pkg/coordinator/types"
web_types "github.com/ethpandaops/assertoor/pkg/coordinator/web/types"
Expand All @@ -23,13 +24,25 @@ type Config struct {
// Global variables
GlobalVars map[string]interface{} `yaml:"globalVars" json:"globalVars"`

// Coordinator config
Coordinator *CoordinatorConfig `yaml:"coordinator" json:"coordinator"`

// List of Test configurations.
Tests []*types.TestConfig `yaml:"tests" json:"tests"`

// List of yaml files with test configurations
ExternalTests []*types.ExternalTestConfig `yaml:"externalTests" json:"externalTests"`
}

//nolint:revive // ignore
type CoordinatorConfig struct {
// Maximum number of tests executed concurrently
MaxConcurrentTests uint64 `yaml:"maxConcurrentTests" json:"maxConcurrentTests"`

// Test history cleanup delay
TestRetentionTime human.Duration `yaml:"testRetentionTime" json:"testRetentionTime"`
}

// DefaultConfig represents a sane-default configuration.
func DefaultConfig() *Config {
return &Config{
Expand All @@ -41,6 +54,7 @@ func DefaultConfig() *Config {
},
},
GlobalVars: make(map[string]interface{}),
Coordinator: &CoordinatorConfig{},
Tests: []*types.TestConfig{},
ExternalTests: []*types.ExternalTestConfig{},
}
Expand Down
68 changes: 63 additions & 5 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Coordinator struct {
testHistory []types.Test
testRegistryMutex sync.RWMutex
testNotificationChan chan bool
maxConcurrentTests int
}

type testDescriptorEntry struct {
Expand All @@ -56,7 +55,7 @@ type testDescriptorEntry struct {
index uint64
}

func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConcurrentTests int) *Coordinator {
func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Coordinator {
return &Coordinator{
log: logger.NewLogger(&logger.ScopeOptions{
Parent: log,
Expand All @@ -70,15 +69,14 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConc
testQueue: []types.Test{},
testHistory: []types.Test{},
testNotificationChan: make(chan bool, 1),
maxConcurrentTests: maxConcurrentTests,
}
}

// Run executes the coordinator until completion.
func (c *Coordinator) Run(ctx context.Context) error {
defer func() {
if err := recover(); err != nil {
logrus.WithError(err.(error)).Errorf("uncaught panic in coordinator.Run: %v, stack: %v", err, string(debug.Stack()))
c.log.GetLogger().WithError(err.(error)).Errorf("uncaught panic in coordinator.Run: %v, stack: %v", err, string(debug.Stack()))
}
}()

Expand Down Expand Up @@ -137,6 +135,9 @@ func (c *Coordinator) Run(ctx context.Context) error {
// start test scheduler
go c.runTestScheduler(ctx)

// start test cleanup routine
go c.runTestCleanup(ctx)

// run tests
c.runTestExecutionLoop(ctx)

Expand Down Expand Up @@ -345,7 +346,12 @@ func (c *Coordinator) createTestRun(descriptor types.TestDescriptor, configOverr
}

func (c *Coordinator) runTestExecutionLoop(ctx context.Context) {
semaphore := make(chan bool, c.maxConcurrentTests)
concurrencyLimit := c.Config.Coordinator.MaxConcurrentTests
if concurrencyLimit < 1 {
concurrencyLimit = 1
}

semaphore := make(chan bool, concurrencyLimit)

for {
var nextTest types.Test
Expand Down Expand Up @@ -393,6 +399,12 @@ func (c *Coordinator) runTest(ctx context.Context, testRef types.Test) {
}

func (c *Coordinator) runTestScheduler(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runTestScheduler: %v, stack: %v", err, string(debug.Stack()))
}
}()

// startup scheduler
for _, testDescr := range c.getStartupTests() {
_, err := c.ScheduleTest(testDescr, nil, false)
Expand Down Expand Up @@ -480,3 +492,49 @@ func (c *Coordinator) getCronTests(cronTime time.Time) []types.TestDescriptor {

return descriptors
}

func (c *Coordinator) runTestCleanup(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runTestCleanup: %v, stack: %v", err, string(debug.Stack()))
}
}()

retentionTime := c.Config.Coordinator.TestRetentionTime.Duration
if retentionTime <= 0 {
retentionTime = 14 * 24 * time.Hour
}

cleanupInterval := 1 * time.Hour
if retentionTime <= 4*time.Hour {
cleanupInterval = 10 * time.Minute
}

for {
select {
case <-ctx.Done():
return
case <-time.After(cleanupInterval):
}

c.cleanupTestHistory(retentionTime)
}
}

func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) {
c.testRegistryMutex.Lock()
defer c.testRegistryMutex.Unlock()

cleanedHistory := []types.Test{}

for _, test := range c.testHistory {
if test.Status() != types.TestStatusPending && test.StartTime().Add(retentionTime).Compare(time.Now()) == -1 {
test.Logger().Infof("cleanup test")
continue
}

cleanedHistory = append(cleanedHistory, test)
}

c.testHistory = cleanedHistory
}

0 comments on commit f9a09b7

Please sign in to comment.