From 6b1efe7ac24008d5e900354d31b9abbb5c1bc3e6 Mon Sep 17 00:00:00 2001 From: SoapHia <37109549+SoapHia@users.noreply.github.com> Date: Fri, 19 Feb 2021 10:02:48 +1100 Subject: [PATCH] Refactored metric config to conditionally update metric config (#103) * refactored metric config to update only when file changes have been detected * pr comments: single context for program, rename isMetricCfgUpdated * pr comments: check file changes with modtime only --- app/main.go | 27 ++++++++++++++++++--------- go.mod | 1 + tasks/tasks.go | 30 +++++++++++++++++++++++++++--- tsbridge/metric_config.go | 12 +++++++++++- web/handlers.go | 13 +++++++------ web/handlers_test.go | 12 ++++++++++-- web/testdata/valid.yaml | 16 ++++++++++++++++ 7 files changed, 90 insertions(+), 21 deletions(-) create mode 100644 web/testdata/valid.yaml diff --git a/app/main.go b/app/main.go index 9c31572c..835ba9dc 100644 --- a/app/main.go +++ b/app/main.go @@ -148,19 +148,25 @@ func main() { SyncPeriod: *syncPeriod, }) - metrics, err := CreateMetrics(context.Background(), config) + ctx, cancel := context.WithCancel(context.Background()) + metrics, err := CreateMetrics(ctx, config) if err != nil { log.Fatalf("failed initializing adaptor/collector dependencies: %v", err) } defer cleanup(metrics) - store, err := tasks.LoadStorageEngine(context.Background(), config) + store, err := tasks.LoadStorageEngine(ctx, config) if err != nil { log.Fatalf("failed to load storage engine: %v", err) } defer store.Close() - h := web.NewHandler(config, metrics, store) + metricCfg, err := tsbridge.NewMetricConfig(ctx, config, store) + if err != nil { + log.Fatalf("failed to perform initial load of metric config: %v", err) + } + + h := web.NewHandler(config, metrics, metricCfg, store) http.HandleFunc("/", h.Index) http.HandleFunc("/sync", h.Sync) http.HandleFunc("/cleanup", h.Cleanup) @@ -168,7 +174,7 @@ func main() { // Run a cleanup on startup log.Debugf("Performing startup cleanup...") - if err := tasks.Cleanup(context.Background(), config, store); err != nil { + if err := tasks.Cleanup(ctx, config, store); err != nil { log.Fatalf("error running the Cleanup() routine: %v", err) } @@ -176,8 +182,7 @@ func main() { // TODO(temikus): refactor this to run exactly every SyncPeriod and skip sync if one is already active if !env.IsAppEngine() { log.Debug("Running outside of appengine, starting up a sync loop...") - ctx, cancel := context.WithCancel(context.Background()) - go syncLoop(ctx, cancel, config, metrics, store) + go syncLoop(ctx, cancel, config, metrics, metricCfg, store) } // Build a connection string, e.g. ":8080" @@ -189,16 +194,20 @@ func main() { } -func syncLoop(ctx context.Context, cancel context.CancelFunc, config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) { +func syncLoop(ctx context.Context, cancel context.CancelFunc, config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig, store storage.Manager) { defer cancel() + for { select { case <-time.After(config.Options.SyncPeriod): log.Debugf("Goroutines: %v", runtime.NumGoroutine()) ctx, cancel := context.WithTimeout(ctx, config.Options.UpdateTimeout) log.WithContext(ctx).Debugf("Running sync...") - if err := tasks.Sync(ctx, config, metrics, store); err != nil { - log.WithContext(ctx).Errorf("error running sync() routine: %v", err) + if err := tasks.SyncMetricConfig(ctx, config, store, metricCfg); err != nil { + log.WithContext(ctx).Errorf("error running SyncMetricConfig() within sync() routine: %v", err) + } + if err := tasks.SyncMetrics(ctx, config, metrics, metricCfg); err != nil { + log.WithContext(ctx).Errorf("error running SyncMetrics() routine: %v", err) } cancel() case <-ctx.Done(): diff --git a/go.mod b/go.mod index a7cc392c..39846261 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/mock v1.4.4 github.com/golang/protobuf v1.4.3 + github.com/google/martian v2.1.0+incompatible github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/influxdata/influxql v1.1.0 github.com/pkg/profile v1.5.0 diff --git a/tasks/tasks.go b/tasks/tasks.go index 586e91b4..9cb104fb 100644 --- a/tasks/tasks.go +++ b/tasks/tasks.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "os" "strings" "github.com/google/ts-bridge/boltdb" @@ -13,6 +14,7 @@ import ( "github.com/google/ts-bridge/stackdriver" "github.com/google/ts-bridge/storage" "github.com/google/ts-bridge/tsbridge" + log "github.com/sirupsen/logrus" ) var ( @@ -38,12 +40,34 @@ func LoadStorageEngine(ctx context.Context, config *tsbridge.Config) (storage.Ma } } -// Sync updates all configured metrics. -func Sync(ctx context.Context, config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) error { - metricCfg, err := tsbridge.NewMetricConfig(ctx, config, store) +// isMetricCfgUpdated checks if the metric config file has been updated. +func isMetricCfgUpdated(ctx context.Context, filename string, metricCfgFs *os.FileInfo) (bool, error) { + fs, err := os.Stat(filename) + if err != nil { + return false, err + } + return (*metricCfgFs).ModTime() != fs.ModTime(), nil +} + +// SyncMetricConfig ensures that metric config is always synced with the metric config file. This should only be called when !env.IsAppEngine(). +func SyncMetricConfig(ctx context.Context, config *tsbridge.Config, store storage.Manager, metricCfg *tsbridge.MetricConfig) error { + update, err := isMetricCfgUpdated(ctx, config.Options.Filename, metricCfg.FileInfo) if err != nil { return err } + if update { + updatedMetricCfg, err := tsbridge.NewMetricConfig(ctx, config, store) + log.Debug("Metric Config file changes reloaded.") + if err != nil { + return err + } + *metricCfg = *updatedMetricCfg + } + return nil +} + +// SyncMetrics updates all configured metrics. +func SyncMetrics(ctx context.Context, config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig) error { if errs := metrics.UpdateAll(ctx, metricCfg, config.Options.UpdateParallelism); errs != nil { msg := strings.Join(errs, "; ") return errors.New(msg) diff --git a/tsbridge/metric_config.go b/tsbridge/metric_config.go index 440948e8..96ac5e79 100644 --- a/tsbridge/metric_config.go +++ b/tsbridge/metric_config.go @@ -19,6 +19,9 @@ package tsbridge import ( "context" "fmt" + "io/ioutil" + "os" + "github.com/google/ts-bridge/datadog" "github.com/google/ts-bridge/env" "github.com/google/ts-bridge/influxdb" @@ -26,7 +29,6 @@ import ( log "github.com/sirupsen/logrus" validator "gopkg.in/validator.v2" yaml "gopkg.in/yaml.v2" - "io/ioutil" ) // MetricConfig is what the YAML configuration file gets deserialized to. @@ -38,6 +40,8 @@ type MetricConfig struct { // internal list of metrics that gets populated when configuration file is read. metrics []*Metric + + FileInfo *os.FileInfo } // DestinationConfig defines configuration for a Stackdriver project metrics are written to. @@ -86,6 +90,12 @@ func NewMetricConfig(ctx context.Context, config *Config, storage storage.Manage return nil, fmt.Errorf("configuration file validation error: %s", err) } + fileInfo, err := os.Stat(config.Options.Filename) + if err != nil { + return nil, err + } + c.FileInfo = &fileInfo + destinations := make(map[string]string) for _, d := range c.StackdriverDestinations { if _, ok := destinations[d.Name]; ok { diff --git a/web/handlers.go b/web/handlers.go index bc077851..3e2157bc 100644 --- a/web/handlers.go +++ b/web/handlers.go @@ -18,17 +18,18 @@ import ( ) type Handler struct { - config *tsbridge.Config - Metrics *tsbridge.Metrics - store storage.Manager + config *tsbridge.Config + Metrics *tsbridge.Metrics + metricCfg *tsbridge.MetricConfig + store storage.Manager } type HealthResponse struct { Status string `json:"status,omitempty"` } -func NewHandler(config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) *Handler { - return &Handler{config: config, Metrics: metrics, store: store} +func NewHandler(config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig, store storage.Manager) *Handler { + return &Handler{config: config, Metrics: metrics, store: store, metricCfg: metricCfg} } // Sync is an HTTP wrapper around sync() method that is designed to be triggered by App Engine Cron. @@ -43,7 +44,7 @@ func (h *Handler) Sync(w http.ResponseWriter, r *http.Request) { return } - if err := tasks.Sync(ctx, h.config, h.Metrics, h.store); err != nil { + if err := tasks.SyncMetrics(ctx, h.config, h.Metrics, h.metricCfg); err != nil { logAndReturnError(ctx, w, err) } } diff --git a/web/handlers_test.go b/web/handlers_test.go index 1bf8990b..02e03fc5 100644 --- a/web/handlers_test.go +++ b/web/handlers_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/ts-bridge/datastore" "github.com/google/ts-bridge/tasks" "github.com/google/ts-bridge/tsbridge" + log "github.com/sirupsen/logrus" ) func TestMain(m *testing.M) { @@ -30,14 +31,21 @@ func TestHealthHandler(t *testing.T) { } { t.Run(storageEngineName, func(t *testing.T) { config := tsbridge.NewConfig(&tsbridge.ConfigOptions{ - StorageEngine: storageEngineName, DatastoreProject: "testapp", + Filename: "testdata/valid.yaml", + StorageEngine: storageEngineName, }) store, err := tasks.LoadStorageEngine(context.Background(), config) if err != nil { t.Fatalf("error while loading storage engine: %v", err) } - h := NewHandler(config, &tsbridge.Metrics{}, store) + + metricCfg, err := tsbridge.NewMetricConfig(context.Background(), config, store) + if err != nil { + log.Fatalf("failed to perform initial load of metric config: %v", err) + } + + h := NewHandler(config, &tsbridge.Metrics{}, metricCfg, store) req, err := http.NewRequest("GET", "/health", nil) if err != nil { diff --git a/web/testdata/valid.yaml b/web/testdata/valid.yaml new file mode 100644 index 00000000..11d224fd --- /dev/null +++ b/web/testdata/valid.yaml @@ -0,0 +1,16 @@ +datadog_metrics: + - name: metric1 + query: "query one" + api_key: xxx + application_key: xxx + destination: stackdriver +influxdb_metrics: + - name: metric2 + query: "query two" + database: db + endpoint: localhost:8888 + destination: stackdriver +stackdriver_destinations: + - name: stackdriver + - name: another_stackdriver + project_id: "another-projectname"