Skip to content

Commit

Permalink
Refactored metric config to conditionally update metric config (#103)
Browse files Browse the repository at this point in the history
* refactored metric config to update only when file changes have been detected
* pr comments: single context for program, rename isMetricCfgUpdated
* pr comments: check file changes with modtime only
  • Loading branch information
SoapHia authored Feb 18, 2021
1 parent 3b2034f commit 6b1efe7
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 21 deletions.
27 changes: 18 additions & 9 deletions app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,36 +148,41 @@ func main() {
SyncPeriod: *syncPeriod,
})

metrics, err := CreateMetrics(context.Background(), config)
ctx, cancel := context.WithCancel(context.Background())
metrics, err := CreateMetrics(ctx, config)
if err != nil {
log.Fatalf("failed initializing adaptor/collector dependencies: %v", err)
}
defer cleanup(metrics)

store, err := tasks.LoadStorageEngine(context.Background(), config)
store, err := tasks.LoadStorageEngine(ctx, config)
if err != nil {
log.Fatalf("failed to load storage engine: %v", err)
}
defer store.Close()

h := web.NewHandler(config, metrics, store)
metricCfg, err := tsbridge.NewMetricConfig(ctx, config, store)
if err != nil {
log.Fatalf("failed to perform initial load of metric config: %v", err)
}

h := web.NewHandler(config, metrics, metricCfg, store)
http.HandleFunc("/", h.Index)
http.HandleFunc("/sync", h.Sync)
http.HandleFunc("/cleanup", h.Cleanup)
http.HandleFunc("/health", h.Health)

