Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completed db cleanup #192

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datastore/datastore/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module datastore

go 1.20
go 1.21

require google.golang.org/grpc v1.64.0

Expand Down
2 changes: 0 additions & 2 deletions datastore/datastore/storagebackend/postgresql/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// package variables (only accessible within postgresql package)
var (
cleanupInterval time.Duration // minimum time period between calls to cleanup()
lastCleanupTime time.Time // time of last call to cleanup()

putObsLimit int // max # of observations in a single call to PutObservations

Expand Down Expand Up @@ -101,7 +100,6 @@ func initPutObsLimit() {
func init() { // automatically called once on program startup (on first import of this package)

initCleanupInterval()
lastCleanupTime = time.Time{}

initPutObsLimit()

Expand Down
98 changes: 68 additions & 30 deletions datastore/datastore/storagebackend/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -165,6 +166,16 @@ func NewPostgreSQL() (*PostgreSQL, error) {
setUpsertTSInsertCmd()
setUpsertTSSelectCmd()

// cleanup the database at regular intervals
ticker := time.NewTicker(cleanupInterval)
go func() {
for range ticker.C {
lukas-phaf marked this conversation as resolved.
Show resolved Hide resolved
if err = cleanup(sbe.Db); err != nil {
log.Printf("cleanup() failed: %v", err)
}
}
}()

return sbe, nil
}

Expand Down Expand Up @@ -331,46 +342,73 @@ func getStringMdataFilter(
// cleanup performs various cleanup tasks, like removing old observations from the database.
func cleanup(db *sql.DB) error {

// start transaction
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin() failed: %v", err)
log.Println("db cleanup started")
start := time.Now()

var err error

// --- BEGIN define removal functions ----------------------------------

rmObsOutsideValidRange := func() error {

loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d)) OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())
jo-asplin-met-no marked this conversation as resolved.
Show resolved Hide resolved

_, err = db.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing observations outside valid range: %v", err)
}

return nil
}
defer tx.Rollback()

rmUnrefRows := func(tableName, fkName string) error {

cmd := fmt.Sprintf(`
DELETE FROM %s t
WHERE NOT EXISTS (
SELECT FROM observation WHERE %s = t.id
)
`, tableName, fkName)

_, err = db.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing unreferenced rows from %s: %v", tableName, err)
}

return nil
}

// --- END define removal functions ----------------------------------

// --- BEGIN apply removal functions ------------------------------------------

// remove observations outside valid range
loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d))
OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())
_, err = tx.Exec(cmd)
err = rmObsOutsideValidRange()
if err != nil {
return fmt.Errorf("tx.Exec() failed: %v", err)
return fmt.Errorf("rmObsOutsideValidRange() failed: %v", err)
}

// DELETE FROM time_series WHERE <no FK refs from observation anymore> ... TODO!
// DELETE FROM geo_points WHERE <no FK refs from observation anymore> ... TODO!

// commit transaction
if err = tx.Commit(); err != nil {
return fmt.Errorf("tx.Commit() failed: %v", err)
// remove time series that are no longer referenced by any observation
err = rmUnrefRows("time_series", "ts_id")
if err != nil {
return fmt.Errorf("rmUnrefRows(time_series) failed: %v", err)
}

lastCleanupTime = time.Now()
// remove geo points that are no longer referenced by any observation
err = rmUnrefRows("geo_point", "geo_point_id")
if err != nil {
return fmt.Errorf("rmUnrefRows(geo_point) failed: %v", err)
}

return nil
}
// --- END apply removal functions ------------------------------------------

// considerCleanup considers if cleanup() should be called.
func considerCleanup(db *sql.DB) error {
// call cleanup() if at least cleanupInterval has passed since the last time it was called
if time.Since(lastCleanupTime) > cleanupInterval {
if err := cleanup(db); err != nil {
return fmt.Errorf("cleanup() failed: %v", err)
}
}
log.Printf("db cleanup complete after %v", time.Since(start))

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"reflect"
"strings"

Expand Down Expand Up @@ -442,9 +441,5 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) (codes.
}
}

if err := considerCleanup(sbe.Db); err != nil {
log.Printf("WARNING: considerCleanup() failed: %v", err)
}

return codes.OK, ""
}