Skip to content

Commit

Permalink
Merge pull request #192 from EURODEO/issue_167_complete_cleanup_new
Browse files Browse the repository at this point in the history
Completed db cleanup
  • Loading branch information
jo-asplin-met-no authored Sep 27, 2024
2 parents 27ac817 + 0f79a08 commit 5aabf54
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 38 deletions.
2 changes: 1 addition & 1 deletion datastore/datastore/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module datastore

go 1.20
go 1.21

require google.golang.org/grpc v1.64.0

Expand Down
2 changes: 0 additions & 2 deletions datastore/datastore/storagebackend/postgresql/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
// package variables (only accessible within postgresql package)
var (
cleanupInterval time.Duration // minimum time period between calls to cleanup()
lastCleanupTime time.Time // time of last call to cleanup()

putObsLimit int // max # of observations in a single call to PutObservations

Expand Down Expand Up @@ -101,7 +100,6 @@ func initPutObsLimit() {
func init() { // automatically called once on program startup (on first import of this package)

initCleanupInterval()
lastCleanupTime = time.Time{}

initPutObsLimit()

Expand Down
98 changes: 68 additions & 30 deletions datastore/datastore/storagebackend/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"regexp"
"strings"
"time"
Expand Down Expand Up @@ -165,6 +166,16 @@ func NewPostgreSQL() (*PostgreSQL, error) {
setUpsertTSInsertCmd()
setUpsertTSSelectCmd()

// cleanup the database at regular intervals
ticker := time.NewTicker(cleanupInterval)
go func() {
for range ticker.C {
if err = cleanup(sbe.Db); err != nil {
log.Printf("cleanup() failed: %v", err)
}
}
}()

return sbe, nil
}

Expand Down Expand Up @@ -331,46 +342,73 @@ func getStringMdataFilter(
// cleanup performs various cleanup tasks, like removing old observations from the database.
func cleanup(db *sql.DB) error {

// start transaction
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("db.Begin() failed: %v", err)
log.Println("db cleanup started")
start := time.Now()

var err error

// --- BEGIN define removal functions ----------------------------------

rmObsOutsideValidRange := func() error {

loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d)) OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())

_, err = db.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing observations outside valid range: %v", err)
}

return nil
}
defer tx.Rollback()

rmUnrefRows := func(tableName, fkName string) error {

cmd := fmt.Sprintf(`
DELETE FROM %s t
WHERE NOT EXISTS (
SELECT FROM observation WHERE %s = t.id
)
`, tableName, fkName)

_, err = db.Exec(cmd)
if err != nil {
return fmt.Errorf(
"tx.Exec() failed when removing unreferenced rows from %s: %v", tableName, err)
}

return nil
}

// --- END define removal functions ----------------------------------

// --- BEGIN apply removal functions ------------------------------------------

// remove observations outside valid range
loTime, hiTime := common.GetValidTimeRange()
cmd := fmt.Sprintf(`
DELETE FROM observation
WHERE (obstime_instant < to_timestamp(%d))
OR (obstime_instant > to_timestamp(%d))
`, loTime.Unix(), hiTime.Unix())
_, err = tx.Exec(cmd)
err = rmObsOutsideValidRange()
if err != nil {
return fmt.Errorf("tx.Exec() failed: %v", err)
return fmt.Errorf("rmObsOutsideValidRange() failed: %v", err)
}

// DELETE FROM time_series WHERE <no FK refs from observation anymore> ... TODO!
// DELETE FROM geo_points WHERE <no FK refs from observation anymore> ... TODO!

// commit transaction
if err = tx.Commit(); err != nil {
return fmt.Errorf("tx.Commit() failed: %v", err)
// remove time series that are no longer referenced by any observation
err = rmUnrefRows("time_series", "ts_id")
if err != nil {
return fmt.Errorf("rmUnrefRows(time_series) failed: %v", err)
}

lastCleanupTime = time.Now()
// remove geo points that are no longer referenced by any observation
err = rmUnrefRows("geo_point", "geo_point_id")
if err != nil {
return fmt.Errorf("rmUnrefRows(geo_point) failed: %v", err)
}

return nil
}
// --- END apply removal functions ------------------------------------------

// considerCleanup considers if cleanup() should be called.
func considerCleanup(db *sql.DB) error {
// call cleanup() if at least cleanupInterval has passed since the last time it was called
if time.Since(lastCleanupTime) > cleanupInterval {
if err := cleanup(db); err != nil {
return fmt.Errorf("cleanup() failed: %v", err)
}
}
log.Printf("db cleanup complete after %v", time.Since(start))

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"datastore/common"
"datastore/datastore"
"fmt"
"log"
"reflect"
"strings"

Expand Down Expand Up @@ -442,9 +441,5 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) (codes.
}
}

if err := considerCleanup(sbe.Db); err != nil {
log.Printf("WARNING: considerCleanup() failed: %v", err)
}

return codes.OK, ""
}

1 comment on commit 5aabf54

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API Unit Test Coverage Report
FileStmtsMissCoverMissing
\_\_init\_\_.py00100% 
datastore_pb2.py614821%34–81
datastore_pb2_grpc.py542750%15–16, 19, 57–72, 105–107, 112–114, 119–121, 126–128, 132–157, 195, 222, 249, 276
export_metrics.py100100% 
grpc_getter.py201145%15–19, 23–26, 30–32, 36–38
locustfile.py15150%1–31
main.py43784%45, 50, 60, 70–71, 81–82
metadata_endpoints.py632954%45–54, 58, 85, 100–216, 220
response_classes.py50100% 
utilities.py1815669%20, 38, 45, 67–70, 78–89, 94–101, 121, 125, 127, 154, 157, 166, 184–185, 189, 205–210, 215, 219–220, 224–226, 232–240, 250–253, 259–260, 272–276, 299, 304, 317
custom_geo_json
   edr_feature_collection.py60100% 
formatters
   \_\_init\_\_.py110100% 
   covjson.py57198%88
   geojson.py17288%21, 46
openapi
   custom_dimension_examples.py40100% 
   edr_query_parameter_descriptions.py110100% 
   openapi_examples.py130100% 
routers
   \_\_init\_\_.py00100% 
   edr.py102496%350–351, 441–442
   feature.py471960%99–132, 148–153, 159–181
TOTAL72021970% 

API Unit Test Coverage Summary

Tests Skipped Failures Errors Time
30 0 💤 0 ❌ 0 🔥 1.913s ⏱️

Please sign in to comment.