Skip to content

Commit

Permalink
Merge pull request #75 from EURODEO/issue_64_follow_up
Browse files Browse the repository at this point in the history
Issue 64 follow up
  • Loading branch information
jo-asplin-met-no authored Dec 5, 2023
2 parents de3b141 + 11a5203 commit 98f1976
Showing 1 changed file with 56 additions and 40 deletions.
96 changes: 56 additions & 40 deletions datastore/storagebackend/postgresql/gettsattrgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
tspb2go map[string]string // association between time series specific protobuf name and
// (generated by protoc) Go name.
// NOTES:
// - Protobuf names are field names the TSMetadata message in datastore.proto.
// - Protobuf names are field names of the TSMetadata message in datastore.proto.
// - Protobuf names are identical to corresponding database column names.
// - Protobuf names are snake case (aaa_bbb) whereas Go names are camel case (aaaBbb or AaaBbb).
)
Expand Down Expand Up @@ -54,27 +54,25 @@ func getTSAllPBNames() []string {
return pbNames
}

// getTSDBColumns returns names of database columns corresponding to pbNames.
func getTSDBColumns(pbNames []string) ([]string, error) {
// validateAttrs validates pbNames. Returns nil if valid, otherwise error.
func validateAttrs(pbNames []string) error {
seen := map[string]struct{}{}
cols := []string{}

for _, pbName := range pbNames {
if _, found := tspb2go[pbName]; !found {
return nil, fmt.Errorf(
return fmt.Errorf(
"attribute not found: %s; supported attributes: %s",
pbName, strings.Join(getTSAllPBNames(), ", "))
}

if _, found := seen[pbName]; found {
return nil, fmt.Errorf("attribute %s specified more than once", pbName)
return fmt.Errorf("attribute %s specified more than once", pbName)
}

cols = append(cols, pbName)
seen[pbName] = struct{}{}
}

return cols, nil
return nil
}

// getTSMdata returns a TSMetadata object initialized from colVals.
Expand Down Expand Up @@ -119,24 +117,29 @@ func getTSMdata(colVals map[string]interface{}) (*datastore.TSMetadata, error) {
return &tsMData, nil
}

// scanTsMdata scans column cols from current result row in rows and converts to a
// TSMetadata object.
// scanTsMdata scans the current row in rows and converts the result into a TSMetadata object.
// It is assumed that cols contains exactly the columns (names, types, order) that were used for
// the query that resulted in rows.
// Returns (TSMetadata object, nil) upon success, otherwise (..., error).
func scanTsMdata(rows *sql.Rows, cols []string) (*datastore.TSMetadata, error) {

colVals0 := make([]interface{}, len(cols)) // column values
colValPtrs := make([]interface{}, len(cols)) // pointers to column values
for i := range colVals0 {
colValPtrs[i] = &colVals0[i]
}

// scan row into column value pointers
if err := rows.Scan(colValPtrs...); err != nil {
return nil, fmt.Errorf("rows.Scan() failed: %v", err)
}

// combine column names and -values into a map
colVals := map[string]interface{}{}
for i, col := range cols {
colVals[col] = colVals0[i]
}

// convert to a TSMetadata object
tsMdata, err := getTSMdata(colVals)
if err != nil {
Expand Down Expand Up @@ -202,17 +205,21 @@ func getCombo(tsMdata1 *datastore.TSMetadata, goNames []string) (*datastore.TSMe
return &tsMdata2, nil
}

// getTSAttrGroupsIncInstances populates groups from cols such that each group contains all
// instances that match a unique combination of database values corresponding to cols.
// getTSAttrGroupsIncInstances creates an array of groups from cols such that each group contains
// all instances that match a unique combination of database values corresponding to cols.
// All attributes, including those in cols, are set to the actual values found in the database.
// Returns nil upon success, otherwise error.
//
// NOTE: cols is assumed to be sanitized by validateAttrs, so there is no risk of SQL injection
// in the below query.
//
// Returns (array of groups, nil) upon success, otherwise (..., error).
func getTSAttrGroupsIncInstances(
db *sql.DB, cols []string, groups *[]*datastore.TSMdataGroup) error {
db *sql.DB, cols []string) ([]*datastore.TSMdataGroup, error) {
allCols := getTSAllPBNames() // get all protobuf names of TSMetadata message

goNames, err := getTSGoNamesFromPBNames(cols)
if err != nil {
return fmt.Errorf("getTSGoNamesFromPBNames() failed: %v", err)
return nil, fmt.Errorf("getTSGoNamesFromPBNames() failed: %v", err)
}

// query database for all columns in time_series, ordered by cols
Expand All @@ -221,32 +228,34 @@ func getTSAttrGroupsIncInstances(
query := fmt.Sprintf("SELECT %s FROM time_series ORDER BY %s", allColsS, colsS)
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("db.Query() failed: %v", err)
return nil, fmt.Errorf("db.Query() failed: %v", err)
}
defer rows.Close()

groups := []*datastore.TSMdataGroup{}

// aggregate rows into groups
currInstances := []*datastore.TSMetadata{} // initial current instance set
for rows.Next() {
// extract tsMdata from current result row
tsMdata, err := scanTsMdata(rows, allCols)
if err != nil {
return fmt.Errorf("scanTsMdata() failed: %v", err)
return nil, fmt.Errorf("scanTsMdata() failed: %v", err)
}

if len(currInstances) > 0 { // check if we should create a new current instance set
equal, err := tsMdataEqual(tsMdata, currInstances[0], goNames)
if err != nil {
return fmt.Errorf("tsMdataEqual() failed: %v", err)
return nil, fmt.Errorf("tsMdataEqual() failed: %v", err)
}

if !equal { // ts metadata changed wrt. cols
// add next group with current instance set
currCombo, err := getCombo(currInstances[0], goNames)
if err != nil {
return fmt.Errorf("getCombo() failed (1): %v", err)
return nil, fmt.Errorf("getCombo() failed (1): %v", err)
}
*groups = append(*groups, &datastore.TSMdataGroup{
groups = append(groups, &datastore.TSMdataGroup{
Combo: currCombo,
Instances: currInstances,
})
Expand All @@ -261,64 +270,71 @@ func getTSAttrGroupsIncInstances(
// add final group with current instance set
currCombo, err := getCombo(currInstances[0], goNames)
if err != nil {
return fmt.Errorf("getCombo() failed (2): %v", err)
return nil, fmt.Errorf("getCombo() failed (2): %v", err)
}
*groups = append(*groups, &datastore.TSMdataGroup{
groups = append(groups, &datastore.TSMdataGroup{
Combo: currCombo,
Instances: currInstances,
})

return nil
return groups, nil
}

// getTSAttrGroupsComboOnly populates groups from cols such that each group contains a single,
// unique combination of database values corresponding to cols. Other attributes than those in cols
// have the default value for the type (i.e. "" for string, etc.).
// Returns nil upon success, otherwise error.
func getTSAttrGroupsComboOnly(db *sql.DB, cols []string, groups *[]*datastore.TSMdataGroup) error {
// getTSAttrGroupsComboOnly creates an array of groups from cols such that each group contains a
// single, unique combination of database values corresponding to cols. Other attributes than those
// in cols have the default value for the type (i.e. "" for string, etc.).
//
// NOTE: cols is assumed to be sanitized by validateAttrs, so there is no risk of SQL injection
// in the below query.
//
// Returns (array of groups, nil) upon success, otherwise (..., error).
func getTSAttrGroupsComboOnly(db *sql.DB, cols []string) ([]*datastore.TSMdataGroup, error) {

// query database for unique combinations of cols in time_series, ordered by cols
colsS := strings.Join(cols, ",")
query := fmt.Sprintf("SELECT DISTINCT %s FROM time_series ORDER BY %s", colsS, colsS)
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("db.Query() failed: %v", err)
return nil, fmt.Errorf("db.Query() failed: %v", err)
}
defer rows.Close()

groups := []*datastore.TSMdataGroup{}

// aggregate rows into groups
for rows.Next() {
// extract tsMdata from current result row
tsMdata, err := scanTsMdata(rows, cols)
if err != nil {
return fmt.Errorf("scanTsMdata() failed: %v", err)
return nil, fmt.Errorf("scanTsMdata() failed: %v", err)
}

// add new group with tsMData as the combo (and leaving the instances
// array unset)
*groups = append(*groups, &datastore.TSMdataGroup{Combo: tsMdata})
groups = append(groups, &datastore.TSMdataGroup{Combo: tsMdata})
}

return nil
return groups, nil
}

// GetTSAttrGroups ... (see documentation in StorageBackend interface)
func (sbe *PostgreSQL) GetTSAttrGroups(request *datastore.GetTSAGRequest) (
*datastore.GetTSAGResponse, error) {

cols, err := getTSDBColumns(request.Attrs) // get database column names for requested attributes
if err != nil {
return nil, fmt.Errorf("getTSAttrCols() failed: %v", err)
if err := validateAttrs(request.Attrs); err != nil {
return nil, fmt.Errorf("validateAttrs() failed: %v", err)
}

groups := []*datastore.TSMdataGroup{}
var groups []*datastore.TSMdataGroup
var err error

if request.IncludeInstances {
if err := getTSAttrGroupsIncInstances(sbe.Db, cols, &groups); err != nil {
return nil, fmt.Errorf("getTSAGroupsIncInstances() failed: %v", err)
if groups, err = getTSAttrGroupsIncInstances(sbe.Db, request.Attrs); err != nil {
return nil, fmt.Errorf("getTSAttrGroupsIncInstances() failed: %v", err)
}
} else {
if err := getTSAttrGroupsComboOnly(sbe.Db, cols, &groups); err != nil {
return nil, fmt.Errorf("getTSAGroupsComboOnly() failed: %v", err)
if groups, err = getTSAttrGroupsComboOnly(sbe.Db, request.Attrs); err != nil {
return nil, fmt.Errorf("getTSAttrGroupsComboOnly() failed: %v", err)
}
}

Expand Down

0 comments on commit 98f1976

Please sign in to comment.