Skip to content

Commit

Permalink
Merge pull request #41 from EURODEO/issue_40_getobservations_latest
Browse files Browse the repository at this point in the history
Issue 40 getobservations latest
  • Loading branch information
jo-asplin-met-no authored Feb 21, 2024
2 parents 53ac5d6 + 7943c60 commit bd732af
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 41 deletions.
12 changes: 12 additions & 0 deletions datastore/datastore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,18 @@ $ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed", "air_temper
...
```

### Retrieve all wind speed observations for platform 78990
```text
$ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed"]}, "platform": {"values": ["78990"]}}}' -plaintext -proto protobuf/datastore.proto 127.0.0.1:50050 datastore.Datastore.GetObservations
...
```

### Retrieve the most recent wind speed observation for platform 78990
```text
$ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed"]}, "platform": {"values": ["78990"]}}, "temporal_mode": "latest"}' -plaintext -proto protobuf/datastore.proto 127.0.0.1:50050 datastore.Datastore.GetObservations
...
```

### List unique occurrences of time series metadata attribute 'standard_name'

```text
Expand Down
7 changes: 7 additions & 0 deletions datastore/datastore/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"datastore/datastore"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -167,3 +168,9 @@ func GetValidTimeRangeSettings() string {
func ToSnakeCase(s string) string {
return strings.ToLower(snakeCaseRE.ReplaceAllString(s, "${1}_${2}"))
}

type TemporalSpec struct {
IntervalMode bool // whether temporal mode is 'interval' (true) or 'latest'
// (false)
Interval *datastore.TimeInterval // interval in 'interval' mode
}
52 changes: 44 additions & 8 deletions datastore/datastore/dsimpl/getobservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,60 @@ package dsimpl
import (
"context"
"fmt"
"strings"

"datastore/common"
"datastore/datastore"
)

// getTemporalSpec derives and validates a temporal specification from request.
//
// Returns (spec, nil) upon success, otherwise (..., error).
func getTemporalSpec(request *datastore.GetObsRequest) (common.TemporalSpec, error) {

tspec := common.TemporalSpec{}

// get mode
tspec.IntervalMode = true // assume 'interval' mode by default
if tmode0 := request.GetTemporalMode(); tmode0 != "" {
tmode := strings.ToLower(strings.TrimSpace(tmode0))
switch {
case tmode == "latest":
tspec.IntervalMode = false
case tmode != "interval":
return common.TemporalSpec{}, fmt.Errorf(
"expected either 'interval' or 'latest' for temporal_mode; found '%s'", tmode)
}
}

if tspec.IntervalMode { // validate and initialize for 'interval' mode
ti := request.GetTemporalInterval()

// do general validation of interval
if ti != nil {
if ti.Start != nil && ti.End != nil {
if ti.End.AsTime().Before(ti.Start.AsTime()) {
return common.TemporalSpec{}, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start)
}
}
}

tspec.Interval = ti // valid (but possibly nil to specify a fully open interval!)
}

return tspec, nil
}

