Skip to content

Commit

Permalink
NOISSUE - Switch to Consumers interface (#1316)
Browse files Browse the repository at this point in the history
* Replace Writer with Consumer

Signed-off-by: dusanb94 <[email protected]>

* Add Notifications package

Signed-off-by: dusanb94 <[email protected]>

* Update Consumer Start

Signed-off-by: dusanb94 <[email protected]>

* Fix Readers

Signed-off-by: dusanb94 <[email protected]>

* Fix Consumer naming

Signed-off-by: dusanb94 <[email protected]>

* Add repo to Notify

Signed-off-by: dusanb94 <[email protected]>

* Remove notify

Signed-off-by: dusanb94 <[email protected]>

* Rename consumer field in middlewares

Signed-off-by: dusanb94 <[email protected]>

* Fix remarks and add Readme

Signed-off-by: dusanb94 <[email protected]>
  • Loading branch information
dborovcanin authored Jan 11, 2021
1 parent 973ca17 commit 6b7dc54
Show file tree
Hide file tree
Showing 45 changed files with 187 additions and 166 deletions.
10 changes: 5 additions & 5 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/cassandra"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -82,7 +82,7 @@ func main() {
repo := newService(session, logger)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.S
return session
}

func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository {
func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer {
repo := cassandra.New(session)
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
Expand Down
8 changes: 4 additions & 4 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/influxdb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/influxdb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -91,7 +91,7 @@ func main() {
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/mongodb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -88,7 +88,7 @@ func main() {
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/postgres-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/postgres"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -88,7 +88,7 @@ func main() {
repo := newService(db, logger)
st := senml.New(cfg.contentType)

if err = writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err = consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
return db
}

func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository {
func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer {
svc := postgres.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down
15 changes: 15 additions & 0 deletions consumers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Consumers

Consumers provide an abstraction of various `Mainflux consumers`.
Mainflux consumer is a generic service that can handle received messages - consume them.
The message is not necessarily a Mainflux message - before consuming, Mainflux message can
be transformed into any valid format that specific consumer can understand. For example,
writers are consumers that can take a SenML or JSON message and store it.

Consumers are optional services and are treated as plugins. In order to
run consumer services, core services must be up and running.

For an in-depth explanation of the usage of `consumers`, as well as thorough
understanding of Mainflux, please check out the [official documentation][doc].

[doc]: http://mainflux.readthedocs.io
42 changes: 42 additions & 0 deletions consumers/api/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// +build !test

package api

import (
"fmt"
"time"

"github.com/mainflux/mainflux/consumers"
log "github.com/mainflux/mainflux/logger"
)

var _ consumers.Consumer = (*loggingMiddleware)(nil)

type loggingMiddleware struct {
logger log.Logger
consumer consumers.Consumer
}

// LoggingMiddleware adds logging facilities to the adapter.
func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer {
return &loggingMiddleware{
logger: logger,
consumer: consumer,
}
}

func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.consumer.Consume(msgs)
}
37 changes: 37 additions & 0 deletions consumers/api/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package api

import (
"time"

"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux/consumers"
)

var _ consumers.Consumer = (*metricsMiddleware)(nil)

type metricsMiddleware struct {
counter metrics.Counter
latency metrics.Histogram
consumer consumers.Consumer
}

// MetricsMiddleware returns new message repository
// with Save method wrapped to expose metrics.
func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer {
return &metricsMiddleware{
counter: counter,
latency: latency,
consumer: consumer,
}
}

func (mm *metricsMiddleware) Consume(msgs interface{}) error {
defer func(begin time.Time) {
mm.counter.With("method", "consume").Add(1)
mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.consumer.Consume(msgs)
}
File renamed without changes.
11 changes: 11 additions & 0 deletions consumers/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package consumers

// Consumer specifies message consuming API.
type Consumer interface {
// Consume method is used to consumed received messages.
// A non-nil error is returned to indicate operation failure.
Consume(messages interface{}) error
}
6 changes: 6 additions & 0 deletions consumers/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// Package consumers contain the domain concept definitions needed to
// support Mainflux consumer services functionality.
package consumers
36 changes: 15 additions & 21 deletions writers/writer.go → consumers/messages.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package writers
package consumers

import (
"fmt"
Expand All @@ -21,42 +21,36 @@ var (
errMessageConversion = errors.New("error conversing transformed messages")
)

type consumer struct {
repo MessageRepository
transformer transformers.Transformer
logger logger.Logger
}

// Start method starts consuming messages received from NATS.
// This method transforms messages to SenML format before
// using MessageRepository to store them.
func Start(sub messaging.Subscriber, repo MessageRepository, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error {
c := consumer{
repo: repo,
transformer: transformer,
logger: logger,
}

func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error {
subjects, err := loadSubjectsConfig(subjectsCfgPath)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err))
}

for _, subject := range subjects {
if err := sub.Subscribe(subject, c.handler); err != nil {
if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil {
return err
}
}
return nil
}

func (c *consumer) handler(msg messaging.Message) error {
t, err := c.transformer.Transform(msg)
if err != nil {
return err
}
func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler {
return func(msg messaging.Message) error {
m := interface{}(msg)
var err error
if t != nil {
m, err = t.Transform(msg)
if err != nil {
return err
}
}

return c.repo.Save(t)
return c.Consume(m)
}
}

type filterConfig struct {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ import (
"fmt"

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
mfjson "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
)

var (
errSaveMessage = errors.New("failed to save message to cassandra database")
errNoTable = errors.New("table does not exist")
)
var _ writers.MessageRepository = (*cassandraRepository)(nil)
var _ consumers.Consumer = (*cassandraRepository)(nil)

type cassandraRepository struct {
session *gocql.Session
}

// New instantiates Cassandra message repository.
func New(session *gocql.Session) writers.MessageRepository {
func New(session *gocql.Session) consumers.Consumer {
return &cassandraRepository{session}
}

func (cr *cassandraRepository) Save(message interface{}) error {
func (cr *cassandraRepository) Consume(message interface{}) error {
switch m := message.(type) {
case mfjson.Messages:
return cr.saveJSON(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"testing"
"time"

"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers/cassandra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -68,6 +68,6 @@ func TestSave(t *testing.T) {
msgs = append(msgs, msg)
}

err = repo.Save(msgs)
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err))
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/writers/cassandra"
dockertest "github.com/ory/dockertest/v3"
)

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"math"
"time"

"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"

influxdata "github.com/influxdata/influxdb/client/v2"
)
Expand All @@ -24,15 +24,15 @@ var (
errSaveMessage = errors.New("failed to save message to influxdb database")
errMessageFormat = errors.New("invalid message format")
)
var _ writers.MessageRepository = (*influxRepo)(nil)
var _ consumers.Consumer = (*influxRepo)(nil)

type influxRepo struct {
client influxdata.Client
cfg influxdata.BatchPointsConfig
}

// New returns new InfluxDB writer.
func New(client influxdata.Client, database string) writers.MessageRepository {
func New(client influxdata.Client, database string) consumers.Consumer {
return &influxRepo{
client: client,
cfg: influxdata.BatchPointsConfig{
Expand All @@ -41,7 +41,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository {
}
}

func (repo *influxRepo) Save(message interface{}) error {
func (repo *influxRepo) Consume(message interface{}) error {
pts, err := influxdata.NewBatchPoints(repo.cfg)
if err != nil {
return errors.Wrap(errSaveMessage, err)
Expand Down
Loading

0 comments on commit 6b7dc54

Please sign in to comment.