Skip to content

Commit

Permalink
Completed db cleanup
Browse files Browse the repository at this point in the history
This change completes the periodic database cleanup by also deleting
unreferred rows in tables time_series and geo_point. The cleanup is
also now triggered from a single goroutine that runs independently
of calls to PutObservations().
  • Loading branch information
jo-asplin-met-no committed Sep 24, 2024
1 parent 27ac817 commit 0ed7437
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 30 deletions.
1 change: 1 addition & 0 deletions datastore/datastore/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require google.golang.org/grpc v1.64.0

require (
github.com/cridenour/go-postgis v1.0.0
github.com/golang/protobuf v1.5.4
google.golang.org/protobuf v1.34.1
)

Expand Down
2 changes: 0 additions & 2 deletions datastore/datastore/storagebackend/postgresql/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// package variables (only accessible within postgresql package)
var (
cleanupInterval time.Duration // minimum time period between calls to cleanup()
lastCleanupTime time.Time // time of last call to cleanup()

putObsLimit int // max # of observations in a single call to PutObservations

Expand Down Expand Up @@ -101,7 +100,6 @@ func initPutObsLimit() {
func init() { // automatically called once on program startup (on first import of this package)

initCleanupInterval()
lastCleanupTime = time.Time{}

initPutObsLimit()

Expand Down
96 changes: 73 additions & 23 deletions datastore/datastore/storagebackend/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -165,6 +166,16 @@ func NewPostgreSQL() (*PostgreSQL, error) {
setUpsertTSInsertCmd()
setUpsertTSSelectCmd()

// cleanup the database at regular intervals
ticker := time.NewTicker(cleanupInterval)
go func() {
for range ticker.C {
if err = cleanup(sbe.Db); err != nil {
log.Printf("cleanup() failed: %v", err)
}
}
}()

return sbe, nil
}

Expand Down Expand Up @@ -331,46 +342,85 @@ func getStringMdataFilter(
// cleanup performs various cleanup tasks, like removing old observations from the database.
func cleanup(db *sql.DB) error {

log.Println("db cleanup started")
start := time.Now()

// start transaction
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin() failed: %v", err)
}
defer tx.Rollback()

// --- BEGIN define removal functions ----------------------------------

rmObsOutsideValidRange := func() error {

loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d)) OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())

_, err = tx.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing observations outside valid range: %v", err)
}

return nil
}

rmUnrefRows := func(tableName, fkName string) error {

cmd := fmt.Sprintf(`
DELETE FROM %s
WHERE id in (
SELECT id FROM %s t WHERE NOT EXISTS (
SELECT FROM observation obs WHERE %s = t.id
)
)
`, tableName, tableName, fkName)

_, err = tx.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing unreferenced rows from %s: %v", tableName, err)
}

return nil
}

// --- END define removal functions ----------------------------------

// --- BEGIN apply removal functions ------------------------------------------

// remove observations outside valid range
loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d))
OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())
_, err = tx.Exec(cmd)
err = rmObsOutsideValidRange()
if err != nil {
return fmt.Errorf("rmObsOutsideValidRange() failed: %v", err)
}

// remove time series that are no longer referenced by any observation
err = rmUnrefRows("time_series", "ts_id")
if err != nil {
return fmt.Errorf("rmUnrefRows(time_series) failed: %v", err)
}

// remove geo points that are no longer referenced by any observation
err = rmUnrefRows("geo_point", "geo_point_id")
if err != nil {
return fmt.Errorf("tx.Exec() failed: %v", err)
return fmt.Errorf("rmUnrefRows(geo_point) failed: %v", err)
}

// DELETE FROM time_series WHERE <no FK refs from observation anymore> ... TODO!
// DELETE FROM geo_points WHERE <no FK refs from observation anymore> ... TODO!
// --- END apply removal functions ------------------------------------------

// commit transaction
if err = tx.Commit(); err != nil {
return fmt.Errorf("tx.Commit() failed: %v", err)
}

lastCleanupTime = time.Now()

return nil
}

// considerCleanup considers if cleanup() should be called.
func considerCleanup(db *sql.DB) error {
// call cleanup() if at least cleanupInterval has passed since the last time it was called
if time.Since(lastCleanupTime) > cleanupInterval {
if err := cleanup(db); err != nil {
return fmt.Errorf("cleanup() failed: %v", err)
}
}
log.Printf("db cleanup complete after %v", time.Since(start))

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"reflect"
"strings"

Expand Down Expand Up @@ -442,9 +441,5 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) (codes.
}
}

if err := considerCleanup(sbe.Db); err != nil {
log.Printf("WARNING: considerCleanup() failed: %v", err)
}

return codes.OK, ""
}

1 comment on commit 0ed7437

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API Unit Test Coverage Report
FileStmtsMissCoverMissing
\_\_init\_\_.py00100% 
datastore_pb2.py614821%34–81
datastore_pb2_grpc.py542750%15–16, 19, 57–72, 105–107, 112–114, 119–121, 126–128, 132–157, 195, 222, 249, 276
export_metrics.py100100% 
grpc_getter.py201145%15–19, 23–26, 30–32, 36–38
locustfile.py15150%1–31
main.py43784%45, 50, 60, 70–71, 81–82
metadata_endpoints.py632954%45–54, 58, 85, 100–216, 220
response_classes.py50100% 
utilities.py1815669%20, 38, 45, 67–70, 78–89, 94–101, 121, 125, 127, 154, 157, 166, 184–185, 189, 205–210, 215, 219–220, 224–226, 232–240, 250–253, 259–260, 272–276, 299, 304, 317
custom_geo_json
   edr_feature_collection.py60100% 
formatters
   \_\_init\_\_.py110100% 
   covjson.py57198%88
   geojson.py17288%21, 46
openapi
   custom_dimension_examples.py40100% 
   edr_query_parameter_descriptions.py110100% 
   openapi_examples.py130100% 
routers
   \_\_init\_\_.py00100% 
   edr.py102496%350–351, 441–442
   feature.py471960%99–132, 148–153, 159–181
TOTAL72021970% 

API Unit Test Coverage Summary

Tests Skipped Failures Errors Time
30 0 💤 0 ❌ 0 🔥 1.926s ⏱️

Please sign in to comment.