func (svcInfo *ServiceInfo) GetObservations(
ctx context.Context, request *datastore.GetObsRequest) (
*datastore.GetObsResponse, error) {

// do general validation of any obs time interval
if ti := request.GetTemporalInterval(); ti != nil {
if ti.Start != nil && ti.End != nil {
if ti.End.AsTime().Before(ti.Start.AsTime()) {
return nil, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start)
}
}
tspec, err := getTemporalSpec(request)
if err != nil {
return nil, fmt.Errorf("dsimpl.GetTemporalSpec() failed: %v", err)
}

response, err := svcInfo.Sbe.GetObservations(request)
response, err := svcInfo.Sbe.GetObservations(request, tspec)
if err != nil {
return nil, fmt.Errorf("svcInfo.Sbe.GetObservations() failed: %v", err)
}
Expand Down
85 changes: 54 additions & 31 deletions datastore/datastore/storagebackend/postgresql/getobservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/cridenour/go-postgis"
"github.com/lib/pq"
_ "github.com/lib/pq"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -145,23 +144,32 @@ func getTSMetadata(db *sql.DB, tsIDs []string, tsMdatas map[int64]*datastore.TSM
return nil
}

// getTimeFilter derives from ti the expression used in a WHERE clause for filtering on obs time.
// getTimeFilter derives from tspec the expression used in a WHERE clause for overall
// (i.e. not time series specific) filtering on obs time.
//
// Returns expression.
func getTimeFilter(ti *datastore.TimeInterval) string {
func getTimeFilter(tspec common.TemporalSpec) string {

timeExpr := "TRUE" // by default, don't filter on obs time at all

if ti != nil {
timeExprs := []string{}
if start := ti.GetStart(); start != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start)))
}
if end := ti.GetEnd(); end != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end)))
}
if len(timeExprs) > 0 {
timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND "))
if tspec.IntervalMode {
// the 'interval' filter is applied in the same way to all time series and can thus be
// part of the WHERE clause (this is contrast to the 'latest' filter which must be applied
// individually to each time series at a later stage)
ti := tspec.Interval
if ti != nil {
timeExprs := []string{}
if start := ti.GetStart(); start != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start)))
}
if end := ti.GetEnd(); end != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end)))
}
if len(timeExprs) > 0 {
timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND "))
}
}
}

Expand Down Expand Up @@ -303,32 +311,41 @@ func getStringMdataFilter(
return getMdataFilter(stringFilterInfos, phVals), nil
}

// createObsQueryVals creates from 'request' values used for querying observations.
// createObsQueryVals creates from request and tspec values used for querying observations.
//
// Values to be used for query placeholders are appended to phVals.
//
// Upon success the function returns four values:
// Upon success the function returns five values:
// - distinct spec, possibly just an empty string
// - time filter used in a 'WHERE ... AND ...' clause (possibly just 'TRUE')
// - geo filter ... ditto
// - string metadata ... ditto
// - nil,
// otherwise (..., ..., ..., error).
func createObsQueryVals(
request *datastore.GetObsRequest, phVals *[]interface{}) (string, string, string, error) {
request *datastore.GetObsRequest, tspec common.TemporalSpec, phVals *[]interface{}) (
string, string, string, string, error) {

distinctSpec := ""
if !tspec.IntervalMode {
// 'latest' mode, so ensure that we select only one observation per time series
// (which will be the most recent one thanks to '... ORDER BY ts_id, obstime_instant DESC')
distinctSpec = "DISTINCT ON (ts_id)"
}

timeFilter := getTimeFilter(request.GetTemporalInterval())
timeFilter := getTimeFilter(tspec)

geoFilter, err := getGeoFilter(request.GetSpatialArea(), phVals)
if err != nil {
return "", "", "", fmt.Errorf("getGeoFilter() failed: %v", err)
return "", "", "", "", fmt.Errorf("getGeoFilter() failed: %v", err)
}

stringMdataFilter, err := getStringMdataFilter(request, phVals)
if err != nil {
return "", "", "", fmt.Errorf("getStringMdataFilter() failed: %v", err)
return "", "", "", "", fmt.Errorf("getStringMdataFilter() failed: %v", err)
}

return timeFilter, geoFilter, stringMdataFilter, nil
return distinctSpec, timeFilter, geoFilter, stringMdataFilter, nil
}

// scanObsRow scans all columns from the current result row in rows and converts to an ObsMetadata
Expand Down Expand Up @@ -387,20 +404,23 @@ func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) {
return &obsMdata, tsID, nil
}

