From 4ff11727c7037816f98f6c0e9d374500f07cffc5 Mon Sep 17 00:00:00 2001 From: Lindsey Cheng Date: Mon, 12 Aug 2024 15:06:54 +0800 Subject: [PATCH] feat: Add core-data event related db methods for postgres (#4855) * feat: Add core-data event related db methods for postgres Relates to #4847. Add core-data event related db methods for postgres db client. Signed-off-by: Lindsey Cheng * fix: Define json query string as query argument Define json query string as query argument instead of in sql statement. Signed-off-by: Lindsey Cheng * fix: Update core-data event table design Update event design based on the event struct fields to have multi columns. Signed-off-by: Lindsey Cheng --------- Signed-off-by: Lindsey Cheng --- cmd/core-data/res/db/sql/00-utils.sql | 7 + cmd/core-data/res/db/sql/01-tables.sql | 14 + .../res/configuration.yaml | 14 +- go.mod | 1 + go.sum | 2 + internal/pkg/bootstrap/handlers/database.go | 18 +- internal/pkg/db/postgres/client.go | 6 +- internal/pkg/db/postgres/utils.go | 4 + internal/pkg/infrastructure/postgres/event.go | 243 ++++++++++++++++++ .../pkg/infrastructure/postgres/reading.go | 83 ++++++ internal/pkg/infrastructure/postgres/sql.go | 75 +++++- 11 files changed, 447 insertions(+), 20 deletions(-) create mode 100644 cmd/core-data/res/db/sql/00-utils.sql create mode 100644 cmd/core-data/res/db/sql/01-tables.sql create mode 100644 internal/pkg/infrastructure/postgres/event.go create mode 100644 internal/pkg/infrastructure/postgres/reading.go diff --git a/cmd/core-data/res/db/sql/00-utils.sql b/cmd/core-data/res/db/sql/00-utils.sql new file mode 100644 index 0000000000..03e5df258d --- /dev/null +++ b/cmd/core-data/res/db/sql/00-utils.sql @@ -0,0 +1,7 @@ +-- +-- Copyright (C) 2024 IOTech Ltd +-- +-- SPDX-License-Identifier: Apache-2.0 + +-- schema for core-data related tables +CREATE SCHEMA IF NOT EXISTS core_data; diff --git a/cmd/core-data/res/db/sql/01-tables.sql b/cmd/core-data/res/db/sql/01-tables.sql new file mode 100644 index 0000000000..2e4308cdda --- /dev/null +++ b/cmd/core-data/res/db/sql/01-tables.sql @@ -0,0 +1,14 @@ +-- +-- Copyright (C) 2024 IOTech Ltd +-- +-- SPDX-License-Identifier: Apache-2.0 + +-- core_data.event is used to store the event information +CREATE TABLE IF NOT EXISTS core_data.event ( + id UUID PRIMARY KEY, + devicename TEXT, + profilename TEXT, + sourcename TEXT, + origin BIGINT, + tags JSONB +); diff --git a/cmd/security-secretstore-setup/res/configuration.yaml b/cmd/security-secretstore-setup/res/configuration.yaml index bbaf720e8f..eee2c51cae 100644 --- a/cmd/security-secretstore-setup/res/configuration.yaml +++ b/cmd/security-secretstore-setup/res/configuration.yaml @@ -49,25 +49,25 @@ Databases: Username: admin command: Service: core-command - Username: core-command + Username: core_command metadata: Service: core-metadata - Username: core-metadata + Username: core_metadata coredata: Service: core-data - Username: core-data + Username: core_data corekeeper: Service: core-keeper - Username: core-keeper + Username: core_keeper rulesengine: Service: app-rules-engine - Username: app-rules-engine + Username: app_rules_engine notifications: Service: support-notifications - Username: support-notifications + Username: support_notifications scheduler: Service: support-scheduler - Username: support-scheduler + Username: support_scheduler SecureMessageBus: Type: none KuiperConfigPath: /tmp/kuiper/edgex.yaml diff --git a/go.mod b/go.mod index f3082cd58a..f022d31f10 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/go-co-op/gocron/v2 v2.11.0 github.com/gomodule/redigo v1.9.2 github.com/google/uuid v1.6.0 + github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jackc/pgx/v5 v5.6.0 github.com/labstack/echo/v4 v4.11.4 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 diff --git a/go.sum b/go.sum index 6bf471892f..ef88b95d19 100644 --- a/go.sum +++ b/go.sum @@ -327,6 +327,8 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= +github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= diff --git a/internal/pkg/bootstrap/handlers/database.go b/internal/pkg/bootstrap/handlers/database.go index 1f201b160b..dc4f87ccd2 100644 --- a/internal/pkg/bootstrap/handlers/database.go +++ b/internal/pkg/bootstrap/handlers/database.go @@ -1,5 +1,5 @@ // -// Copyright (C) 2020 IOTech Ltd +// Copyright (C) 2020-2024 IOTech Ltd // // SPDX-License-Identifier: Apache-2.0 @@ -24,6 +24,12 @@ import ( "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" ) +const ( + baseScriptPath = "/res/db/sql" + redisDBType = "redisdb" + postgresDBType = "postgres" +) + // httpServer defines the contract used to determine whether or not the http httpServer is running. type httpServer interface { IsRunning() bool @@ -60,12 +66,12 @@ func (d Database) newDBClient( } switch databaseInfo.Type { - case "redisdb": + case redisDBType: return redis.NewClient(databaseConfig, lc) - case "postgres": + case postgresDBType: databaseConfig.Username = credentials.Username // TODO: The baseScriptPath and extScriptPath should be passed in from the configuration file - return postgres.NewClient(ctx, databaseConfig, "/res/db/sql", "", lc) + return postgres.NewClient(ctx, databaseConfig, baseScriptPath, "", lc) default: return nil, db.ErrUnsupportedDatabase } @@ -139,7 +145,7 @@ func (d Database) BootstrapHandler( }, }) - lc.Info("Database connected") + lc.Infof("%s database connected", dbInfo.Type) wg.Add(1) go func() { defer wg.Done() @@ -153,7 +159,7 @@ func (d Database) BootstrapHandler( } time.Sleep(time.Second) } - lc.Info("Database disconnected") + lc.Infof("%s database disconnected", dbInfo.Type) }() return true diff --git a/internal/pkg/db/postgres/client.go b/internal/pkg/db/postgres/client.go index b38f87df9a..f5b4cf9d95 100644 --- a/internal/pkg/db/postgres/client.go +++ b/internal/pkg/db/postgres/client.go @@ -63,13 +63,13 @@ func NewClient(ctx context.Context, config db.Configuration, baseScriptPath, ext // execute base DB scripts if edgeXerr = executeDBScripts(ctx, dc.ConnPool, baseScriptPath); edgeXerr != nil { - return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute base DB scripts", edgeXerr) + return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres base DB scripts", edgeXerr) } - lc.Info("successfully execute base DB scripts") + lc.Info("successfully execute Postgres base DB scripts") // execute extension DB scripts if edgeXerr = executeDBScripts(ctx, dc.ConnPool, extScriptPath); edgeXerr != nil { - return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute extension DB scripts", edgeXerr) + return nil, errors.NewCommonEdgeX(errors.Kind(edgeXerr), "failed to execute Postgres extension DB scripts", edgeXerr) } return dc, nil diff --git a/internal/pkg/db/postgres/utils.go b/internal/pkg/db/postgres/utils.go index 95e6ca54bc..6499dfbd09 100644 --- a/internal/pkg/db/postgres/utils.go +++ b/internal/pkg/db/postgres/utils.go @@ -14,6 +14,7 @@ import ( "regexp" "sort" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" @@ -128,6 +129,9 @@ func sortedSqlFileNames(sqlFilesDir string) ([]string, errors.EdgeX) { func WrapDBError(message string, err error) errors.EdgeX { var pgErr *pgconn.PgError if goErrors.As(err, &pgErr) { + if pgerrcode.IsIntegrityConstraintViolation(pgErr.Code) { + return errors.NewCommonEdgeX(errors.KindDuplicateName, pgErr.Detail, nil) + } return errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("%s: %s %s", message, pgErr.Error(), pgErr.Detail), nil) } return errors.NewCommonEdgeX(errors.KindDatabaseError, message, err) diff --git a/internal/pkg/infrastructure/postgres/event.go b/internal/pkg/infrastructure/postgres/event.go new file mode 100644 index 0000000000..59a4802e6f --- /dev/null +++ b/internal/pkg/infrastructure/postgres/event.go @@ -0,0 +1,243 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "context" + "encoding/json" + stdErrs "errors" + "fmt" + "time" + + pgClient "github.com/edgexfoundry/edgex-go/internal/pkg/db/postgres" + + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + model "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +const ( + eventTableName = "core_data.event" + + // constants relate to the event struct field names + deviceNameCol = "devicename" + profileNameCol = "profilename" + sourceNameCol = "sourcename" + originCol = "origin" + tagsCol = "tags" +) + +// AllEvents queries the events with the given range, offset, and limit +func (c *Client) AllEvents(offset, limit int) ([]model.Event, errors.EdgeX) { + ctx := context.Background() + + events, err := queryEvents(ctx, c.ConnPool, sqlQueryAllWithPagination(eventTableName), offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all events", err) + } + + return events, nil +} + +// AddEvent adds a new event model to DB +func (c *Client) AddEvent(e model.Event) (model.Event, errors.EdgeX) { + ctx := context.Background() + + if e.Id == "" { + e.Id = uuid.NewString() + } + event := model.Event{ + Id: e.Id, + DeviceName: e.DeviceName, + ProfileName: e.ProfileName, + SourceName: e.SourceName, + Origin: e.Origin, + Tags: e.Tags, + } + tagsBytes, err := json.Marshal(event.Tags) + if err != nil { + return model.Event{}, errors.NewCommonEdgeX(errors.KindServerError, "unable to JSON marshal event tags", err) + } + + _, err = c.ConnPool.Exec( + ctx, + sqlInsert(eventTableName, idCol, deviceNameCol, profileNameCol, sourceNameCol, originCol, tagsCol), + event.Id, + event.DeviceName, + event.ProfileName, + event.SourceName, + event.Origin, + tagsBytes, + ) + if err != nil { + return model.Event{}, pgClient.WrapDBError("failed to insert event", err) + } + + // TODO: readings included in this event will be added to database in the following PRs + + return event, nil +} + +// EventById gets an event by id +func (c *Client) EventById(id string) (model.Event, errors.EdgeX) { + ctx := context.Background() + var event model.Event + + rows, err := c.ConnPool.Query(ctx, sqlQueryAllById(eventTableName), id) + if err != nil { + return event, pgClient.WrapDBError(fmt.Sprintf("failed to query event with id '%s'", id), err) + } + + event, err = pgx.CollectExactlyOneRow(rows, func(row pgx.CollectableRow) (model.Event, error) { + e, err := pgx.RowToStructByNameLax[model.Event](row) + + // TODO: readings data will be added to the event model in the following PRs + + return e, err + }) + if err != nil { + if stdErrs.Is(err, pgx.ErrNoRows) { + return event, errors.NewCommonEdgeX(errors.KindEntityDoesNotExist, fmt.Sprintf("no event with id '%s' found", id), err) + } + return event, pgClient.WrapDBError("failed to scan row to event model", err) + } + + return event, nil +} + +// EventTotalCount returns the total count of Event from db +func (c *Client) EventTotalCount() (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCount(eventTableName)) +} + +// EventCountByDeviceName returns the count of Event associated a specific Device from db +func (c *Client) EventCountByDeviceName(deviceName string) (uint32, errors.EdgeX) { + sqlStatement := sqlQueryCountByCol(eventTableName, deviceNameCol) + + return getTotalRowsCount(context.Background(), c.ConnPool, sqlStatement, deviceName) +} + +// EventCountByTimeRange returns the count of Event by time range from db +func (c *Client) EventCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { + return getTotalRowsCount(context.Background(), c.ConnPool, sqlQueryCountByTimeRangeCol(eventTableName, originCol), start, end) +} + +// EventsByDeviceName query events by offset, limit and device name +func (c *Client) EventsByDeviceName(offset int, limit int, name string) ([]model.Event, errors.EdgeX) { + sqlStatement := sqlQueryAllByColWithPagination(eventTableName, deviceNameCol) + + events, err := queryEvents(context.Background(), c.ConnPool, sqlStatement, name, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeX(errors.KindDatabaseError, fmt.Sprintf("failed to query events by device '%s'", name), err) + } + return events, nil +} + +// EventsByTimeRange query events by time range, offset, and limit +func (c *Client) EventsByTimeRange(start int, end int, offset int, limit int) ([]model.Event, errors.EdgeX) { + ctx := context.Background() + sqlStatement := sqlQueryAllWithPaginationAndTimeRangeDescByCol(eventTableName, originCol, originCol) + + events, err := queryEvents(ctx, c.ConnPool, sqlStatement, start, end, offset, limit) + if err != nil { + return nil, errors.NewCommonEdgeXWrapper(err) + } + return events, nil +} + +// DeleteEventById removes an event by id +func (c *Client) DeleteEventById(id string) errors.EdgeX { + sqlStatement := sqlDeleteById(eventTableName) + + err := deleteEvents(context.Background(), c.ConnPool, sqlStatement, id) + if err != nil { + return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed delete event with id '%s'", id), err) + } + + // TODO: delete related readings associated to the deleted events + + return nil +} + +// DeleteEventsByDeviceName deletes specific device's events and corresponding readings +// This function is implemented to starts up two goroutines to delete readings and events in the background to achieve better performance +func (c *Client) DeleteEventsByDeviceName(deviceName string) errors.EdgeX { + ctx := context.Background() + + sqlStatement := sqlDeleteByColumn(eventTableName, deviceNameCol) + + go func() { + err := deleteEvents(ctx, c.ConnPool, sqlStatement, deviceName) + if err != nil { + c.loggingClient.Errorf("failed delete event with device '%s': %v", deviceName, err) + } + }() + + // TODO: delete related readings associated to the deleted events + + return nil +} + +// DeleteEventsByAge deletes events and their corresponding readings that are older than age +// This function is implemented to starts up two goroutines to delete readings and events in the background to achieve better performance +func (c *Client) DeleteEventsByAge(age int64) errors.EdgeX { + ctx := context.Background() + expireTimestamp := time.Now().UnixNano() - age + sqlStatement := sqlDeleteTimeRangeByColumn(eventTableName, originCol) + + go func() { + err := deleteEvents(ctx, c.ConnPool, sqlStatement, expireTimestamp) + if err != nil { + c.loggingClient.Errorf("failed delete event by age '%d' nanoseconds: %v", age, err) + } + }() + + // TODO: delete related readings associated to the deleted events + + return nil +} + +// queryEvents queries the data rows with given sql statement and passed args, and unmarshal the data rows to the Event model slice +func queryEvents(ctx context.Context, connPool *pgxpool.Pool, sql string, args ...any) ([]model.Event, errors.EdgeX) { + rows, err := connPool.Query(ctx, sql, args...) + if err != nil { + return nil, pgClient.WrapDBError("query failed", err) + } + + var events []model.Event + events, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (model.Event, error) { + event, err := pgx.RowToStructByNameLax[model.Event](row) + + // TODO: readings data will be added to the event model in the following PRs + + return event, err + }) + + if err != nil { + return nil, pgClient.WrapDBError("failed to scan events", err) + } + + return events, nil +} + +// deleteEvents delete the data rows with given sql statement and passed args +func deleteEvents(ctx context.Context, connPool *pgxpool.Pool, sqlStatement string, args ...any) errors.EdgeX { + commandTag, err := connPool.Exec( + ctx, + sqlStatement, + args..., + ) + if commandTag.RowsAffected() == 0 { + return errors.NewCommonEdgeX(errors.KindContractInvalid, "no event found", nil) + } + if err != nil { + return pgClient.WrapDBError("event(s) delete failed", err) + } + return nil +} diff --git a/internal/pkg/infrastructure/postgres/reading.go b/internal/pkg/infrastructure/postgres/reading.go new file mode 100644 index 0000000000..6969f51211 --- /dev/null +++ b/internal/pkg/infrastructure/postgres/reading.go @@ -0,0 +1,83 @@ +// +// Copyright (C) 2024 IOTech Ltd +// +// SPDX-License-Identifier: Apache-2.0 + +package postgres + +import ( + "github.com/edgexfoundry/go-mod-core-contracts/v3/errors" + model "github.com/edgexfoundry/go-mod-core-contracts/v3/models" +) + +func (c *Client) ReadingTotalCount() (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) AllReadings(offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByTimeRange(start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByResourceName(offset int, limit int, resourceName string) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByDeviceName(offset int, limit int, name string) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByDeviceNameAndResourceName(deviceName string, resourceName string, offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingCountByDeviceName(deviceName string) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingCountByResourceName(resourceName string) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingCountByResourceNameAndTimeRange(resourceName string, start int, end int) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingCountByDeviceNameAndResourceName(deviceName string, resourceName string) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingCountByDeviceNameAndResourceNameAndTimeRange(deviceName string, resourceName string, start int, end int) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingCountByTimeRange(start int, end int) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) ReadingsByResourceNameAndTimeRange(resourceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingsByDeviceNameAndResourceNamesAndTimeRange(deviceName string, resourceNames []string, start, end, offset, limit int) ([]model.Reading, uint32, errors.EdgeX) { + return nil, 0, nil +} + +func (c *Client) ReadingsByDeviceNameAndTimeRange(deviceName string, start int, end int, offset int, limit int) ([]model.Reading, errors.EdgeX) { + return nil, nil +} + +func (c *Client) ReadingCountByDeviceNameAndTimeRange(deviceName string, start int, end int) (uint32, errors.EdgeX) { + return 0, nil +} + +func (c *Client) LatestReadingByOffset(offset uint32) (model.Reading, errors.EdgeX) { + return nil, nil +} diff --git a/internal/pkg/infrastructure/postgres/sql.go b/internal/pkg/infrastructure/postgres/sql.go index a9094a39d4..c790455281 100644 --- a/internal/pkg/infrastructure/postgres/sql.go +++ b/internal/pkg/infrastructure/postgres/sql.go @@ -70,11 +70,25 @@ func sqlQueryAllWithPagination(table string) string { // return fmt.Sprintf("SELECT * FROM %s ORDER BY %s DESC OFFSET $1 LIMIT $2", table, createdCol) //} +// sqlQueryAllByColWithPagination returns the SQL statement for selecting all rows from the table by the given columns with pagination +func sqlQueryAllByColWithPagination(table string, columns ...string) string { + columnCount := len(columns) + whereCondition := constructWhereCondition(columns...) + + return fmt.Sprintf("SELECT * FROM %s WHERE %s OFFSET $%d LIMIT $%d", table, whereCondition, columnCount+1, columnCount+2) +} + // sqlQueryAllWithPaginationAndTimeRange returns the SQL statement for selecting all rows from the table with pagination and a time range. func sqlQueryAllWithPaginationAndTimeRange(table string) string { return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s OFFSET $3 LIMIT $4", table, createdCol, createdCol, createdCol) } +// sqlQueryAllWithPaginationAndTimeRangeDescByCol returns the SQL statement for selecting all rows from the table +// with pagination and a time range by column1, desc by column2 +func sqlQueryAllWithPaginationAndTimeRangeDescByCol(table string, timeRangeCol string, descCol string) string { + return fmt.Sprintf("SELECT * FROM %s WHERE %s >= $1 AND %s <= $2 ORDER BY %s DESC OFFSET $3 LIMIT $4", table, timeRangeCol, timeRangeCol, descCol) +} + // sqlQueryAllByStatusWithPaginationAndTimeRange returns the SQL statement for selecting all rows from the table by status with pagination and a time range. func sqlQueryAllByStatusWithPaginationAndTimeRange(table string) string { return fmt.Sprintf("SELECT * FROM %s WHERE %s = $1 AND %s >= $2 AND %s <= $3 ORDER BY %s OFFSET $4 LIMIT $5", table, statusCol, createdCol, createdCol, createdCol) @@ -104,6 +118,26 @@ func sqlQueryAllById(table string) string { return fmt.Sprintf("SELECT * FROM %s WHERE %s = $1", table, idCol) } +// sqlQueryAllById returns the SQL statement for selecting content column by the specified id. +//func sqlQueryContentById(table string) string { +// return fmt.Sprintf("SELECT content FROM %s WHERE %s = $1", table, idCol) +//} + +// sqlQueryContentWithPagination returns the SQL statement for selecting content column from the table with pagination +//func sqlQueryContentWithPagination(table string) string { +// return fmt.Sprintf("SELECT content FROM %s ORDER BY created OFFSET $1 LIMIT $2", table) +//} + +// sqlQueryCountByJSONField returns the SQL statement for selecting content column in the table by the given JSON query string +//func sqlQueryContentByJSONField(table string) (string, errors.EdgeX) { +// return fmt.Sprintf("SELECT content FROM %s WHERE content @> $1::jsonb ORDER BY %s OFFSET $2 LIMIT $3", table, createdCol), nil +//} + +// sqlQueryCountByJSONField returns the SQL statement for selecting content column by the given time range of the JSON field name +//func sqlQueryContentByJSONFieldTimeRange(table string, field string) string { +// return fmt.Sprintf("SELECT content FROM %s WHERE (content->'%s')::bigint >= $1 AND (content->'%s')::bigint <= $2 ORDER BY %s OFFSET $3 LIMIT $4", table, field, field, createdCol) +//} + // sqlCheckExistsByName returns the SQL statement for checking if a row exists in the table by name. func sqlCheckExistsByName(table string) string { return fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE %s = $1)", table, nameCol) @@ -122,9 +156,26 @@ func sqlQueryCount(table string) string { // sqlQueryCountByCol returns the SQL statement for counting the number of rows in the table by the given column name. func sqlQueryCountByCol(table string, columns ...string) string { whereCondition := constructWhereCondition(columns...) - return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s = $1", table, whereCondition) + return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s", table, whereCondition) } +// sqlQueryCountByTimeRangeCol returns the SQL statement for counting the number of rows in the table +// by the given time range of the specified column +func sqlQueryCountByTimeRangeCol(table string, column string) string { + return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE %s >= $1 AND %s <= $2", table, column, column) +} + +// sqlQueryCountByJSONField returns the SQL statement for counting the number of rows in the table by the given JSON query string +//func sqlQueryCountByJSONField(table string) (string, errors.EdgeX) { +// return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE content @> $1::jsonb", table), nil +//} + +// sqlQueryCountByJSONFieldTimeRange returns the SQL statement for counting the number of rows in the table +// by the given time range of the JSON field name +//func sqlQueryCountByJSONFieldTimeRange(table string, field string) string { +// return fmt.Sprintf("SELECT COUNT(*) FROM %s WHERE (content->'%s')::bigint >= $1 AND (content->'%s')::bigint <= $2", table, field, field) +//} + // ---------------------------------------------------------------------------------- // SQL statements for UPDATE operations // ---------------------------------------------------------------------------------- @@ -149,15 +200,31 @@ func sqlDeleteByName(table string) string { } // sqlDeleteById returns the SQL statement for deleting a row from the table by id. -//func sqlDeleteById(table string) string { -// return fmt.Sprintf("DELETE FROM %s WHERE %s = $1", table, idCol) -//} +func sqlDeleteById(table string) string { + return fmt.Sprintf("DELETE FROM %s WHERE %s = $1", table, idCol) +} // sqlDeleteByAge returns the SQL statement for deleting rows from the table by created timestamp. func sqlDeleteByAge(table string) string { return fmt.Sprintf("DELETE FROM %s WHERE %s < NOW() - INTERVAL '1 millisecond' * $1", table, createdCol) } +// sqlDeleteTimeRangeByColumn returns the SQL statement for deleting rows from the table by time range with the specified column +// the time range is calculated from the caller function since the interval unit might be different +func sqlDeleteTimeRangeByColumn(table string, column string) string { + return fmt.Sprintf("DELETE FROM %s WHERE %s <= $1", table, column) +} + +// sqlDeleteByColumn returns the SQL statement for deleting rows from the table by the specified column +func sqlDeleteByColumn(table string, column string) string { + return fmt.Sprintf("DELETE FROM %s WHERE %s = $1", table, column) +} + +// sqlDeleteByJSONField returns the SQL statement for deleting rows from the table by the given JSON query string +//func sqlDeleteByJSONField(table string) (string, errors.EdgeX) { +// return fmt.Sprintf("DELETE FROM %s WHERE content @> $1::jsonb", table), nil +//} + // ---------------------------------------------------------------------------------- // Utils // ----------------------------------------------------------------------------------