Skip to content

Commit

Permalink
Merge branch 'main' into add_remaining_acdd_attrs_issue_54
Browse files Browse the repository at this point in the history
  • Loading branch information
jo-asplin-met-no committed Dec 19, 2023
2 parents 240c538 + 2480e94 commit aaac335
Show file tree
Hide file tree
Showing 22 changed files with 1,037 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.run/

# Python
.pytest_cache/
__pycache__/
venv/

Expand Down
3 changes: 2 additions & 1 deletion api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ def get_datetime_range(datetime_string: str | None) -> Tuple[Timestamp, Timestam
else:
start_datetime.FromDatetime(datetime.min)
if datetimes[1] != "..":
end_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(datetimes[1]))
# HACK add one second so that the end_datetime is included in the interval.
end_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(datetimes[1]) + timedelta(seconds=1))
else:
end_datetime.FromDatetime(datetime.max)

Expand Down
14 changes: 14 additions & 0 deletions database/healthcheck_postgis_uptime.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/env bash

CONNECTION_STRING=$1 # Postgres connection string
UPTIME_AMOUNT=${2:-1} # Number of e.g. hours, minutes, seconds
UPTIME_TYPE=${3:-"minute"} # E.g. hour, minute, second

# Return exit code based on the uptime of postgres
if [[ $(psql "${CONNECTION_STRING}" -XtAc \
"SELECT COUNT(*) FROM (SELECT current_timestamp - pg_postmaster_start_time() AS uptime) AS t WHERE t.uptime > interval '${UPTIME_AMOUNT} ${UPTIME_TYPE}'") == 1 ]];
then
exit 0
else
exit 1
fi
85 changes: 48 additions & 37 deletions datastore/storagebackend/postgresql/gettsattrgroups.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,25 @@ func getTSAllPBNames() []string {
return pbNames
}

