Skip to content

Commit

Permalink
Implemented temporal_mode==latest (but still not latest_maxage)
Browse files Browse the repository at this point in the history
  • Loading branch information
jo-asplin-met-no committed Feb 15, 2024
1 parent d707e60 commit f9560f0
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 34 deletions.
6 changes: 6 additions & 0 deletions datastore/datastore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ $ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed", "air_temper
...
```

### Retrieve the two most recent wind speed observations for platform 78990
```text
$ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed"]}, "platform": {"values": ["78990"]}}, "temporal_mode": "latest", "latest_limit": "2"}' -plaintext -proto protobuf/datastore.proto 127.0.0.1:50050 datastore.Datastore.GetObservations
...
```

### List unique occurrences of time series metadata attribute 'standard_name'

```text
Expand Down
9 changes: 9 additions & 0 deletions datastore/datastore/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"datastore/datastore"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -167,3 +168,11 @@ func GetValidTimeRangeSettings() string {
func ToSnakeCase(s string) string {
return strings.ToLower(snakeCaseRE.ReplaceAllString(s, "${1}_${2}"))
}

type TemporalSpec struct {
IntervalMode bool // whether temporal mode is 'interval' (true) or 'latest' (false)
Interval *datastore.TimeInterval // interval in 'interval' mode
LatestLimit int // max number of observations retrievable in 'latest' mode
LatestMaxage time.Duration // interval, defined as [now - LatestMaxage, now], within which
// observations may be retrieved in 'latest' mode
}
87 changes: 79 additions & 8 deletions datastore/datastore/dsimpl/getobservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,95 @@ package dsimpl
import (
"context"
"fmt"
"strconv"
"strings"
"time"

"datastore/common"
"datastore/datastore"
)

// getTemporalSpec derives and validates a temporal specification from request.
//
// Returns (spec, nil) upon success, otherwise (..., error).
func getTemporalSpec(request *datastore.GetObsRequest) (common.TemporalSpec, error) {

tspec := common.TemporalSpec{}

// get mode
tspec.IntervalMode = true // assume 'interval' mode by default
if tmode0 := request.GetTemporalMode(); tmode0 != "" {
tmode := strings.ToLower(strings.TrimSpace(tmode0))
switch {
case tmode == "latest":
tspec.IntervalMode = false
case tmode != "interval":
return common.TemporalSpec{}, fmt.Errorf(
"expected either 'interval' or 'latest' for temporal_mode; found '%s'", tmode)
}
}

if tspec.IntervalMode { // validate and initialize for 'interval' mode

ti := request.GetTemporalInterval()

// do general validation of interval
if ti != nil {
if ti.Start != nil && ti.End != nil {
if ti.End.AsTime().Before(ti.Start.AsTime()) {
return common.TemporalSpec{}, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start)
}
}
}

tspec.Interval = ti // valid (but possibly nil to specify a fully open interval!)

} else { // validate and initialize for 'latest' mode

// latest_limit ...
if limit0 := request.GetLatestLimit(); limit0 != "" {
limit, err := strconv.Atoi(limit0)
if err != nil {
return common.TemporalSpec{},
fmt.Errorf("failed to convert latest_limit to an int: %v", limit0)
}

if limit < 0 {
return common.TemporalSpec{},
fmt.Errorf("latest_limit cannot be negative: %d", limit)
}

tspec.LatestLimit = limit // valid

} else {
// by default return the single latest obs for a time series
tspec.LatestLimit = 1 // default
}

// latest_maxage ...
if maxage0 := request.GetLatestMaxage(); maxage0 != "" {
// ### TODO! (convert maxage0 (an ISO-8601 period) into a time.Duration and assign to
// tspec.LatestMaxage)
return common.TemporalSpec{}, fmt.Errorf("latest_maxage unimplemented")
} else {
// by default don't consider any observation as too old
tspec.LatestMaxage = time.Duration(-1)
}
}

return tspec, nil
}

