From 0ed7437b9931eb6459f3c047130956ad238c4afd Mon Sep 17 00:00:00 2001 From: Jo Asplin Date: Tue, 24 Sep 2024 10:20:11 +0200 Subject: [PATCH] Completed db cleanup This change completes the periodic database cleanup by also deleting unreferred rows in tables time_series and geo_point. The cleanup is also now triggered from a single goroutine that runs independently of calls to PutObservations(). --- datastore/datastore/go.mod | 1 + .../storagebackend/postgresql/init.go | 2 - .../storagebackend/postgresql/postgresql.go | 96 ++++++++++++++----- .../postgresql/putobservations.go | 5 - 4 files changed, 74 insertions(+), 30 deletions(-) diff --git a/datastore/datastore/go.mod b/datastore/datastore/go.mod index 2aff87f3..29850d0e 100644 --- a/datastore/datastore/go.mod +++ b/datastore/datastore/go.mod @@ -6,6 +6,7 @@ require google.golang.org/grpc v1.64.0 require ( github.com/cridenour/go-postgis v1.0.0 + github.com/golang/protobuf v1.5.4 google.golang.org/protobuf v1.34.1 ) diff --git a/datastore/datastore/storagebackend/postgresql/init.go b/datastore/datastore/storagebackend/postgresql/init.go index 9304f993..df0d82b1 100644 --- a/datastore/datastore/storagebackend/postgresql/init.go +++ b/datastore/datastore/storagebackend/postgresql/init.go @@ -15,7 +15,6 @@ import ( // package variables (only accessible within postgresql package) var ( cleanupInterval time.Duration // minimum time period between calls to cleanup() - lastCleanupTime time.Time // time of last call to cleanup() putObsLimit int // max # of observations in a single call to PutObservations @@ -101,7 +100,6 @@ func initPutObsLimit() { func init() { // automatically called once on program startup (on first import of this package) initCleanupInterval() - lastCleanupTime = time.Time{} initPutObsLimit() diff --git a/datastore/datastore/storagebackend/postgresql/postgresql.go b/datastore/datastore/storagebackend/postgresql/postgresql.go index cfcec989..4cc490c3 100644 --- a/datastore/datastore/storagebackend/postgresql/postgresql.go +++ b/datastore/datastore/storagebackend/postgresql/postgresql.go @@ -5,6 +5,7 @@ import ( "datastore/common" "datastore/datastore" "fmt" + "log" "regexp" "strings" "time" @@ -165,6 +166,16 @@ func NewPostgreSQL() (*PostgreSQL, error) { setUpsertTSInsertCmd() setUpsertTSSelectCmd() + // cleanup the database at regular intervals + ticker := time.NewTicker(cleanupInterval) + go func() { + for range ticker.C { + if err = cleanup(sbe.Db); err != nil { + log.Printf("cleanup() failed: %v", err) + } + } + }() + return sbe, nil } @@ -331,6 +342,9 @@ func getStringMdataFilter( // cleanup performs various cleanup tasks, like removing old observations from the database. func cleanup(db *sql.DB) error { + log.Println("db cleanup started") + start := time.Now() + // start transaction tx, err := db.Begin() if err != nil { @@ -338,39 +352,75 @@ func cleanup(db *sql.DB) error { } defer tx.Rollback() + // --- BEGIN define removal functions ---------------------------------- + + rmObsOutsideValidRange := func() error { + + loTime, hiTime := common.GetValidTimeRange() + cmd := fmt.Sprintf(` + DELETE FROM observation + WHERE (obstime_instant < to_timestamp(%d)) OR (obstime_instant > to_timestamp(%d)) + `, loTime.Unix(), hiTime.Unix()) + + _, err = tx.Exec(cmd) + if err != nil { + return fmt.Errorf( + "tx.Exec() failed when removing observations outside valid range: %v", err) + } + + return nil + } + + rmUnrefRows := func(tableName, fkName string) error { + + cmd := fmt.Sprintf(` + DELETE FROM %s + WHERE id in ( + SELECT id FROM %s t WHERE NOT EXISTS ( + SELECT FROM observation obs WHERE %s = t.id + ) + ) + `, tableName, tableName, fkName) + + _, err = tx.Exec(cmd) + if err != nil { + return fmt.Errorf( + "tx.Exec() failed when removing unreferenced rows from %s: %v", tableName, err) + } + + return nil + } + + // --- END define removal functions ---------------------------------- + + // --- BEGIN apply removal functions ------------------------------------------ + // remove observations outside valid range - loTime, hiTime := common.GetValidTimeRange() - cmd := fmt.Sprintf(` - DELETE FROM observation - WHERE (obstime_instant < to_timestamp(%d)) - OR (obstime_instant > to_timestamp(%d)) - `, loTime.Unix(), hiTime.Unix()) - _, err = tx.Exec(cmd) + err = rmObsOutsideValidRange() + if err != nil { + return fmt.Errorf("rmObsOutsideValidRange() failed: %v", err) + } + + // remove time series that are no longer referenced by any observation + err = rmUnrefRows("time_series", "ts_id") + if err != nil { + return fmt.Errorf("rmUnrefRows(time_series) failed: %v", err) + } + + // remove geo points that are no longer referenced by any observation + err = rmUnrefRows("geo_point", "geo_point_id") if err != nil { - return fmt.Errorf("tx.Exec() failed: %v", err) + return fmt.Errorf("rmUnrefRows(geo_point) failed: %v", err) } - // DELETE FROM time_series WHERE ... TODO! - // DELETE FROM geo_points WHERE ... TODO! + // --- END apply removal functions ------------------------------------------ // commit transaction if err = tx.Commit(); err != nil { return fmt.Errorf("tx.Commit() failed: %v", err) } - lastCleanupTime = time.Now() - - return nil -} - -// considerCleanup considers if cleanup() should be called. -func considerCleanup(db *sql.DB) error { - // call cleanup() if at least cleanupInterval has passed since the last time it was called - if time.Since(lastCleanupTime) > cleanupInterval { - if err := cleanup(db); err != nil { - return fmt.Errorf("cleanup() failed: %v", err) - } - } + log.Printf("db cleanup complete after %v", time.Since(start)) return nil } diff --git a/datastore/datastore/storagebackend/postgresql/putobservations.go b/datastore/datastore/storagebackend/postgresql/putobservations.go index 3dc0f05c..11b888bd 100644 --- a/datastore/datastore/storagebackend/postgresql/putobservations.go +++ b/datastore/datastore/storagebackend/postgresql/putobservations.go @@ -5,7 +5,6 @@ import ( "datastore/common" "datastore/datastore" "fmt" - "log" "reflect" "strings" @@ -442,9 +441,5 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) (codes. } } - if err := considerCleanup(sbe.Db); err != nil { - log.Printf("WARNING: considerCleanup() failed: %v", err) - } - return codes.OK, "" }