Skip to content

Commit

Permalink
Required only a subset of ts metadata to be unique
Browse files Browse the repository at this point in the history
By reducing the number of columns in constraint unique_main, we
allow PutObservations to effectively update/overwrite other columns
without creating a new time series.
  • Loading branch information
jo-asplin-met-no committed Feb 27, 2024
1 parent 635dec3 commit d9490a5
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 50 deletions.
5 changes: 5 additions & 0 deletions datastore/datastore/storagebackend/postgresql/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ var (

tsStringMdataGoNames []string // Go names for time series metadata of type string
tsStringMdataPBNames []string // protobuf names for time series metadata of type string
tsStringMdataPBNamesUnique []string // column names of constraint unique_main in table
// time_series

obspb2go map[string]string // association between observation specific protobuf name and
// (generated by protoc) Go name
Expand Down Expand Up @@ -53,6 +55,9 @@ var (
// - Protobuf names are field names of the TSMetadata message in datastore.proto.
// - Protobuf names are identical to corresponding database column names.
// - Protobuf names are snake case (aaa_bbb) whereas Go names are camel case (aaaBbb or AaaBbb).

// used by upsertTS
upsertTSInsertCmd, upsertTSSelectCmd string
)

// initCleanupInterval initializes cleanupInterval.
Expand Down
117 changes: 117 additions & 0 deletions datastore/datastore/storagebackend/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"datastore/common"
"fmt"
"regexp"
"strings"
"time"

Expand All @@ -21,6 +22,82 @@ func (sbe *PostgreSQL) Description() string {
return "PostgreSQL database"
}

// setTSUniqueMainCols extracts into tsStringMdataPBNamesUnique the columns comprising constraint
// unique_main in table time_series.
//
// Returns nil upon success, otherwise error.
func (sbe *PostgreSQL) setTSUniqueMainCols() error {

query := `
SELECT pg_get_constraintdef(c.oid)
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE conrelid::regclass::text = 'time_series'
AND conname = 'unique_main'
AND contype = 'u'
`

row := sbe.Db.QueryRow(query)

var result string
err := row.Scan(&result)
if err != nil {
return fmt.Errorf("row.Scan() failed: %v", err)
}

pattern := `\((.*)\)`
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(result)
if (len(matches) != 2) {
return fmt.Errorf("'%s' didn't match regexp pattern '%s'", result, pattern)
}

// create tsStringMdataPBNamesUnique
tsStringMdataPBNamesUnique = strings.Split(matches[1], ",")
for i := 0; i < len(tsStringMdataPBNamesUnique); i++ {
tsStringMdataPBNamesUnique[i] = strings.TrimSpace(tsStringMdataPBNamesUnique[i])
}

return nil
}

// setUpsertTSInsertCmd sets upsertTSInsertCmd to be used by upsertTS.
func setUpsertTSInsertCmd() {

cols := getTSMdataCols()

formats := make([]string, len(cols))
for i := 0; i < len(cols); i++ {
formats[i] = "$%d"
}

updateExpr := []string{}
for _, col := range getTSMdataColsUniqueCompl() {
updateExpr = append(updateExpr, fmt.Sprintf("%s = EXCLUDED.%s", col, col))
}

upsertTSInsertCmd = fmt.Sprintf(`
INSERT INTO time_series (%s) VALUES (%s)
ON CONFLICT ON CONSTRAINT unique_main DO UPDATE SET %s
`,
strings.Join(cols, ","),
strings.Join(createPlaceholders(formats), ","),
strings.Join(updateExpr, ","),
)
}

// setUpsertTSSelectCmd sets upsertTSSelectCmd to be used by upsertTS.
func setUpsertTSSelectCmd() {

whereExpr := []string{}
for i, col := range getTSMdataColsUnique() {
whereExpr = append(whereExpr, fmt.Sprintf("%s=$%d", col, i+1))
}

upsertTSSelectCmd = fmt.Sprintf(
`SELECT id FROM time_series WHERE %s`, strings.Join(whereExpr, " AND "))
}

// openDB opens database identified by host/port/user/password/dbname.
// Returns (DB, nil) upon success, otherwise (..., error).
func openDB(host, port, user, password, dbname string) (*sql.DB, error) {
Expand Down Expand Up @@ -58,6 +135,14 @@ func NewPostgreSQL() (*PostgreSQL, error) {
return nil, fmt.Errorf("sbe.Db.Ping() failed: %v", err)
}

err = sbe.setTSUniqueMainCols()
if err != nil {
return nil, fmt.Errorf("sbe.setTSUniqueMainCols() failed: %v", err)
}

setUpsertTSInsertCmd()
setUpsertTSSelectCmd()

return sbe, nil
}

Expand All @@ -79,6 +164,38 @@ func getTSMdataCols() []string {
return cols
}

// getTSMdataColsUnique returns the fields defined in constraint unique_main in table
// time_series.
func getTSMdataColsUnique() []string {
return tsStringMdataPBNamesUnique
}

// getTSMdataColsUniqueCompl returns the complement of the set of fields defined in constraint
// unique_main in table time_series, i.e. getTSMdataCols() - getTSMdataColsUnique().
func getTSMdataColsUniqueCompl() []string {

colSet := map[string]struct{}{}

for _, col := range getTSMdataCols() { // start with all columns
colSet[col] = struct{}{}
}

for _, col := range getTSMdataColsUnique() { // remove columns of the unique_main constraint
delete(colSet, col)
}

// return remaining columns as a string

result := make([]string, len(colSet))
i := 0
for col := range colSet {
result[i] = col
i++
}

return result
}

// createPlaceholders returns the list of n placeholder strings for
// values in a parameterized query, e.g. $1, to_timestamp($2), ..., $n.
// Items in formats must be strings containing exactly one "$%d" pattern,
Expand Down
115 changes: 65 additions & 50 deletions datastore/datastore/storagebackend/postgresql/putobservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ import (
)

// getTSColVals gets the time series metadata column values from tsMdata.
// The column values are returned both as a map in the colVals2 out-parameter and as
// an array.
//
// Returns (column values, nil) upon success, otherwise (..., error).
func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) {
func getTSColVals(
tsMdata *datastore.TSMetadata, colVals2 map[string]interface{}) ([]interface{}, error) {
colVals := []interface{}{}

// --- BEGIN non-string metadata ---------------------------
Expand Down Expand Up @@ -48,7 +52,9 @@ func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) {
if linkVals, err := getLinkVals(key); err != nil {
return nil, fmt.Errorf("getLinkVals() failed: %v", err)
} else {
colVals = append(colVals, pq.StringArray(linkVals))
vals := pq.StringArray(linkVals)
colVals2[common.ToSnakeCase(key)] = vals
colVals = append(colVals, vals)
}
}

Expand All @@ -66,6 +72,7 @@ func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) {
return nil, fmt.Errorf(
"method.Call() failed for method %s; failed to return string", methodName)
}
colVals2[common.ToSnakeCase(field.Name)] = val
colVals = append(colVals, val)
}
}
Expand All @@ -75,14 +82,42 @@ func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) {
return colVals, nil
}

