Skip to content

Commit

Permalink
NOISSUE - Introduce NATS Jetstream as Default ES (#1907)
Browse files Browse the repository at this point in the history
* Refactor message broker implementation

This commit refactors the Nats message broker implementation to include pubsub options. These changes include:

- Adding `Option` func that takes in the URL and prefix
- Implement `WithStream` option which can create a different stream for nats stream
- Implement `WithExchange` option which can create a different exchaange for rabbitmq channel
- Implement `WithPrefix` option which allows to you change the publisher prefix

These changes improve the organization and readability of the codebase.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Use redis URL to configure username, password and db

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Make event store configurable on dev deployment

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix adds options to messaging `PubSub` interface

Adding options to PubSub interface allows the use of messaging
package to do es.

The changes in this commit ensure that the code handles errors
properly and provides more informative error messages when
encountering unexpected types.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Add NATS event publisher implementation

This commit adds the implementation of the NATS event publisher.

The NATS event publisher is responsible for publishing events
to a NATS messaging system. It uses the `messaging`
package to interact with the messaging system.

The implementation includes the following features:

- Publishing events to NATS using the `Publish` method.
- Marshaling events to JSON before publishing.
- Setting the message subject and headers based on the event.
- Handling errors during publishing.

This implementation is built with the `!rabbitmq` build tag,
which means it will only be compiled if the `rabbitmq` build tag
is not present.

The NATS event publisher is part of the Mainflux events package
and provides support for the Mainflux NATS events source service functionality.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Add RabbitMQ event publisher implementation

This commit adds the implementation of the RabbitMQ event publisher.

The RabbitMQ event publisher is responsible for publishing events
to a RabbitMQ messaging system. It uses the `messaging`
package to interact with the messaging system.

The implementation includes the following features:

- Publishing events to RabbitMQ using the `Publish` method.
- Marshaling events to JSON before publishing.
- Setting the message subject and headers based on the event.
- Handling errors during publishing.

This implementation is built with the `rabbitmq` build tag,
which means it will only be compiled if the `rabbitmq` build tag
is present.

The RabbitMQ event publisher is part of the Mainflux events package
and provides support for the Mainflux RabbitMQ events source service functionality.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Add configurable implementation for events store

This commit adds a new file `brokers_*.go` which contains the implementation for the different event store. The file includes functions for creating a new publisher and subscriber using different es store.

This commit also includes an `init` function that logs a message indicating that the binary was built using the respective package as the events store.

The purpose of this commit is to add support for alternative events store.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix build flags

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Refactor Makefile and Semaphore configuration

The Makefile has been refactored to include the `MF_ES_STORE_TYPE` tag in the `go build` command. Additionally, the Semaphore configuration has been updated to include a new task for compiling with Redis as the broker type.

This commit addresses the need to compile the codebase with Redis as the event store type and includes the necessary changes in the Makefile and Semaphore configuration.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Reduced due to memory on testing

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix tests for es

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix grammar

Co-authored-by: Sammy Kerata Oina <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix linting

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* feat(docker): update environment variables for message broker

The commit updates the environment variable `MF_ES_STORE_TYPE` in the `docker/.env` file. The variable is changed from `${MF_MQTT_BROKER_TYPE}` to `${MF_MESSAGE_BROKER_TYPE}` to accurately reflect the type of message broker being used. This change ensures that the correct message broker is configured for the Event Store.

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* feat: Update docker environment variables

- Removed the unused MF_ES_URL variable in the .env file
- Updated the MF_ES_STORE_TYPE and MF_ES_STORE_URL variables in the .env file to match the MF_MESSAGE_BROKER_TYPE and MF_NATS_URL variables respectively

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>

* Fix after rebase

Signed-off-by: rodneyosodo <[email protected]>

* Add godocs for option parameters for brokers

Signed-off-by: rodneyosodo <[email protected]>

* pass by value exchange and prefix names

Signed-off-by: rodneyosodo <[email protected]>

* Rename option functions

Signed-off-by: rodneyosodo <[email protected]>

* move variables to constants

Signed-off-by: rodneyosodo <[email protected]>

* fix: option example comment

Signed-off-by: rodneyosodo <[email protected]>

---------

Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: Rodney Osodo <[email protected]>
Signed-off-by: rodneyosodo <[email protected]>
Co-authored-by: Sammy Kerata Oina <[email protected]>
  • Loading branch information
rodneyosodo and SammyOina authored Oct 24, 2023
1 parent 059b954 commit 43a263d
Show file tree
Hide file tree
Showing 51 changed files with 1,915 additions and 181 deletions.
86 changes: 47 additions & 39 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ blocks:
jobs:
- name: Setup Codecov
commands:
- 'curl -Os https://uploader.codecov.io/latest/linux/codecov'
- "curl -Os https://uploader.codecov.io/latest/linux/codecov"
- chmod +x codecov
- ./codecov
- cache store codecov ./codecov
- name: Setup Golangci-lint
commands:
- 'curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3'
- "curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3"
- cache store linter ./bin/golangci-lint
secrets:
- name: codecov
Expand All @@ -50,10 +50,10 @@ blocks:

- go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_VERSION
- go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GRPC_VERSION

- export PATH=$PATH:/usr/local/bin/protoc
- export PATH=$PATH:$HOME/go/bin

- |
echo "Setting up Mainflux..."
for p in $(ls ./*.pb.go); do
Expand All @@ -70,7 +70,7 @@ blocks:
exit 1
fi
done
- |
- |
for p in $(ls pkg/messaging/*.pb.go); do
if ! cmp -s $p $p.tmp; then
echo "Proto file and generated Go file $p are out of sync!"
Expand All @@ -89,7 +89,7 @@ blocks:
commands:
- cd users
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Users
commands:
- cd users
Expand All @@ -110,7 +110,7 @@ blocks:
commands:
- make docker_users
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Things
run:
when: "change_in(['things', 'cmd/things', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -123,7 +123,7 @@ blocks:
commands:
- cd things
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Things
commands:
- cd things
Expand All @@ -144,7 +144,7 @@ blocks:
commands:
- make docker_things
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test CoAP
run:
when: "change_in(['coap', 'cmd/coap', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -157,7 +157,7 @@ blocks:
commands:
- cd coap
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test CoAP
commands:
- cd coap
Expand All @@ -178,7 +178,7 @@ blocks:
commands:
- make docker_coap
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test HTTP
run:
when: "change_in(['http', 'cmd/http', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -191,7 +191,7 @@ blocks:
commands:
- cd http
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test HTTP
commands:
- cd http
Expand All @@ -213,7 +213,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/http:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test MQTT
run:
when: "change_in(['mqtt', 'cmd/mqtt', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -226,7 +226,7 @@ blocks:
commands:
- cd mqtt
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test MQTT
commands:
- cd mqtt
Expand All @@ -248,7 +248,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/mqtt:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test WS
run:
when: "change_in(['ws', 'cmd/ws', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -261,7 +261,7 @@ blocks:
commands:
- cd ws
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test WS
commands:
- cd ws
Expand All @@ -283,7 +283,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/ws:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Bootstrap
run:
when: "change_in(['bootstrap', 'cmd/bootstrap','things/policies/postgres/policies.go', 'things/policies/api/grpc/client.go'])"
Expand All @@ -296,7 +296,7 @@ blocks:
commands:
- cd bootstrap
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Bootstrap
commands:
- cd bootstrap
Expand All @@ -318,7 +318,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/bootstrap:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Certs
run:
when: "change_in(['certs', 'cmd/certs', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -331,7 +331,7 @@ blocks:
commands:
- cd certs
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Certs
commands:
- cd certs
Expand All @@ -353,7 +353,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/certs:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Provision
dependencies:
- Setup
Expand All @@ -366,7 +366,7 @@ blocks:
commands:
- cd provision
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Provision
commands:
- cd provision
Expand All @@ -388,7 +388,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/http:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Twins
run:
when: "change_in(['twins', 'cmd/twins', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -401,7 +401,7 @@ blocks:
commands:
- cd twins
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Twins
commands:
- cd twins
Expand All @@ -423,7 +423,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/twins:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Readers
run:
when: "change_in(['/readers', 'cmd/cassandra-reader', 'cmd/influxdb-reader', 'cmd/mongodb-reader', 'cmd/postgres-reader', 'cmd/timescale-reader', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -436,7 +436,7 @@ blocks:
commands:
- cd readers
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Twins
commands:
- cd readers
Expand All @@ -462,7 +462,7 @@ blocks:
- docker push mainflux/postgres-reader:latest
- docker push mainflux/timescale-reader:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Consumers
run:
when: "change_in(['consumers', 'cmd/cassandra-writer', 'cmd/influxdb-writer', 'cmd/mongodb-writer', 'cmd/postgres-writer', 'cmd/timescale-writer', 'cmd/smpp-notifier', 'cmd/smtp-notifier', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -475,7 +475,7 @@ blocks:
commands:
- cd consumers
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Consumers
commands:
- cd consumers
Expand All @@ -502,7 +502,7 @@ blocks:
- docker push mainflux/timescale-writer:latest
- docker push mainflux/smtp-notifier:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test CLI
run:
when: "change_in(['cli', 'cmd/cli'])"
Expand All @@ -515,7 +515,7 @@ blocks:
commands:
- cd cli
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test CLI
commands:
- cd cli
Expand All @@ -537,7 +537,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/cli:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test LoRa
run:
when: "change_in(['lora', 'cmd/lora'])"
Expand All @@ -550,7 +550,7 @@ blocks:
commands:
- cd lora
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test LoRa
commands:
- cd lora
Expand All @@ -572,7 +572,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/lora:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test OPC-UA
run:
when: "change_in(['/opcua', 'cmd/opcua', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])"
Expand All @@ -585,7 +585,7 @@ blocks:
commands:
- cd opcua
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test OPC-UA
commands:
- cd opcua
Expand All @@ -607,7 +607,7 @@ blocks:
- echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin
- docker push mainflux/opcua:latest
secrets:
- name: docker_hub
- name: docker_hub
- name: Lint and Test Internal
run:
when: "change_in('/internal')"
Expand All @@ -620,7 +620,7 @@ blocks:
commands:
- cd internal
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Internal
commands:
- cd internal
Expand All @@ -640,7 +640,7 @@ blocks:
commands:
- cd logger
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Logger
commands:
- cd logger
Expand All @@ -660,7 +660,7 @@ blocks:
commands:
- cd pkg
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test PKG
commands:
- cd pkg
Expand All @@ -680,7 +680,7 @@ blocks:
commands:
- cd tools
- cache restore linter
- './bin/golangci-lint run'
- "./bin/golangci-lint run"
- name: Test Tools
commands:
- cd tools
Expand All @@ -697,3 +697,11 @@ blocks:
commands:
- MF_MESSAGE_BROKER_TYPE=rabbitmq make mqtt

- name: Compile Check For Redis
dependencies:
- Setup
task:
jobs:
- name: Compile For Redis
commands:
- MF_ES_STORE_TYPE=redis make mqtt
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ else
MF_MQTT_BROKER_TYPE=nats
endif

ifneq ($(MF_ES_STORE_TYPE),)
MF_ES_STORE_TYPE := $(MF_ES_STORE_TYPE)
else
MF_ES_STORE_TYPE=nats
endif

define compile_service
CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \
go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) -ldflags "-s -w \
go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) --tags $(MF_ES_STORE_TYPE) -ldflags "-s -w \
-X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \
-X 'github.com/mainflux/mainflux.Version=$(VERSION)' \
-X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \
Expand Down Expand Up @@ -226,7 +231,15 @@ else
endif

run: check_certs change_config
ifeq ($(MF_ES_STORE_TYPE), redis)
sed -i "s/MF_ES_STORE_TYPE=.*/MF_ES_STORE_TYPE=redis/" docker/.env
sed -i "s/MF_ES_STORE_URL=.*/MF_ES_STORE_URL=$$\{MF_REDIS_URL}/" docker/.env
docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) --profile redis -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args)
else
sed -i "s,MF_ES_STORE_TYPE=.*,MF_ES_STORE_TYPE=$$\{MF_MESSAGE_BROKER_TYPE}," docker/.env
sed -i "s,MF_ES_STORE_URL=.*,MF_ES_STORE_URL=$$\{MF_$(shell echo ${MF_MESSAGE_BROKER_TYPE} | tr 'a-z' 'A-Z')_URL\}," docker/.env
docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args)
endif

run_addons: check_certs
$(call change_config)
Expand Down
Loading

0 comments on commit 43a263d

Please sign in to comment.