From f9a09b7c8e236dc1550a9cd290e4d367bc1c2efa Mon Sep 17 00:00:00 2001 From: pk910 Date: Fri, 1 Mar 2024 13:01:57 +0100 Subject: [PATCH] added test cleanup routine (fix #3) --- cmd/root.go | 10 +++-- pkg/coordinator/config.go | 14 +++++++ pkg/coordinator/coordinator.go | 68 +++++++++++++++++++++++++++++++--- 3 files changed, 84 insertions(+), 8 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 1a2d8cd..bb0b5fb 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -26,6 +26,10 @@ var rootCmd = &cobra.Command{ logr.Fatal(err) } + if maxConcurrentTests > 0 { + config.Coordinator.MaxConcurrentTests = maxConcurrentTests + } + if logFormat == "json" { logr.SetFormatter(&logrus.JSONFormatter{}) logr.Info("Log format set to json") @@ -37,7 +41,7 @@ var rootCmd = &cobra.Command{ logr.SetLevel(logrus.DebugLevel) } - coord := coordinator.NewCoordinator(config, logr, metricsPort, maxConcurrentTests) + coord := coordinator.NewCoordinator(config, logr, metricsPort) if err := coord.Run(cmd.Context()); err != nil { logr.Fatal(err) @@ -51,7 +55,7 @@ var ( logFormat string verbose bool metricsPort int - maxConcurrentTests int + maxConcurrentTests uint64 version bool ) @@ -70,7 +74,7 @@ func init() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file") rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "log format (default is text). Valid values are 'text', 'json'") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output") - rootCmd.Flags().IntVar(&maxConcurrentTests, "maxConcurrentTests", 1, "Number of tests to run concurrently") + rootCmd.Flags().Uint64Var(&maxConcurrentTests, "maxConcurrentTests", 0, "Number of tests to run concurrently") rootCmd.Flags().IntVarP(&metricsPort, "metrics-port", "", 9090, "Port to serve Prometheus metrics on") rootCmd.Flags().BoolVarP(&version, "version", "", false, "Print version information") } diff --git a/pkg/coordinator/config.go b/pkg/coordinator/config.go index 9d45823..d8e4724 100644 --- a/pkg/coordinator/config.go +++ b/pkg/coordinator/config.go @@ -4,6 +4,7 @@ import ( "os" "github.com/ethpandaops/assertoor/pkg/coordinator/clients" + "github.com/ethpandaops/assertoor/pkg/coordinator/human-duration" "github.com/ethpandaops/assertoor/pkg/coordinator/names" "github.com/ethpandaops/assertoor/pkg/coordinator/types" web_types "github.com/ethpandaops/assertoor/pkg/coordinator/web/types" @@ -23,6 +24,9 @@ type Config struct { // Global variables GlobalVars map[string]interface{} `yaml:"globalVars" json:"globalVars"` + // Coordinator config + Coordinator *CoordinatorConfig `yaml:"coordinator" json:"coordinator"` + // List of Test configurations. Tests []*types.TestConfig `yaml:"tests" json:"tests"` @@ -30,6 +34,15 @@ type Config struct { ExternalTests []*types.ExternalTestConfig `yaml:"externalTests" json:"externalTests"` } +//nolint:revive // ignore +type CoordinatorConfig struct { + // Maximum number of tests executed concurrently + MaxConcurrentTests uint64 `yaml:"maxConcurrentTests" json:"maxConcurrentTests"` + + // Test history cleanup delay + TestRetentionTime human.Duration `yaml:"testRetentionTime" json:"testRetentionTime"` +} + // DefaultConfig represents a sane-default configuration. func DefaultConfig() *Config { return &Config{ @@ -41,6 +54,7 @@ func DefaultConfig() *Config { }, }, GlobalVars: make(map[string]interface{}), + Coordinator: &CoordinatorConfig{}, Tests: []*types.TestConfig{}, ExternalTests: []*types.ExternalTestConfig{}, } diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index 933b634..3b62f35 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -47,7 +47,6 @@ type Coordinator struct { testHistory []types.Test testRegistryMutex sync.RWMutex testNotificationChan chan bool - maxConcurrentTests int } type testDescriptorEntry struct { @@ -56,7 +55,7 @@ type testDescriptorEntry struct { index uint64 } -func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConcurrentTests int) *Coordinator { +func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort int) *Coordinator { return &Coordinator{ log: logger.NewLogger(&logger.ScopeOptions{ Parent: log, @@ -70,7 +69,6 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConc testQueue: []types.Test{}, testHistory: []types.Test{}, testNotificationChan: make(chan bool, 1), - maxConcurrentTests: maxConcurrentTests, } } @@ -78,7 +76,7 @@ func NewCoordinator(config *Config, log logrus.FieldLogger, metricsPort, maxConc func (c *Coordinator) Run(ctx context.Context) error { defer func() { if err := recover(); err != nil { - logrus.WithError(err.(error)).Errorf("uncaught panic in coordinator.Run: %v, stack: %v", err, string(debug.Stack())) + c.log.GetLogger().WithError(err.(error)).Errorf("uncaught panic in coordinator.Run: %v, stack: %v", err, string(debug.Stack())) } }() @@ -137,6 +135,9 @@ func (c *Coordinator) Run(ctx context.Context) error { // start test scheduler go c.runTestScheduler(ctx) + // start test cleanup routine + go c.runTestCleanup(ctx) + // run tests c.runTestExecutionLoop(ctx) @@ -345,7 +346,12 @@ func (c *Coordinator) createTestRun(descriptor types.TestDescriptor, configOverr } func (c *Coordinator) runTestExecutionLoop(ctx context.Context) { - semaphore := make(chan bool, c.maxConcurrentTests) + concurrencyLimit := c.Config.Coordinator.MaxConcurrentTests + if concurrencyLimit < 1 { + concurrencyLimit = 1 + } + + semaphore := make(chan bool, concurrencyLimit) for { var nextTest types.Test @@ -393,6 +399,12 @@ func (c *Coordinator) runTest(ctx context.Context, testRef types.Test) { } func (c *Coordinator) runTestScheduler(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runTestScheduler: %v, stack: %v", err, string(debug.Stack())) + } + }() + // startup scheduler for _, testDescr := range c.getStartupTests() { _, err := c.ScheduleTest(testDescr, nil, false) @@ -480,3 +492,49 @@ func (c *Coordinator) getCronTests(cronTime time.Time) []types.TestDescriptor { return descriptors } + +func (c *Coordinator) runTestCleanup(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runTestCleanup: %v, stack: %v", err, string(debug.Stack())) + } + }() + + retentionTime := c.Config.Coordinator.TestRetentionTime.Duration + if retentionTime <= 0 { + retentionTime = 14 * 24 * time.Hour + } + + cleanupInterval := 1 * time.Hour + if retentionTime <= 4*time.Hour { + cleanupInterval = 10 * time.Minute + } + + for { + select { + case <-ctx.Done(): + return + case <-time.After(cleanupInterval): + } + + c.cleanupTestHistory(retentionTime) + } +} + +func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) { + c.testRegistryMutex.Lock() + defer c.testRegistryMutex.Unlock() + + cleanedHistory := []types.Test{} + + for _, test := range c.testHistory { + if test.Status() != types.TestStatusPending && test.StartTime().Add(retentionTime).Compare(time.Now()) == -1 { + test.Logger().Infof("cleanup test") + continue + } + + cleanedHistory = append(cleanedHistory, test) + } + + c.testHistory = cleanedHistory +}