// getTSDBColumns returns names of database columns corresponding to pbNames.
func getTSDBColumns(pbNames []string) ([]string, error) {
// validateAttrs validates pbNames. Returns nil if valid, otherwise error.
func validateAttrs(pbNames []string) error {
seen := map[string]struct{}{}
cols := []string{}

for _, pbName := range pbNames {
if _, found := tspb2go[pbName]; !found {
return nil, fmt.Errorf(
return fmt.Errorf(
"attribute not found: %s; supported attributes: %s",
pbName, strings.Join(getTSAllPBNames(), ", "))
}

if _, found := seen[pbName]; found {
return nil, fmt.Errorf("attribute %s specified more than once", pbName)
return fmt.Errorf("attribute %s specified more than once", pbName)
}

cols = append(cols, pbName)
seen[pbName] = struct{}{}
}

return cols, nil
return nil
}

// getTSMdata creates a TSMetadata object initialized from colVals.
Expand Down Expand Up @@ -205,17 +203,21 @@ func getCombo(tsMdata1 *datastore.TSMetadata, goNames []string) (*datastore.TSMe
return &tsMdata2, nil
}

// getTSAttrGroupsIncInstances populates groups from cols such that each group contains all
// instances that match a unique combination of database values corresponding to cols.
// getTSAttrGroupsIncInstances creates an array of groups from cols such that each group contains
// all instances that match a unique combination of database values corresponding to cols.
// All attributes, including those in cols, are set to the actual values found in the database.
// Returns nil upon success, otherwise error.
//
// NOTE: cols is assumed to be sanitized by validateAttrs, so there is no risk of SQL injection
// in the below query.
//
// Returns (array of groups, nil) upon success, otherwise (..., error).
func getTSAttrGroupsIncInstances(
db *sql.DB, cols []string, groups *[]*datastore.TSMdataGroup) error {
db *sql.DB, cols []string) ([]*datastore.TSMdataGroup, error) {
allCols := getTSAllPBNames() // get all protobuf names of TSMetadata message

goNames, err := getTSGoNamesFromPBNames(cols)
if err != nil {
return fmt.Errorf("getTSGoNamesFromPBNames() failed: %v", err)
return nil, fmt.Errorf("getTSGoNamesFromPBNames() failed: %v", err)
}

// query database for all columns in time_series, ordered by cols
Expand All @@ -224,32 +226,34 @@ func getTSAttrGroupsIncInstances(
query := fmt.Sprintf("SELECT %s FROM time_series ORDER BY %s", allColsS, colsS)
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("db.Query() failed: %v", err)
return nil, fmt.Errorf("db.Query() failed: %v", err)
}
defer rows.Close()

groups := []*datastore.TSMdataGroup{}

// aggregate rows into groups
currInstances := []*datastore.TSMetadata{} // initial current instance set
for rows.Next() {
// extract tsMdata from current result row
tsMdata, err := scanTsRow(rows, allCols)
if err != nil {
return fmt.Errorf("scanTsMdata() failed: %v", err)
return nil, fmt.Errorf("scanTsMdata() failed: %v", err)
}

if len(currInstances) > 0 { // check if we should create a new current instance set
equal, err := tsMdataEqual(tsMdata, currInstances[0], goNames)
if err != nil {
return fmt.Errorf("tsMdataEqual() failed: %v", err)
return nil, fmt.Errorf("tsMdataEqual() failed: %v", err)
}

if !equal { // ts metadata changed wrt. cols
// add next group with current instance set
currCombo, err := getCombo(currInstances[0], goNames)
if err != nil {
return fmt.Errorf("getCombo() failed (1): %v", err)
return nil, fmt.Errorf("getCombo() failed (1): %v", err)
}
*groups = append(*groups, &datastore.TSMdataGroup{
groups = append(groups, &datastore.TSMdataGroup{
Combo: currCombo,
Instances: currInstances,
})
Expand All @@ -264,63 +268,70 @@ func getTSAttrGroupsIncInstances(
// add final group with current instance set
currCombo, err := getCombo(currInstances[0], goNames)
if err != nil {
return fmt.Errorf("getCombo() failed (2): %v", err)
return nil, fmt.Errorf("getCombo() failed (2): %v", err)
}
*groups = append(*groups, &datastore.TSMdataGroup{
groups = append(groups, &datastore.TSMdataGroup{
Combo: currCombo,
Instances: currInstances,
})

return nil
return groups, nil
}

// getTSAttrGroupsComboOnly populates groups from cols such that each group contains a single,
// unique combination of database values corresponding to cols. Other attributes than those in cols
// have the default value for the type (i.e. "" for string, etc.).
// Returns nil upon success, otherwise error.
func getTSAttrGroupsComboOnly(db *sql.DB, cols []string, groups *[]*datastore.TSMdataGroup) error {
// getTSAttrGroupsComboOnly creates an array of groups from cols such that each group contains a
// single, unique combination of database values corresponding to cols. Other attributes than those
// in cols have the default value for the type (i.e. "" for string, etc.).
//
// NOTE: cols is assumed to be sanitized by validateAttrs, so there is no risk of SQL injection
// in the below query.
//
// Returns (array of groups, nil) upon success, otherwise (..., error).
func getTSAttrGroupsComboOnly(db *sql.DB, cols []string) ([]*datastore.TSMdataGroup, error) {

// query database for unique combinations of cols in time_series, ordered by cols
colsS := strings.Join(cols, ",")
query := fmt.Sprintf("SELECT DISTINCT %s FROM time_series ORDER BY %s", colsS, colsS)
rows, err := db.Query(query)
if err != nil {
return fmt.Errorf("db.Query() failed: %v", err)
return nil, fmt.Errorf("db.Query() failed: %v", err)
}
defer rows.Close()

groups := []*datastore.TSMdataGroup{}

// aggregate rows into groups
for rows.Next() {
// extract tsMdata from current result row
tsMdata, err := scanTsRow(rows, cols)
if err != nil {
return fmt.Errorf("scanTsMdata() failed: %v", err)
return nil, fmt.Errorf("scanTsMdata() failed: %v", err)
}

// add new group with tsMData as the combo (and leaving the Instances array unset)
*groups = append(*groups, &datastore.TSMdataGroup{Combo: tsMdata})
groups = append(groups, &datastore.TSMdataGroup{Combo: tsMdata})
}

return nil
return groups, nil
}

// GetTSAttrGroups ... (see documentation in StorageBackend interface)
func (sbe *PostgreSQL) GetTSAttrGroups(request *datastore.GetTSAGRequest) (
*datastore.GetTSAGResponse, error) {

cols, err := getTSDBColumns(request.Attrs) // get database column names for requested attributes
if err != nil {
return nil, fmt.Errorf("getTSAttrCols() failed: %v", err)
if err := validateAttrs(request.Attrs); err != nil {
return nil, fmt.Errorf("validateAttrs() failed: %v", err)
}

groups := []*datastore.TSMdataGroup{}
var groups []*datastore.TSMdataGroup
var err error

if request.IncludeInstances {
if err := getTSAttrGroupsIncInstances(sbe.Db, cols, &groups); err != nil {
return nil, fmt.Errorf("getTSAGroupsIncInstances() failed: %v", err)
if groups, err = getTSAttrGroupsIncInstances(sbe.Db, request.Attrs); err != nil {
return nil, fmt.Errorf("getTSAttrGroupsIncInstances() failed: %v", err)
}
} else {
if err := getTSAttrGroupsComboOnly(sbe.Db, cols, &groups); err != nil {
return nil, fmt.Errorf("getTSAGroupsComboOnly() failed: %v", err)
if groups, err = getTSAttrGroupsComboOnly(sbe.Db, request.Attrs); err != nil {
return nil, fmt.Errorf("getTSAttrGroupsComboOnly() failed: %v", err)
}
}

Expand Down
19 changes: 15 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,29 @@ services:
volumes:
# - ts-data:/home/postgres/pgdata/data # for timescale image
- ts-data:/var/lib/postgresql/data # for postgres image
- ./datastore/ts-init.sql:/docker-entrypoint-initdb.d/init.sql
- ./database/healthcheck_postgis_uptime.sh:/healthcheck_postgis_uptime.sh # for the healthcheck
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=mysecretpassword
- POSTGRES_DB=data
restart: on-failure
healthcheck:
test: [ "CMD-SHELL", "psql postgresql://postgres:mysecretpassword@localhost/data -c \"SELECT COUNT(*) from OBSERVATION\"" ]
# HACK Due to the installation of Postgis extension the database is restarted, the healthcheck checks if the database is up for longer than specified time.
test: ["CMD-SHELL", "/healthcheck_postgis_uptime.sh postgresql://postgres:mysecretpassword@localhost/data 10 second"]
interval: 5s
timeout: 1s
retries: 3
start_period: 30s # Failures in 30 seconds do not mark container as unhealthy

migrate:
image: migrate/migrate:4
volumes:
- ./migrate/data/migrations:/data/migrations
command: ["-path", "/data/migrations", "-database", "postgres://postgres:mysecretpassword@db:5432/data?sslmode=disable", "up"]
depends_on:
db:
condition: service_healthy

store:
build:
context: datastore
Expand All @@ -45,8 +55,8 @@ services:
retries: 3
start_period: 30s # Failures in 30 seconds do not mark container as unhealthy
depends_on:
db:
condition: service_healthy
migrate:
condition: service_completed_successfully

api:
build:
Expand Down Expand Up @@ -93,6 +103,7 @@ services:
environment:
- DSHOST=store
- DSPORT=50050
- BASE_URL=http://api:8000
depends_on:
store:
condition: service_healthy
Expand Down
3 changes: 3 additions & 0 deletions integration-test/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ RUN python -m grpc_tools.protoc \

COPY "${PROJECT_PYTHON_PATH}/test_knmi.py" "${DOCKER_PATH}/test_knmi.py"
COPY "${PROJECT_PYTHON_PATH}/test_delete.py" "${DOCKER_PATH}/test_delete.py"
COPY "${PROJECT_PYTHON_PATH}/test_api.py" "${DOCKER_PATH}/test_api.py"

COPY "${PROJECT_PYTHON_PATH}/response/" "${DOCKER_PATH}/response/"

WORKDIR "${DOCKER_PATH}"
CMD ["pytest"]
2 changes: 2 additions & 0 deletions integration-test/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@
# Install using:
# pip-sync

deepdiff~=6.2
grpcio-tools~=1.56
pytest~=7.4
requests~=2.31
24 changes: 19 additions & 5 deletions integration-test/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,34 @@
#
# pip-compile --no-emit-index-url
#
grpcio==1.58.0
certifi==2023.11.17
# via requests
charset-normalizer==3.3.2
# via requests
deepdiff==6.7.1
# via -r requirements.in
grpcio==1.59.3
# via grpcio-tools
grpcio-tools==1.58.0
grpcio-tools==1.59.3
# via -r requirements.in
idna==3.6
# via requests
iniconfig==2.0.0
# via pytest
packaging==23.1
ordered-set==4.1.0
# via deepdiff
packaging==23.2
# via pytest
pluggy==1.3.0
# via pytest
protobuf==4.24.3
protobuf==4.25.1
# via grpcio-tools
pytest==7.4.2
pytest==7.4.3
# via -r requirements.in
requests==2.31.0
# via -r requirements.in
urllib3==2.1.0
# via requests

# The following packages are considered to be unsafe in a requirements file:
# setuptools
Loading

0 comments on commit aaac335

Please sign in to comment.