// getTimeSeriesID retrieves the ID of the row in table time_series that matches tsMdata,
// inserting a new row if necessary. The ID is first looked up in a cache in order to save
// unnecessary database access.
// getTSColValsUnique gets the subset of colVals that correspond to the fields defined by
// constraint unique_main.
//
// The order in the returned array is consistent with the array returned by getTSMdataColsUnique().
//
// Returns (column values, nil) upon success, otherwise (..., error).
func getTSColValsUnique(colVals map[string]interface{}) ([]interface{}, error) {

result := []interface{}{}

for _, col := range getTSMdataColsUnique() {
colVal, found := colVals[col]
if !found {
return []interface{}{}, fmt.Errorf("column '%s' not found in colVals: %v", col, colVals)
}
result = append(result, colVal)
}

return result, nil
}

// upsertTS retrieves the ID of the row in table time_series that matches tsMdata wrt.
// the fields - U - defined by constraint unique_main, inserting a new row if necessary.
//
// If the row already existed, the function ensures that the row is updated with the tsMdata
// fields - UC - that are not in U (i.e. the complement of U).
//
// The ID is first looked up in a cache (where the key consists of all fields (U + UC)) in order to
// save unnecessary database access.
//
// Returns (ID, nil) upon success, otherwise (..., error).
func getTimeSeriesID(
func upsertTS(
db *sql.DB, tsMdata *datastore.TSMetadata, cache map[string]int64) (int64, error) {

colVals, err := getTSColVals(tsMdata)
colVals2 := map[string]interface{}{}
colVals, err := getTSColVals(tsMdata, colVals2) // column values for U+UC
if err != nil {
return -1, fmt.Errorf("getTSColVals() failed: %v", err)
}
Expand All @@ -97,52 +132,34 @@ func getTimeSeriesID(

// then access database ...

cols := getTSMdataCols()

formats := make([]string, len(colVals))
for i := 0; i < len(colVals); i++ {
formats[i] = "$%d"
}

// Get a Tx for making transaction requests.
// start transaction
tx, err := db.Begin()
if err != nil {
return -1, fmt.Errorf("db.Begin() failed: %v", err)
}
// Defer a rollback in case anything fails.
defer tx.Rollback()

// NOTE: the 'WHERE false' is a feature that ensures that another transaction cannot
// delete the row
insertCmd := fmt.Sprintf(`
INSERT INTO time_series (%s) VALUES (%s)
ON CONFLICT ON CONSTRAINT unique_main DO UPDATE SET %s = EXCLUDED.%s WHERE false
`,
strings.Join(cols, ","),
strings.Join(createPlaceholders(formats), ","),
cols[0],
cols[0],
)
fmt.Printf("insertCmd: %s; len(cols): %d; len(phs): %d\n",
insertCmd, len(cols), len(createPlaceholders(formats)))

_, err = tx.Exec(insertCmd, colVals...)
// STEP 1: upsert row

_, err = tx.Exec(upsertTSInsertCmd, colVals...)
if err != nil {
return -1, fmt.Errorf("tx.Exec() failed: %v", err)
}

whereExpr := []string{}
for i, col := range getTSMdataCols() {
whereExpr = append(whereExpr, fmt.Sprintf("%s=$%d", col, i+1))
// STEP 2: retrieve ID of upserted row

colValsUnique, err := getTSColValsUnique(colVals2) // column values for U
if err != nil {
return -1, fmt.Errorf("getTSColValsUnique() failed: %v", err)
}

selectCmd := fmt.Sprintf(`SELECT id FROM time_series WHERE %s`, strings.Join(whereExpr, " AND "))
err = tx.QueryRow(selectCmd, colVals...).Scan(&id)

err = tx.QueryRow(upsertTSSelectCmd, colValsUnique...).Scan(&id)
if err != nil {
return -1, fmt.Errorf("tx.QueryRow() failed: %v", err)
}

// Commit the transaction.
// commit transaction
if err = tx.Commit(); err != nil {
return -1, fmt.Errorf("tx.Commit() failed: %v", err)
}
Expand Down Expand Up @@ -203,20 +220,17 @@ func getGeoPointID(db *sql.DB, point *datastore.Point, cache map[string]int64) (

// NOTE: the 'WHERE false' is a feature that ensures that another transaction cannot
// delete the row
insertCmd := fmt.Sprintf(`
insertCmd := `
INSERT INTO geo_point (point) VALUES (ST_MakePoint($1, $2)::geography)
ON CONFLICT (point) DO UPDATE SET point = EXCLUDED.point WHERE false`,
)
ON CONFLICT (point) DO UPDATE SET point = EXCLUDED.point WHERE false
`

_, err = tx.Exec(insertCmd, point.GetLon(), point.GetLat())
if err != nil {
return -1, fmt.Errorf("tx.Exec() failed: %v", err)
}

selectCmd := fmt.Sprintf(`
SELECT id FROM geo_point WHERE point = ST_MakePoint($1, $2)::geography
`,
)
selectCmd := "SELECT id FROM geo_point WHERE point = ST_MakePoint($1, $2)::geography"

err = tx.QueryRow(selectCmd, point.GetLon(), point.GetLat()).Scan(&id)
if err != nil {
Expand Down Expand Up @@ -289,9 +303,10 @@ func createInsertVals(
}
}

// upsertObsForTS inserts new observations and/or updates existing ones.
// upsertObs inserts new observations and/or updates existing ones.
//
// Returns nil upon success, otherwise error.
func upsertObsForTS(
func upsertObs(
db *sql.DB, tsID int64, obsTimes *[]*timestamppb.Timestamp, gpIDs *[]int64,
omds *[]*datastore.ObsMetadata) error {

Expand Down Expand Up @@ -387,9 +402,9 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) error {
obsTime.AsTime(), hiTime, loTime, common.GetValidTimeRangeSettings())
}

tsID, err := getTimeSeriesID(sbe.Db, obs.GetTsMdata(), tsIDCache)
tsID, err := upsertTS(sbe.Db, obs.GetTsMdata(), tsIDCache)
if err != nil {
return fmt.Errorf("getTimeSeriesID() failed: %v", err)
return fmt.Errorf("upsertTS() failed: %v", err)
}

gpID, err := getGeoPointID(sbe.Db, obs.GetObsMdata().GetGeoPoint(), gpIDCache)
Expand Down Expand Up @@ -421,9 +436,9 @@ func (sbe *PostgreSQL) PutObservations(request *datastore.PutObsRequest) error {

// insert/update observations for each time series
for tsID, tsInfo := range tsInfos {
if err := upsertObsForTS(
if err := upsertObs(
sbe.Db, tsID, tsInfo.obsTimes, tsInfo.gpIDs, tsInfo.omds); err != nil {
return fmt.Errorf("upsertObsForTS()) failed: %v", err)
return fmt.Errorf("upsertObs() failed: %v", err)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE time_series
DROP CONSTRAINT IF EXISTS unique_main,
ADD CONSTRAINT unique_main UNIQUE NULLS NOT DISTINCT (version, type, title, summary, keywords,
keywords_vocabulary, license, conventions, naming_authority, creator_type, creator_name,
creator_email, creator_url, institution, project, source, platform, platform_vocabulary,
standard_name, unit, instrument, instrument_vocabulary, level, period, function,
parameter_name, link_href, link_rel, link_type, link_hreflang, link_title);
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE time_series
DROP CONSTRAINT IF EXISTS unique_main,
ADD CONSTRAINT unique_main UNIQUE NULLS NOT DISTINCT (naming_authority, platform,
standard_name, instrument, level, period, function);

1 comment on commit d9490a5

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API Unit Test Coverage Report
FileStmtsMissCoverMissing
\_\_init\_\_.py00100% 
datastore_pb2.py584621%24–69
datastore_pb2_grpc.py432347%37–52, 85–87, 92–94, 99–101, 106–108, 112–136, 174, 191, 208, 225
dependencies.py31487%23–24, 31, 38
grpc_getter.py8450%12–16
locustfile.py15150%1–31
main.py22386%27, 37, 47
metadata_endpoints.py19479%16, 33–66, 70
formatters
   \_\_init\_\_.py70100% 
   covjson.py46198%58
routers
   \_\_init\_\_.py00100% 
   edr.py711185%36–59, 109–110, 137–138
   records.py00100% 
TOTAL32011165% 

API Unit Test Coverage Summary

Tests Skipped Failures Errors Time
16 0 💤 0 ❌ 0 🔥 1.778s ⏱️

Please sign in to comment.