diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index d990cd8bcf..aeff5b4e50 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -16,12 +16,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/gocql/gocql" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/cassandra" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -82,7 +82,7 @@ func main() { repo := newService(session, logger) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } @@ -134,7 +134,7 @@ func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.S return session } -func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository { +func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer { repo := cassandra.New(session) repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware( diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 298383fcc3..e876345926 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/influxdb" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -91,7 +91,7 @@ func main() { repo = api.MetricsMiddleware(repo, counter, latency) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index ab786e2e32..406c6adc51 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/mongodb" stdprometheus "github.com/prometheus/client_golang/prometheus" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -88,7 +88,7 @@ func main() { repo = api.MetricsMiddleware(repo, counter, latency) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 98e56277eb..f4e3797d02 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/postgres" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -88,7 +88,7 @@ func main() { repo := newService(db, logger) st := senml.New(cfg.contentType) - if err = writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err = consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } @@ -138,7 +138,7 @@ func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB { return db } -func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository { +func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer { svc := postgres.New(db) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( diff --git a/consumers/README.md b/consumers/README.md new file mode 100644 index 0000000000..b89351abdb --- /dev/null +++ b/consumers/README.md @@ -0,0 +1,15 @@ +# Consumers + +Consumers provide an abstraction of various `Mainflux consumers`. +Mainflux consumer is a generic service that can handle received messages - consume them. +The message is not necessarily a Mainflux message - before consuming, Mainflux message can +be transformed into any valid format that specific consumer can understand. For example, +writers are consumers that can take a SenML or JSON message and store it. + +Consumers are optional services and are treated as plugins. In order to +run consumer services, core services must be up and running. + +For an in-depth explanation of the usage of `consumers`, as well as thorough +understanding of Mainflux, please check out the [official documentation][doc]. + +[doc]: http://mainflux.readthedocs.io diff --git a/consumers/api/logging.go b/consumers/api/logging.go new file mode 100644 index 0000000000..7c2e5eb25f --- /dev/null +++ b/consumers/api/logging.go @@ -0,0 +1,42 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// +build !test + +package api + +import ( + "fmt" + "time" + + "github.com/mainflux/mainflux/consumers" + log "github.com/mainflux/mainflux/logger" +) + +var _ consumers.Consumer = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger log.Logger + consumer consumers.Consumer +} + +// LoggingMiddleware adds logging facilities to the adapter. +func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer { + return &loggingMiddleware{ + logger: logger, + consumer: consumer, + } +} + +func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) { + defer func(begin time.Time) { + message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) + if err != nil { + lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) + return + } + lm.logger.Info(fmt.Sprintf("%s without errors.", message)) + }(time.Now()) + + return lm.consumer.Consume(msgs) +} diff --git a/consumers/api/metrics.go b/consumers/api/metrics.go new file mode 100644 index 0000000000..8021f1734c --- /dev/null +++ b/consumers/api/metrics.go @@ -0,0 +1,37 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "time" + + "github.com/go-kit/kit/metrics" + "github.com/mainflux/mainflux/consumers" +) + +var _ consumers.Consumer = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + consumer consumers.Consumer +} + +// MetricsMiddleware returns new message repository +// with Save method wrapped to expose metrics. +func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer { + return &metricsMiddleware{ + counter: counter, + latency: latency, + consumer: consumer, + } +} + +func (mm *metricsMiddleware) Consume(msgs interface{}) error { + defer func(begin time.Time) { + mm.counter.With("method", "consume").Add(1) + mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) + }(time.Now()) + return mm.consumer.Consume(msgs) +} diff --git a/writers/api/transport.go b/consumers/api/transport.go similarity index 100% rename from writers/api/transport.go rename to consumers/api/transport.go diff --git a/consumers/consumer.go b/consumers/consumer.go new file mode 100644 index 0000000000..00a359f689 --- /dev/null +++ b/consumers/consumer.go @@ -0,0 +1,11 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package consumers + +// Consumer specifies message consuming API. +type Consumer interface { + // Consume method is used to consumed received messages. + // A non-nil error is returned to indicate operation failure. + Consume(messages interface{}) error +} diff --git a/consumers/docs.go b/consumers/docs.go new file mode 100644 index 0000000000..a8a532ce94 --- /dev/null +++ b/consumers/docs.go @@ -0,0 +1,6 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package consumers contain the domain concept definitions needed to +// support Mainflux consumer services functionality. +package consumers diff --git a/writers/writer.go b/consumers/messages.go similarity index 72% rename from writers/writer.go rename to consumers/messages.go index 2ea05a548b..8791d32972 100644 --- a/writers/writer.go +++ b/consumers/messages.go @@ -1,7 +1,7 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -package writers +package consumers import ( "fmt" @@ -21,42 +21,36 @@ var ( errMessageConversion = errors.New("error conversing transformed messages") ) -type consumer struct { - repo MessageRepository - transformer transformers.Transformer - logger logger.Logger -} - // Start method starts consuming messages received from NATS. // This method transforms messages to SenML format before // using MessageRepository to store them. -func Start(sub messaging.Subscriber, repo MessageRepository, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { - c := consumer{ - repo: repo, - transformer: transformer, - logger: logger, - } - +func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { subjects, err := loadSubjectsConfig(subjectsCfgPath) if err != nil { logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) } for _, subject := range subjects { - if err := sub.Subscribe(subject, c.handler); err != nil { + if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil { return err } } return nil } -func (c *consumer) handler(msg messaging.Message) error { - t, err := c.transformer.Transform(msg) - if err != nil { - return err - } +func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler { + return func(msg messaging.Message) error { + m := interface{}(msg) + var err error + if t != nil { + m, err = t.Transform(msg) + if err != nil { + return err + } + } - return c.repo.Save(t) + return c.Consume(m) + } } type filterConfig struct { diff --git a/writers/README.md b/consumers/writers/README.md similarity index 100% rename from writers/README.md rename to consumers/writers/README.md diff --git a/writers/cassandra/README.md b/consumers/writers/cassandra/README.md similarity index 100% rename from writers/cassandra/README.md rename to consumers/writers/cassandra/README.md diff --git a/writers/cassandra/messages.go b/consumers/writers/cassandra/consumer.go similarity index 91% rename from writers/cassandra/messages.go rename to consumers/writers/cassandra/consumer.go index 5649172ad1..ce935f3eb4 100644 --- a/writers/cassandra/messages.go +++ b/consumers/writers/cassandra/consumer.go @@ -8,28 +8,28 @@ import ( "fmt" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" mfjson "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) var ( errSaveMessage = errors.New("failed to save message to cassandra database") errNoTable = errors.New("table does not exist") ) -var _ writers.MessageRepository = (*cassandraRepository)(nil) +var _ consumers.Consumer = (*cassandraRepository)(nil) type cassandraRepository struct { session *gocql.Session } // New instantiates Cassandra message repository. -func New(session *gocql.Session) writers.MessageRepository { +func New(session *gocql.Session) consumers.Consumer { return &cassandraRepository{session} } -func (cr *cassandraRepository) Save(message interface{}) error { +func (cr *cassandraRepository) Consume(message interface{}) error { switch m := message.(type) { case mfjson.Messages: return cr.saveJSON(m) diff --git a/writers/cassandra/messages_test.go b/consumers/writers/cassandra/consumer_test.go similarity index 94% rename from writers/cassandra/messages_test.go rename to consumers/writers/cassandra/consumer_test.go index e89bdd2356..50d1c1685a 100644 --- a/writers/cassandra/messages_test.go +++ b/consumers/writers/cassandra/consumer_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" + "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/cassandra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -68,6 +68,6 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } diff --git a/writers/cassandra/doc.go b/consumers/writers/cassandra/doc.go similarity index 100% rename from writers/cassandra/doc.go rename to consumers/writers/cassandra/doc.go diff --git a/writers/cassandra/init.go b/consumers/writers/cassandra/init.go similarity index 100% rename from writers/cassandra/init.go rename to consumers/writers/cassandra/init.go diff --git a/writers/cassandra/setup_test.go b/consumers/writers/cassandra/setup_test.go similarity index 96% rename from writers/cassandra/setup_test.go rename to consumers/writers/cassandra/setup_test.go index c1cfeccbbb..536b4e5a93 100644 --- a/writers/cassandra/setup_test.go +++ b/consumers/writers/cassandra/setup_test.go @@ -9,8 +9,8 @@ import ( "testing" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers/writers/cassandra" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/cassandra" dockertest "github.com/ory/dockertest/v3" ) diff --git a/writers/docs.go b/consumers/writers/docs.go similarity index 100% rename from writers/docs.go rename to consumers/writers/docs.go diff --git a/writers/influxdb/README.md b/consumers/writers/influxdb/README.md similarity index 100% rename from writers/influxdb/README.md rename to consumers/writers/influxdb/README.md diff --git a/writers/influxdb/messages.go b/consumers/writers/influxdb/consumer.go similarity index 91% rename from writers/influxdb/messages.go rename to consumers/writers/influxdb/consumer.go index 1eea0f397d..85298b15e6 100644 --- a/writers/influxdb/messages.go +++ b/consumers/writers/influxdb/consumer.go @@ -7,10 +7,10 @@ import ( "math" "time" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" influxdata "github.com/influxdata/influxdb/client/v2" ) @@ -24,7 +24,7 @@ var ( errSaveMessage = errors.New("failed to save message to influxdb database") errMessageFormat = errors.New("invalid message format") ) -var _ writers.MessageRepository = (*influxRepo)(nil) +var _ consumers.Consumer = (*influxRepo)(nil) type influxRepo struct { client influxdata.Client @@ -32,7 +32,7 @@ type influxRepo struct { } // New returns new InfluxDB writer. -func New(client influxdata.Client, database string) writers.MessageRepository { +func New(client influxdata.Client, database string) consumers.Consumer { return &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ @@ -41,7 +41,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository { } } -func (repo *influxRepo) Save(message interface{}) error { +func (repo *influxRepo) Consume(message interface{}) error { pts, err := influxdata.NewBatchPoints(repo.cfg) if err != nil { return errors.Wrap(errSaveMessage, err) diff --git a/writers/influxdb/messages_test.go b/consumers/writers/influxdb/consumer_test.go similarity index 96% rename from writers/influxdb/messages_test.go rename to consumers/writers/influxdb/consumer_test.go index cee0f2e5c7..06cd7a79d0 100644 --- a/writers/influxdb/messages_test.go +++ b/consumers/writers/influxdb/consumer_test.go @@ -10,9 +10,9 @@ import ( "time" influxdata "github.com/influxdata/influxdb/client/v2" + writer "github.com/mainflux/mainflux/consumers/writers/influxdb" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/transformers/senml" - writer "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -120,7 +120,7 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) row, err := queryDB(selectMsgs) diff --git a/writers/influxdb/doc.go b/consumers/writers/influxdb/doc.go similarity index 100% rename from writers/influxdb/doc.go rename to consumers/writers/influxdb/doc.go diff --git a/writers/influxdb/fields.go b/consumers/writers/influxdb/fields.go similarity index 100% rename from writers/influxdb/fields.go rename to consumers/writers/influxdb/fields.go diff --git a/writers/influxdb/setup_test.go b/consumers/writers/influxdb/setup_test.go similarity index 100% rename from writers/influxdb/setup_test.go rename to consumers/writers/influxdb/setup_test.go diff --git a/writers/influxdb/tags.go b/consumers/writers/influxdb/tags.go similarity index 100% rename from writers/influxdb/tags.go rename to consumers/writers/influxdb/tags.go diff --git a/writers/mongodb/README.md b/consumers/writers/mongodb/README.md similarity index 100% rename from writers/mongodb/README.md rename to consumers/writers/mongodb/README.md diff --git a/writers/mongodb/messages.go b/consumers/writers/mongodb/consumer.go similarity index 87% rename from writers/mongodb/messages.go rename to consumers/writers/mongodb/consumer.go index dbfc3b6855..942f0c41b7 100644 --- a/writers/mongodb/messages.go +++ b/consumers/writers/mongodb/consumer.go @@ -8,10 +8,10 @@ import ( "go.mongodb.org/mongo-driver/mongo" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) const ( @@ -24,18 +24,18 @@ var ( errMessageFormat = errors.New("invalid message format") ) -var _ writers.MessageRepository = (*mongoRepo)(nil) +var _ consumers.Consumer = (*mongoRepo)(nil) type mongoRepo struct { db *mongo.Database } // New returns new MongoDB writer. -func New(db *mongo.Database) writers.MessageRepository { +func New(db *mongo.Database) consumers.Consumer { return &mongoRepo{db} } -func (repo *mongoRepo) Save(message interface{}) error { +func (repo *mongoRepo) Consume(message interface{}) error { switch m := message.(type) { case json.Messages: return repo.saveJSON(m) diff --git a/writers/mongodb/messages_test.go b/consumers/writers/mongodb/consumer_test.go similarity index 96% rename from writers/mongodb/messages_test.go rename to consumers/writers/mongodb/consumer_test.go index 10cf58a9b0..7bc4f20de6 100644 --- a/writers/mongodb/messages_test.go +++ b/consumers/writers/mongodb/consumer_test.go @@ -13,8 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/mongodb" log "github.com/mainflux/mainflux/logger" "go.mongodb.org/mongo-driver/bson" @@ -82,7 +82,7 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) diff --git a/writers/mongodb/doc.go b/consumers/writers/mongodb/doc.go similarity index 100% rename from writers/mongodb/doc.go rename to consumers/writers/mongodb/doc.go diff --git a/writers/mongodb/setup_test.go b/consumers/writers/mongodb/setup_test.go similarity index 100% rename from writers/mongodb/setup_test.go rename to consumers/writers/mongodb/setup_test.go diff --git a/writers/postgres/README.md b/consumers/writers/postgres/README.md similarity index 100% rename from writers/postgres/README.md rename to consumers/writers/postgres/README.md diff --git a/writers/postgres/messages.go b/consumers/writers/postgres/consumer.go similarity index 96% rename from writers/postgres/messages.go rename to consumers/writers/postgres/consumer.go index 6fd424ccc5..a79c4c0757 100644 --- a/writers/postgres/messages.go +++ b/consumers/writers/postgres/consumer.go @@ -11,10 +11,10 @@ import ( "github.com/gofrs/uuid" "github.com/jmoiron/sqlx" "github.com/lib/pq" // required for DB access + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" mfjson "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) const ( @@ -31,18 +31,18 @@ var ( errNoTable = errors.New("relation does not exist") ) -var _ writers.MessageRepository = (*postgresRepo)(nil) +var _ consumers.Consumer = (*postgresRepo)(nil) type postgresRepo struct { db *sqlx.DB } // New returns new PostgreSQL writer. -func New(db *sqlx.DB) writers.MessageRepository { +func New(db *sqlx.DB) consumers.Consumer { return &postgresRepo{db: db} } -func (pr postgresRepo) Save(message interface{}) (err error) { +func (pr postgresRepo) Consume(message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return pr.saveJSON(m) diff --git a/writers/postgres/messages_test.go b/consumers/writers/postgres/consumer_test.go similarity index 93% rename from writers/postgres/messages_test.go rename to consumers/writers/postgres/consumer_test.go index 600b5eb5e7..88ad29cdb0 100644 --- a/writers/postgres/messages_test.go +++ b/consumers/writers/postgres/consumer_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/postgres" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,6 +67,6 @@ func TestMessageSave(t *testing.T) { msgs = append(msgs, msg) } - err = messageRepo.Save(msgs) + err = messageRepo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/writers/postgres/doc.go b/consumers/writers/postgres/doc.go similarity index 100% rename from writers/postgres/doc.go rename to consumers/writers/postgres/doc.go diff --git a/writers/postgres/init.go b/consumers/writers/postgres/init.go similarity index 100% rename from writers/postgres/init.go rename to consumers/writers/postgres/init.go diff --git a/writers/postgres/setup_test.go b/consumers/writers/postgres/setup_test.go similarity index 96% rename from writers/postgres/setup_test.go rename to consumers/writers/postgres/setup_test.go index f5edd656ba..2394135032 100644 --- a/writers/postgres/setup_test.go +++ b/consumers/writers/postgres/setup_test.go @@ -12,8 +12,8 @@ import ( "testing" "github.com/jmoiron/sqlx" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/postgres" dockertest "github.com/ory/dockertest/v3" ) diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 994c21a94e..233881f128 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -8,10 +8,10 @@ import ( "testing" "time" + writer "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/readers" - creaders "github.com/mainflux/mainflux/readers/cassandra" - cwriters "github.com/mainflux/mainflux/writers/cassandra" + reader "github.com/mainflux/mainflux/readers/cassandra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -42,13 +42,13 @@ var ( ) func TestReadSenml(t *testing.T) { - session, err := creaders.Connect(creaders.DBConfig{ + session, err := reader.Connect(reader.DBConfig{ Hosts: []string{addr}, Keyspace: keyspace, }) require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err)) defer session.Close() - writer := cwriters.New(session) + writer := writer.New(session) messages := []senml.Message{} subtopicMsgs := []senml.Message{} @@ -78,10 +78,10 @@ func TestReadSenml(t *testing.T) { } } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) - reader := creaders.New(session) + reader := reader.New(session) // Since messages are not saved in natural order, // cases that return subset of messages are only diff --git a/readers/cassandra/setup_test.go b/readers/cassandra/setup_test.go index c1cfeccbbb..536b4e5a93 100644 --- a/readers/cassandra/setup_test.go +++ b/readers/cassandra/setup_test.go @@ -9,8 +9,8 @@ import ( "testing" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers/writers/cassandra" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/cassandra" dockertest "github.com/ory/dockertest/v3" ) diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index a881da9678..e6eb3f1e22 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -6,11 +6,11 @@ import ( "time" influxdata "github.com/influxdata/influxdb/client/v2" + iwriter "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/pkg/transformers/senml" uuidProvider "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" ireader "github.com/mainflux/mainflux/readers/influxdb" - iwriter "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -96,7 +96,7 @@ func TestReadAll(t *testing.T) { messages = append(messages, msg) } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) reader := ireader.New(client, testDB) diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 40051f4e36..add30cffc1 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -10,10 +10,10 @@ import ( "testing" "time" + writer "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/readers" - mreader "github.com/mainflux/mainflux/readers/mongodb" - mwriter "github.com/mainflux/mainflux/writers/mongodb" + reader "github.com/mainflux/mainflux/readers/mongodb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,7 +54,7 @@ func TestReadSenml(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) db := client.Database(testDB) - writer := mwriter.New(db) + writer := writer.New(db) messages := []senml.Message{} subtopicMsgs := []senml.Message{} @@ -82,9 +82,9 @@ func TestReadSenml(t *testing.T) { subtopicMsgs = append(subtopicMsgs, msg) } } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) - reader := mreader.New(db) + reader := reader.New(db) cases := map[string]struct { chanID string diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 087e139a3d..8f775efed4 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" + pwriter "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/pkg/transformers/senml" uuidProvider "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" preader "github.com/mainflux/mainflux/readers/postgres" - pwriter "github.com/mainflux/mainflux/writers/postgres" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -36,7 +36,7 @@ var ( ) func TestReadSenml(t *testing.T) { - messageRepo := pwriter.New(db) + writer := pwriter.New(db) chanID, err := uuidProvider.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -92,7 +92,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = messageRepo.Save(messages) + err = writer.Consume(messages) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := preader.New(db) diff --git a/writers/api/logging.go b/writers/api/logging.go deleted file mode 100644 index 1d254e2d1e..0000000000 --- a/writers/api/logging.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -// +build !test - -package api - -import ( - "fmt" - "time" - - log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers" -) - -var _ writers.MessageRepository = (*loggingMiddleware)(nil) - -type loggingMiddleware struct { - logger log.Logger - svc writers.MessageRepository -} - -// LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(svc writers.MessageRepository, logger log.Logger) writers.MessageRepository { - return &loggingMiddleware{logger, svc} -} - -func (lm *loggingMiddleware) Save(msgs interface{}) (err error) { - defer func(begin time.Time) { - message := fmt.Sprintf("Method save took %s to complete", time.Since(begin)) - if err != nil { - lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) - return - } - lm.logger.Info(fmt.Sprintf("%s without errors.", message)) - }(time.Now()) - - return lm.svc.Save(msgs) -} diff --git a/writers/api/metrics.go b/writers/api/metrics.go deleted file mode 100644 index 07eab1d744..0000000000 --- a/writers/api/metrics.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "time" - - "github.com/go-kit/kit/metrics" - "github.com/mainflux/mainflux/writers" -) - -type metricsMiddleware struct { - counter metrics.Counter - latency metrics.Histogram - repo writers.MessageRepository -} - -// MetricsMiddleware returns new message repository with Save method wrapped to expose metrics. -func MetricsMiddleware(repo writers.MessageRepository, counter metrics.Counter, latency metrics.Histogram) writers.MessageRepository { - return &metricsMiddleware{ - counter: counter, - latency: latency, - repo: repo, - } -} - -func (mm *metricsMiddleware) Save(msgs interface{}) error { - defer func(begin time.Time) { - mm.counter.With("method", "handle_message").Add(1) - mm.latency.With("method", "handle_message").Observe(time.Since(begin).Seconds()) - }(time.Now()) - return mm.repo.Save(msgs) -} diff --git a/writers/messages.go b/writers/messages.go deleted file mode 100644 index ed4b42fc9e..0000000000 --- a/writers/messages.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package writers - -// MessageRepository specifies message writing API. -type MessageRepository interface { - // Save method is used to save published message. A non-nil - // error is returned to indicate operation failure. - Save(messages interface{}) error -}