diff --git a/datastore/datastore/README.md b/datastore/datastore/README.md index 85dedfd7..e8758583 100644 --- a/datastore/datastore/README.md +++ b/datastore/datastore/README.md @@ -241,6 +241,12 @@ $ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed", "air_temper ... ``` +### Retrieve the two most recent wind speed observations for platform 78990 +```text +$ grpcurl -d '{"filter": {"standard_name": {"values": ["wind_speed"]}, "platform": {"values": ["78990"]}}, "temporal_mode": "latest", "latest_limit": "2"}' -plaintext -proto protobuf/datastore.proto 127.0.0.1:50050 datastore.Datastore.GetObservations +... +``` + ### List unique occurrences of time series metadata attribute 'standard_name' ```text diff --git a/datastore/datastore/common/common.go b/datastore/datastore/common/common.go index feb8bc0b..dcfd697e 100644 --- a/datastore/datastore/common/common.go +++ b/datastore/datastore/common/common.go @@ -1,6 +1,7 @@ package common import ( + "datastore/datastore" "fmt" "log" "os" @@ -167,3 +168,11 @@ func GetValidTimeRangeSettings() string { func ToSnakeCase(s string) string { return strings.ToLower(snakeCaseRE.ReplaceAllString(s, "${1}_${2}")) } + +type TemporalSpec struct { + IntervalMode bool // whether temporal mode is 'interval' (true) or 'latest' (false) + Interval *datastore.TimeInterval // interval in 'interval' mode + LatestLimit int // max number of observations retrievable in 'latest' mode + LatestMaxage time.Duration // interval, defined as [now - LatestMaxage, now], within which + // observations may be retrieved in 'latest' mode +} diff --git a/datastore/datastore/dsimpl/getobservations.go b/datastore/datastore/dsimpl/getobservations.go index fbefd06d..ded0637a 100644 --- a/datastore/datastore/dsimpl/getobservations.go +++ b/datastore/datastore/dsimpl/getobservations.go @@ -3,24 +3,95 @@ package dsimpl import ( "context" "fmt" + "strconv" + "strings" + "time" + "datastore/common" "datastore/datastore" ) +// getTemporalSpec derives and validates a temporal specification from request. +// +// Returns (spec, nil) upon success, otherwise (..., error). +func getTemporalSpec(request *datastore.GetObsRequest) (common.TemporalSpec, error) { + + tspec := common.TemporalSpec{} + + // get mode + tspec.IntervalMode = true // assume 'interval' mode by default + if tmode0 := request.GetTemporalMode(); tmode0 != "" { + tmode := strings.ToLower(strings.TrimSpace(tmode0)) + switch { + case tmode == "latest": + tspec.IntervalMode = false + case tmode != "interval": + return common.TemporalSpec{}, fmt.Errorf( + "expected either 'interval' or 'latest' for temporal_mode; found '%s'", tmode) + } + } + + if tspec.IntervalMode { // validate and initialize for 'interval' mode + + ti := request.GetTemporalInterval() + + // do general validation of interval + if ti != nil { + if ti.Start != nil && ti.End != nil { + if ti.End.AsTime().Before(ti.Start.AsTime()) { + return common.TemporalSpec{}, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start) + } + } + } + + tspec.Interval = ti // valid (but possibly nil to specify a fully open interval!) + + } else { // validate and initialize for 'latest' mode + + // latest_limit ... + if limit0 := request.GetLatestLimit(); limit0 != "" { + limit, err := strconv.Atoi(limit0) + if err != nil { + return common.TemporalSpec{}, + fmt.Errorf("failed to convert latest_limit to an int: %v", limit0) + } + + if limit < 0 { + return common.TemporalSpec{}, + fmt.Errorf("latest_limit cannot be negative: %d", limit) + } + + tspec.LatestLimit = limit // valid + + } else { + // by default return the single latest obs for a time series + tspec.LatestLimit = 1 // default + } + + // latest_maxage ... + if maxage0 := request.GetLatestMaxage(); maxage0 != "" { + // ### TODO! (convert maxage0 (an ISO-8601 period) into a time.Duration and assign to + // tspec.LatestMaxage) + return common.TemporalSpec{}, fmt.Errorf("latest_maxage unimplemented") + } else { + // by default don't consider any observation as too old + tspec.LatestMaxage = time.Duration(-1) + } + } + + return tspec, nil +} + func (svcInfo *ServiceInfo) GetObservations( ctx context.Context, request *datastore.GetObsRequest) ( *datastore.GetObsResponse, error) { - // do general validation of any obs time interval - if ti := request.GetTemporalInterval(); ti != nil { - if ti.Start != nil && ti.End != nil { - if ti.End.AsTime().Before(ti.Start.AsTime()) { - return nil, fmt.Errorf("end(%v) < start(%v)", ti.End, ti.Start) - } - } + tspec, err := getTemporalSpec(request) + if err != nil { + return nil, fmt.Errorf("dsimpl.GetTemporalSpec() failed: %v", err) } - response, err := svcInfo.Sbe.GetObservations(request) + response, err := svcInfo.Sbe.GetObservations(request, tspec) if err != nil { return nil, fmt.Errorf("svcInfo.Sbe.GetObservations() failed: %v", err) } diff --git a/datastore/datastore/storagebackend/postgresql/getobservations.go b/datastore/datastore/storagebackend/postgresql/getobservations.go index b10de184..ef9c507e 100644 --- a/datastore/datastore/storagebackend/postgresql/getobservations.go +++ b/datastore/datastore/storagebackend/postgresql/getobservations.go @@ -11,7 +11,6 @@ import ( "github.com/cridenour/go-postgis" "github.com/lib/pq" - _ "github.com/lib/pq" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -145,23 +144,32 @@ func getTSMetadata(db *sql.DB, tsIDs []string, tsMdatas map[int64]*datastore.TSM return nil } -// getTimeFilter derives from ti the expression used in a WHERE clause for filtering on obs time. +// getTimeFilter derives from tspec the expression used in a WHERE clause for overall +// (i.e. not time series specific) filtering on obs time. +// // Returns expression. -func getTimeFilter(ti *datastore.TimeInterval) string { +func getTimeFilter(tspec common.TemporalSpec) string { + timeExpr := "TRUE" // by default, don't filter on obs time at all - if ti != nil { - timeExprs := []string{} - if start := ti.GetStart(); start != nil { - timeExprs = append(timeExprs, fmt.Sprintf( - "obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start))) - } - if end := ti.GetEnd(); end != nil { - timeExprs = append(timeExprs, fmt.Sprintf( - "obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end))) - } - if len(timeExprs) > 0 { - timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND ")) + if tspec.IntervalMode { + // the 'interval' filter is applied in the same way to all time series and can thus be + // part of the WHERE clause (this is contrast to the 'latest' filter which must be applied + // individually to each time series at a later stage) + ti := tspec.Interval + if ti != nil { + timeExprs := []string{} + if start := ti.GetStart(); start != nil { + timeExprs = append(timeExprs, fmt.Sprintf( + "obstime_instant >= to_timestamp(%f)", common.Tstamp2float64Secs(start))) + } + if end := ti.GetEnd(); end != nil { + timeExprs = append(timeExprs, fmt.Sprintf( + "obstime_instant < to_timestamp(%f)", common.Tstamp2float64Secs(end))) + } + if len(timeExprs) > 0 { + timeExpr = fmt.Sprintf("(%s)", strings.Join(timeExprs, " AND ")) + } } } @@ -303,7 +311,7 @@ func getStringMdataFilter( return getMdataFilter(stringFilterInfos, phVals), nil } -// createObsQueryVals creates from 'request' values used for querying observations. +// createObsQueryVals creates from request and tspec values used for querying observations. // // Values to be used for query placeholders are appended to phVals. // @@ -314,9 +322,10 @@ func getStringMdataFilter( // - nil, // otherwise (..., ..., ..., error). func createObsQueryVals( - request *datastore.GetObsRequest, phVals *[]interface{}) (string, string, string, error) { + request *datastore.GetObsRequest, tspec common.TemporalSpec, phVals *[]interface{}) ( + string, string, string, error) { - timeFilter := getTimeFilter(request.GetTemporalInterval()) + timeFilter := getTimeFilter(tspec) geoFilter, err := getGeoFilter(request.GetSpatialArea(), phVals) if err != nil { @@ -389,11 +398,13 @@ func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) { // getObs gets into obs all observations that match request. // Returns nil upon success, otherwise error. -func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Metadata2) (retErr error) { +func getObs( + db *sql.DB, request *datastore.GetObsRequest, tspec common.TemporalSpec, + obs *[]*datastore.Metadata2) (retErr error) { // get values needed for query phVals := []interface{}{} // placeholder values - timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, &phVals) + timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, tspec, &phVals) if err != nil { return fmt.Errorf("createQueryVals() failed: %v", err) } @@ -411,7 +422,7 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta JOIN time_series on time_series.id = observation.ts_id JOIN geo_point ON observation.geo_point_id = geo_point.id WHERE %s AND %s AND %s - ORDER BY ts_id, obstime_instant + ORDER BY ts_id, obstime_instant DESC `, strings.Join(obsStringMdataCols, ","), timeFilter, geoFilter, stringMdataFilter) rows, err := db.Query(query, phVals...) @@ -422,6 +433,17 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta obsMdatas := make(map[int64][]*datastore.ObsMetadata) // observations per time series ID + // set oldest allowable obs time when in 'latest' mode + oldestTime := time.Time{} + if (!tspec.IntervalMode) { + loTime, hiTime := common.GetValidTimeRange() + if tspec.LatestMaxage < 0 { + oldestTime = loTime + } else { + oldestTime = hiTime.Add(-tspec.LatestMaxage) + } + } + // scan rows for rows.Next() { obsMdata, tsID, err := scanObsRow(rows) @@ -429,7 +451,20 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta return fmt.Errorf("scanObsRow() failed: %v", err) } - obsMdatas[tsID] = append(obsMdatas[tsID], obsMdata) + includeObs := true // by default include obs in this time series (in particular in + // 'interval' mode, since filtering for that has been done already) + if (!tspec.IntervalMode) { // 'latest' mode + switch { + case len(obsMdatas[tsID]) >= tspec.LatestLimit: + includeObs = false // reject, since not room for this obs + case obsMdata.GetObstimeInstant().AsTime().Before(oldestTime): + includeObs = false // reject, since obs too old + } + } + + if includeObs { // prepend obs to time series to get chronological order + obsMdatas[tsID] = append([]*datastore.ObsMetadata{obsMdata}, obsMdatas[tsID]...) + } } // get time series @@ -454,14 +489,15 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta } // GetObservations ... (see documentation in StorageBackend interface) -func (sbe *PostgreSQL) GetObservations(request *datastore.GetObsRequest) ( +func (sbe *PostgreSQL) GetObservations( + request *datastore.GetObsRequest, tspec common.TemporalSpec) ( *datastore.GetObsResponse, error) { var err error obs := []*datastore.Metadata2{} if err = getObs( - sbe.Db, request, &obs); err != nil { + sbe.Db, request, tspec, &obs); err != nil { return nil, fmt.Errorf("getObs() failed: %v", err) } diff --git a/datastore/datastore/storagebackend/storagebackend.go b/datastore/datastore/storagebackend/storagebackend.go index 21476ab6..f2c2676a 100644 --- a/datastore/datastore/storagebackend/storagebackend.go +++ b/datastore/datastore/storagebackend/storagebackend.go @@ -1,6 +1,7 @@ package storagebackend import ( + "datastore/common" "datastore/datastore" ) @@ -16,7 +17,8 @@ type StorageBackend interface { // GetObservations retrieves observations from the storage. // Returns nil upon success, otherwise error. - GetObservations(*datastore.GetObsRequest) (*datastore.GetObsResponse, error) + GetObservations(*datastore.GetObsRequest, common.TemporalSpec) ( + *datastore.GetObsResponse, error) // GetTSAttrGroups retrieves, for the non-default attributes in the input, the unique // combinations of attribute values currently represented in the storage. diff --git a/protobuf/datastore.proto b/protobuf/datastore.proto index 2620f915..82d07be5 100644 --- a/protobuf/datastore.proto +++ b/protobuf/datastore.proto @@ -169,7 +169,7 @@ message GetObsRequest { TimeInterval temporal_interval = 1; // only observations in this time range may be returned // temporal spec applicable when temporal_mode = 'latest' - int32 latest_limit = 5; // only this many of the latest observations may be returned per time + string latest_limit = 5; // only this many of the latest observations may be returned per time // series; default: 1 string latest_maxage = 6; // ISO 8601 period (like 'PT1H') that limits the time window (ending // at the current time) from which the latest observations may be retrieved (use case: specify