func (svcInfo *ServiceInfo) GetObservations(
ctx context.Context, request *datastore.GetObsRequest) (
*datastore.GetObsResponse, error) {

// do general validation of any obs time interval
if ti := request.GetTemporalInterval(); ti != nil {
if ti.Start != nil && ti.End != nil {
if ti.End.AsTime().Before(ti.Start.AsTime()) {
return nil, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start)
}
}
tspec, err := getTemporalSpec(request)
if err != nil {
return nil, fmt.Errorf("dsimpl.GetTemporalSpec() failed: %v", err)
}

response, err := svcInfo.Sbe.GetObservations(request)
response, err := svcInfo.Sbe.GetObservations(request, tspec)
if err != nil {
return nil, fmt.Errorf("svcInfo.Sbe.GetObservations() failed: %v", err)
}
Expand Down
84 changes: 60 additions & 24 deletions datastore/datastore/storagebackend/postgresql/getobservations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/cridenour/go-postgis"
"github.com/lib/pq"
_ "github.com/lib/pq"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -145,23 +144,32 @@ func getTSMetadata(db *sql.DB, tsIDs []string, tsMdatas map[int64]*datastore.TSM
return nil
}

// getTimeFilter derives from ti the expression used in a WHERE clause for filtering on obs time.
// getTimeFilter derives from tspec the expression used in a WHERE clause for overall
// (i.e. not time series specific) filtering on obs time.
//
// Returns expression.
func getTimeFilter(ti *datastore.TimeInterval) string {
func getTimeFilter(tspec common.TemporalSpec) string {

timeExpr := "TRUE" // by default, don't filter on obs time at all

if ti != nil {
timeExprs := []string{}
if start := ti.GetStart(); start != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start)))
}
if end := ti.GetEnd(); end != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end)))
}
if len(timeExprs) > 0 {
timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND "))
if tspec.IntervalMode {
// the 'interval' filter is applied in the same way to all time series and can thus be
// part of the WHERE clause (this is contrast to the 'latest' filter which must be applied
// individually to each time series at a later stage)
ti := tspec.Interval
if ti != nil {
timeExprs := []string{}
if start := ti.GetStart(); start != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start)))
}
if end := ti.GetEnd(); end != nil {
timeExprs = append(timeExprs, fmt.Sprintf(
"obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end)))
}
if len(timeExprs) > 0 {
timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND "))
}
}
}

Expand Down Expand Up @@ -303,7 +311,7 @@ func getStringMdataFilter(
return getMdataFilter(stringFilterInfos, phVals), nil
}