// getObs gets into obs all observations that match request.
// getObs gets into obs all observations that match request and tspec.
// Returns nil upon success, otherwise error.
func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Metadata2) (retErr error) {
func getObs(
db *sql.DB, request *datastore.GetObsRequest, tspec common.TemporalSpec,
obs *[]*datastore.Metadata2) (retErr error) {

// get values needed for query
phVals := []interface{}{} // placeholder values
timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, &phVals)
distinctSpec, timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(
request, tspec, &phVals)
if err != nil {
return fmt.Errorf("createQueryVals() failed: %v", err)
}

// define and execute query
query := fmt.Sprintf(`
SELECT
SELECT %s
ts_id,
obstime_instant,
pubtime,
Expand All @@ -411,8 +431,9 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta
JOIN time_series on time_series.id = observation.ts_id
JOIN geo_point ON observation.geo_point_id = geo_point.id
WHERE %s AND %s AND %s
ORDER BY ts_id, obstime_instant
`, strings.Join(obsStringMdataCols, ","), timeFilter, geoFilter, stringMdataFilter)
ORDER BY ts_id, obstime_instant DESC
`, distinctSpec, strings.Join(obsStringMdataCols, ","), timeFilter, geoFilter,
stringMdataFilter)

rows, err := db.Query(query, phVals...)
if err != nil {
Expand All @@ -429,7 +450,8 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta
return fmt.Errorf("scanObsRow() failed: %v", err)
}

obsMdatas[tsID] = append(obsMdatas[tsID], obsMdata)
// prepend obs to time series to get chronological order
obsMdatas[tsID] = append([]*datastore.ObsMetadata{obsMdata}, obsMdatas[tsID]...)
}

// get time series
Expand All @@ -454,14 +476,15 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta
}

// GetObservations ... (see documentation in StorageBackend interface)
func (sbe *PostgreSQL) GetObservations(request *datastore.GetObsRequest) (
func (sbe *PostgreSQL) GetObservations(
request *datastore.GetObsRequest, tspec common.TemporalSpec) (
*datastore.GetObsResponse, error) {

var err error

obs := []*datastore.Metadata2{}
if err = getObs(
sbe.Db, request, &obs); err != nil {
sbe.Db, request, tspec, &obs); err != nil {
return nil, fmt.Errorf("getObs() failed: %v", err)
}

Expand Down
4 changes: 3 additions & 1 deletion datastore/datastore/storagebackend/storagebackend.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storagebackend

import (
"datastore/common"
"datastore/datastore"
)

Expand All @@ -16,7 +17,8 @@ type StorageBackend interface {

// GetObservations retrieves observations from the storage.
// Returns nil upon success, otherwise error.
GetObservations(*datastore.GetObsRequest) (*datastore.GetObsResponse, error)
GetObservations(*datastore.GetObsRequest, common.TemporalSpec) (
*datastore.GetObsResponse, error)

// GetTSAttrGroups retrieves, for the non-default attributes in the input, the unique
// combinations of attribute values currently represented in the storage.
Expand Down
7 changes: 6 additions & 1 deletion protobuf/datastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,12 @@ message Strings {
message GetObsRequest {
// --- BEGIN non-string metadata -------------------------

// temporal filter
// temporal mode
string temporal_mode = 4; // use 'interval' (the default) to retrieve observations in
// an interval (defined by temporal) or 'latest' to retrieve the most recent observation
// of otherwise matching time series

// temporal spec applicable when temporal_mode = 'interval'
TimeInterval temporal_interval = 1; // only observations in this time range may be returned

// spatial filter
Expand Down

1 comment on commit bd732af

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API Unit Test Coverage Report
FileStmtsMissCoverMissing
\_\_init\_\_.py00100% 
datastore_pb2.py584621%24–69
datastore_pb2_grpc.py432347%37–52, 85–87, 92–94, 99–101, 106–108, 112–136, 174, 191, 208, 225
dependencies.py31487%23–24, 31, 38
grpc_getter.py8450%12–16
locustfile.py15150%1–31
main.py22386%27, 37, 47
metadata_endpoints.py19479%16, 33–66, 70
formatters
   \_\_init\_\_.py70100% 
   covjson.py46198%58
routers
   \_\_init\_\_.py00100% 
   edr.py711185%36–59, 109–110, 137–138
   records.py00100% 
TOTAL32011165% 

API Unit Test Coverage Summary

Tests Skipped Failures Errors Time
16 0 💤 0 ❌ 0 🔥 1.694s ⏱️

Please sign in to comment.