From 7be1b628fd70b969a98f444b92bd46419f8ad478 Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Tue, 22 Dec 2020 15:38:05 +0100 Subject: [PATCH 1/9] Replace Writer with Consumer Signed-off-by: dusanb94 --- cmd/cassandra-writer/main.go | 10 +-- cmd/influxdb-writer/main.go | 8 +-- cmd/mongodb-writer/main.go | 6 +- cmd/postgres-writer/main.go | 10 +-- {writers => consumers}/README.md | 0 {writers => consumers}/api/logging.go | 19 ++--- consumers/api/metrics.go | 37 ++++++++++ {writers => consumers}/api/transport.go | 0 consumers/consumer.go | 72 +++++++++++++++++++ consumers/docs.go | 6 ++ consumers/messages.go | 11 +++ consumers/writers/README.md | 16 +++++ .../writers}/cassandra/README.md | 0 .../writers}/cassandra/doc.go | 0 .../writers}/cassandra/init.go | 0 .../writers}/cassandra/messages.go | 8 +-- .../writers}/cassandra/messages_test.go | 4 +- .../writers}/cassandra/setup_test.go | 2 +- {writers => consumers/writers}/docs.go | 0 .../writers}/influxdb/README.md | 0 .../writers}/influxdb/doc.go | 0 .../writers}/influxdb/fields.go | 0 .../writers}/influxdb/messages.go | 8 +-- .../writers}/influxdb/messages_test.go | 4 +- .../writers}/influxdb/setup_test.go | 0 .../writers}/influxdb/tags.go | 0 {writers => consumers/writers}/messages.go | 0 .../writers}/mongodb/README.md | 0 {writers => consumers/writers}/mongodb/doc.go | 0 .../writers}/mongodb/messages.go | 8 +-- .../writers}/mongodb/messages_test.go | 4 +- .../writers}/mongodb/setup_test.go | 0 .../writers}/postgres/README.md | 0 .../writers}/postgres/doc.go | 0 .../writers}/postgres/init.go | 0 .../writers}/postgres/messages.go | 8 +-- .../writers}/postgres/messages_test.go | 4 +- .../writers}/postgres/setup_test.go | 2 +- {writers => consumers/writers}/writer.go | 17 ++--- writers/api/metrics.go | 34 --------- 40 files changed, 205 insertions(+), 93 deletions(-) rename {writers => consumers}/README.md (100%) rename {writers => consumers}/api/logging.go (51%) create mode 100644 consumers/api/metrics.go rename {writers => consumers}/api/transport.go (100%) create mode 100644 consumers/consumer.go create mode 100644 consumers/docs.go create mode 100644 consumers/messages.go create mode 100644 consumers/writers/README.md rename {writers => consumers/writers}/cassandra/README.md (100%) rename {writers => consumers/writers}/cassandra/doc.go (100%) rename {writers => consumers/writers}/cassandra/init.go (100%) rename {writers => consumers/writers}/cassandra/messages.go (91%) rename {writers => consumers/writers}/cassandra/messages_test.go (94%) rename {writers => consumers/writers}/cassandra/setup_test.go (96%) rename {writers => consumers/writers}/docs.go (100%) rename {writers => consumers/writers}/influxdb/README.md (100%) rename {writers => consumers/writers}/influxdb/doc.go (100%) rename {writers => consumers/writers}/influxdb/fields.go (100%) rename {writers => consumers/writers}/influxdb/messages.go (91%) rename {writers => consumers/writers}/influxdb/messages_test.go (96%) rename {writers => consumers/writers}/influxdb/setup_test.go (100%) rename {writers => consumers/writers}/influxdb/tags.go (100%) rename {writers => consumers/writers}/messages.go (100%) rename {writers => consumers/writers}/mongodb/README.md (100%) rename {writers => consumers/writers}/mongodb/doc.go (100%) rename {writers => consumers/writers}/mongodb/messages.go (87%) rename {writers => consumers/writers}/mongodb/messages_test.go (96%) rename {writers => consumers/writers}/mongodb/setup_test.go (100%) rename {writers => consumers/writers}/postgres/README.md (100%) rename {writers => consumers/writers}/postgres/doc.go (100%) rename {writers => consumers/writers}/postgres/init.go (100%) rename {writers => consumers/writers}/postgres/messages.go (96%) rename {writers => consumers/writers}/postgres/messages_test.go (93%) rename {writers => consumers/writers}/postgres/setup_test.go (96%) rename {writers => consumers/writers}/writer.go (79%) delete mode 100644 writers/api/metrics.go diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index d990cd8bcf..3df5396808 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -16,12 +16,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/gocql/gocql" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/cassandra" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -82,7 +82,7 @@ func main() { repo := newService(session, logger) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } @@ -134,7 +134,7 @@ func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.S return session } -func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository { +func newService(session *gocql.Session, logger logger.Logger) consumers.MessageConsumer { repo := cassandra.New(session) repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware( diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 298383fcc3..e876345926 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/influxdb" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -91,7 +91,7 @@ func main() { repo = api.MetricsMiddleware(repo, counter, latency) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index ab786e2e32..35e15ad1bd 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers" + "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/mongodb" stdprometheus "github.com/prometheus/client_golang/prometheus" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 98e56277eb..d5e61cdeda 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -14,12 +14,12 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/jmoiron/sqlx" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/api" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" - "github.com/mainflux/mainflux/writers/api" - "github.com/mainflux/mainflux/writers/postgres" stdprometheus "github.com/prometheus/client_golang/prometheus" ) @@ -88,7 +88,7 @@ func main() { repo := newService(db, logger) st := senml.New(cfg.contentType) - if err = writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err = consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err)) } @@ -138,7 +138,7 @@ func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB { return db } -func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository { +func newService(db *sqlx.DB, logger logger.Logger) consumers.MessageConsumer { svc := postgres.New(db) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( diff --git a/writers/README.md b/consumers/README.md similarity index 100% rename from writers/README.md rename to consumers/README.md diff --git a/writers/api/logging.go b/consumers/api/logging.go similarity index 51% rename from writers/api/logging.go rename to consumers/api/logging.go index 1d254e2d1e..e6fcccce5a 100644 --- a/writers/api/logging.go +++ b/consumers/api/logging.go @@ -9,25 +9,28 @@ import ( "fmt" "time" + "github.com/mainflux/mainflux/consumers" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers" ) -var _ writers.MessageRepository = (*loggingMiddleware)(nil) +var _ consumers.MessageConsumer = (*loggingMiddleware)(nil) type loggingMiddleware struct { logger log.Logger - svc writers.MessageRepository + c consumers.MessageConsumer } // LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(svc writers.MessageRepository, logger log.Logger) writers.MessageRepository { - return &loggingMiddleware{logger, svc} +func LoggingMiddleware(c consumers.MessageConsumer, logger log.Logger) consumers.MessageConsumer { + return &loggingMiddleware{ + logger: logger, + c: c, + } } -func (lm *loggingMiddleware) Save(msgs interface{}) (err error) { +func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) { defer func(begin time.Time) { - message := fmt.Sprintf("Method save took %s to complete", time.Since(begin)) + message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -35,5 +38,5 @@ func (lm *loggingMiddleware) Save(msgs interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Save(msgs) + return lm.c.Consume(msgs) } diff --git a/consumers/api/metrics.go b/consumers/api/metrics.go new file mode 100644 index 0000000000..6140f45de3 --- /dev/null +++ b/consumers/api/metrics.go @@ -0,0 +1,37 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "time" + + "github.com/go-kit/kit/metrics" + "github.com/mainflux/mainflux/consumers" +) + +var _ consumers.MessageConsumer = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + c consumers.MessageConsumer +} + +// MetricsMiddleware returns new message repository +// with Save method wrapped to expose metrics. +func MetricsMiddleware(c consumers.MessageConsumer, counter metrics.Counter, latency metrics.Histogram) consumers.MessageConsumer { + return &metricsMiddleware{ + counter: counter, + latency: latency, + c: c, + } +} + +func (mm *metricsMiddleware) Consume(msgs interface{}) error { + defer func(begin time.Time) { + mm.counter.With("method", "consume").Add(1) + mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) + }(time.Now()) + return mm.c.Consume(msgs) +} diff --git a/writers/api/transport.go b/consumers/api/transport.go similarity index 100% rename from writers/api/transport.go rename to consumers/api/transport.go diff --git a/consumers/consumer.go b/consumers/consumer.go new file mode 100644 index 0000000000..ab0b4fc13a --- /dev/null +++ b/consumers/consumer.go @@ -0,0 +1,72 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package consumers + +import ( + "fmt" + "io/ioutil" + + "github.com/BurntSushi/toml" + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/errors" + "github.com/mainflux/mainflux/pkg/messaging" + pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/transformers" +) + +var ( + errOpenConfFile = errors.New("unable to open configuration file") + errParseConfFile = errors.New("unable to parse configuration file") + errMessageConversion = errors.New("error conversing transformed messages") +) + +// Start method starts consuming messages received from NATS. +// This method transforms messages to SenML format before +// using MessageRepository to store them. +func Start(sub messaging.Subscriber, consumer MessageConsumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { + subjects, err := loadSubjectsConfig(subjectsCfgPath) + if err != nil { + logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) + } + + for _, subject := range subjects { + if err := sub.Subscribe(subject, handle(transformer, consumer)); err != nil { + return err + } + } + return nil +} + +func handle(t transformers.Transformer, c MessageConsumer) messaging.MessageHandler { + return func(msg messaging.Message) error { + m, err := t.Transform(msg) + if err != nil { + return err + } + + return c.Consume(m) + } +} + +type filterConfig struct { + Filter []string `toml:"filter"` +} + +type subjectsConfig struct { + Subjects filterConfig `toml:"subjects"` +} + +func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) { + data, err := ioutil.ReadFile(subjectsConfigPath) + if err != nil { + return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err) + } + + var subjectsCfg subjectsConfig + if err := toml.Unmarshal(data, &subjectsCfg); err != nil { + return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err) + } + + return subjectsCfg.Subjects.Filter, nil +} diff --git a/consumers/docs.go b/consumers/docs.go new file mode 100644 index 0000000000..b119b8ff9b --- /dev/null +++ b/consumers/docs.go @@ -0,0 +1,6 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package writers contain the domain concept definitions needed to +// support Mainflux writer services functionality. +package consumers diff --git a/consumers/messages.go b/consumers/messages.go new file mode 100644 index 0000000000..5f6d83d93b --- /dev/null +++ b/consumers/messages.go @@ -0,0 +1,11 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package consumers + +// MessageConsumer specifies message writing API. +type MessageConsumer interface { + // MessageConsumer method is used to save published message. A non-nil + // error is returned to indicate operation failure. + Consume(messages interface{}) error +} diff --git a/consumers/writers/README.md b/consumers/writers/README.md new file mode 100644 index 0000000000..b715a11901 --- /dev/null +++ b/consumers/writers/README.md @@ -0,0 +1,16 @@ +# Writers + +Writers provide an implementation of various `message writers`. +Message writers are services that normalize (in `SenML` format) +Mainflux messages and store them in specific data store. + +Writers are optional services and are treated as plugins. In order to +run writer services, core services must be up and running. For more info +on the platform core services with its dependencies, please check out +the [Docker Compose][compose] file. + +For an in-depth explanation of the usage of `writers`, as well as thorough +understanding of Mainflux, please check out the [official documentation][doc]. + +[doc]: http://mainflux.readthedocs.io +[compose]: ../docker/docker-compose.yml diff --git a/writers/cassandra/README.md b/consumers/writers/cassandra/README.md similarity index 100% rename from writers/cassandra/README.md rename to consumers/writers/cassandra/README.md diff --git a/writers/cassandra/doc.go b/consumers/writers/cassandra/doc.go similarity index 100% rename from writers/cassandra/doc.go rename to consumers/writers/cassandra/doc.go diff --git a/writers/cassandra/init.go b/consumers/writers/cassandra/init.go similarity index 100% rename from writers/cassandra/init.go rename to consumers/writers/cassandra/init.go diff --git a/writers/cassandra/messages.go b/consumers/writers/cassandra/messages.go similarity index 91% rename from writers/cassandra/messages.go rename to consumers/writers/cassandra/messages.go index 5649172ad1..e003c3f8af 100644 --- a/writers/cassandra/messages.go +++ b/consumers/writers/cassandra/messages.go @@ -8,28 +8,28 @@ import ( "fmt" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" mfjson "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) var ( errSaveMessage = errors.New("failed to save message to cassandra database") errNoTable = errors.New("table does not exist") ) -var _ writers.MessageRepository = (*cassandraRepository)(nil) +var _ consumers.MessageConsumer = (*cassandraRepository)(nil) type cassandraRepository struct { session *gocql.Session } // New instantiates Cassandra message repository. -func New(session *gocql.Session) writers.MessageRepository { +func New(session *gocql.Session) consumers.MessageConsumer { return &cassandraRepository{session} } -func (cr *cassandraRepository) Save(message interface{}) error { +func (cr *cassandraRepository) Consume(message interface{}) error { switch m := message.(type) { case mfjson.Messages: return cr.saveJSON(m) diff --git a/writers/cassandra/messages_test.go b/consumers/writers/cassandra/messages_test.go similarity index 94% rename from writers/cassandra/messages_test.go rename to consumers/writers/cassandra/messages_test.go index e89bdd2356..50d1c1685a 100644 --- a/writers/cassandra/messages_test.go +++ b/consumers/writers/cassandra/messages_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" + "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/cassandra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -68,6 +68,6 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err)) } diff --git a/writers/cassandra/setup_test.go b/consumers/writers/cassandra/setup_test.go similarity index 96% rename from writers/cassandra/setup_test.go rename to consumers/writers/cassandra/setup_test.go index c1cfeccbbb..536b4e5a93 100644 --- a/writers/cassandra/setup_test.go +++ b/consumers/writers/cassandra/setup_test.go @@ -9,8 +9,8 @@ import ( "testing" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers/writers/cassandra" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/cassandra" dockertest "github.com/ory/dockertest/v3" ) diff --git a/writers/docs.go b/consumers/writers/docs.go similarity index 100% rename from writers/docs.go rename to consumers/writers/docs.go diff --git a/writers/influxdb/README.md b/consumers/writers/influxdb/README.md similarity index 100% rename from writers/influxdb/README.md rename to consumers/writers/influxdb/README.md diff --git a/writers/influxdb/doc.go b/consumers/writers/influxdb/doc.go similarity index 100% rename from writers/influxdb/doc.go rename to consumers/writers/influxdb/doc.go diff --git a/writers/influxdb/fields.go b/consumers/writers/influxdb/fields.go similarity index 100% rename from writers/influxdb/fields.go rename to consumers/writers/influxdb/fields.go diff --git a/writers/influxdb/messages.go b/consumers/writers/influxdb/messages.go similarity index 91% rename from writers/influxdb/messages.go rename to consumers/writers/influxdb/messages.go index 1eea0f397d..c329a6ba7d 100644 --- a/writers/influxdb/messages.go +++ b/consumers/writers/influxdb/messages.go @@ -7,10 +7,10 @@ import ( "math" "time" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" influxdata "github.com/influxdata/influxdb/client/v2" ) @@ -24,7 +24,7 @@ var ( errSaveMessage = errors.New("failed to save message to influxdb database") errMessageFormat = errors.New("invalid message format") ) -var _ writers.MessageRepository = (*influxRepo)(nil) +var _ consumers.MessageConsumer = (*influxRepo)(nil) type influxRepo struct { client influxdata.Client @@ -32,7 +32,7 @@ type influxRepo struct { } // New returns new InfluxDB writer. -func New(client influxdata.Client, database string) writers.MessageRepository { +func New(client influxdata.Client, database string) consumers.MessageConsumer { return &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ @@ -41,7 +41,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository { } } -func (repo *influxRepo) Save(message interface{}) error { +func (repo *influxRepo) Consume(message interface{}) error { pts, err := influxdata.NewBatchPoints(repo.cfg) if err != nil { return errors.Wrap(errSaveMessage, err) diff --git a/writers/influxdb/messages_test.go b/consumers/writers/influxdb/messages_test.go similarity index 96% rename from writers/influxdb/messages_test.go rename to consumers/writers/influxdb/messages_test.go index cee0f2e5c7..06cd7a79d0 100644 --- a/writers/influxdb/messages_test.go +++ b/consumers/writers/influxdb/messages_test.go @@ -10,9 +10,9 @@ import ( "time" influxdata "github.com/influxdata/influxdb/client/v2" + writer "github.com/mainflux/mainflux/consumers/writers/influxdb" log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/transformers/senml" - writer "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -120,7 +120,7 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) row, err := queryDB(selectMsgs) diff --git a/writers/influxdb/setup_test.go b/consumers/writers/influxdb/setup_test.go similarity index 100% rename from writers/influxdb/setup_test.go rename to consumers/writers/influxdb/setup_test.go diff --git a/writers/influxdb/tags.go b/consumers/writers/influxdb/tags.go similarity index 100% rename from writers/influxdb/tags.go rename to consumers/writers/influxdb/tags.go diff --git a/writers/messages.go b/consumers/writers/messages.go similarity index 100% rename from writers/messages.go rename to consumers/writers/messages.go diff --git a/writers/mongodb/README.md b/consumers/writers/mongodb/README.md similarity index 100% rename from writers/mongodb/README.md rename to consumers/writers/mongodb/README.md diff --git a/writers/mongodb/doc.go b/consumers/writers/mongodb/doc.go similarity index 100% rename from writers/mongodb/doc.go rename to consumers/writers/mongodb/doc.go diff --git a/writers/mongodb/messages.go b/consumers/writers/mongodb/messages.go similarity index 87% rename from writers/mongodb/messages.go rename to consumers/writers/mongodb/messages.go index dbfc3b6855..1afbe4a2f3 100644 --- a/writers/mongodb/messages.go +++ b/consumers/writers/mongodb/messages.go @@ -8,10 +8,10 @@ import ( "go.mongodb.org/mongo-driver/mongo" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) const ( @@ -24,18 +24,18 @@ var ( errMessageFormat = errors.New("invalid message format") ) -var _ writers.MessageRepository = (*mongoRepo)(nil) +var _ consumers.MessageConsumer = (*mongoRepo)(nil) type mongoRepo struct { db *mongo.Database } // New returns new MongoDB writer. -func New(db *mongo.Database) writers.MessageRepository { +func New(db *mongo.Database) consumers.MessageConsumer { return &mongoRepo{db} } -func (repo *mongoRepo) Save(message interface{}) error { +func (repo *mongoRepo) Consume(message interface{}) error { switch m := message.(type) { case json.Messages: return repo.saveJSON(m) diff --git a/writers/mongodb/messages_test.go b/consumers/writers/mongodb/messages_test.go similarity index 96% rename from writers/mongodb/messages_test.go rename to consumers/writers/mongodb/messages_test.go index 10cf58a9b0..7bc4f20de6 100644 --- a/writers/mongodb/messages_test.go +++ b/consumers/writers/mongodb/messages_test.go @@ -13,8 +13,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/mongodb" log "github.com/mainflux/mainflux/logger" "go.mongodb.org/mongo-driver/bson" @@ -82,7 +82,7 @@ func TestSave(t *testing.T) { msgs = append(msgs, msg) } - err = repo.Save(msgs) + err = repo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) count, err := db.Collection(collection).CountDocuments(context.Background(), bson.D{}) diff --git a/writers/mongodb/setup_test.go b/consumers/writers/mongodb/setup_test.go similarity index 100% rename from writers/mongodb/setup_test.go rename to consumers/writers/mongodb/setup_test.go diff --git a/writers/postgres/README.md b/consumers/writers/postgres/README.md similarity index 100% rename from writers/postgres/README.md rename to consumers/writers/postgres/README.md diff --git a/writers/postgres/doc.go b/consumers/writers/postgres/doc.go similarity index 100% rename from writers/postgres/doc.go rename to consumers/writers/postgres/doc.go diff --git a/writers/postgres/init.go b/consumers/writers/postgres/init.go similarity index 100% rename from writers/postgres/init.go rename to consumers/writers/postgres/init.go diff --git a/writers/postgres/messages.go b/consumers/writers/postgres/messages.go similarity index 96% rename from writers/postgres/messages.go rename to consumers/writers/postgres/messages.go index 6fd424ccc5..81c5f08fd0 100644 --- a/writers/postgres/messages.go +++ b/consumers/writers/postgres/messages.go @@ -11,10 +11,10 @@ import ( "github.com/gofrs/uuid" "github.com/jmoiron/sqlx" "github.com/lib/pq" // required for DB access + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/pkg/errors" mfjson "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers" ) const ( @@ -31,18 +31,18 @@ var ( errNoTable = errors.New("relation does not exist") ) -var _ writers.MessageRepository = (*postgresRepo)(nil) +var _ consumers.MessageConsumer = (*postgresRepo)(nil) type postgresRepo struct { db *sqlx.DB } // New returns new PostgreSQL writer. -func New(db *sqlx.DB) writers.MessageRepository { +func New(db *sqlx.DB) consumers.MessageConsumer { return &postgresRepo{db: db} } -func (pr postgresRepo) Save(message interface{}) (err error) { +func (pr postgresRepo) Consume(message interface{}) (err error) { switch m := message.(type) { case mfjson.Messages: return pr.saveJSON(m) diff --git a/writers/postgres/messages_test.go b/consumers/writers/postgres/messages_test.go similarity index 93% rename from writers/postgres/messages_test.go rename to consumers/writers/postgres/messages_test.go index 600b5eb5e7..88ad29cdb0 100644 --- a/writers/postgres/messages_test.go +++ b/consumers/writers/postgres/messages_test.go @@ -8,8 +8,8 @@ import ( "testing" "time" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/writers/postgres" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,6 +67,6 @@ func TestMessageSave(t *testing.T) { msgs = append(msgs, msg) } - err = messageRepo.Save(msgs) + err = messageRepo.Consume(msgs) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) } diff --git a/writers/postgres/setup_test.go b/consumers/writers/postgres/setup_test.go similarity index 96% rename from writers/postgres/setup_test.go rename to consumers/writers/postgres/setup_test.go index f5edd656ba..2394135032 100644 --- a/writers/postgres/setup_test.go +++ b/consumers/writers/postgres/setup_test.go @@ -12,8 +12,8 @@ import ( "testing" "github.com/jmoiron/sqlx" + "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/postgres" dockertest "github.com/ory/dockertest/v3" ) diff --git a/writers/writer.go b/consumers/writers/writer.go similarity index 79% rename from writers/writer.go rename to consumers/writers/writer.go index 2ea05a548b..cf62b9c3e5 100644 --- a/writers/writer.go +++ b/consumers/writers/writer.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "github.com/BurntSushi/toml" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" @@ -21,8 +22,8 @@ var ( errMessageConversion = errors.New("error conversing transformed messages") ) -type consumer struct { - repo MessageRepository +type writer struct { + consumer consumers.MessageConsumer transformer transformers.Transformer logger logger.Logger } @@ -30,9 +31,9 @@ type consumer struct { // Start method starts consuming messages received from NATS. // This method transforms messages to SenML format before // using MessageRepository to store them. -func Start(sub messaging.Subscriber, repo MessageRepository, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { - c := consumer{ - repo: repo, +func Start(sub messaging.Subscriber, consumer consumers.MessageConsumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { + w := writer{ + consumer: consumer, transformer: transformer, logger: logger, } @@ -43,20 +44,20 @@ func Start(sub messaging.Subscriber, repo MessageRepository, transformer transfo } for _, subject := range subjects { - if err := sub.Subscribe(subject, c.handler); err != nil { + if err := sub.Subscribe(subject, w.handler); err != nil { return err } } return nil } -func (c *consumer) handler(msg messaging.Message) error { +func (c *writer) handler(msg messaging.Message) error { t, err := c.transformer.Transform(msg) if err != nil { return err } - return c.repo.Save(t) + return c.consumer.Consume(t) } type filterConfig struct { diff --git a/writers/api/metrics.go b/writers/api/metrics.go deleted file mode 100644 index 07eab1d744..0000000000 --- a/writers/api/metrics.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package api - -import ( - "time" - - "github.com/go-kit/kit/metrics" - "github.com/mainflux/mainflux/writers" -) - -type metricsMiddleware struct { - counter metrics.Counter - latency metrics.Histogram - repo writers.MessageRepository -} - -// MetricsMiddleware returns new message repository with Save method wrapped to expose metrics. -func MetricsMiddleware(repo writers.MessageRepository, counter metrics.Counter, latency metrics.Histogram) writers.MessageRepository { - return &metricsMiddleware{ - counter: counter, - latency: latency, - repo: repo, - } -} - -func (mm *metricsMiddleware) Save(msgs interface{}) error { - defer func(begin time.Time) { - mm.counter.With("method", "handle_message").Add(1) - mm.latency.With("method", "handle_message").Observe(time.Since(begin).Seconds()) - }(time.Now()) - return mm.repo.Save(msgs) -} From 691f373d7b886dba685fda0e1de31f875cf510d2 Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Thu, 24 Dec 2020 15:42:14 +0100 Subject: [PATCH 2/9] Add Notifications package Signed-off-by: dusanb94 --- .../notifications/smtp/cassandra/README.md | 72 +++++++++++++++++++ .../notifications/smtp/cassandra/consumer.go | 29 ++++++++ consumers/notifications/smtp/cassandra/doc.go | 6 ++ 3 files changed, 107 insertions(+) create mode 100644 consumers/notifications/smtp/cassandra/README.md create mode 100644 consumers/notifications/smtp/cassandra/consumer.go create mode 100644 consumers/notifications/smtp/cassandra/doc.go diff --git a/consumers/notifications/smtp/cassandra/README.md b/consumers/notifications/smtp/cassandra/README.md new file mode 100644 index 0000000000..df9a232ddf --- /dev/null +++ b/consumers/notifications/smtp/cassandra/README.md @@ -0,0 +1,72 @@ +# SMTP Notifications + +SMTP Notifications provides a service for sending notification emails. + +## Configuration + +The service is configured using the environment variables presented in the +following table. Note that any unset variables will be replaced with their +default values. + +| Variable | Description | Default | +| -------------------------------- | --------------------------------------------------------- | ---------------------- | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_SMTP_NOTIFICATIONS_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error | +| MF_SMTP_NOTIFICATIONS_PORT | Service HTTP port | 8180 | +| MF_SMTP_NOTIFICATIONS_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | + +## Deployment + +```yaml + version: "3.7" + cassandra-writer: + image: mainflux/cassandra-writer:[version] + container_name: [instance name] + expose: + - [Service HTTP port] + restart: on-failure + environment: + MF_NATS_URL: [NATS instance URL] + MF_SMTP_NOTIFICATIONS_LOG_LEVEL: [Cassandra writer log level] + MF_SMTP_NOTIFICATIONS_PORT: [Service HTTP port] + MF_SMTP_NOTIFICATIONS_CONFIG_PATH: [Configuration file path with NATS subjects list] + ports: + - [host machine port]:[configured HTTP port] + volume: + - ./config.toml:/config.toml +``` + +To start the service, execute the following shell script: + +```bash +# download the latest version of the service +git clone https://github.com/mainflux/mainflux + +cd mainflux + +# compile the cassandra writer +make cassandra-writer + +# copy binary to bin +make install + +# Set the environment variables and run the service +MF_NATS_URL=[NATS instance URL] \ +MF_SMTP_NOTIFICATIONS_LOG_LEVEL=[Cassandra writer log level] \ +MF_SMTP_NOTIFICATIONS_PORT=[Service HTTP port] \ +MF_SMTP_NOTIFICATIONS_CONFIG_PATH=[Configuration file path with NATS subjects list] \ +$GOBIN/mainflux-cassandra-writer +``` + +### Using docker-compose + +This service can be deployed using docker containers. Docker compose file is +available in `/docker/addons/smtp-notifications/docker-compose.yml`. +In order to run all Mainflux core services, as well as mentioned optional ones, +execute following command: + +## Usage + +Starting service will start consuming messages. + +[doc]: http://mainflux.readthedocs.io diff --git a/consumers/notifications/smtp/cassandra/consumer.go b/consumers/notifications/smtp/cassandra/consumer.go new file mode 100644 index 0000000000..b5d05cf014 --- /dev/null +++ b/consumers/notifications/smtp/cassandra/consumer.go @@ -0,0 +1,29 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package smtp + +import ( + "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/internal/email" + "github.com/mainflux/mainflux/pkg/errors" +) + +var ( + errSaveMessage = errors.New("failed to save message to cassandra database") + errNoTable = errors.New("table does not exist") +) +var _ consumers.MessageConsumer = (*emailer)(nil) + +type emailer struct { + agent *email.Agent +} + +// New instantiates Cassandra message repository. +func New(agent *email.Agent) consumers.MessageConsumer { + return &emailer{agent: agent} +} + +func (c emailer) Consume(message interface{}) error { + return c.agent.Send(nil, "", "Password reset", "", "", "") +} diff --git a/consumers/notifications/smtp/cassandra/doc.go b/consumers/notifications/smtp/cassandra/doc.go new file mode 100644 index 0000000000..e94bc450c4 --- /dev/null +++ b/consumers/notifications/smtp/cassandra/doc.go @@ -0,0 +1,6 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package smtp contains the domain concept definitions needed to +// support Mainflux SMTP notifications. +package smtp From d7c20a0882e8d53921c613a5683f048d6b1a044c Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Mon, 28 Dec 2020 11:45:04 +0100 Subject: [PATCH 3/9] Update Consumer Start Signed-off-by: dusanb94 --- consumers/consumer.go | 10 +++++++--- .../smtp/cassandra => notify/smtp}/README.md | 0 .../smtp/cassandra => notify/smtp}/consumer.go | 0 .../smtp/cassandra => notify/smtp}/doc.go | 0 4 files changed, 7 insertions(+), 3 deletions(-) rename consumers/{notifications/smtp/cassandra => notify/smtp}/README.md (100%) rename consumers/{notifications/smtp/cassandra => notify/smtp}/consumer.go (100%) rename consumers/{notifications/smtp/cassandra => notify/smtp}/doc.go (100%) diff --git a/consumers/consumer.go b/consumers/consumer.go index ab0b4fc13a..e440e40426 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -40,9 +40,13 @@ func Start(sub messaging.Subscriber, consumer MessageConsumer, transformer trans func handle(t transformers.Transformer, c MessageConsumer) messaging.MessageHandler { return func(msg messaging.Message) error { - m, err := t.Transform(msg) - if err != nil { - return err + m := interface{}(msg) + var err error + if t != nil { + m, err = t.Transform(msg) + if err != nil { + return err + } } return c.Consume(m) diff --git a/consumers/notifications/smtp/cassandra/README.md b/consumers/notify/smtp/README.md similarity index 100% rename from consumers/notifications/smtp/cassandra/README.md rename to consumers/notify/smtp/README.md diff --git a/consumers/notifications/smtp/cassandra/consumer.go b/consumers/notify/smtp/consumer.go similarity index 100% rename from consumers/notifications/smtp/cassandra/consumer.go rename to consumers/notify/smtp/consumer.go diff --git a/consumers/notifications/smtp/cassandra/doc.go b/consumers/notify/smtp/doc.go similarity index 100% rename from consumers/notifications/smtp/cassandra/doc.go rename to consumers/notify/smtp/doc.go From 26ab94b0b6c19833af0f1c82230362a3d1cd52aa Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Thu, 31 Dec 2020 14:06:30 +0100 Subject: [PATCH 4/9] Fix Readers Signed-off-by: dusanb94 --- readers/cassandra/messages_test.go | 12 ++++++------ readers/cassandra/setup_test.go | 2 +- readers/influxdb/messages_test.go | 2 +- readers/mongodb/messages_test.go | 10 +++++----- readers/postgres/messages_test.go | 4 ++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 994c21a94e..233881f128 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -8,10 +8,10 @@ import ( "testing" "time" + writer "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/readers" - creaders "github.com/mainflux/mainflux/readers/cassandra" - cwriters "github.com/mainflux/mainflux/writers/cassandra" + reader "github.com/mainflux/mainflux/readers/cassandra" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -42,13 +42,13 @@ var ( ) func TestReadSenml(t *testing.T) { - session, err := creaders.Connect(creaders.DBConfig{ + session, err := reader.Connect(reader.DBConfig{ Hosts: []string{addr}, Keyspace: keyspace, }) require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err)) defer session.Close() - writer := cwriters.New(session) + writer := writer.New(session) messages := []senml.Message{} subtopicMsgs := []senml.Message{} @@ -78,10 +78,10 @@ func TestReadSenml(t *testing.T) { } } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) - reader := creaders.New(session) + reader := reader.New(session) // Since messages are not saved in natural order, // cases that return subset of messages are only diff --git a/readers/cassandra/setup_test.go b/readers/cassandra/setup_test.go index c1cfeccbbb..536b4e5a93 100644 --- a/readers/cassandra/setup_test.go +++ b/readers/cassandra/setup_test.go @@ -9,8 +9,8 @@ import ( "testing" "github.com/gocql/gocql" + "github.com/mainflux/mainflux/consumers/writers/cassandra" log "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/writers/cassandra" dockertest "github.com/ory/dockertest/v3" ) diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index a881da9678..4a6c0e9c37 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -96,7 +96,7 @@ func TestReadAll(t *testing.T) { messages = append(messages, msg) } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to InfluxDB: %s", err)) reader := ireader.New(client, testDB) diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index 40051f4e36..add30cffc1 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -10,10 +10,10 @@ import ( "testing" "time" + writer "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/pkg/transformers/senml" "github.com/mainflux/mainflux/readers" - mreader "github.com/mainflux/mainflux/readers/mongodb" - mwriter "github.com/mainflux/mainflux/writers/mongodb" + reader "github.com/mainflux/mainflux/readers/mongodb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,7 +54,7 @@ func TestReadSenml(t *testing.T) { require.Nil(t, err, fmt.Sprintf("Creating new MongoDB client expected to succeed: %s.\n", err)) db := client.Database(testDB) - writer := mwriter.New(db) + writer := writer.New(db) messages := []senml.Message{} subtopicMsgs := []senml.Message{} @@ -82,9 +82,9 @@ func TestReadSenml(t *testing.T) { subtopicMsgs = append(subtopicMsgs, msg) } } - err = writer.Save(messages) + err = writer.Consume(messages) require.Nil(t, err, fmt.Sprintf("failed to store message to MongoDB: %s", err)) - reader := mreader.New(db) + reader := reader.New(db) cases := map[string]struct { chanID string diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index 087e139a3d..dee457c648 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -36,7 +36,7 @@ var ( ) func TestReadSenml(t *testing.T) { - messageRepo := pwriter.New(db) + writer := pwriter.New(db) chanID, err := uuidProvider.New().ID() require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) @@ -92,7 +92,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = messageRepo.Save(messages) + err = writer.Consume(messages) assert.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := preader.New(db) From e28e8239d809e084ad86498aac6586c8e0a39f94 Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Tue, 5 Jan 2021 12:09:34 +0100 Subject: [PATCH 5/9] Fix Consumer naming Signed-off-by: dusanb94 --- cmd/cassandra-writer/main.go | 2 +- cmd/mongodb-writer/main.go | 4 +- cmd/postgres-writer/main.go | 2 +- consumers/api/logging.go | 6 +- consumers/api/metrics.go | 6 +- consumers/consumer.go | 75 ++--------------- consumers/docs.go | 2 +- consumers/messages.go | 75 +++++++++++++++-- consumers/notify/smtp/consumer.go | 4 +- .../cassandra/{messages.go => consumer.go} | 4 +- .../{messages_test.go => consumer_test.go} | 0 .../influxdb/{messages.go => consumer.go} | 4 +- .../{messages_test.go => consumer_test.go} | 0 consumers/writers/messages.go | 11 --- .../mongodb/{messages.go => consumer.go} | 4 +- .../{messages_test.go => consumer_test.go} | 0 .../postgres/{messages.go => consumer.go} | 4 +- .../{messages_test.go => consumer_test.go} | 0 consumers/writers/writer.go | 83 ------------------- 19 files changed, 96 insertions(+), 190 deletions(-) rename consumers/writers/cassandra/{messages.go => consumer.go} (95%) rename consumers/writers/cassandra/{messages_test.go => consumer_test.go} (100%) rename consumers/writers/influxdb/{messages.go => consumer.go} (94%) rename consumers/writers/influxdb/{messages_test.go => consumer_test.go} (100%) delete mode 100644 consumers/writers/messages.go rename consumers/writers/mongodb/{messages.go => consumer.go} (93%) rename consumers/writers/mongodb/{messages_test.go => consumer_test.go} (100%) rename consumers/writers/postgres/{messages.go => consumer.go} (98%) rename consumers/writers/postgres/{messages_test.go => consumer_test.go} (100%) delete mode 100644 consumers/writers/writer.go diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index 3df5396808..aeff5b4e50 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -134,7 +134,7 @@ func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.S return session } -func newService(session *gocql.Session, logger logger.Logger) consumers.MessageConsumer { +func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer { repo := cassandra.New(session) repo = api.LoggingMiddleware(repo, logger) repo = api.MetricsMiddleware( diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 35e15ad1bd..406c6adc51 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -14,8 +14,8 @@ import ( kitprometheus "github.com/go-kit/kit/metrics/prometheus" "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/consumers" "github.com/mainflux/mainflux/consumers/api" - "github.com/mainflux/mainflux/consumers/writers" "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/nats" @@ -88,7 +88,7 @@ func main() { repo = api.MetricsMiddleware(repo, counter, latency) st := senml.New(cfg.contentType) - if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { + if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err)) os.Exit(1) } diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index d5e61cdeda..f4e3797d02 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -138,7 +138,7 @@ func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB { return db } -func newService(db *sqlx.DB, logger logger.Logger) consumers.MessageConsumer { +func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer { svc := postgres.New(db) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( diff --git a/consumers/api/logging.go b/consumers/api/logging.go index e6fcccce5a..f873c9c02e 100644 --- a/consumers/api/logging.go +++ b/consumers/api/logging.go @@ -13,15 +13,15 @@ import ( log "github.com/mainflux/mainflux/logger" ) -var _ consumers.MessageConsumer = (*loggingMiddleware)(nil) +var _ consumers.Consumer = (*loggingMiddleware)(nil) type loggingMiddleware struct { logger log.Logger - c consumers.MessageConsumer + c consumers.Consumer } // LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(c consumers.MessageConsumer, logger log.Logger) consumers.MessageConsumer { +func LoggingMiddleware(c consumers.Consumer, logger log.Logger) consumers.Consumer { return &loggingMiddleware{ logger: logger, c: c, diff --git a/consumers/api/metrics.go b/consumers/api/metrics.go index 6140f45de3..3b55503d02 100644 --- a/consumers/api/metrics.go +++ b/consumers/api/metrics.go @@ -10,17 +10,17 @@ import ( "github.com/mainflux/mainflux/consumers" ) -var _ consumers.MessageConsumer = (*metricsMiddleware)(nil) +var _ consumers.Consumer = (*metricsMiddleware)(nil) type metricsMiddleware struct { counter metrics.Counter latency metrics.Histogram - c consumers.MessageConsumer + c consumers.Consumer } // MetricsMiddleware returns new message repository // with Save method wrapped to expose metrics. -func MetricsMiddleware(c consumers.MessageConsumer, counter metrics.Counter, latency metrics.Histogram) consumers.MessageConsumer { +func MetricsMiddleware(c consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer { return &metricsMiddleware{ counter: counter, latency: latency, diff --git a/consumers/consumer.go b/consumers/consumer.go index e440e40426..f4ea0037a9 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -3,74 +3,9 @@ package consumers -import ( - "fmt" - "io/ioutil" - - "github.com/BurntSushi/toml" - "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging" - pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" -) - -var ( - errOpenConfFile = errors.New("unable to open configuration file") - errParseConfFile = errors.New("unable to parse configuration file") - errMessageConversion = errors.New("error conversing transformed messages") -) - -// Start method starts consuming messages received from NATS. -// This method transforms messages to SenML format before -// using MessageRepository to store them. -func Start(sub messaging.Subscriber, consumer MessageConsumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { - subjects, err := loadSubjectsConfig(subjectsCfgPath) - if err != nil { - logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) - } - - for _, subject := range subjects { - if err := sub.Subscribe(subject, handle(transformer, consumer)); err != nil { - return err - } - } - return nil -} - -func handle(t transformers.Transformer, c MessageConsumer) messaging.MessageHandler { - return func(msg messaging.Message) error { - m := interface{}(msg) - var err error - if t != nil { - m, err = t.Transform(msg) - if err != nil { - return err - } - } - - return c.Consume(m) - } -} - -type filterConfig struct { - Filter []string `toml:"filter"` -} - -type subjectsConfig struct { - Subjects filterConfig `toml:"subjects"` -} - -func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) { - data, err := ioutil.ReadFile(subjectsConfigPath) - if err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err) - } - - var subjectsCfg subjectsConfig - if err := toml.Unmarshal(data, &subjectsCfg); err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err) - } - - return subjectsCfg.Subjects.Filter, nil +// Consumer specifies message writing API. +type Consumer interface { + // MessageConsumer method is used to save published message. A non-nil + // error is returned to indicate operation failure. + Consume(messages interface{}) error } diff --git a/consumers/docs.go b/consumers/docs.go index b119b8ff9b..d19b7d7c3a 100644 --- a/consumers/docs.go +++ b/consumers/docs.go @@ -1,6 +1,6 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 -// Package writers contain the domain concept definitions needed to +// Package consumers contain the domain concept definitions needed to // support Mainflux writer services functionality. package consumers diff --git a/consumers/messages.go b/consumers/messages.go index 5f6d83d93b..dc900489f8 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -3,9 +3,74 @@ package consumers -// MessageConsumer specifies message writing API. -type MessageConsumer interface { - // MessageConsumer method is used to save published message. A non-nil - // error is returned to indicate operation failure. - Consume(messages interface{}) error +import ( + "fmt" + "io/ioutil" + + "github.com/BurntSushi/toml" + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/errors" + "github.com/mainflux/mainflux/pkg/messaging" + pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/mainflux/mainflux/pkg/transformers" +) + +var ( + errOpenConfFile = errors.New("unable to open configuration file") + errParseConfFile = errors.New("unable to parse configuration file") + errMessageConversion = errors.New("error conversing transformed messages") +) + +// Start method starts consuming messages received from NATS. +// This method transforms messages to SenML format before +// using MessageRepository to store them. +func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { + subjects, err := loadSubjectsConfig(subjectsCfgPath) + if err != nil { + logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) + } + + for _, subject := range subjects { + if err := sub.Subscribe(subject, handle(transformer, consumer)); err != nil { + return err + } + } + return nil +} + +func handle(t transformers.Transformer, c Consumer) messaging.MessageHandler { + return func(msg messaging.Message) error { + m := interface{}(msg) + var err error + if t != nil { + m, err = t.Transform(msg) + if err != nil { + return err + } + } + + return c.Consume(m) + } +} + +type filterConfig struct { + Filter []string `toml:"filter"` +} + +type subjectsConfig struct { + Subjects filterConfig `toml:"subjects"` +} + +func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) { + data, err := ioutil.ReadFile(subjectsConfigPath) + if err != nil { + return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err) + } + + var subjectsCfg subjectsConfig + if err := toml.Unmarshal(data, &subjectsCfg); err != nil { + return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err) + } + + return subjectsCfg.Subjects.Filter, nil } diff --git a/consumers/notify/smtp/consumer.go b/consumers/notify/smtp/consumer.go index b5d05cf014..233f00c4ec 100644 --- a/consumers/notify/smtp/consumer.go +++ b/consumers/notify/smtp/consumer.go @@ -13,14 +13,14 @@ var ( errSaveMessage = errors.New("failed to save message to cassandra database") errNoTable = errors.New("table does not exist") ) -var _ consumers.MessageConsumer = (*emailer)(nil) +var _ consumers.Consumer = (*emailer)(nil) type emailer struct { agent *email.Agent } // New instantiates Cassandra message repository. -func New(agent *email.Agent) consumers.MessageConsumer { +func New(agent *email.Agent) consumers.Consumer { return &emailer{agent: agent} } diff --git a/consumers/writers/cassandra/messages.go b/consumers/writers/cassandra/consumer.go similarity index 95% rename from consumers/writers/cassandra/messages.go rename to consumers/writers/cassandra/consumer.go index e003c3f8af..ce935f3eb4 100644 --- a/consumers/writers/cassandra/messages.go +++ b/consumers/writers/cassandra/consumer.go @@ -18,14 +18,14 @@ var ( errSaveMessage = errors.New("failed to save message to cassandra database") errNoTable = errors.New("table does not exist") ) -var _ consumers.MessageConsumer = (*cassandraRepository)(nil) +var _ consumers.Consumer = (*cassandraRepository)(nil) type cassandraRepository struct { session *gocql.Session } // New instantiates Cassandra message repository. -func New(session *gocql.Session) consumers.MessageConsumer { +func New(session *gocql.Session) consumers.Consumer { return &cassandraRepository{session} } diff --git a/consumers/writers/cassandra/messages_test.go b/consumers/writers/cassandra/consumer_test.go similarity index 100% rename from consumers/writers/cassandra/messages_test.go rename to consumers/writers/cassandra/consumer_test.go diff --git a/consumers/writers/influxdb/messages.go b/consumers/writers/influxdb/consumer.go similarity index 94% rename from consumers/writers/influxdb/messages.go rename to consumers/writers/influxdb/consumer.go index c329a6ba7d..85298b15e6 100644 --- a/consumers/writers/influxdb/messages.go +++ b/consumers/writers/influxdb/consumer.go @@ -24,7 +24,7 @@ var ( errSaveMessage = errors.New("failed to save message to influxdb database") errMessageFormat = errors.New("invalid message format") ) -var _ consumers.MessageConsumer = (*influxRepo)(nil) +var _ consumers.Consumer = (*influxRepo)(nil) type influxRepo struct { client influxdata.Client @@ -32,7 +32,7 @@ type influxRepo struct { } // New returns new InfluxDB writer. -func New(client influxdata.Client, database string) consumers.MessageConsumer { +func New(client influxdata.Client, database string) consumers.Consumer { return &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ diff --git a/consumers/writers/influxdb/messages_test.go b/consumers/writers/influxdb/consumer_test.go similarity index 100% rename from consumers/writers/influxdb/messages_test.go rename to consumers/writers/influxdb/consumer_test.go diff --git a/consumers/writers/messages.go b/consumers/writers/messages.go deleted file mode 100644 index ed4b42fc9e..0000000000 --- a/consumers/writers/messages.go +++ /dev/null @@ -1,11 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package writers - -// MessageRepository specifies message writing API. -type MessageRepository interface { - // Save method is used to save published message. A non-nil - // error is returned to indicate operation failure. - Save(messages interface{}) error -} diff --git a/consumers/writers/mongodb/messages.go b/consumers/writers/mongodb/consumer.go similarity index 93% rename from consumers/writers/mongodb/messages.go rename to consumers/writers/mongodb/consumer.go index 1afbe4a2f3..942f0c41b7 100644 --- a/consumers/writers/mongodb/messages.go +++ b/consumers/writers/mongodb/consumer.go @@ -24,14 +24,14 @@ var ( errMessageFormat = errors.New("invalid message format") ) -var _ consumers.MessageConsumer = (*mongoRepo)(nil) +var _ consumers.Consumer = (*mongoRepo)(nil) type mongoRepo struct { db *mongo.Database } // New returns new MongoDB writer. -func New(db *mongo.Database) consumers.MessageConsumer { +func New(db *mongo.Database) consumers.Consumer { return &mongoRepo{db} } diff --git a/consumers/writers/mongodb/messages_test.go b/consumers/writers/mongodb/consumer_test.go similarity index 100% rename from consumers/writers/mongodb/messages_test.go rename to consumers/writers/mongodb/consumer_test.go diff --git a/consumers/writers/postgres/messages.go b/consumers/writers/postgres/consumer.go similarity index 98% rename from consumers/writers/postgres/messages.go rename to consumers/writers/postgres/consumer.go index 81c5f08fd0..a79c4c0757 100644 --- a/consumers/writers/postgres/messages.go +++ b/consumers/writers/postgres/consumer.go @@ -31,14 +31,14 @@ var ( errNoTable = errors.New("relation does not exist") ) -var _ consumers.MessageConsumer = (*postgresRepo)(nil) +var _ consumers.Consumer = (*postgresRepo)(nil) type postgresRepo struct { db *sqlx.DB } // New returns new PostgreSQL writer. -func New(db *sqlx.DB) consumers.MessageConsumer { +func New(db *sqlx.DB) consumers.Consumer { return &postgresRepo{db: db} } diff --git a/consumers/writers/postgres/messages_test.go b/consumers/writers/postgres/consumer_test.go similarity index 100% rename from consumers/writers/postgres/messages_test.go rename to consumers/writers/postgres/consumer_test.go diff --git a/consumers/writers/writer.go b/consumers/writers/writer.go deleted file mode 100644 index cf62b9c3e5..0000000000 --- a/consumers/writers/writer.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package writers - -import ( - "fmt" - "io/ioutil" - - "github.com/BurntSushi/toml" - "github.com/mainflux/mainflux/consumers" - "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/errors" - "github.com/mainflux/mainflux/pkg/messaging" - pubsub "github.com/mainflux/mainflux/pkg/messaging/nats" - "github.com/mainflux/mainflux/pkg/transformers" -) - -var ( - errOpenConfFile = errors.New("unable to open configuration file") - errParseConfFile = errors.New("unable to parse configuration file") - errMessageConversion = errors.New("error conversing transformed messages") -) - -type writer struct { - consumer consumers.MessageConsumer - transformer transformers.Transformer - logger logger.Logger -} - -// Start method starts consuming messages received from NATS. -// This method transforms messages to SenML format before -// using MessageRepository to store them. -func Start(sub messaging.Subscriber, consumer consumers.MessageConsumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error { - w := writer{ - consumer: consumer, - transformer: transformer, - logger: logger, - } - - subjects, err := loadSubjectsConfig(subjectsCfgPath) - if err != nil { - logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err)) - } - - for _, subject := range subjects { - if err := sub.Subscribe(subject, w.handler); err != nil { - return err - } - } - return nil -} - -func (c *writer) handler(msg messaging.Message) error { - t, err := c.transformer.Transform(msg) - if err != nil { - return err - } - - return c.consumer.Consume(t) -} - -type filterConfig struct { - Filter []string `toml:"filter"` -} - -type subjectsConfig struct { - Subjects filterConfig `toml:"subjects"` -} - -func loadSubjectsConfig(subjectsConfigPath string) ([]string, error) { - data, err := ioutil.ReadFile(subjectsConfigPath) - if err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errOpenConfFile, err) - } - - var subjectsCfg subjectsConfig - if err := toml.Unmarshal(data, &subjectsCfg); err != nil { - return []string{pubsub.SubjectAllChannels}, errors.Wrap(errParseConfFile, err) - } - - return subjectsCfg.Subjects.Filter, nil -} From 1ec96bdf6d97dbabf4e0fb432c82bfc6c8ace43f Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Tue, 5 Jan 2021 17:59:23 +0100 Subject: [PATCH 6/9] Add repo to Notify Signed-off-by: dusanb94 --- consumers/notify/notifications.go | 32 +++++++++++++++++++++++++++++++ consumers/notify/smtp/consumer.go | 2 ++ 2 files changed, 34 insertions(+) create mode 100644 consumers/notify/notifications.go diff --git a/consumers/notify/notifications.go b/consumers/notify/notifications.go new file mode 100644 index 0000000000..76c9444f9a --- /dev/null +++ b/consumers/notify/notifications.go @@ -0,0 +1,32 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package notify + +import "context" + +// Subscription represents a user Subscription. +type Subscription struct { + ID string + OwnerID string + OwnerEmail string + Topic string +} + +// SubscriptionRepository specifies a Subscription persistence API. +type SubscriptionRepository interface { + // Save persists a subscription. Successful operation is indicated by non-nil + // error response. + Save(ctx context.Context, sub Subscription) (string, error) + + // Retrieve retrieves the subscription for the given owner and topic. + Retrieve(ctx context.Context, ownerID, topic string) error + + // Remove removes the subscription having the provided identifier, that is owned + // by the specified user. + RetrieveAll(ctx context.Context, topic string) ([]Subscription, error) + + // Remove removes the subscription having the provided identifier, that is owned + // by the specified user. + Remove(ctx context.Context, ownerID, id string) error +} diff --git a/consumers/notify/smtp/consumer.go b/consumers/notify/smtp/consumer.go index 233f00c4ec..30500189d1 100644 --- a/consumers/notify/smtp/consumer.go +++ b/consumers/notify/smtp/consumer.go @@ -5,6 +5,7 @@ package smtp import ( "github.com/mainflux/mainflux/consumers" + "github.com/mainflux/mainflux/consumers/notify" "github.com/mainflux/mainflux/internal/email" "github.com/mainflux/mainflux/pkg/errors" ) @@ -17,6 +18,7 @@ var _ consumers.Consumer = (*emailer)(nil) type emailer struct { agent *email.Agent + repo notify.SubscriptionRepository } // New instantiates Cassandra message repository. From a3708facb75cfa2fa7643a4b5a4420b06ed53dc0 Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Tue, 5 Jan 2021 18:00:47 +0100 Subject: [PATCH 7/9] Remove notify Signed-off-by: dusanb94 --- consumers/docs.go | 2 +- consumers/notify/notifications.go | 32 -------------- consumers/notify/smtp/README.md | 72 ------------------------------- consumers/notify/smtp/consumer.go | 31 ------------- consumers/notify/smtp/doc.go | 6 --- 5 files changed, 1 insertion(+), 142 deletions(-) delete mode 100644 consumers/notify/notifications.go delete mode 100644 consumers/notify/smtp/README.md delete mode 100644 consumers/notify/smtp/consumer.go delete mode 100644 consumers/notify/smtp/doc.go diff --git a/consumers/docs.go b/consumers/docs.go index d19b7d7c3a..a8a532ce94 100644 --- a/consumers/docs.go +++ b/consumers/docs.go @@ -2,5 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 // Package consumers contain the domain concept definitions needed to -// support Mainflux writer services functionality. +// support Mainflux consumer services functionality. package consumers diff --git a/consumers/notify/notifications.go b/consumers/notify/notifications.go deleted file mode 100644 index 76c9444f9a..0000000000 --- a/consumers/notify/notifications.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package notify - -import "context" - -// Subscription represents a user Subscription. -type Subscription struct { - ID string - OwnerID string - OwnerEmail string - Topic string -} - -// SubscriptionRepository specifies a Subscription persistence API. -type SubscriptionRepository interface { - // Save persists a subscription. Successful operation is indicated by non-nil - // error response. - Save(ctx context.Context, sub Subscription) (string, error) - - // Retrieve retrieves the subscription for the given owner and topic. - Retrieve(ctx context.Context, ownerID, topic string) error - - // Remove removes the subscription having the provided identifier, that is owned - // by the specified user. - RetrieveAll(ctx context.Context, topic string) ([]Subscription, error) - - // Remove removes the subscription having the provided identifier, that is owned - // by the specified user. - Remove(ctx context.Context, ownerID, id string) error -} diff --git a/consumers/notify/smtp/README.md b/consumers/notify/smtp/README.md deleted file mode 100644 index df9a232ddf..0000000000 --- a/consumers/notify/smtp/README.md +++ /dev/null @@ -1,72 +0,0 @@ -# SMTP Notifications - -SMTP Notifications provides a service for sending notification emails. - -## Configuration - -The service is configured using the environment variables presented in the -following table. Note that any unset variables will be replaced with their -default values. - -| Variable | Description | Default | -| -------------------------------- | --------------------------------------------------------- | ---------------------- | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_SMTP_NOTIFICATIONS_LOG_LEVEL | Log level for Cassandra writer (debug, info, warn, error) | error | -| MF_SMTP_NOTIFICATIONS_PORT | Service HTTP port | 8180 | -| MF_SMTP_NOTIFICATIONS_CONFIG_PATH | Configuration file path with NATS subjects list | /config.toml | - -## Deployment - -```yaml - version: "3.7" - cassandra-writer: - image: mainflux/cassandra-writer:[version] - container_name: [instance name] - expose: - - [Service HTTP port] - restart: on-failure - environment: - MF_NATS_URL: [NATS instance URL] - MF_SMTP_NOTIFICATIONS_LOG_LEVEL: [Cassandra writer log level] - MF_SMTP_NOTIFICATIONS_PORT: [Service HTTP port] - MF_SMTP_NOTIFICATIONS_CONFIG_PATH: [Configuration file path with NATS subjects list] - ports: - - [host machine port]:[configured HTTP port] - volume: - - ./config.toml:/config.toml -``` - -To start the service, execute the following shell script: - -```bash -# download the latest version of the service -git clone https://github.com/mainflux/mainflux - -cd mainflux - -# compile the cassandra writer -make cassandra-writer - -# copy binary to bin -make install - -# Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] \ -MF_SMTP_NOTIFICATIONS_LOG_LEVEL=[Cassandra writer log level] \ -MF_SMTP_NOTIFICATIONS_PORT=[Service HTTP port] \ -MF_SMTP_NOTIFICATIONS_CONFIG_PATH=[Configuration file path with NATS subjects list] \ -$GOBIN/mainflux-cassandra-writer -``` - -### Using docker-compose - -This service can be deployed using docker containers. Docker compose file is -available in `/docker/addons/smtp-notifications/docker-compose.yml`. -In order to run all Mainflux core services, as well as mentioned optional ones, -execute following command: - -## Usage - -Starting service will start consuming messages. - -[doc]: http://mainflux.readthedocs.io diff --git a/consumers/notify/smtp/consumer.go b/consumers/notify/smtp/consumer.go deleted file mode 100644 index 30500189d1..0000000000 --- a/consumers/notify/smtp/consumer.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package smtp - -import ( - "github.com/mainflux/mainflux/consumers" - "github.com/mainflux/mainflux/consumers/notify" - "github.com/mainflux/mainflux/internal/email" - "github.com/mainflux/mainflux/pkg/errors" -) - -var ( - errSaveMessage = errors.New("failed to save message to cassandra database") - errNoTable = errors.New("table does not exist") -) -var _ consumers.Consumer = (*emailer)(nil) - -type emailer struct { - agent *email.Agent - repo notify.SubscriptionRepository -} - -// New instantiates Cassandra message repository. -func New(agent *email.Agent) consumers.Consumer { - return &emailer{agent: agent} -} - -func (c emailer) Consume(message interface{}) error { - return c.agent.Send(nil, "", "Password reset", "", "", "") -} diff --git a/consumers/notify/smtp/doc.go b/consumers/notify/smtp/doc.go deleted file mode 100644 index e94bc450c4..0000000000 --- a/consumers/notify/smtp/doc.go +++ /dev/null @@ -1,6 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -// Package smtp contains the domain concept definitions needed to -// support Mainflux SMTP notifications. -package smtp From 2fc215b3178343e43524b750b68d556dffb0161b Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Tue, 5 Jan 2021 18:39:08 +0100 Subject: [PATCH 8/9] Rename consumer field in middlewares Signed-off-by: dusanb94 --- consumers/api/logging.go | 12 ++++++------ consumers/api/metrics.go | 16 ++++++++-------- consumers/consumer.go | 6 +++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/consumers/api/logging.go b/consumers/api/logging.go index f873c9c02e..7c2e5eb25f 100644 --- a/consumers/api/logging.go +++ b/consumers/api/logging.go @@ -16,15 +16,15 @@ import ( var _ consumers.Consumer = (*loggingMiddleware)(nil) type loggingMiddleware struct { - logger log.Logger - c consumers.Consumer + logger log.Logger + consumer consumers.Consumer } // LoggingMiddleware adds logging facilities to the adapter. -func LoggingMiddleware(c consumers.Consumer, logger log.Logger) consumers.Consumer { +func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer { return &loggingMiddleware{ - logger: logger, - c: c, + logger: logger, + consumer: consumer, } } @@ -38,5 +38,5 @@ func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) { lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.c.Consume(msgs) + return lm.consumer.Consume(msgs) } diff --git a/consumers/api/metrics.go b/consumers/api/metrics.go index 3b55503d02..8021f1734c 100644 --- a/consumers/api/metrics.go +++ b/consumers/api/metrics.go @@ -13,18 +13,18 @@ import ( var _ consumers.Consumer = (*metricsMiddleware)(nil) type metricsMiddleware struct { - counter metrics.Counter - latency metrics.Histogram - c consumers.Consumer + counter metrics.Counter + latency metrics.Histogram + consumer consumers.Consumer } // MetricsMiddleware returns new message repository // with Save method wrapped to expose metrics. -func MetricsMiddleware(c consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer { +func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer { return &metricsMiddleware{ - counter: counter, - latency: latency, - c: c, + counter: counter, + latency: latency, + consumer: consumer, } } @@ -33,5 +33,5 @@ func (mm *metricsMiddleware) Consume(msgs interface{}) error { mm.counter.With("method", "consume").Add(1) mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.c.Consume(msgs) + return mm.consumer.Consume(msgs) } diff --git a/consumers/consumer.go b/consumers/consumer.go index f4ea0037a9..c33b5b7f3c 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -3,9 +3,9 @@ package consumers -// Consumer specifies message writing API. +// Consumer specifies message consuming API. type Consumer interface { - // MessageConsumer method is used to save published message. A non-nil - // error is returned to indicate operation failure. + // Consume method is used to consumed received messages. + // A non-nil error is returned to indicate operation failure. Consume(messages interface{}) error } From adc70d89f9628531b471815243f954d52af02a23 Mon Sep 17 00:00:00 2001 From: dusanb94 Date: Mon, 11 Jan 2021 13:36:44 +0100 Subject: [PATCH 9/9] Fix remarks and add Readme Signed-off-by: dusanb94 --- consumers/README.md | 19 +++++++++---------- consumers/consumer.go | 2 +- consumers/messages.go | 4 ++-- readers/influxdb/messages_test.go | 2 +- readers/postgres/messages_test.go | 2 +- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/consumers/README.md b/consumers/README.md index b715a11901..b89351abdb 100644 --- a/consumers/README.md +++ b/consumers/README.md @@ -1,16 +1,15 @@ -# Writers +# Consumers -Writers provide an implementation of various `message writers`. -Message writers are services that normalize (in `SenML` format) -Mainflux messages and store them in specific data store. +Consumers provide an abstraction of various `Mainflux consumers`. +Mainflux consumer is a generic service that can handle received messages - consume them. +The message is not necessarily a Mainflux message - before consuming, Mainflux message can +be transformed into any valid format that specific consumer can understand. For example, +writers are consumers that can take a SenML or JSON message and store it. -Writers are optional services and are treated as plugins. In order to -run writer services, core services must be up and running. For more info -on the platform core services with its dependencies, please check out -the [Docker Compose][compose] file. +Consumers are optional services and are treated as plugins. In order to +run consumer services, core services must be up and running. -For an in-depth explanation of the usage of `writers`, as well as thorough +For an in-depth explanation of the usage of `consumers`, as well as thorough understanding of Mainflux, please check out the [official documentation][doc]. [doc]: http://mainflux.readthedocs.io -[compose]: ../docker/docker-compose.yml diff --git a/consumers/consumer.go b/consumers/consumer.go index c33b5b7f3c..00a359f689 100644 --- a/consumers/consumer.go +++ b/consumers/consumer.go @@ -6,6 +6,6 @@ package consumers // Consumer specifies message consuming API. type Consumer interface { // Consume method is used to consumed received messages. - // A non-nil error is returned to indicate operation failure. + // A non-nil error is returned to indicate operation failure. Consume(messages interface{}) error } diff --git a/consumers/messages.go b/consumers/messages.go index dc900489f8..8791d32972 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -31,14 +31,14 @@ func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers } for _, subject := range subjects { - if err := sub.Subscribe(subject, handle(transformer, consumer)); err != nil { + if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil { return err } } return nil } -func handle(t transformers.Transformer, c Consumer) messaging.MessageHandler { +func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler { return func(msg messaging.Message) error { m := interface{}(msg) var err error diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 4a6c0e9c37..e6eb3f1e22 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -6,11 +6,11 @@ import ( "time" influxdata "github.com/influxdata/influxdb/client/v2" + iwriter "github.com/mainflux/mainflux/consumers/writers/influxdb" "github.com/mainflux/mainflux/pkg/transformers/senml" uuidProvider "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" ireader "github.com/mainflux/mainflux/readers/influxdb" - iwriter "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index dee457c648..8f775efed4 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -8,11 +8,11 @@ import ( "testing" "time" + pwriter "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/pkg/transformers/senml" uuidProvider "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" preader "github.com/mainflux/mainflux/readers/postgres" - pwriter "github.com/mainflux/mainflux/writers/postgres" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" )