diff --git a/datastore/api/main.py b/datastore/api/main.py index 2313b9f..5287a0d 100644 --- a/datastore/api/main.py +++ b/datastore/api/main.py @@ -192,7 +192,7 @@ def get_locations(bbox: str = Query(..., example="5.0,52.0,6.0,52.1")) -> Featur with grpc.insecure_channel(f"{os.getenv('DSHOST', 'localhost')}:{os.getenv('DSPORT', '50050')}") as channel: grpc_stub = dstore_grpc.DatastoreStub(channel) ts_request = dstore.GetObsRequest( - instruments=["tn"], # Hack + instrument=["tn"], # Hack inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), ) ts_response = grpc_stub.GetObservations(ts_request) @@ -227,8 +227,8 @@ def get_data_location_id( # This is just a quick and dirty demo range = get_datetime_range(datetime) get_obs_request = dstore.GetObsRequest( - platforms=[location_id], - instruments=list(map(str.strip, parameter_name.split(","))), + platform=[location_id], + instrument=list(map(str.strip, parameter_name.split(","))), interval=dstore.TimeInterval(start=range[0], end=range[1]) if range else None, ) return get_data_for_time_series(get_obs_request) @@ -266,7 +266,7 @@ def get_data_area( assert poly.geom_type == "Polygon" range = get_datetime_range(datetime) get_obs_request = dstore.GetObsRequest( - instruments=list(map(str.strip, parameter_name.split(","))), + instrument=list(map(str.strip, parameter_name.split(","))), inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), interval=dstore.TimeInterval(start=range[0], end=range[1]) if range else None, ) diff --git a/datastore/datastore/protobuf/datastore.proto b/datastore/datastore/protobuf/datastore.proto index 4fe56f0..5f3296d 100644 --- a/datastore/datastore/protobuf/datastore.proto +++ b/datastore/datastore/protobuf/datastore.proto @@ -6,7 +6,8 @@ import "google/protobuf/timestamp.proto"; option go_package = "./datastore"; -// Notes: +// NOTES: +// // - A _time series_ is a context defined by a set of metadata (defined in TSMetadata below) that // usually does not vary with observaion (time). // @@ -71,47 +72,58 @@ message Link { } message TSMetadata { - string version = 1; - string type = 2; - string title = 3; - string summary = 4; - string keywords = 5; - string keywords_vocabulary = 6 [json_name = "keywords_vocabulary"]; - string license = 7; - string conventions = 8; - string naming_authority = 9 [json_name = "naming_authority"]; - string creator_type = 10 [json_name = "creator_type"]; - string creator_name = 11 [json_name = "creator_name"]; - string creator_email = 12 [json_name = "creator_email"]; - string creator_url = 13 [json_name = "creator_url"]; - string institution = 14; - string project = 15; - string source = 16; - string platform = 17; - string platform_vocabulary = 18 [json_name = "platform_vocabulary"]; - string standard_name = 19 [json_name = "standard_name"]; - string unit = 20; - string instrument = 21; - string instrument_vocabulary = 22 [json_name = "instrument_vocabulary"]; - repeated Link links = 23; + // --- BEGIN non-string metadata ----------------- + repeated Link links = 1; + // --- END non-string metadata ----------------- + + // --- BEGIN string metadata (handleable with reflection) ----------------- + string version = 2; + string type = 3; + string title = 4; + string summary = 5; + string keywords = 6; + string keywords_vocabulary = 7 [json_name = "keywords_vocabulary"]; + string license = 8; + string conventions = 9; + string naming_authority = 10 [json_name = "naming_authority"]; + string creator_type = 101 [json_name = "creator_type"]; + string creator_name = 12 [json_name = "creator_name"]; + string creator_email = 13 [json_name = "creator_email"]; + string creator_url = 14 [json_name = "creator_url"]; + string institution = 15; + string project = 16; + string source = 17; + string platform = 18; + string platform_vocabulary = 19 [json_name = "platform_vocabulary"]; + string standard_name = 20 [json_name = "standard_name"]; + string unit = 21; + string instrument = 22; + string instrument_vocabulary = 23 [json_name = "instrument_vocabulary"]; + // --- END string metadata ----------------- } message ObsMetadata { - string id = 1; + // --- BEGIN non-string metadata ----------------- oneof geometry { - Point geo_point = 2 [json_name = "geo_point"]; - Polygon geo_polygon = 3 [json_name = "geo_polygon"]; + Point geo_point = 1 [json_name = "geo_point"]; + Polygon geo_polygon = 2 [json_name = "geo_polygon"]; } - google.protobuf.Timestamp pubtime = 4; - string data_id = 5 [json_name = "data_id"]; - string history = 6; - string metadata_id = 7 [json_name = "metadata_id"]; oneof obstime { - google.protobuf.Timestamp obstime_instant = 8 [json_name = "obstime_instant"]; - //TimeInterval obstime_interval = 9 [json_name = "obstime_interval"]; -- unsupported for now + google.protobuf.Timestamp obstime_instant = 3 [json_name = "obstime_instant"]; + //TimeInterval obstime_interval = 4 [json_name = "obstime_interval"]; -- unsupported for now } + google.protobuf.Timestamp pubtime = 5; + // --- END non-string metadata ----------------- + + // --- BEGIN string metadata (handleable with reflection) ----------------- + string id = 6; + string data_id = 7 [json_name = "data_id"]; + string history = 8; + string metadata_id = 9 [json_name = "metadata_id"]; string processing_level = 10 [json_name = "processing_level"]; - string value = 11; + // --- END string metadata ----------------- + + string value = 11; // obs value (not metadata in a strict sense) } //--------------------------------------------------------------------------- @@ -140,13 +152,57 @@ message PutObsResponse { //--------------------------------------------------------------------------- message GetObsRequest { + // --- BEGIN non-string metadata ------------------------- + + // temporal search TimeInterval interval = 1; // only return observations in this time range + + // spatial search Polygon inside = 2; // if specified, only return observations in this area - repeated string platforms = 3; // if specified, only return observations matching any of these platform patterns - repeated string standard_names = 4 [json_name = "standard_names"]; // if specified, only return observations matching any of these standard names - repeated string instruments = 5; // if specified, only return observations matching any of these instruments - repeated string processing_levels = 6 [json_name = "processing_levels"]; // if specified, only return observations matching any of these processing levels - // TODO: add search filters for other metadata + + // search wrt. TSMetadata.links + // TODO - needs special handling + + // --- END non-string metadata ------------------------- + + + // --- BEGIN string metadata (handleable with reflection) ------------------------- + // + // - field names must correspond exactly with string field names in TSMetadata or ObsMetadata + // + // - if the field F is specified (where F is for example 'platform'), only observations + // matching at least one these values for F will be returned + // + // - if the field F is not specified, filtering on F is effectively disabled + + repeated string version = 3; + repeated string type = 4; + repeated string title = 5; + repeated string summary = 6; + repeated string keywords = 7; + repeated string keywords_vocabulary = 8 [json_name = "keywords_vocabulary"]; + repeated string license = 9; + repeated string conventions = 10; + repeated string naming_authority = 11 [json_name = "naming_authority"]; + repeated string creator_type = 12 [json_name = "creator_type"]; + repeated string creator_name = 13 [json_name = "creator_name"]; + repeated string creator_email = 14 [json_name = "creator_email"]; + repeated string creator_url = 15 [json_name = "creator_url"]; + repeated string institution = 16; + repeated string project = 17; + repeated string source = 18; + repeated string platform = 19; + repeated string platform_vocabulary = 20 [json_name = "platform_vocabulary"]; + repeated string standard_name = 21 [json_name = "standard_name"]; + repeated string unit = 22; + repeated string instrument = 23; + repeated string instrument_vocabulary = 24 [json_name = "instrument_vocabulary"]; + repeated string id = 25; + repeated string data_id = 26 [json_name = "data_id"]; + repeated string history = 27; + repeated string metadata_id = 28 [json_name = "metadata_id"]; + repeated string processing_level = 29 [json_name = "processing_level"]; + // --- END string metadata ------------------------------------- } message GetObsResponse { diff --git a/datastore/datastore/storagebackend/postgresql/getobservations.go b/datastore/datastore/storagebackend/postgresql/getobservations.go index 2e01a49..b4b0394 100644 --- a/datastore/datastore/storagebackend/postgresql/getobservations.go +++ b/datastore/datastore/storagebackend/postgresql/getobservations.go @@ -5,6 +5,7 @@ import ( "datastore/common" "datastore/datastore" "fmt" + "reflect" "strings" "time" @@ -14,6 +15,69 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) +var ( + tsStringMdataGoNames []string // Go names for time series metadata of type string + tsStringMdataPBNames []string // protobuf names for time series metadata of type string + + obspb2go map[string]string // association between observation specific protobuf name and + // (generated by protoc) Go name. + // NOTES: + // - Protobuf names are field names of the ObsMetadata message in datastore.proto. + // - Only fields of string type are included + // - The observation value field is not included + // - Protobuf names are identical to corresponding database column names. + // - Protobuf names are snake case (aaa_bbb) whereas Go names are camel case (aaaBbb or AaaBbb). + + obsStringMdataGoNames []string // Go names for observation metadata of type string + obsStringMdataCols []string // column names qualified with table name 'observation' +) + +func init() { + tsStringMdataGoNames = []string{} + tsStringMdataPBNames = []string{} + for _, field := range reflect.VisibleFields(reflect.TypeOf(datastore.TSMetadata{})) { + if field.IsExported() && (field.Type.Kind() == reflect.String) { + goName := field.Name + tsStringMdataGoNames = append(tsStringMdataGoNames, goName) + tsStringMdataPBNames = append(tsStringMdataPBNames, common.ToSnakeCase(goName)) + } + } + + obspb2go = map[string]string{} + obsStringMdataGoNames = []string{} + obsStringMdataCols = []string{} + for _, field := range reflect.VisibleFields(reflect.TypeOf(datastore.ObsMetadata{})) { + if field.IsExported() && (field.Type.Kind() == reflect.String) && + (strings.ToLower(field.Name) != "value") { // (obs value not considered metadata here) + goName := field.Name + pbName := common.ToSnakeCase(goName) + obspb2go[pbName] = goName + obsStringMdataGoNames = append(obsStringMdataGoNames, goName) + obsStringMdataCols = append(obsStringMdataCols, fmt.Sprintf("observation.%s", pbName)) + } + } +} + +// addStringMdata assigns the values of colVals to the corresponding struct fields in +// stringMdataGoNames (value i corresponds to field i ...). The struct is represented by rv. +// Returns nil upon success, otherwise error. +func addStringMdata(rv reflect.Value, stringMdataGoNames []string, colVals []interface{}) error { + for i, goName := range stringMdataGoNames { + val, ok := colVals[i].(string) + if !ok { + return fmt.Errorf("colVals[%d] not string: %v (type: %T)", i, colVals[i], colVals[i]) + } + + field := rv.Elem().FieldByName(goName) + + // NOTE: we assume the following assignment will never panic, hence we don't do + // any pre-validation of field + field.SetString(val) + } + + return nil +} + // addWhereCondMatchAnyPattern appends to whereExpr an expression of the form // "(cond1 OR cond2 OR ... OR condN)" where condi tests if the ith pattern in patterns matches // colName. Matching is case-insensitive and an asterisk in a pattern matches zero or more @@ -38,11 +102,68 @@ func addWhereCondMatchAnyPattern( *whereExpr = append(*whereExpr, fmt.Sprintf("(%s)", strings.Join(whereExprOR, " OR "))) } -// getTSMetadata retrieves into tsMdata metadata of time series in table time_series that match -// tsIDs. The keys of tsMdata are the time series IDs. -// Returns nil upon success, otherwise error -func getTSMetadata(db *sql.DB, tsIDs []string, tsMdata map[int64]*datastore.TSMetadata) error { +// scanTSRow scans all columns from the current result row in rows and converts to a TSMetadata +// object. +// Returns (TSMetadata object, time series ID, nil) upon success, otherwise (..., ..., error). +func scanTSRow(rows *sql.Rows) (*datastore.TSMetadata, int64, error) { + var ( + tsID int64 + linkHref pq.StringArray + linkRel pq.StringArray + linkType pq.StringArray + linkHrefLang pq.StringArray + linkTitle pq.StringArray + ) + // initialize colValPtrs with non-string metadata + colValPtrs := []interface{}{ + &tsID, + &linkHref, + &linkRel, + &linkType, + &linkHrefLang, + &linkTitle, + } + + // complete colValPtrs with string metadata (handleable with reflection) + colVals0 := make([]interface{}, len(tsStringMdataGoNames)) + for i := range tsStringMdataGoNames { + colValPtrs = append(colValPtrs, &colVals0[i]) + } + + // scan row into column value pointers + if err := rows.Scan(colValPtrs...); err != nil { + return nil, -1, fmt.Errorf("rows.Scan() failed: %v", err) + } + + // initialize tsMdata with non-string metadata + links := []*datastore.Link{} + for i := 0; i < len(linkHref); i++ { + links = append(links, &datastore.Link{ + Href: linkHref[i], + Rel: linkRel[i], + Type: linkType[i], + Hreflang: linkHrefLang[i], + Title: linkTitle[i], + }) + } + tsMdata := datastore.TSMetadata{ + Links: links, + } + + // complete tsMdata with string metadata (handleable with reflection) + err := addStringMdata(reflect.ValueOf(&tsMdata), tsStringMdataGoNames, colVals0) + if err != nil { + return nil, -1, fmt.Errorf("addStringMdata() failed: %v", err) + } + + return &tsMdata, tsID, nil +} + +// getTSMetadata retrieves into tsMdatas metadata of time series in table time_series that match +// tsIDs. The keys of tsMdatas are the time series IDs. +// Returns nil upon success, otherwise error +func getTSMetadata(db *sql.DB, tsIDs []string, tsMdatas map[int64]*datastore.TSMetadata) error { query := fmt.Sprintf( `SELECT id, %s FROM time_series WHERE %s`, strings.Join(getTSMdataCols(), ","), @@ -56,61 +177,12 @@ func getTSMetadata(db *sql.DB, tsIDs []string, tsMdata map[int64]*datastore.TSMe defer rows.Close() for rows.Next() { - var tsID int64 - var tsMdata0 datastore.TSMetadata - - linkHref := pq.StringArray{} - linkRel := pq.StringArray{} - linkType := pq.StringArray{} - linkHrefLang := pq.StringArray{} - linkTitle := pq.StringArray{} - - if err := rows.Scan( - &tsID, - &tsMdata0.Version, - &tsMdata0.Type, - &tsMdata0.Title, - &tsMdata0.Summary, - &tsMdata0.Keywords, - &tsMdata0.KeywordsVocabulary, - &tsMdata0.License, - &tsMdata0.Conventions, - &tsMdata0.NamingAuthority, - &tsMdata0.CreatorType, - &tsMdata0.CreatorName, - &tsMdata0.CreatorEmail, - &tsMdata0.CreatorUrl, - &tsMdata0.Institution, - &tsMdata0.Project, - &tsMdata0.Source, - &tsMdata0.Platform, - &tsMdata0.PlatformVocabulary, - &tsMdata0.StandardName, - &tsMdata0.Unit, - &tsMdata0.Instrument, - &tsMdata0.InstrumentVocabulary, - &linkHref, - &linkRel, - &linkType, - &linkHrefLang, - &linkTitle, - ); err != nil { - return fmt.Errorf("rows.Scan() failed: %v", err) - } - - links := []*datastore.Link{} - for i := 0; i < len(linkHref); i++ { - links = append(links, &datastore.Link{ - Href: linkHref[i], - Rel: linkRel[i], - Type: linkType[i], - Hreflang: linkHrefLang[i], - Title: linkTitle[i], - }) + tsMdata, tsID, err := scanTSRow(rows) + if err != nil { + return fmt.Errorf("scanTSRow() failed: %v", err) } - tsMdata0.Links = links - tsMdata[tsID] = &tsMdata0 + tsMdatas[tsID] = tsMdata } return nil @@ -144,13 +216,15 @@ func getTimeFilter(ti *datastore.TimeInterval) string { return timeExpr } -type filterInfo struct { +type stringFilterInfo struct { colName string - patterns []string // NOTE: only []string supported for now + patterns []string } -// getMdataFilter derives from filterInfos the expression used in a WHERE clause for "match any" -// filtering on a set of attributes. +// TODO: add filter info for non-string types + +// getMdataFilter derives from stringFilterInfos the expression used in a WHERE clause for +// "match any" filtering on a set of attributes. // // The expression will be of the form // @@ -163,13 +237,13 @@ type filterInfo struct { // Values to be used for query placeholders are appended to phVals. // // Returns expression. -func getMdataFilter(filterInfos []filterInfo, phVals *[]interface{}) string { +func getMdataFilter(stringFilterInfos []stringFilterInfo, phVals *[]interface{}) string { whereExprAND := []string{} - for _, fi := range filterInfos { + for _, sfi := range stringFilterInfos { addWhereCondMatchAnyPattern( - fi.colName, fi.patterns, &whereExprAND, phVals) + sfi.colName, sfi.patterns, &whereExprAND, phVals) } whereExpr := "TRUE" // by default, don't filter @@ -182,6 +256,9 @@ func getMdataFilter(filterInfos []filterInfo, phVals *[]interface{}) string { // getGeoFilter derives from 'inside' the expression used in a WHERE clause for keeping // observations inside this polygon. +// +// Values to be used for query placeholders are appended to phVals. +// // Returns expression. func getGeoFilter(inside *datastore.Polygon, phVals *[]interface{}) (string, error) { whereExpr := "TRUE" // by default, don't filter @@ -219,40 +296,175 @@ func getGeoFilter(inside *datastore.Polygon, phVals *[]interface{}) (string, err return whereExpr, nil } -// getObs gets into obs all observations that match request. -// Returns nil upon success, otherwise error. -func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Metadata2) error { +type stringFieldInfo struct { + field reflect.StructField + tableName string + method reflect.Value + methodName string +} - phVals := []interface{}{} // placeholder values +// getStringMdataFilter creates from 'request' the string metadata filter used for querying +// observations. +// +// Values to be used for query placeholders are appended to phVals. +// +// Returns upon success (string metadata filter used in a 'WHERE ... AND ...' clause (possibly +// just 'TRUE'), nil), otherwise (..., error). +func getStringMdataFilter( + request *datastore.GetObsRequest, phVals *[]interface{}) (string, error) { + + rv := reflect.ValueOf(request) + + stringFilterInfos := []stringFilterInfo{} + + stringFieldInfos := []stringFieldInfo{} + + addStringFields := func(s interface{}, tableName string) { + for _, field := range reflect.VisibleFields(reflect.TypeOf(s)) { + mtdName := fmt.Sprintf("Get%s", field.Name) + mtd := rv.MethodByName(mtdName) + if field.IsExported() && (field.Type.Kind() == reflect.String) && (mtd.IsValid()) { + stringFieldInfos = append(stringFieldInfos, stringFieldInfo{ + field: field, + tableName: tableName, + method: mtd, + methodName: mtdName, + }) + } + } + } + addStringFields(datastore.TSMetadata{}, "time_series") + addStringFields(datastore.ObsMetadata{}, "observation") + + for _, sfInfo := range stringFieldInfos { + patterns, ok := sfInfo.method.Call([]reflect.Value{})[0].Interface().([]string) + if !ok { + return "", fmt.Errorf( + "sfInfo.method.Call() failed for method %s; failed to return []string", + sfInfo.methodName) + } + if len(patterns) > 0 { + stringFilterInfos = append(stringFilterInfos, stringFilterInfo{ + colName: fmt.Sprintf( + "%s.%s", sfInfo.tableName, common.ToSnakeCase(sfInfo.field.Name)), + patterns: patterns, + }) + } + } - timeExpr := getTimeFilter(request.GetInterval()) + return getMdataFilter(stringFilterInfos, phVals), nil +} - tsMdataExpr := getMdataFilter([]filterInfo{ - {"platform", request.GetPlatforms()}, - {"standard_name", request.GetStandardNames()}, - {"instrument", request.GetInstruments()}, - // TODO: add search filters for more columns in table 'time_series' - }, &phVals) +// createObsQueryVals creates from 'request' values used for querying observations. +// +// Values to be used for query placeholders are appended to phVals. +// +// Upon success the function returns four values: +// - time filter used in a 'WHERE ... AND ...' clause (possibly just 'TRUE') +// - geo filter ... ditto +// - string metadata ... ditto +// - nil, +// otherwise (..., ..., ..., error). +func createObsQueryVals( + request *datastore.GetObsRequest, phVals *[]interface{}) (string, string, string, error) { + + timeFilter := getTimeFilter(request.GetInterval()) + + geoFilter, err := getGeoFilter(request.Inside, phVals) + if err != nil { + return "", "", "", fmt.Errorf("getGeoFilter() failed: %v", err) + } - obsMdataExpr := getMdataFilter([]filterInfo{ - {"processing_level", request.GetProcessingLevels()}, - // TODO: add search filters for more columns in table 'observation' - }, &phVals) + stringMdataFilter, err := getStringMdataFilter(request, phVals) + if err != nil { + return "", "", "", fmt.Errorf("getStringMdataFilter() failed: %v", err) + } + + return timeFilter, geoFilter, stringMdataFilter, nil +} + +// scanObsRow scans all columns from the current result row in rows and converts to an ObsMetadata +// object. +// Returns (ObsMetadata object, time series ID, nil) upon success, otherwise (..., ..., error). +func scanObsRow(rows *sql.Rows) (*datastore.ObsMetadata, int64, error) { + var ( + tsID int64 + obsTimeInstant0 time.Time + pubTime0 time.Time + value string + point postgis.PointS + ) - geoExpr, err := getGeoFilter(request.Inside, &phVals) + // initialize colValPtrs with non-string metadata + colValPtrs := []interface{}{ + &tsID, + &obsTimeInstant0, + &pubTime0, + &value, + &point, + } + + // complete colValPtrs with string metadata (handleable with reflection) + colVals0 := make([]interface{}, len(obsStringMdataGoNames)) + for i := range obsStringMdataGoNames { + colValPtrs = append(colValPtrs, &colVals0[i]) + } + + // scan row into column value pointers + if err := rows.Scan(colValPtrs...); err != nil { + return nil, -1, fmt.Errorf("rows.Scan() failed: %v", err) + } + + // initialize obsMdata with non-string metadata and obs value + obsMdata := datastore.ObsMetadata{ + Geometry: &datastore.ObsMetadata_GeoPoint{ + GeoPoint: &datastore.Point{ + Lon: point.X, + Lat: point.Y, + }, + }, + Obstime: &datastore.ObsMetadata_ObstimeInstant{ + ObstimeInstant: timestamppb.New(obsTimeInstant0), + }, + Pubtime: timestamppb.New(pubTime0), + Value: value, + } + + // complete obsMdata with string metadata (handleable with reflection) + err := addStringMdata(reflect.ValueOf(&obsMdata), obsStringMdataGoNames, colVals0) + if err != nil { + return nil, -1, fmt.Errorf("addStringMdata() failed: %v", err) + } + + return &obsMdata, tsID, nil +} + +// getObs gets into obs all observations that match request. +// Returns nil upon success, otherwise error. +func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Metadata2) (retErr error) { + + // get values needed for query + phVals := []interface{}{} // placeholder values + timeFilter, geoFilter, stringMdataFilter, err := createObsQueryVals(request, &phVals) if err != nil { - return fmt.Errorf("getGeoFilter() failed: %v", err) + return fmt.Errorf("createQueryVals() failed: %v", err) } + // define and execute query query := fmt.Sprintf(` - SELECT ts_id, observation.id, geo_point_id, pubtime, data_id, history, metadata_id, - obstime_instant, processing_level, value, point + SELECT + ts_id, + obstime_instant, + pubtime, + value, + point, + %s FROM observation - JOIN geo_point gp ON observation.geo_point_id = gp.id - JOIN time_series ts on ts.id = observation.ts_id - WHERE %s AND %s AND %s AND %s + JOIN time_series on time_series.id = observation.ts_id + JOIN geo_point ON observation.geo_point_id = geo_point.id + WHERE %s AND %s AND %s ORDER BY ts_id, obstime_instant - `, timeExpr, tsMdataExpr, obsMdataExpr, geoExpr) + `, strings.Join(obsStringMdataCols, ","), timeFilter, geoFilter, stringMdataFilter) rows, err := db.Query(query, phVals...) if err != nil { @@ -260,59 +472,32 @@ func getObs(db *sql.DB, request *datastore.GetObsRequest, obs *[]*datastore.Meta } defer rows.Close() - obsMap := make(map[int64][]*datastore.ObsMetadata) + obsMdatas := make(map[int64][]*datastore.ObsMetadata) // observations per time series ID + + // scan rows for rows.Next() { - var ( - tsID int64 - id string - gpID int64 - pubTime0 time.Time - dataID string - history string - metadataID string - obsTimeInstant0 time.Time - processingLevel string - value string - point postgis.PointS - ) - if err := rows.Scan(&tsID, &id, &gpID, &pubTime0, &dataID, &history, &metadataID, - &obsTimeInstant0, &processingLevel, &value, &point); err != nil { - return fmt.Errorf("rows.Scan() failed: %v", err) + obsMdata, tsID, err := scanObsRow(rows) + if err != nil { + return fmt.Errorf("scanObsRow() failed: %v", err) } - obsMdata := &datastore.ObsMetadata{ - Id: id, - Geometry: &datastore.ObsMetadata_GeoPoint{ - GeoPoint: &datastore.Point{ - Lon: point.X, - Lat: point.Y}, - }, - Pubtime: timestamppb.New(pubTime0), - DataId: dataID, - History: history, - MetadataId: metadataID, - Obstime: &datastore.ObsMetadata_ObstimeInstant{ - ObstimeInstant: timestamppb.New(obsTimeInstant0), - }, - ProcessingLevel: processingLevel, - Value: value, - } - obsMap[tsID] = append(obsMap[tsID], obsMdata) + obsMdatas[tsID] = append(obsMdatas[tsID], obsMdata) } // get time series - tsMdata := map[int64]*datastore.TSMetadata{} + tsMdatas := map[int64]*datastore.TSMetadata{} tsIDs := []string{} - for id := range obsMap { - tsIDs = append(tsIDs, fmt.Sprintf("%d", id)) + for tsID := range obsMdatas { + tsIDs = append(tsIDs, fmt.Sprintf("%d", tsID)) } - if err = getTSMetadata(db, tsIDs, tsMdata); err != nil { + if err = getTSMetadata(db, tsIDs, tsMdatas); err != nil { return fmt.Errorf("getTSMetadata() failed: %v", err) } - for tsID, obsMdata := range obsMap { + // assemble final output + for tsID, obsMdata := range obsMdatas { *obs = append(*obs, &datastore.Metadata2{ - TsMdata: tsMdata[tsID], + TsMdata: tsMdatas[tsID], ObsMdata: obsMdata, }) } diff --git a/datastore/datastore/storagebackend/postgresql/gettsattrgroups.go b/datastore/datastore/storagebackend/postgresql/gettsattrgroups.go index ae7c093..28222b6 100644 --- a/datastore/datastore/storagebackend/postgresql/gettsattrgroups.go +++ b/datastore/datastore/storagebackend/postgresql/gettsattrgroups.go @@ -22,10 +22,10 @@ var ( func init() { tspb2go = map[string]string{} - for _, f := range reflect.VisibleFields(reflect.TypeOf(datastore.TSMetadata{})) { - if f.IsExported() && (f.Type.Kind() == reflect.String) { + for _, field := range reflect.VisibleFields(reflect.TypeOf(datastore.TSMetadata{})) { + if field.IsExported() && (field.Type.Kind() == reflect.String) { // TODO: support non-string types, like the 'links' attribute - goName := f.Name + goName := field.Name pbName := common.ToSnakeCase(goName) tspb2go[pbName] = goName } @@ -75,9 +75,9 @@ func validateAttrs(pbNames []string) error { return nil } -// getTSMdata returns a TSMetadata object initialized from colVals. +// getTSMdata creates a TSMetadata object initialized from colVals. +// Returns (TSMetadata object, nil) upon success, otherwise (..., error). func getTSMdata(colVals map[string]interface{}) (*datastore.TSMetadata, error) { - tsMData := datastore.TSMetadata{} tp := reflect.ValueOf(&tsMData) @@ -117,12 +117,10 @@ func getTSMdata(colVals map[string]interface{}) (*datastore.TSMetadata, error) { return &tsMData, nil } -// scanTsMdata scans the current row in rows and converts the result into a TSMetadata object. -// It is assumed that cols contains exactly the columns (names, types, order) that were used for -// the query that resulted in rows. +// scanTsRow scans columns cols from the current result row in rows and converts to a TSMetadata +// object. // Returns (TSMetadata object, nil) upon success, otherwise (..., error). -func scanTsMdata(rows *sql.Rows, cols []string) (*datastore.TSMetadata, error) { - +func scanTsRow(rows *sql.Rows, cols []string) (*datastore.TSMetadata, error) { colVals0 := make([]interface{}, len(cols)) // column values colValPtrs := make([]interface{}, len(cols)) // pointers to column values for i := range colVals0 { @@ -238,9 +236,9 @@ func getTSAttrGroupsIncInstances( currInstances := []*datastore.TSMetadata{} // initial current instance set for rows.Next() { // extract tsMdata from current result row - tsMdata, err := scanTsMdata(rows, allCols) + tsMdata, err := scanTsRow(rows, allCols) if err != nil { - return nil, fmt.Errorf("scanTsMdata() failed: %v", err) + return nil, fmt.Errorf("scanTsRow() failed: %v", err) } if len(currInstances) > 0 { // check if we should create a new current instance set @@ -304,13 +302,12 @@ func getTSAttrGroupsComboOnly(db *sql.DB, cols []string) ([]*datastore.TSMdataGr // aggregate rows into groups for rows.Next() { // extract tsMdata from current result row - tsMdata, err := scanTsMdata(rows, cols) + tsMdata, err := scanTsRow(rows, cols) if err != nil { - return nil, fmt.Errorf("scanTsMdata() failed: %v", err) + return nil, fmt.Errorf("scanTsRow() failed: %v", err) } - // add new group with tsMData as the combo (and leaving the instances - // array unset) + // add new group with tsMData as the combo (and leaving the Instances array unset) groups = append(groups, &datastore.TSMdataGroup{Combo: tsMdata}) } diff --git a/datastore/datastore/storagebackend/postgresql/postgresql.go b/datastore/datastore/storagebackend/postgresql/postgresql.go index 3dbb4d3..7b7197f 100644 --- a/datastore/datastore/storagebackend/postgresql/postgresql.go +++ b/datastore/datastore/storagebackend/postgresql/postgresql.go @@ -92,37 +92,20 @@ func NewPostgreSQL() (*PostgreSQL, error) { // getTSMdataCols returns time series metadata column names. func getTSMdataCols() []string { - return []string{ - // main section - "version", - "type", - "title", - "summary", - "keywords", - "keywords_vocabulary", - "license", - "conventions", - "naming_authority", - "creator_type", - "creator_name", - "creator_email", - "creator_url", - "institution", - "project", - "source", - "platform", - "platform_vocabulary", - "standard_name", - "unit", - "instrument", - "instrument_vocabulary", - // links section + + // initialize cols with non-string metadata + cols := []string{ "link_href", "link_rel", "link_type", "link_hreflang", "link_title", } + + // complete cols with string metadata (handleable with reflection) + cols = append(cols, tsStringMdataPBNames...) + + return cols } // createPlaceholders returns the list of n placeholder strings for diff --git a/datastore/datastore/storagebackend/postgresql/putobservations.go b/datastore/datastore/storagebackend/postgresql/putobservations.go index 183ac79..9066d16 100644 --- a/datastore/datastore/storagebackend/postgresql/putobservations.go +++ b/datastore/datastore/storagebackend/postgresql/putobservations.go @@ -6,6 +6,7 @@ import ( "datastore/datastore" "fmt" "log" + "reflect" "strconv" "strings" @@ -41,37 +42,9 @@ func initPutObsLimit() { // getTSColVals gets the time series metadata column values from tsMdata. // Returns (column values, nil) upon success, otherwise (..., error). func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) { - colVals := []interface{}{} - // main section - - colVals = []interface{}{ - tsMdata.GetVersion(), - tsMdata.GetType(), - tsMdata.GetTitle(), - tsMdata.GetSummary(), - tsMdata.GetKeywords(), - tsMdata.GetKeywordsVocabulary(), - tsMdata.GetLicense(), - tsMdata.GetConventions(), - tsMdata.GetNamingAuthority(), - tsMdata.GetCreatorType(), - tsMdata.GetCreatorName(), - tsMdata.GetCreatorEmail(), - tsMdata.GetCreatorUrl(), - tsMdata.GetInstitution(), - tsMdata.GetProject(), - tsMdata.GetSource(), - tsMdata.GetPlatform(), - tsMdata.GetPlatformVocabulary(), - tsMdata.GetStandardName(), - tsMdata.GetUnit(), - tsMdata.GetInstrument(), - tsMdata.GetInstrumentVocabulary(), - } - - // links section + // --- BEGIN non-string metadata --------------------------- getLinkVals := func(key string) ([]string, error) { linkVals := []string{} @@ -104,6 +77,26 @@ func getTSColVals(tsMdata *datastore.TSMetadata) ([]interface{}, error) { } } + // --- END non-string metadata --------------------------- + + // --- BEGIN string metadata (handleable with reflection) --------------------------- + + rv := reflect.ValueOf(tsMdata) + for _, field := range reflect.VisibleFields(reflect.TypeOf(datastore.TSMetadata{})) { + methodName := fmt.Sprintf("Get%s", field.Name) + method := rv.MethodByName(methodName) + if field.IsExported() && (field.Type.Kind() == reflect.String) && (method.IsValid()) { + val, ok := method.Call([]reflect.Value{})[0].Interface().(string) + if !ok { + return nil, fmt.Errorf( + "method.Call() failed for method %s; failed to return string", methodName) + } + colVals = append(colVals, val) + } + } + + // --- END string metadata --------------------------- + return colVals, nil } @@ -155,6 +148,8 @@ func getTimeSeriesID( cols[0], cols[0], ) + fmt.Printf("insertCmd: %s; len(cols): %d; len(phs): %d\n", + insertCmd, len(cols), len(createPlaceholders(formats))) _, err = tx.Exec(insertCmd, colVals...) if err != nil { diff --git a/datastore/integration-test/test_knmi.py b/datastore/integration-test/test_knmi.py index 31e965a..dfe500a 100644 --- a/datastore/integration-test/test_knmi.py +++ b/datastore/integration-test/test_knmi.py @@ -20,7 +20,7 @@ def grpc_stub(): def test_find_series_single_station_single_parameter(grpc_stub): - request = dstore.GetObsRequest(platforms=["06260"], instruments=["rh"]) + request = dstore.GetObsRequest(platform=["06260"], instrument=["rh"]) response = grpc_stub.GetObservations(request) assert len(response.observations) == 1 @@ -30,21 +30,21 @@ def test_find_series_single_station_single_parameter(grpc_stub): def test_find_series_all_stations_single_parameter(grpc_stub): - request = dstore.GetObsRequest(instruments=["rh"]) + request = dstore.GetObsRequest(instrument=["rh"]) response = grpc_stub.GetObservations(request) assert len(response.observations) == 46 # Not all station have RH def test_find_series_single_station_all_parameters(grpc_stub): - request = dstore.GetObsRequest(platforms=["06260"]) + request = dstore.GetObsRequest(platform=["06260"]) response = grpc_stub.GetObservations(request) assert len(response.observations) == 42 # Station 06260 doesn't have all parameters def test_get_values_single_station_single_parameter(grpc_stub): - ts_request = dstore.GetObsRequest(platforms=["06260"], instruments=["rh"]) + ts_request = dstore.GetObsRequest(platform=["06260"], instrument=["rh"]) response = grpc_stub.GetObservations(ts_request) assert len(response.observations) == 1 @@ -60,7 +60,7 @@ def test_get_values_single_station_single_parameter_one_hour(grpc_stub): end_datetime.FromDatetime(datetime(2022, 12, 31, 12)) ts_request = dstore.GetObsRequest( - platforms=["06260"], instruments=["rh"], interval=dstore.TimeInterval(start=start_datetime, end=end_datetime) + platform=["06260"], instrument=["rh"], interval=dstore.TimeInterval(start=start_datetime, end=end_datetime) ) response = grpc_stub.GetObservations(ts_request) @@ -148,7 +148,7 @@ def test_get_values_single_station_single_parameter_one_hour(grpc_stub): @pytest.mark.parametrize("coords,param_ids,expected_station_ids", input_params_polygon) def test_get_observations_with_polygon(grpc_stub, coords, param_ids, expected_station_ids): polygon = dstore.Polygon(points=[dstore.Point(lat=lat, lon=lon) for lat, lon in coords]) - get_obs_request = dstore.GetObsRequest(inside=polygon, instruments=param_ids) + get_obs_request = dstore.GetObsRequest(inside=polygon, instrument=param_ids) get_obs_response = grpc_stub.GetObservations(get_obs_request) actual_station_ids = sorted({ts.ts_mdata.platform for ts in get_obs_response.observations}) diff --git a/datastore/load-test/locustfile.py b/datastore/load-test/locustfile.py index 81e966f..3bb4892 100644 --- a/datastore/load-test/locustfile.py +++ b/datastore/load-test/locustfile.py @@ -44,8 +44,8 @@ def get_data_for_single_timeserie(self): request = dstore.GetObsRequest( interval=dstore.TimeInterval(start=from_time, end=to_time), - platforms=[random.choice(stations)], - instruments=[random.choice(parameters)], + platform=[random.choice(stations)], + instrument=[random.choice(parameters)], ) response = self.stub.GetObservations(request) assert len(response.observations) == 1 @@ -63,7 +63,7 @@ def get_data_single_station_through_bbox(self): request = dstore.GetObsRequest( interval=dstore.TimeInterval(start=from_time, end=to_time), - instruments=[random.choice(parameters)], + instrument=[random.choice(parameters)], inside=dstore.Polygon(points=[dstore.Point(lat=coord[1], lon=coord[0]) for coord in poly.exterior.coords]), ) response = self.stub.GetObservations(request) diff --git a/datastore/migrate/data/migrations/1701872471_initialise_schema.up.sql b/datastore/migrate/data/migrations/1701872471_initialise_schema.up.sql index dd7ae0c..0950742 100644 --- a/datastore/migrate/data/migrations/1701872471_initialise_schema.up.sql +++ b/datastore/migrate/data/migrations/1701872471_initialise_schema.up.sql @@ -4,33 +4,40 @@ CREATE TABLE time_series ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, -- --- BEGIN metadata fields that usually don't vary with obs time --- - version TEXT NOT NULL, -- required - type TEXT NOT NULL, -- required - title TEXT, - summary TEXT NOT NULL, -- required - keywords TEXT NOT NULL, -- required - keywords_vocabulary TEXT NOT NULL, -- required - license TEXT NOT NULL, -- required - conventions TEXT NOT NULL, -- required - naming_authority TEXT NOT NULL, -- required - creator_type TEXT, - creator_name TEXT, - creator_email TEXT, - creator_url TEXT, - institution TEXT, - project TEXT, - source TEXT, - platform TEXT NOT NULL, -- required - platform_vocabulary TEXT NOT NULL, -- required - standard_name TEXT, - unit TEXT, - instrument TEXT NOT NULL, - instrument_vocabulary TEXT NOT NULL, + + -- --- BEGIN non-string metadata ----------------- link_href TEXT[], link_rel TEXT[], link_type TEXT[], link_hreflang TEXT[], link_title TEXT[], + -- --- END non-string metadata ----------------- + + -- --- BEGIN string metadata (handleable with reflection) ----------------- + version TEXT NOT NULL, -- required + type TEXT NOT NULL, -- required + title TEXT, + summary TEXT NOT NULL, -- required + keywords TEXT NOT NULL, -- required + keywords_vocabulary TEXT NOT NULL, -- required + license TEXT NOT NULL, -- required + conventions TEXT NOT NULL, -- required + naming_authority TEXT NOT NULL, -- required + creator_type TEXT, + creator_name TEXT, + creator_email TEXT, + creator_url TEXT, + institution TEXT, + project TEXT, + source TEXT, + platform TEXT NOT NULL, -- required + platform_vocabulary TEXT NOT NULL, -- required + standard_name TEXT, + unit TEXT, + instrument TEXT NOT NULL, + instrument_vocabulary TEXT NOT NULL, + -- --- END string metadata ----------------- + -- --- END metadata fields that usually don't vary with obs time --- CONSTRAINT unique_main UNIQUE NULLS NOT DISTINCT (version, type, title, summary, keywords, @@ -51,23 +58,29 @@ CREATE TABLE observation ( ts_id BIGINT NOT NULL REFERENCES time_series(id) ON DELETE CASCADE, -- --- BEGIN metadata fields that usually vary with obs time --- - id TEXT NOT NULL, -- required + -- --- BEGIN non-string metadata ----------------- -- Refer to geometry via a foreign key to ensure that each distinct geometry is -- stored only once in the geo_* table, thus speeding up geo search. geo_point_id BIGINT NOT NULL REFERENCES geo_point(id) ON DELETE CASCADE, + -- --- BEGIN for now support only a single instant for obs time --------- + obstime_instant timestamptz, -- NOT NULL, but implied by being part of PK; obs time variant 1: single instant + -- --- END for now support only a single instant for obs time --------- + pubtime timestamptz NOT NULL, -- required + -- --- END non-string metadata ----------------- + + -- --- BEGIN string metadata (handleable with reflection) ----------------- + id TEXT NOT NULL, -- required data_id TEXT NOT NULL, -- required history TEXT, metadata_id TEXT NOT NULL, -- required + processing_level TEXT, + -- --- END string metadata ----------------- - -- --- BEGIN for now support only a single instant for obs time --------- - obstime_instant timestamptz, -- NOT NULL, but implied by being part of PK; obs time variant 1: single instant - -- --- END for now support only a single instant for obs time --------- + value TEXT NOT NULL, -- obs value (not metadata in a strict sense) - processing_level TEXT, - value TEXT NOT NULL, -- obs value -- --- END metadata fields that usually vary with obs time --- PRIMARY KEY (ts_id, obstime_instant)