// Run a cleanup on startup
log.Debugf("Performing startup cleanup...")
if err := tasks.Cleanup(context.Background(), config, store); err != nil {
if err := tasks.Cleanup(ctx, config, store); err != nil {
log.Fatalf("error running the Cleanup() routine: %v", err)
}

// Run a sync loop for standalone use
// TODO(temikus): refactor this to run exactly every SyncPeriod and skip sync if one is already active
if !env.IsAppEngine() {
log.Debug("Running outside of appengine, starting up a sync loop...")
ctx, cancel := context.WithCancel(context.Background())
go syncLoop(ctx, cancel, config, metrics, store)
go syncLoop(ctx, cancel, config, metrics, metricCfg, store)
}

// Build a connection string, e.g. ":8080"
Expand All @@ -189,16 +194,20 @@ func main() {

}

func syncLoop(ctx context.Context, cancel context.CancelFunc, config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) {
func syncLoop(ctx context.Context, cancel context.CancelFunc, config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig, store storage.Manager) {
defer cancel()

for {
select {
case <-time.After(config.Options.SyncPeriod):
log.Debugf("Goroutines: %v", runtime.NumGoroutine())
ctx, cancel := context.WithTimeout(ctx, config.Options.UpdateTimeout)
log.WithContext(ctx).Debugf("Running sync...")
if err := tasks.Sync(ctx, config, metrics, store); err != nil {
log.WithContext(ctx).Errorf("error running sync() routine: %v", err)
if err := tasks.SyncMetricConfig(ctx, config, store, metricCfg); err != nil {
log.WithContext(ctx).Errorf("error running SyncMetricConfig() within sync() routine: %v", err)
}
if err := tasks.SyncMetrics(ctx, config, metrics, metricCfg); err != nil {
log.WithContext(ctx).Errorf("error running SyncMetrics() routine: %v", err)
}
cancel()
case <-ctx.Done():
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.3
github.com/google/martian v2.1.0+incompatible
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
github.com/influxdata/influxql v1.1.0
github.com/pkg/profile v1.5.0
Expand Down
30 changes: 27 additions & 3 deletions tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"

"github.com/google/ts-bridge/boltdb"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/google/ts-bridge/stackdriver"
"github.com/google/ts-bridge/storage"
"github.com/google/ts-bridge/tsbridge"
log "github.com/sirupsen/logrus"
)

var (
Expand All @@ -38,12 +40,34 @@ func LoadStorageEngine(ctx context.Context, config *tsbridge.Config) (storage.Ma
}
}

// Sync updates all configured metrics.
func Sync(ctx context.Context, config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) error {
metricCfg, err := tsbridge.NewMetricConfig(ctx, config, store)
// isMetricCfgUpdated checks if the metric config file has been updated.
func isMetricCfgUpdated(ctx context.Context, filename string, metricCfgFs *os.FileInfo) (bool, error) {
fs, err := os.Stat(filename)
if err != nil {
return false, err
}
return (*metricCfgFs).ModTime() != fs.ModTime(), nil
}

// SyncMetricConfig ensures that metric config is always synced with the metric config file. This should only be called when !env.IsAppEngine().
func SyncMetricConfig(ctx context.Context, config *tsbridge.Config, store storage.Manager, metricCfg *tsbridge.MetricConfig) error {
update, err := isMetricCfgUpdated(ctx, config.Options.Filename, metricCfg.FileInfo)
if err != nil {
return err
}
if update {
updatedMetricCfg, err := tsbridge.NewMetricConfig(ctx, config, store)
log.Debug("Metric Config file changes reloaded.")
if err != nil {
return err
}
*metricCfg = *updatedMetricCfg
}
return nil
}

// SyncMetrics updates all configured metrics.
func SyncMetrics(ctx context.Context, config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig) error {
if errs := metrics.UpdateAll(ctx, metricCfg, config.Options.UpdateParallelism); errs != nil {
msg := strings.Join(errs, "; ")
return errors.New(msg)
Expand Down
12 changes: 11 additions & 1 deletion tsbridge/metric_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package tsbridge
import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/google/ts-bridge/datadog"
"github.com/google/ts-bridge/env"
"github.com/google/ts-bridge/influxdb"
"github.com/google/ts-bridge/storage"
log "github.com/sirupsen/logrus"
validator "gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
"io/ioutil"
)

// MetricConfig is what the YAML configuration file gets deserialized to.
Expand All @@ -38,6 +40,8 @@ type MetricConfig struct {

// internal list of metrics that gets populated when configuration file is read.
metrics []*Metric

FileInfo *os.FileInfo
}

// DestinationConfig defines configuration for a Stackdriver project metrics are written to.
Expand Down Expand Up @@ -86,6 +90,12 @@ func NewMetricConfig(ctx context.Context, config *Config, storage storage.Manage
return nil, fmt.Errorf("configuration file validation error: %s", err)
}

fileInfo, err := os.Stat(config.Options.Filename)
if err != nil {
return nil, err
}
c.FileInfo = &fileInfo

destinations := make(map[string]string)
for _, d := range c.StackdriverDestinations {
if _, ok := destinations[d.Name]; ok {
Expand Down
13 changes: 7 additions & 6 deletions web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@ import (
)

type Handler struct {
config *tsbridge.Config
Metrics *tsbridge.Metrics
store storage.Manager
config *tsbridge.Config
Metrics *tsbridge.Metrics
metricCfg *tsbridge.MetricConfig
store storage.Manager
}

type HealthResponse struct {
Status string `json:"status,omitempty"`
}

func NewHandler(config *tsbridge.Config, metrics *tsbridge.Metrics, store storage.Manager) *Handler {
return &Handler{config: config, Metrics: metrics, store: store}
func NewHandler(config *tsbridge.Config, metrics *tsbridge.Metrics, metricCfg *tsbridge.MetricConfig, store storage.Manager) *Handler {
return &Handler{config: config, Metrics: metrics, store: store, metricCfg: metricCfg}
}

// Sync is an HTTP wrapper around sync() method that is designed to be triggered by App Engine Cron.
Expand All @@ -43,7 +44,7 @@ func (h *Handler) Sync(w http.ResponseWriter, r *http.Request) {
return
}

if err := tasks.Sync(ctx, h.config, h.Metrics, h.store); err != nil {
if err := tasks.SyncMetrics(ctx, h.config, h.Metrics, h.metricCfg); err != nil {
logAndReturnError(ctx, w, err)
}
}
Expand Down
12 changes: 10 additions & 2 deletions web/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/google/ts-bridge/datastore"
"github.com/google/ts-bridge/tasks"
"github.com/google/ts-bridge/tsbridge"
log "github.com/sirupsen/logrus"
)

func TestMain(m *testing.M) {
Expand All @@ -30,14 +31,21 @@ func TestHealthHandler(t *testing.T) {
} {
t.Run(storageEngineName, func(t *testing.T) {
config := tsbridge.NewConfig(&tsbridge.ConfigOptions{
StorageEngine: storageEngineName,
DatastoreProject: "testapp",
Filename: "testdata/valid.yaml",
StorageEngine: storageEngineName,
})
store, err := tasks.LoadStorageEngine(context.Background(), config)
if err != nil {
t.Fatalf("error while loading storage engine: %v", err)
}
h := NewHandler(config, &tsbridge.Metrics{}, store)

metricCfg, err := tsbridge.NewMetricConfig(context.Background(), config, store)
if err != nil {
log.Fatalf("failed to perform initial load of metric config: %v", err)
}

h := NewHandler(config, &tsbridge.Metrics{}, metricCfg, store)

req, err := http.NewRequest("GET", "/health", nil)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions web/testdata/valid.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
datadog_metrics:
- name: metric1
query: "query one"
api_key: xxx
application_key: xxx
destination: stackdriver
influxdb_metrics:
- name: metric2
query: "query two"
database: db
endpoint: localhost:8888
destination: stackdriver
stackdriver_destinations:
- name: stackdriver
- name: another_stackdriver
project_id: "another-projectname"

0 comments on commit 6b1efe7

Please sign in to comment.