Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Switch to Consumers interface #1316

Merged
merged 9 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/cassandra"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -82,7 +82,7 @@ func main() {
repo := newService(session, logger)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err))
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func connectToCassandra(dbCfg cassandra.DBConfig, logger logger.Logger) *gocql.S
return session
}

func newService(session *gocql.Session, logger logger.Logger) writers.MessageRepository {
func newService(session *gocql.Session, logger logger.Logger) consumers.Consumer {
repo := cassandra.New(session)
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
Expand Down
8 changes: 4 additions & 4 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/influxdb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/influxdb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -91,7 +91,7 @@ func main() {
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start InfluxDB writer: %s", err))
os.Exit(1)
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/mongodb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/mongodb"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/mongodb"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand Down Expand Up @@ -88,7 +88,7 @@ func main() {
repo = api.MetricsMiddleware(repo, counter, latency)
st := senml.New(cfg.contentType)

if err := writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err := consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to start MongoDB writer: %s", err))
os.Exit(1)
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/postgres-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/consumers/api"
"github.com/mainflux/mainflux/consumers/writers/postgres"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
"github.com/mainflux/mainflux/writers/api"
"github.com/mainflux/mainflux/writers/postgres"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -88,7 +88,7 @@ func main() {
repo := newService(db, logger)
st := senml.New(cfg.contentType)

if err = writers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
if err = consumers.Start(pubSub, repo, st, cfg.configPath, logger); err != nil {
logger.Error(fmt.Sprintf("Failed to create Postgres writer: %s", err))
}

Expand Down Expand Up @@ -138,7 +138,7 @@ func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
return db
}

func newService(db *sqlx.DB, logger logger.Logger) writers.MessageRepository {
func newService(db *sqlx.DB, logger logger.Logger) consumers.Consumer {
svc := postgres.New(db)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
Expand Down
15 changes: 15 additions & 0 deletions consumers/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Consumers

Consumers provide an abstraction of various `Mainflux consumers`.
Mainflux consumer is a generic service that can handle received messages - consume them.
The message is not necessarily a Mainflux message - before consuming, Mainflux message can
be transformed into any valid format that specific consumer can understand. For example,
writers are consumers that can take a SenML or JSON message and store it.

Consumers are optional services and are treated as plugins. In order to
run consumer services, core services must be up and running.

For an in-depth explanation of the usage of `consumers`, as well as thorough
understanding of Mainflux, please check out the [official documentation][doc].

[doc]: http://mainflux.readthedocs.io
42 changes: 42 additions & 0 deletions consumers/api/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// +build !test

package api

import (
"fmt"
"time"

"github.com/mainflux/mainflux/consumers"
log "github.com/mainflux/mainflux/logger"
)

var _ consumers.Consumer = (*loggingMiddleware)(nil)

type loggingMiddleware struct {
logger log.Logger
consumer consumers.Consumer
}

// LoggingMiddleware adds logging facilities to the adapter.
func LoggingMiddleware(consumer consumers.Consumer, logger log.Logger) consumers.Consumer {
return &loggingMiddleware{
logger: logger,
consumer: consumer,
}
}

func (lm *loggingMiddleware) Consume(msgs interface{}) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method consume took %s to complete", time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.consumer.Consume(msgs)
}
37 changes: 37 additions & 0 deletions consumers/api/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package api

import (
"time"

"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux/consumers"
)

var _ consumers.Consumer = (*metricsMiddleware)(nil)

type metricsMiddleware struct {
counter metrics.Counter
latency metrics.Histogram
consumer consumers.Consumer
}

// MetricsMiddleware returns new message repository
// with Save method wrapped to expose metrics.
func MetricsMiddleware(consumer consumers.Consumer, counter metrics.Counter, latency metrics.Histogram) consumers.Consumer {
return &metricsMiddleware{
counter: counter,
latency: latency,
consumer: consumer,
}
}

func (mm *metricsMiddleware) Consume(msgs interface{}) error {
defer func(begin time.Time) {
mm.counter.With("method", "consume").Add(1)
mm.latency.With("method", "consume").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.consumer.Consume(msgs)
}
File renamed without changes.
11 changes: 11 additions & 0 deletions consumers/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package consumers

// Consumer specifies message consuming API.
type Consumer interface {
// Consume method is used to consumed received messages.
// A non-nil error is returned to indicate operation failure.
Consume(messages interface{}) error
}
6 changes: 6 additions & 0 deletions consumers/docs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

// Package consumers contain the domain concept definitions needed to
// support Mainflux consumer services functionality.
package consumers
36 changes: 15 additions & 21 deletions writers/writer.go → consumers/messages.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package writers
package consumers

import (
"fmt"
Expand All @@ -21,42 +21,36 @@ var (
errMessageConversion = errors.New("error conversing transformed messages")
)

type consumer struct {
repo MessageRepository
transformer transformers.Transformer
logger logger.Logger
}

// Start method starts consuming messages received from NATS.
// This method transforms messages to SenML format before
// using MessageRepository to store them.
func Start(sub messaging.Subscriber, repo MessageRepository, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error {
c := consumer{
repo: repo,
transformer: transformer,
logger: logger,
}

func Start(sub messaging.Subscriber, consumer Consumer, transformer transformers.Transformer, subjectsCfgPath string, logger logger.Logger) error {
subjects, err := loadSubjectsConfig(subjectsCfgPath)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to load subjects: %s", err))
}

for _, subject := range subjects {
if err := sub.Subscribe(subject, c.handler); err != nil {
if err := sub.Subscribe(subject, handler(transformer, consumer)); err != nil {
return err
}
}
return nil
}

func (c *consumer) handler(msg messaging.Message) error {
t, err := c.transformer.Transform(msg)
if err != nil {
return err
}
func handler(t transformers.Transformer, c Consumer) messaging.MessageHandler {
return func(msg messaging.Message) error {
m := interface{}(msg)
var err error
if t != nil {
m, err = t.Transform(msg)
if err != nil {
return err
}
}

return c.repo.Save(t)
return c.Consume(m)
}
}

type filterConfig struct {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ import (
"fmt"

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
mfjson "github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"
)

var (
errSaveMessage = errors.New("failed to save message to cassandra database")
errNoTable = errors.New("table does not exist")
)
var _ writers.MessageRepository = (*cassandraRepository)(nil)
var _ consumers.Consumer = (*cassandraRepository)(nil)

type cassandraRepository struct {
session *gocql.Session
}

// New instantiates Cassandra message repository.
func New(session *gocql.Session) writers.MessageRepository {
func New(session *gocql.Session) consumers.Consumer {
return &cassandraRepository{session}
}

func (cr *cassandraRepository) Save(message interface{}) error {
func (cr *cassandraRepository) Consume(message interface{}) error {
switch m := message.(type) {
case mfjson.Messages:
return cr.saveJSON(m)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"testing"
"time"

"github.com/mainflux/mainflux/consumers/writers/cassandra"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers/cassandra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -68,6 +68,6 @@ func TestSave(t *testing.T) {
msgs = append(msgs, msg)
}

err = repo.Save(msgs)
err = repo.Consume(msgs)
assert.Nil(t, err, fmt.Sprintf("expected no error, got %s", err))
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"testing"

"github.com/gocql/gocql"
"github.com/mainflux/mainflux/consumers/writers/cassandra"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/writers/cassandra"
dockertest "github.com/ory/dockertest/v3"
)

Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"math"
"time"

"github.com/mainflux/mainflux/consumers"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/transformers/json"
"github.com/mainflux/mainflux/pkg/transformers/senml"
"github.com/mainflux/mainflux/writers"

influxdata "github.com/influxdata/influxdb/client/v2"
)
Expand All @@ -24,15 +24,15 @@ var (
errSaveMessage = errors.New("failed to save message to influxdb database")
errMessageFormat = errors.New("invalid message format")
)
var _ writers.MessageRepository = (*influxRepo)(nil)
var _ consumers.Consumer = (*influxRepo)(nil)

type influxRepo struct {
client influxdata.Client
cfg influxdata.BatchPointsConfig
}

// New returns new InfluxDB writer.
func New(client influxdata.Client, database string) writers.MessageRepository {
func New(client influxdata.Client, database string) consumers.Consumer {
return &influxRepo{
client: client,
cfg: influxdata.BatchPointsConfig{
Expand All @@ -41,7 +41,7 @@ func New(client influxdata.Client, database string) writers.MessageRepository {
}
}

func (repo *influxRepo) Save(message interface{}) error {
func (repo *influxRepo) Consume(message interface{}) error {
pts, err := influxdata.NewBatchPoints(repo.cfg)
if err != nil {
return errors.Wrap(errSaveMessage, err)
Expand Down
Loading