// createObsQueryVals creates from 'request' values used for querying observations.
// createObsQueryVals creates from request and tspec values used for querying observations.
//
// Values to be used for query placeholders are appended to phVals.
//
Expand All @@ -314,9 +322,10 @@ func getStringMdataFilter(
// - nil,
// otherwise (..., ..., ..., error).
func createObsQueryVals(
request *datastore.GetObsRequest, phVals *[]interface{}) (string, string, string, error) {
request *datastore.GetObsRequest, tspec common.TemporalSpec, phVals *[]interface{}) (
string, string, string, error) {

timeFilter := getTimeFilter(request.GetTemporalInterval())
timeFilter := getTimeFilter(tspec)

geoFilter, err := getGeoFilter(request.GetSpatialArea(), phVals)
if err != nil {
Expand Down Expand Up @@ -389,11 +398,13 @@ func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) {

// getObs gets into obs all observations that match request.
// Returns nil upon success, otherwise error.
func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Metadata2) (retErr error) {
func getObs(
db *sql.DB, request *datastore.GetObsRequest, tspec common.TemporalSpec,
obs *[]*datastore.Metadata2) (retErr error) {

// get values needed for query
phVals := []interface{}{} // placeholder values
timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, &phVals)
timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, tspec, &phVals)
if err != nil {
return fmt.Errorf("createQueryVals() failed: %v", err)
}
Expand All @@ -411,7 +422,7 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta
JOIN time_series on time_series.id = observation.ts_id
JOIN geo_point ON observation.geo_point_id = geo_point.id
WHERE %s AND %s AND %s
ORDER BY ts_id, obstime_instant
ORDER BY ts_id, obstime_instant DESC
`, strings.Join(obsStringMdataCols, ","), timeFilter, geoFilter, stringMdataFilter)

rows, err := db.Query(query, phVals...)
Expand All @@ -422,14 +433,38 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta

obsMdatas := make(map[int64][]*datastore.ObsMetadata) // observations per time series ID

// set oldest allowable obs time when in 'latest' mode
oldestTime := time.Time{}
if (!tspec.IntervalMode) {
loTime, hiTime := common.GetValidTimeRange()
if tspec.LatestMaxage < 0 {
oldestTime = loTime
} else {
oldestTime = hiTime.Add(-tspec.LatestMaxage)
}
}

// scan rows
for rows.Next() {
obsMdata, tsID, err := scanObsRow(rows)
if err != nil {
return fmt.Errorf("scanObsRow() failed: %v", err)
}

obsMdatas[tsID] = append(obsMdatas[tsID], obsMdata)
includeObs := true // by default include obs in this time series (in particular in
// 'interval' mode, since filtering for that has been done already)
if (!tspec.IntervalMode) { // 'latest' mode
switch {
case len(obsMdatas[tsID]) >= tspec.LatestLimit:
includeObs = false // reject, since not room for this obs
case obsMdata.GetObstimeInstant().AsTime().Before(oldestTime):
includeObs = false // reject, since obs too old
}
}

if includeObs { // prepend obs to time series to get chronological order
obsMdatas[tsID] = append([]*datastore.ObsMetadata{obsMdata}, obsMdatas[tsID]...)
}
}

// get time series
Expand All @@ -454,14 +489,15 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta
}

// GetObservations ... (see documentation in StorageBackend interface)
func (sbe *PostgreSQL) GetObservations(request *datastore.GetObsRequest) (
func (sbe *PostgreSQL) GetObservations(
request *datastore.GetObsRequest, tspec common.TemporalSpec) (
*datastore.GetObsResponse, error) {

var err error

obs := []*datastore.Metadata2{}
if err = getObs(
sbe.Db, request, &obs); err != nil {
sbe.Db, request, tspec, &obs); err != nil {
return nil, fmt.Errorf("getObs() failed: %v", err)
}

Expand Down
4 changes: 3 additions & 1 deletion datastore/datastore/storagebackend/storagebackend.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storagebackend

import (
"datastore/common"
"datastore/datastore"
)

Expand All @@ -16,7 +17,8 @@ type StorageBackend interface {

// GetObservations retrieves observations from the storage.
// Returns nil upon success, otherwise error.
GetObservations(*datastore.GetObsRequest) (*datastore.GetObsResponse, error)
GetObservations(*datastore.GetObsRequest, common.TemporalSpec) (
*datastore.GetObsResponse, error)

// GetTSAttrGroups retrieves, for the non-default attributes in the input, the unique
// combinations of attribute values currently represented in the storage.
Expand Down
2 changes: 1 addition & 1 deletion protobuf/datastore.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ message GetObsRequest {
TimeInterval temporal_interval = 1; // only observations in this time range may be returned

// temporal spec applicable when temporal_mode = 'latest'
int32 latest_limit = 5; // only this many of the latest observations may be returned per time
string latest_limit = 5; // only this many of the latest observations may be returned per time
// series; default: 1
string latest_maxage = 6; // ISO 8601 period (like 'PT1H') that limits the time window (ending
// at the current time) from which the latest observations may be retrieved (use case: specify
Expand Down

1 comment on commit f9560f0

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API Unit Test Coverage Report
FileStmtsMissCoverMissing
\_\_init\_\_.py00100% 
datastore_pb2.py584621%24–69
datastore_pb2_grpc.py432347%37–52, 85–87, 92–94, 99–101, 106–108, 112–136, 174, 191, 208, 225
dependencies.py31487%23–24, 31, 38
grpc_getter.py8450%12–16
locustfile.py15150%1–31
main.py22386%27, 37, 47
metadata_endpoints.py19479%16, 33–66, 70
formatters
   \_\_init\_\_.py70100% 
   covjson.py46198%58
routers
   \_\_init\_\_.py00100% 
   edr.py711185%36–59, 109–110, 137–138
   records.py00100% 
TOTAL32011165% 

API Unit Test Coverage Summary

Tests Skipped Failures Errors Time
16 0 💤 0 ❌ 0 🔥 1.860s ⏱️

Please sign in to comment.