diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 22779b558..a270f5f92 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -19,13 +19,13 @@ blocks: jobs: - name: Setup Codecov commands: - - 'curl -Os https://uploader.codecov.io/latest/linux/codecov' + - "curl -Os https://uploader.codecov.io/latest/linux/codecov" - chmod +x codecov - ./codecov - cache store codecov ./codecov - name: Setup Golangci-lint commands: - - 'curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3' + - "curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.53.3" - cache store linter ./bin/golangci-lint secrets: - name: codecov @@ -50,10 +50,10 @@ blocks: - go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_VERSION - go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GRPC_VERSION - + - export PATH=$PATH:/usr/local/bin/protoc - export PATH=$PATH:$HOME/go/bin - + - | echo "Setting up Mainflux..." for p in $(ls ./*.pb.go); do @@ -70,7 +70,7 @@ blocks: exit 1 fi done - - | + - | for p in $(ls pkg/messaging/*.pb.go); do if ! cmp -s $p $p.tmp; then echo "Proto file and generated Go file $p are out of sync!" @@ -89,7 +89,7 @@ blocks: commands: - cd users - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Users commands: - cd users @@ -110,7 +110,7 @@ blocks: commands: - make docker_users secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Things run: when: "change_in(['things', 'cmd/things', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -123,7 +123,7 @@ blocks: commands: - cd things - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Things commands: - cd things @@ -144,7 +144,7 @@ blocks: commands: - make docker_things secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test CoAP run: when: "change_in(['coap', 'cmd/coap', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -157,7 +157,7 @@ blocks: commands: - cd coap - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test CoAP commands: - cd coap @@ -178,7 +178,7 @@ blocks: commands: - make docker_coap secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test HTTP run: when: "change_in(['http', 'cmd/http', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -191,7 +191,7 @@ blocks: commands: - cd http - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test HTTP commands: - cd http @@ -213,7 +213,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/http:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test MQTT run: when: "change_in(['mqtt', 'cmd/mqtt', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -226,7 +226,7 @@ blocks: commands: - cd mqtt - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test MQTT commands: - cd mqtt @@ -248,7 +248,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/mqtt:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test WS run: when: "change_in(['ws', 'cmd/ws', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -261,7 +261,7 @@ blocks: commands: - cd ws - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test WS commands: - cd ws @@ -283,7 +283,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/ws:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Bootstrap run: when: "change_in(['bootstrap', 'cmd/bootstrap','things/policies/postgres/policies.go', 'things/policies/api/grpc/client.go'])" @@ -296,7 +296,7 @@ blocks: commands: - cd bootstrap - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Bootstrap commands: - cd bootstrap @@ -318,7 +318,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/bootstrap:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Certs run: when: "change_in(['certs', 'cmd/certs', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -331,7 +331,7 @@ blocks: commands: - cd certs - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Certs commands: - cd certs @@ -353,7 +353,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/certs:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Provision dependencies: - Setup @@ -366,7 +366,7 @@ blocks: commands: - cd provision - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Provision commands: - cd provision @@ -388,7 +388,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/http:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Twins run: when: "change_in(['twins', 'cmd/twins', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -401,7 +401,7 @@ blocks: commands: - cd twins - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Twins commands: - cd twins @@ -423,7 +423,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/twins:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Readers run: when: "change_in(['/readers', 'cmd/cassandra-reader', 'cmd/influxdb-reader', 'cmd/mongodb-reader', 'cmd/postgres-reader', 'cmd/timescale-reader', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -436,7 +436,7 @@ blocks: commands: - cd readers - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Twins commands: - cd readers @@ -462,7 +462,7 @@ blocks: - docker push mainflux/postgres-reader:latest - docker push mainflux/timescale-reader:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Consumers run: when: "change_in(['consumers', 'cmd/cassandra-writer', 'cmd/influxdb-writer', 'cmd/mongodb-writer', 'cmd/postgres-writer', 'cmd/timescale-writer', 'cmd/smpp-notifier', 'cmd/smtp-notifier', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -475,7 +475,7 @@ blocks: commands: - cd consumers - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Consumers commands: - cd consumers @@ -502,7 +502,7 @@ blocks: - docker push mainflux/timescale-writer:latest - docker push mainflux/smtp-notifier:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test CLI run: when: "change_in(['cli', 'cmd/cli'])" @@ -515,7 +515,7 @@ blocks: commands: - cd cli - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test CLI commands: - cd cli @@ -537,7 +537,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/cli:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test LoRa run: when: "change_in(['lora', 'cmd/lora'])" @@ -550,7 +550,7 @@ blocks: commands: - cd lora - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test LoRa commands: - cd lora @@ -572,7 +572,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/lora:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test OPC-UA run: when: "change_in(['/opcua', 'cmd/opcua', 'things/api/grpc', 'auth/service.go', 'auth/api/grpc/client.go'])" @@ -585,7 +585,7 @@ blocks: commands: - cd opcua - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test OPC-UA commands: - cd opcua @@ -607,7 +607,7 @@ blocks: - echo $DOCKER_TOKEN | docker login --username "$DOCKER_USERNAME" --password-stdin - docker push mainflux/opcua:latest secrets: - - name: docker_hub + - name: docker_hub - name: Lint and Test Internal run: when: "change_in('/internal')" @@ -620,7 +620,7 @@ blocks: commands: - cd internal - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Internal commands: - cd internal @@ -640,7 +640,7 @@ blocks: commands: - cd logger - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Logger commands: - cd logger @@ -660,7 +660,7 @@ blocks: commands: - cd pkg - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test PKG commands: - cd pkg @@ -680,7 +680,7 @@ blocks: commands: - cd tools - cache restore linter - - './bin/golangci-lint run' + - "./bin/golangci-lint run" - name: Test Tools commands: - cd tools @@ -697,3 +697,11 @@ blocks: commands: - MF_MESSAGE_BROKER_TYPE=rabbitmq make mqtt + - name: Compile Check For Redis + dependencies: + - Setup + task: + jobs: + - name: Compile For Redis + commands: + - MF_ES_STORE_TYPE=redis make mqtt diff --git a/Makefile b/Makefile index 398e08e21..eaf648fca 100644 --- a/Makefile +++ b/Makefile @@ -35,10 +35,15 @@ else MF_MQTT_BROKER_TYPE=nats endif +ifneq ($(MF_ES_STORE_TYPE),) + MF_ES_STORE_TYPE := $(MF_ES_STORE_TYPE) +else + MF_ES_STORE_TYPE=nats +endif define compile_service CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \ - go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) -ldflags "-s -w \ + go build -mod=vendor -tags $(MF_MESSAGE_BROKER_TYPE) --tags $(MF_ES_STORE_TYPE) -ldflags "-s -w \ -X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \ -X 'github.com/mainflux/mainflux.Version=$(VERSION)' \ -X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \ @@ -226,7 +231,15 @@ else endif run: check_certs change_config +ifeq ($(MF_ES_STORE_TYPE), redis) + sed -i "s/MF_ES_STORE_TYPE=.*/MF_ES_STORE_TYPE=redis/" docker/.env + sed -i "s/MF_ES_STORE_URL=.*/MF_ES_STORE_URL=$$\{MF_REDIS_URL}/" docker/.env + docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) --profile redis -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args) +else + sed -i "s,MF_ES_STORE_TYPE=.*,MF_ES_STORE_TYPE=$$\{MF_MESSAGE_BROKER_TYPE}," docker/.env + sed -i "s,MF_ES_STORE_URL=.*,MF_ES_STORE_URL=$$\{MF_$(shell echo ${MF_MESSAGE_BROKER_TYPE} | tr 'a-z' 'A-Z')_URL\}," docker/.env docker-compose -f docker/docker-compose.yml --profile $(DOCKER_PROFILE) -p $(DOCKER_PROJECT) $(DOCKER_COMPOSE_COMMAND) $(args) +endif run_addons: check_certs $(call change_config) diff --git a/bootstrap/events/producer/streams.go b/bootstrap/events/producer/streams.go index 461ebd3e7..54e88cb71 100644 --- a/bootstrap/events/producer/streams.go +++ b/bootstrap/events/producer/streams.go @@ -8,7 +8,7 @@ import ( "github.com/mainflux/mainflux/bootstrap" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" ) const streamID = "mainflux.bootstrap" @@ -23,7 +23,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around bootstrap service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc bootstrap.Service, url string) (bootstrap.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 56053b4c8..b22e8c5e8 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -28,7 +28,7 @@ import ( "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" mfsdk "github.com/mainflux/mainflux/pkg/sdk/go" "github.com/mainflux/mainflux/pkg/uuid" "go.opentelemetry.io/otel/trace" @@ -184,7 +184,7 @@ func newService(ctx context.Context, auth mainflux.AuthServiceClient, db *sqlx.D } func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger mflog.Logger) error { - subscriber, err := redis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 20b64809a..33cf00f26 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -26,7 +26,7 @@ import ( "github.com/mainflux/mainflux/lora/api" "github.com/mainflux/mainflux/lora/events" "github.com/mainflux/mainflux/lora/mqtt" - mfredis "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" @@ -195,7 +195,7 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du } func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger mflog.Logger) error { - subscriber, err := mfredis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 61489a50d..30a8f87c8 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -25,7 +25,7 @@ import ( "github.com/mainflux/mainflux/opcua/db" "github.com/mainflux/mainflux/opcua/events" "github.com/mainflux/mainflux/opcua/gopcua" - mfredis "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging/brokers" brokerstracing "github.com/mainflux/mainflux/pkg/messaging/brokers/tracing" "github.com/mainflux/mainflux/pkg/uuid" @@ -178,7 +178,7 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua. } func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger mflog.Logger) error { - subscriber, err := mfredis.NewSubscriber(cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) if err != nil { return err } diff --git a/docker/.env b/docker/.env index c9e907b85..c1100ec6e 100644 --- a/docker/.env +++ b/docker/.env @@ -52,7 +52,8 @@ MF_REDIS_TCP_PORT=6379 MF_REDIS_URL=redis://es-redis:${MF_REDIS_TCP_PORT}/0 ## Event Store -MF_ES_URL=${MF_REDIS_URL} +MF_ES_STORE_TYPE=${MF_MESSAGE_BROKER_TYPE} +MF_ES_STORE_URL=${MF_NATS_URL} ## Jaeger MF_JAEGER_PORT=6831 diff --git a/docker/addons/bootstrap/docker-compose.yml b/docker/addons/bootstrap/docker-compose.yml index 0e2c100fd..1383aa915 100644 --- a/docker/addons/bootstrap/docker-compose.yml +++ b/docker/addons/bootstrap/docker-compose.yml @@ -41,7 +41,7 @@ services: MF_BOOTSTRAP_LOG_LEVEL: ${MF_BOOTSTRAP_LOG_LEVEL} MF_BOOTSTRAP_ENCRYPT_KEY: ${MF_BOOTSTRAP_ENCRYPT_KEY} MF_BOOTSTRAP_EVENT_CONSUMER: ${MF_BOOTSTRAP_EVENT_CONSUMER} - MF_BOOTSTRAP_ES_URL: ${MF_ES_URL} + MF_BOOTSTRAP_ES_URL: ${MF_ES_STORE_URL} MF_BOOTSTRAP_HTTP_HOST: ${MF_BOOTSTRAP_HTTP_HOST} MF_BOOTSTRAP_HTTP_PORT: ${MF_BOOTSTRAP_HTTP_PORT} MF_BOOTSTRAP_HTTP_SERVER_CERT: ${MF_BOOTSTRAP_HTTP_SERVER_CERT} diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 5c18705f1..99ab3cc75 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -36,7 +36,7 @@ services: MF_LORA_ADAPTER_HTTP_SERVER_CERT: ${MF_LORA_ADAPTER_HTTP_SERVER_CERT} MF_LORA_ADAPTER_HTTP_SERVER_KEY: ${MF_LORA_ADAPTER_HTTP_SERVER_KEY} MF_LORA_ADAPTER_ROUTE_MAP_URL: ${MF_LORA_ADAPTER_ROUTE_MAP_URL} - MF_LORA_ADAPTER_ES_URL: ${MF_ES_URL} + MF_LORA_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index c0bf5a0c1..7bdfe44b9 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -36,7 +36,7 @@ services: MF_OPCUA_ADAPTER_HTTP_PORT: ${MF_OPCUA_ADAPTER_HTTP_PORT} MF_OPCUA_ADAPTER_HTTP_SERVER_CERT: ${MF_OPCUA_ADAPTER_HTTP_SERVER_CERT} MF_OPCUA_ADAPTER_HTTP_SERVER_KEY: ${MF_OPCUA_ADAPTER_HTTP_SERVER_KEY} - MF_OPCUA_ADAPTER_ES_URL: ${MF_ES_URL} + MF_OPCUA_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_OPCUA_ADAPTER_ROUTE_MAP_URL: ${MF_OPCUA_ADAPTER_ROUTE_MAP_URL} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} diff --git a/docker/addons/twins/docker-compose.yml b/docker/addons/twins/docker-compose.yml index 336dde19b..eb93683d3 100644 --- a/docker/addons/twins/docker-compose.yml +++ b/docker/addons/twins/docker-compose.yml @@ -52,7 +52,7 @@ services: MF_TWINS_HTTP_SERVER_CERT: ${MF_TWINS_HTTP_SERVER_CERT} MF_TWINS_HTTP_SERVER_KEY: ${MF_TWINS_HTTP_SERVER_KEY} MF_TWINS_CACHE_URL: ${MF_TWINS_CACHE_URL} - MF_TWINS_ES_URL: ${MF_ES_URL} + MF_TWINS_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_STANDALONE_ID: ${MF_THINGS_STANDALONE_ID} MF_THINGS_STANDALONE_TOKEN: ${MF_THINGS_STANDALONE_TOKEN} MF_TWINS_DB_HOST: ${MF_TWINS_DB_HOST} diff --git a/docker/brokers/profiles/nats_rabbitmq.yml b/docker/brokers/profiles/nats_rabbitmq.yml deleted file mode 100644 index f928ad3db..000000000 --- a/docker/brokers/profiles/nats_rabbitmq.yml +++ /dev/null @@ -1,15 +0,0 @@ -# This file is used to configure NATS broker. -# It used when running nats as an MQTT broker and RabbitMQ as a Message broker. -services: - nats: - extends: - file: ../nats.yml - service: broker - container_name: mainflux-nats - restart: on-failure - networks: - - mainflux-base-net - volumes: - - mainflux-mqtt-broker-volume:/data - profiles: - - nats_rabbitmq diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b1aae17a8..325331c61 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -12,15 +12,17 @@ volumes: mainflux-users-db-volume: mainflux-things-db-volume: mainflux-things-redis-volume: - mainflux-es-redis-volume: mainflux-mqtt-broker-volume: mainflux-broker-volume: + mainflux-es-volume: mainflux-spicedb-db-volume: mainflux-auth-db-volume: include: - path: brokers/docker-compose.yml env_file: docker/.env + - path: es/docker-compose.yml + env_file: docker/.env services: spicedb: @@ -190,7 +192,7 @@ services: MF_THINGS_AUTH_GRPC_SERVER_KEY: ${MF_THINGS_AUTH_GRPC_SERVER_KEY:+/things-grpc-server.key} MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_THINGS_AUTH_GRPC_CLIENT_CA_CERTS: ${MF_THINGS_AUTH_GRPC_CLIENT_CA_CERTS:+/things-grpc-client-ca.crt} - MF_THINGS_ES_URL: ${MF_ES_URL} + MF_THINGS_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_CACHE_URL: ${MF_THINGS_CACHE_URL} MF_THINGS_DB_HOST: ${MF_THINGS_DB_HOST} MF_THINGS_DB_PORT: ${MF_THINGS_DB_PORT} @@ -308,7 +310,7 @@ services: MF_EMAIL_FROM_ADDRESS: ${MF_EMAIL_FROM_ADDRESS} MF_EMAIL_FROM_NAME: ${MF_EMAIL_FROM_NAME} MF_EMAIL_TEMPLATE: ${MF_EMAIL_TEMPLATE} - MF_USERS_ES_URL: ${MF_ES_URL} + MF_USERS_ES_URL: ${MF_ES_STORE_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} MF_AUTH_GRPC_URL: ${MF_AUTH_GRPC_URL} @@ -373,7 +375,7 @@ services: MF_MQTT_ADAPTER_WS_TARGET_PORT: ${MF_MQTT_ADAPTER_WS_TARGET_PORT} MF_MQTT_ADAPTER_WS_TARGET_PATH: ${MF_MQTT_ADAPTER_WS_TARGET_PATH} MF_MQTT_ADAPTER_INSTANCE: ${MF_MQTT_ADAPTER_INSTANCE} - MF_MQTT_ADAPTER_ES_URL: ${MF_ES_URL} + MF_MQTT_ADAPTER_ES_URL: ${MF_ES_STORE_URL} MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} @@ -446,15 +448,6 @@ services: bind: create_host_path: true - es-redis: - image: redis:7.2.0-alpine - container_name: mainflux-es-redis - restart: on-failure - networks: - - mainflux-base-net - volumes: - - mainflux-es-redis-volume:/data - coap-adapter: image: mainflux/coap:${MF_RELEASE_TAG} container_name: mainflux-coap diff --git a/docker/es/docker-compose.yml b/docker/es/docker-compose.yml new file mode 100644 index 000000000..93065d3e5 --- /dev/null +++ b/docker/es/docker-compose.yml @@ -0,0 +1,14 @@ +volumes: + mainflux-es-redis-volume: + +services: + es-redis: + image: redis:7.2.0-alpine + container_name: mainflux-es-redis + restart: on-failure + networks: + - mainflux-base-net + volumes: + - mainflux-es-volume:/data + profiles: + - redis diff --git a/internal/groups/events/streams.go b/internal/groups/events/streams.go index 485b9008d..0881131b7 100644 --- a/internal/groups/events/streams.go +++ b/internal/groups/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/groups" ) @@ -23,7 +23,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc groups.Service, url string) (groups.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/mqtt/events/streams.go b/mqtt/events/streams.go index 0f966b63f..002137e07 100644 --- a/mqtt/events/streams.go +++ b/mqtt/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" ) const streamID = "mainflux.mqtt" @@ -26,7 +26,7 @@ type eventStore struct { // NewEventStore returns wrapper around mProxy service that sends // events to event store. func NewEventStore(ctx context.Context, url, instance string) (EventStore, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/pkg/events/events.go b/pkg/events/events.go index 174e0796a..2da708306 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -11,8 +11,8 @@ import ( const ( UnpublishedEventsCheckInterval = 1 * time.Minute ConnCheckInterval = 100 * time.Millisecond - MaxUnpublishedEvents uint64 = 1e6 - MaxEventStreamLen int64 = 1e9 + MaxUnpublishedEvents uint64 = 1e4 + MaxEventStreamLen int64 = 1e6 ) // Event represents an event. diff --git a/pkg/events/nats/doc.go b/pkg/events/nats/doc.go new file mode 100644 index 000000000..c0e7bf948 --- /dev/null +++ b/pkg/events/nats/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains the domain concept definitions needed to support +// Mainflux redis events source service functionality. +// +// It provides the abstraction of the redis stream and its operations. +package nats diff --git a/pkg/events/nats/publisher.go b/pkg/events/nats/publisher.go new file mode 100644 index 000000000..785ab7859 --- /dev/null +++ b/pkg/events/nats/publisher.go @@ -0,0 +1,90 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "context" + "encoding/json" + "time" + + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +// Max message payload size is 1MB. +var reconnectBufSize = 1024 * 1024 * int(events.MaxUnpublishedEvents) + +type pubEventStore struct { + url string + conn *nats.Conn + publisher messaging.Publisher + stream string +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects), nats.ReconnectBufSize(reconnectBufSize)) + if err != nil { + return nil, err + } + js, err := jetstream.New(conn) + if err != nil { + return nil, err + } + if _, err := js.CreateStream(ctx, jsStreamConfig); err != nil { + return nil, err + } + + publisher, err := broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js)) + if err != nil { + return nil, err + } + + es := &pubEventStore{ + url: url, + conn: conn, + publisher: publisher, + stream: stream, + } + + go es.StartPublishingRoutine(ctx) + + return es, nil +} + +func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error { + values, err := event.Encode() + if err != nil { + return err + } + values["occurred_at"] = time.Now().UnixNano() + + data, err := json.Marshal(values) + if err != nil { + return err + } + + record := &messaging.Message{ + Payload: data, + } + + return es.publisher.Publish(ctx, es.stream, record) +} + +func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { + // Nats doesn't need to check for unpublished events + // since the events are published to a buffer. + // The buffer is flushed when the connection is reestablished. + // https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer + + <-ctx.Done() +} + +func (es *pubEventStore) Close() error { + es.conn.Close() + + return es.publisher.Close() +} diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go new file mode 100644 index 000000000..7c93163a0 --- /dev/null +++ b/pkg/events/nats/publisher_test.go @@ -0,0 +1,302 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/nats" + "github.com/stretchr/testify/assert" +) + +var ( + streamTopic = "test-topic" + eventsChan = make(chan map[string]interface{}) + logger = mflog.NewMock() + errFailed = errors.New("failed") +) + +type testEvent struct { + Data map[string]interface{} +} + +func (te testEvent) Encode() (map[string]interface{}, error) { + data := make(map[string]interface{}) + for k, v := range te.Data { + switch v.(type) { + case string: + data[k] = v + case float64: + data[k] = v + default: + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + data[k] = string(b) + } + } + + return data, nil +} + +func TestPublish(t *testing.T) { + publisher, err := nats.NewPublisher(ctx, natsURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + _, err = nats.NewSubscriber(ctx, "http://invaliurl.com", stream, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err := nats.NewSubscriber(ctx, natsURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(ctx, handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + cases := []struct { + desc string + event map[string]interface{} + err error + }{ + { + desc: "publish event successfully", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": "Earth", + "status": "normal", + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish with nil event", + err: nil, + event: nil, + }, + { + desc: "publish event with invalid event location", + err: fmt.Errorf("json: unsupported type: chan int"), + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": make(chan int), + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish event with nested sting value", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": map[string]string{ + "lat": fmt.Sprintf("%f", rand.Float64()), + "lng": fmt.Sprintf("%f", rand.Float64()), + }, + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + for _, tc := range cases { + event := testEvent{Data: tc.event} + + err := publisher.Publish(ctx, event) + switch tc.err { + case nil: + assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) + + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) + assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + + default: + assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) + } + } +} + +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + _, err = nats.NewPublisher(ctx, "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := nats.NewPublisher(ctx, client.url, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + +func TestPubsub(t *testing.T) { + subcases := []struct { + desc string + stream string + consumer string + errorMessage error + handler events.EventHandler + }{ + { + desc: "Subscribe to a stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + errorMessage: nats.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + errorMessage: nats.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: "", + errorMessage: nats.ErrEmptyConsumer, + handler: handler{false}, + }, + { + desc: "Subscribe to another stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{true}, + }, + } + + for _, pc := range subcases { + subcriber, err := nats.NewSubscriber(ctx, natsURL, pc.stream, pc.consumer, logger) + if err != nil { + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + + continue + } + + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + + switch err := subcriber.Subscribe(context.TODO(), pc.handler); { + case err == nil: + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + default: + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + } +} + +type handler struct { + fail bool +} + +func (h handler) Handle(_ context.Context, event events.Event) error { + if h.fail { + return errFailed + } + data, err := event.Encode() + if err != nil { + return err + } + + eventsChan <- data + + return nil +} diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go new file mode 100644 index 000000000..6304b7b61 --- /dev/null +++ b/pkg/events/nats/setup_test.go @@ -0,0 +1,97 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats_test + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/mainflux/mainflux/pkg/events/nats" + "github.com/ory/dockertest/v3" +) + +type client struct { + url string + pool *dockertest.Pool + container *dockertest.Resource +} + +var ( + natsURL string + stream = "tests.events" + consumer = "tests-consumer" + ctx = context.Background() +) + +func TestMain(m *testing.M) { + client, err := startContainer() + if err != nil { + log.Fatalf(err.Error()) + } + natsURL = client.url + + code := m.Run() + + if err := client.pool.Purge(client.container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} + +func startContainer() (client, error) { + var cli client + var err error + cli.pool, err = dockertest.NewPool("") + if err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + cli.container, err = cli.pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "nats", + Tag: "2.9.21-alpine", + Cmd: []string{"-DVV", "-js"}, + }) + if err != nil { + return client{}, fmt.Errorf("Could not start container: %s", err) + } + + handleInterrupt(cli.pool, cli.container) + + cli.url = fmt.Sprintf("nats://%s:%s", "localhost", cli.container.GetPort("4222/tcp")) + + if err := cli.pool.Retry(func() error { + _, err = nats.NewPublisher(ctx, cli.url, stream) + return err + }); err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + if err := cli.pool.Retry(func() error { + _, err = nats.NewSubscriber(ctx, cli.url, stream, consumer, logger) + return err + }); err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + return cli, nil +} diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go new file mode 100644 index 000000000..7dfa5d429 --- /dev/null +++ b/pkg/events/nats/subscriber.go @@ -0,0 +1,145 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/nats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +const ( + maxReconnects = -1 +) + +var _ events.Subscriber = (*subEventStore)(nil) + +var ( + eventsPrefix = "events" + + jsStreamConfig = jetstream.StreamConfig{ + Name: "events", + Description: "Mainflux stream for sending and receiving messages in between Mainflux events", + Subjects: []string{"events.>"}, + Retention: jetstream.LimitsPolicy, + MaxMsgsPerSubject: 1e9, + MaxAge: time.Hour * 24, + MaxMsgSize: 1024 * 1024, + Discard: jetstream.DiscardOld, + Storage: jetstream.FileStorage, + } + + // ErrEmptyStream is returned when stream name is empty. + ErrEmptyStream = errors.New("stream name cannot be empty") + + // ErrEmptyConsumer is returned when consumer name is empty. + ErrEmptyConsumer = errors.New("consumer name cannot be empty") +) + +type subEventStore struct { + conn *nats.Conn + pubsub messaging.PubSub + stream string + consumer string + logger mflog.Logger +} + +func NewSubscriber(ctx context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + if stream == "" { + return nil, ErrEmptyStream + } + + if consumer == "" { + return nil, ErrEmptyConsumer + } + + conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects)) + if err != nil { + return nil, err + } + js, err := jetstream.New(conn) + if err != nil { + return nil, err + } + jsStream, err := js.CreateStream(ctx, jsStreamConfig) + if err != nil { + return nil, err + } + + pubsub, err := broker.NewPubSub(ctx, url, logger, broker.Stream(jsStream)) + if err != nil { + return nil, err + } + + return &subEventStore{ + conn: conn, + pubsub: pubsub, + stream: stream, + consumer: consumer, + logger: logger, + }, nil +} + +func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { + subCfg := messaging.SubscriberConfig{ + ID: es.consumer, + Topic: eventsPrefix + "." + es.stream, + Handler: &eventHandler{ + handler: handler, + ctx: ctx, + logger: es.logger, + }, + DeliveryPolicy: messaging.DeliverNewPolicy, + } + + return es.pubsub.Subscribe(ctx, subCfg) +} + +func (es *subEventStore) Close() error { + es.conn.Close() + return es.pubsub.Close() +} + +type event struct { + Data map[string]interface{} +} + +func (re event) Encode() (map[string]interface{}, error) { + return re.Data, nil +} + +type eventHandler struct { + handler events.EventHandler + ctx context.Context + logger mflog.Logger +} + +func (eh *eventHandler) Handle(msg *messaging.Message) error { + event := event{ + Data: make(map[string]interface{}), + } + + if err := json.Unmarshal(msg.GetPayload(), &event.Data); err != nil { + return err + } + + if err := eh.handler.Handle(eh.ctx, event); err != nil { + eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + } + + return nil +} + +func (eh *eventHandler) Cancel() error { + return nil +} diff --git a/pkg/events/rabbitmq/doc.go b/pkg/events/rabbitmq/doc.go new file mode 100644 index 000000000..b5eef9fbe --- /dev/null +++ b/pkg/events/rabbitmq/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +// Package redis contains the domain concept definitions needed to support +// Mainflux redis events source service functionality. +// +// It provides the abstraction of the redis stream and its operations. +package rabbitmq diff --git a/pkg/events/rabbitmq/publisher.go b/pkg/events/rabbitmq/publisher.go new file mode 100644 index 000000000..a67c177f9 --- /dev/null +++ b/pkg/events/rabbitmq/publisher.go @@ -0,0 +1,111 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" + amqp "github.com/rabbitmq/amqp091-go" +) + +type pubEventStore struct { + conn *amqp.Connection + publisher messaging.Publisher + unpublishedEvents chan amqp.Return + stream string + mu sync.Mutex +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + conn, err := amqp.Dial(url) + if err != nil { + return nil, err + } + ch, err := conn.Channel() + if err != nil { + return nil, err + } + if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { + return nil, err + } + + publisher, err := broker.NewPublisher(url, broker.Prefix(eventsPrefix), broker.Exchange(exchangeName), broker.Channel(ch)) + if err != nil { + return nil, err + } + + es := &pubEventStore{ + conn: conn, + publisher: publisher, + unpublishedEvents: make(chan amqp.Return, events.MaxUnpublishedEvents), + stream: stream, + } + + ch.NotifyReturn(es.unpublishedEvents) + + go es.StartPublishingRoutine(ctx) + + return es, nil +} + +func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error { + values, err := event.Encode() + if err != nil { + return err + } + values["occurred_at"] = time.Now().UnixNano() + + data, err := json.Marshal(values) + if err != nil { + return err + } + + record := &messaging.Message{ + Payload: data, + } + + return es.publisher.Publish(ctx, es.stream, record) +} + +func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) { + defer close(es.unpublishedEvents) + + ticker := time.NewTicker(events.UnpublishedEventsCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if ok := es.conn.IsClosed(); !ok { + es.mu.Lock() + for i := len(es.unpublishedEvents) - 1; i >= 0; i-- { + record := <-es.unpublishedEvents + msg := &messaging.Message{ + Payload: record.Body, + } + if err := es.publisher.Publish(ctx, es.stream, msg); err != nil { + es.unpublishedEvents <- record + + break + } + } + es.mu.Unlock() + } + case <-ctx.Done(): + return + } + } +} + +func (es *pubEventStore) Close() error { + es.conn.Close() + + return es.publisher.Close() +} diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go new file mode 100644 index 000000000..1ed51c5da --- /dev/null +++ b/pkg/events/rabbitmq/publisher_test.go @@ -0,0 +1,302 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/rabbitmq" + "github.com/stretchr/testify/assert" +) + +var ( + streamTopic = "test-topic" + eventsChan = make(chan map[string]interface{}) + logger = mflog.NewMock() + errFailed = errors.New("failed") +) + +type testEvent struct { + Data map[string]interface{} +} + +func (te testEvent) Encode() (map[string]interface{}, error) { + data := make(map[string]interface{}) + for k, v := range te.Data { + switch v.(type) { + case string: + data[k] = v + case float64: + data[k] = v + default: + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + data[k] = string(b) + } + } + + return data, nil +} + +func TestPublish(t *testing.T) { + publisher, err := rabbitmq.NewPublisher(ctx, rabbitmqURL, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + _, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = subcriber.Subscribe(ctx, handler{}) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) + + cases := []struct { + desc string + event map[string]interface{} + err error + }{ + { + desc: "publish event successfully", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": "Earth", + "status": "normal", + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish with nil event", + err: nil, + event: nil, + }, + { + desc: "publish event with invalid event location", + err: fmt.Errorf("json: unsupported type: chan int"), + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": make(chan int), + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + { + desc: "publish event with nested sting value", + err: nil, + event: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": "abc123", + "location": map[string]string{ + "lat": fmt.Sprintf("%f", rand.Float64()), + "lng": fmt.Sprintf("%f", rand.Float64()), + }, + "status": "normal", + "timestamp": "invalid", + "operation": "create", + "occurred_at": time.Now().UnixNano(), + }, + }, + } + + for _, tc := range cases { + event := testEvent{Data: tc.event} + + err := publisher.Publish(ctx, event) + switch tc.err { + case nil: + assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) + + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"])) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"])) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"])) + assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"])) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"])) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"])) + + default: + assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) + } + } +} + +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + _, err = rabbitmq.NewPublisher(ctx, "http://invaliurl.com", stream) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err := rabbitmq.NewPublisher(ctx, client.url, stream) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(2 * events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + +func TestPubsub(t *testing.T) { + subcases := []struct { + desc string + stream string + consumer string + errorMessage error + handler events.EventHandler + }{ + { + desc: "Subscribe to a stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + errorMessage: rabbitmq.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + errorMessage: rabbitmq.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: "", + errorMessage: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, + }, + { + desc: "Subscribe to another stream", + stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", stream, streamTopic), + consumer: consumer, + errorMessage: nil, + handler: handler{true}, + }, + } + + for _, pc := range subcases { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, pc.stream, pc.consumer, logger) + if err != nil { + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + + continue + } + + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + + switch err := subcriber.Subscribe(ctx, pc.handler); { + case err == nil: + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + default: + assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + } +} + +type handler struct { + fail bool +} + +func (h handler) Handle(_ context.Context, event events.Event) error { + if h.fail { + return errFailed + } + data, err := event.Encode() + if err != nil { + return err + } + + eventsChan <- data + + return nil +} diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go new file mode 100644 index 000000000..bd7babe47 --- /dev/null +++ b/pkg/events/rabbitmq/setup_test.go @@ -0,0 +1,93 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq_test + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "testing" + + "github.com/mainflux/mainflux/pkg/events/rabbitmq" + "github.com/ory/dockertest/v3" +) + +type client struct { + url string + pool *dockertest.Pool + container *dockertest.Resource +} + +var ( + rabbitmqURL string + stream = "tests.events" + consumer = "tests-consumer" + ctx = context.TODO() +) + +func TestMain(m *testing.M) { + client, err := startContainer() + if err != nil { + log.Fatalf(err.Error()) + } + rabbitmqURL = client.url + + code := m.Run() + + if err := client.pool.Purge(client.container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() +} + +func startContainer() (client, error) { + var cli client + var err error + cli.pool, err = dockertest.NewPool("") + if err != nil { + return client{}, fmt.Errorf("Could not connect to docker: %s", err) + } + + cli.container, err = cli.pool.Run("rabbitmq", "3.9.20", []string{}) + if err != nil { + log.Fatalf("Could not start container: %s", err) + } + + handleInterrupt(cli.pool, cli.container) + + cli.url = fmt.Sprintf("amqp://%s:%s", "localhost", cli.container.GetPort("5672/tcp")) + + if err := cli.pool.Retry(func() error { + _, err = rabbitmq.NewPublisher(ctx, cli.url, stream) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + if err := cli.pool.Retry(func() error { + _, err = rabbitmq.NewSubscriber(cli.url, stream, consumer, logger) + return err + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + return cli, nil +} diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go new file mode 100644 index 000000000..4a44c2586 --- /dev/null +++ b/pkg/events/rabbitmq/subscriber.go @@ -0,0 +1,127 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/mainflux/mainflux/pkg/messaging/rabbitmq" + amqp "github.com/rabbitmq/amqp091-go" +) + +var _ events.Subscriber = (*subEventStore)(nil) + +var ( + exchangeName = "events" + eventsPrefix = "events" + + // ErrEmptyStream is returned when stream name is empty. + ErrEmptyStream = errors.New("stream name cannot be empty") + + // ErrEmptyConsumer is returned when consumer name is empty. + ErrEmptyConsumer = errors.New("consumer name cannot be empty") +) + +type subEventStore struct { + conn *amqp.Connection + pubsub messaging.PubSub + stream string + consumer string + logger mflog.Logger +} + +func NewSubscriber(url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + if stream == "" { + return nil, ErrEmptyStream + } + + if consumer == "" { + return nil, ErrEmptyConsumer + } + + conn, err := amqp.Dial(url) + if err != nil { + return nil, err + } + ch, err := conn.Channel() + if err != nil { + return nil, err + } + if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { + return nil, err + } + + pubsub, err := broker.NewPubSub(url, logger, broker.Channel(ch), broker.Exchange(exchangeName)) + if err != nil { + return nil, err + } + + return &subEventStore{ + conn: conn, + pubsub: pubsub, + stream: stream, + consumer: consumer, + logger: logger, + }, nil +} + +func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { + subCfg := messaging.SubscriberConfig{ + ID: es.consumer, + Topic: eventsPrefix + "." + es.stream, + Handler: &eventHandler{ + handler: handler, + ctx: ctx, + logger: es.logger, + }, + DeliveryPolicy: messaging.DeliverNewPolicy, + } + + return es.pubsub.Subscribe(ctx, subCfg) +} + +func (es *subEventStore) Close() error { + es.conn.Close() + return es.pubsub.Close() +} + +type event struct { + Data map[string]interface{} +} + +func (re event) Encode() (map[string]interface{}, error) { + return re.Data, nil +} + +type eventHandler struct { + handler events.EventHandler + ctx context.Context + logger mflog.Logger +} + +func (eh *eventHandler) Handle(msg *messaging.Message) error { + event := event{ + Data: make(map[string]interface{}), + } + + if err := json.Unmarshal(msg.GetPayload(), &event.Data); err != nil { + return err + } + + if err := eh.handler.Handle(eh.ctx, event); err != nil { + eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + } + + return nil +} + +func (eh *eventHandler) Cancel() error { + return nil +} diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index 570925dc0..4dfb40f14 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis import ( @@ -50,21 +53,22 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error Values: values, } - if err := es.checkRedisConnection(ctx); err != nil { + switch err := es.checkRedisConnection(ctx); err { + case nil: + return es.client.XAdd(ctx, record).Err() + default: es.mu.Lock() defer es.mu.Unlock() - select { - case es.unpublishedEvents <- record: - default: - // If the channel is full (rarely happens), drop the events. + // If the channel is full (rarely happens), drop the events. + if len(es.unpublishedEvents) == int(events.MaxUnpublishedEvents) { return nil } + es.unpublishedEvents <- record + return nil } - - return es.client.XAdd(ctx, record).Err() } func (es *pubEventStore) startPublishingRoutine(ctx context.Context) { diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 3a698d7a4..c2cb897de 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis_test import ( @@ -26,6 +29,7 @@ var ( eventsChan = make(chan map[string]interface{}) logger = mflog.NewMock() errFailed = errors.New("failed") + ctx = context.TODO() ) type testEvent struct { @@ -53,16 +57,19 @@ func (te testEvent) Encode() (map[string]interface{}, error) { } func TestPublish(t *testing.T) { - err := redisClient.FlushAll(context.Background()).Err() + err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName) + publisher, err := redis.NewPublisher(ctx, redisURL, streamName) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + subcriber, err = redis.NewSubscriber(redisURL, streamName, consumer, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + err = subcriber.Subscribe(ctx, handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -125,7 +132,7 @@ func TestPublish(t *testing.T) { for _, tc := range cases { event := testEvent{Data: tc.event} - err := publisher.Publish(context.Background(), event) + err := publisher.Publish(ctx, event) switch tc.err { case nil: assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) @@ -152,44 +159,127 @@ func TestPublish(t *testing.T) { } } +func TestUnavailablePublish(t *testing.T) { + client, err := startContainer() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on starting container: %s", err)) + + err = client.Client.FlushAll(ctx).Err() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) + + publisher, err := redis.NewPublisher(ctx, "http://invaliurl.com", streamName) + assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) + + publisher, err = redis.NewPublisher(ctx, client.url, streamName) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + + err = client.pool.Client.PauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err)) + + spawnGoroutines(publisher, t) + + err = client.pool.Client.UnpauseContainer(client.container.Container.ID) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err)) + + // Wait for the events to be published. + time.Sleep(events.UnpublishedEventsCheckInterval) + + err = publisher.Close() + assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err)) + + err = client.pool.Purge(client.container) + assert.Nil(t, err, fmt.Sprintf("got unexpected error on purging container: %s", err)) +} + +func generateRandomEvent() testEvent { + return testEvent{ + Data: map[string]interface{}{ + "temperature": fmt.Sprintf("%f", rand.Float64()), + "humidity": fmt.Sprintf("%f", rand.Float64()), + "sensor_id": fmt.Sprintf("%d", rand.Intn(1000)), + "location": fmt.Sprintf("%f", rand.Float64()), + "status": fmt.Sprintf("%d", rand.Intn(1000)), + "timestamp": fmt.Sprintf("%d", time.Now().UnixNano()), + "operation": "create", + }, + } +} + +func spawnGoroutines(publisher events.Publisher, t *testing.T) { + for i := 0; i < 1e4; i++ { + go func() { + for i := 0; i < 10; i++ { + event := generateRandomEvent() + err := publisher.Publish(ctx, event) + assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + } + }() + } +} + func TestPubsub(t *testing.T) { - err := redisClient.FlushAll(context.Background()).Err() + err := redisClient.FlushAll(ctx).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) subcases := []struct { desc string stream string + consumer string errorMessage error handler events.EventHandler }{ { desc: "Subscribe to a stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{false}, }, { desc: "Subscribe to the same stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{false}, }, { - desc: "Subscribe to an empty stream", + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + errorMessage: redis.ErrEmptyStream, + handler: handler{false}, + }, + { + desc: "Subscribe to an empty stream with a valid consumer", stream: "", + consumer: consumer, errorMessage: redis.ErrEmptyStream, handler: handler{false}, }, + { + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: "", + errorMessage: redis.ErrEmptyConsumer, + handler: handler{false}, + }, { desc: "Subscribe to another stream", stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), + consumer: consumer, + errorMessage: nil, + handler: handler{false}, + }, + { + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("%s.%s", streamName, streamTopic), + consumer: consumer, errorMessage: nil, handler: handler{true}, }, } for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, pc.stream, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, pc.stream, pc.consumer, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) @@ -204,6 +294,9 @@ func TestPubsub(t *testing.T) { default: assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) } + + err = subcriber.Close() + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } } diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index cdc77d4b4..f733987fa 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -1,54 +1,94 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis_test import ( - "context" "fmt" "log" "os" + "os/signal" + "syscall" "testing" "github.com/go-redis/redis/v8" "github.com/ory/dockertest/v3" ) +type client struct { + *redis.Client + url string + pool *dockertest.Pool + container *dockertest.Resource +} + var ( redisClient *redis.Client redisURL string ) func TestMain(m *testing.M) { + client, err := startContainer() + if err != nil { + log.Fatalf(err.Error()) + } + redisClient = client.Client + redisURL = client.url + + code := m.Run() + + if err := client.pool.Purge(client.container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + + os.Exit(code) +} + +func startContainer() (client, error) { + var cli client pool, err := dockertest.NewPool("") if err != nil { - log.Fatalf("Could not connect to docker: %s", err) + return client{}, fmt.Errorf("Could not connect to docker: %s", err) } + cli.pool = pool - container, err := pool.Run("redis", "7.2.0-alpine", nil) + container, err := cli.pool.Run("redis", "7.2.0-alpine", nil) if err != nil { - log.Fatalf("Could not start container: %s", err) + return client{}, fmt.Errorf("Could not start container: %s", err) } + cli.container = container + + handleInterrupt(cli.pool, cli.container) - redisURL = fmt.Sprintf("redis://localhost:%s/0", container.GetPort("6379/tcp")) - opts, err := redis.ParseURL(redisURL) + cli.url = fmt.Sprintf("redis://localhost:%s/0", cli.container.GetPort("6379/tcp")) + opts, err := redis.ParseURL(cli.url) if err != nil { - log.Fatalf("Could not parse redis URL: %s", err) + return client{}, fmt.Errorf("Could not parse redis URL: %s", err) } if err := pool.Retry(func() error { - redisClient = redis.NewClient(opts) + cli.Client = redis.NewClient(opts) - return redisClient.Ping(context.Background()).Err() + return cli.Client.Ping(ctx).Err() }); err != nil { - log.Fatalf("Could not connect to docker: %s", err) + return client{}, fmt.Errorf("Could not connect to docker: %s", err) } - code := m.Run() + return cli, nil +} - if err := pool.Purge(container); err != nil { - log.Fatalf("Could not purge container: %s", err) - } +func handleInterrupt(pool *dockertest.Pool, container *dockertest.Resource) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) - os.Exit(code) + go func() { + <-c + if err := pool.Purge(container); err != nil { + log.Fatalf("Could not purge container: %s", err) + } + os.Exit(0) + }() } diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index e5c4d1980..f47676118 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -1,6 +1,9 @@ // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + package redis import ( diff --git a/pkg/events/store/brokers_nats.go b/pkg/events/store/brokers_nats.go new file mode 100644 index 000000000..cde4a8bf8 --- /dev/null +++ b/pkg/events/store/brokers_nats.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build nats +// +build nats + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/nats" +) + +func init() { + log.Println("The binary was build using nats as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := nats.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(ctx context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := nats.NewSubscriber(ctx, url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/events/store/brokers_rabbitmq.go b/pkg/events/store/brokers_rabbitmq.go new file mode 100644 index 000000000..c190543bd --- /dev/null +++ b/pkg/events/store/brokers_rabbitmq.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build rabbitmq +// +build rabbitmq + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/rabbitmq" +) + +func init() { + log.Println("The binary was build using rabbitmq as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := rabbitmq.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(_ context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := rabbitmq.NewSubscriber(url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/events/store/brokers_redis.go b/pkg/events/store/brokers_redis.go new file mode 100644 index 000000000..8a824384b --- /dev/null +++ b/pkg/events/store/brokers_redis.go @@ -0,0 +1,38 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +//go:build !nats && !rabbitmq +// +build !nats,!rabbitmq + +package store + +import ( + "context" + "log" + + mflog "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/pkg/events" + "github.com/mainflux/mainflux/pkg/events/redis" +) + +func init() { + log.Println("The binary was build using redis as the events store") +} + +func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, error) { + pb, err := redis.NewPublisher(ctx, url, stream) + if err != nil { + return nil, err + } + + return pb, nil +} + +func NewSubscriber(_ context.Context, url, stream, consumer string, logger mflog.Logger) (events.Subscriber, error) { + pb, err := redis.NewSubscriber(url, stream, consumer, logger) + if err != nil { + return nil, err + } + + return pb, nil +} diff --git a/pkg/messaging/brokers/brokers_nats.go b/pkg/messaging/brokers/brokers_nats.go index b8a15696d..8a787a1fb 100644 --- a/pkg/messaging/brokers/brokers_nats.go +++ b/pkg/messaging/brokers/brokers_nats.go @@ -22,8 +22,8 @@ func init() { log.Println("The binary was build using Nats as the message broker") } -func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { - pb, err := nats.NewPublisher(ctx, url) +func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + pb, err := nats.NewPublisher(ctx, url, opts...) if err != nil { return nil, err } @@ -31,8 +31,8 @@ func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) return pb, nil } -func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { - pb, err := nats.NewPubSub(ctx, url, logger) +func NewPubSub(ctx context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { + pb, err := nats.NewPubSub(ctx, url, logger, opts...) if err != nil { return nil, err } diff --git a/pkg/messaging/brokers/brokers_rabbitmq.go b/pkg/messaging/brokers/brokers_rabbitmq.go index 536279e6d..215ae3746 100644 --- a/pkg/messaging/brokers/brokers_rabbitmq.go +++ b/pkg/messaging/brokers/brokers_rabbitmq.go @@ -1,9 +1,9 @@ -//go:build rabbitmq -// +build rabbitmq - // Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 +//go:build rabbitmq +// +build rabbitmq + package brokers import ( @@ -22,8 +22,8 @@ func init() { log.Println("The binary was build using RabbitMQ as the message broker") } -func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { - pb, err := rabbitmq.NewPublisher(url) +func NewPublisher(_ context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + pb, err := rabbitmq.NewPublisher(url, opts...) if err != nil { return nil, err } @@ -31,8 +31,8 @@ func NewPublisher(_ context.Context, url string) (messaging.Publisher, error) { return pb, nil } -func NewPubSub(_ context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { - pb, err := rabbitmq.NewPubSub(url, logger) +func NewPubSub(_ context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { + pb, err := rabbitmq.NewPubSub(url, logger, opts...) if err != nil { return nil, err } diff --git a/pkg/messaging/nats/options.go b/pkg/messaging/nats/options.go new file mode 100644 index 000000000..a24dbee72 --- /dev/null +++ b/pkg/messaging/nats/options.go @@ -0,0 +1,56 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package nats + +import ( + "errors" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/nats-io/nats.go/jetstream" +) + +// ErrInvalidType is returned when the provided value is not of the expected type. +var ErrInvalidType = errors.New("invalid type") + +// Prefix sets the prefix for the publisher. +func Prefix(prefix string) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.prefix = prefix + + return nil + } +} + +// JSStream sets the JetStream for the publisher. +func JSStream(stream jetstream.JetStream) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.js = stream + + return nil + } +} + +// Stream sets the Stream for the subscriber. +func Stream(stream jetstream.Stream) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*pubsub) + if !ok { + return ErrInvalidType + } + + p.stream = stream + + return nil + } +} diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index fdaf62eed..ca5c522fb 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -7,30 +7,35 @@ import ( "context" "fmt" + "github.com/mainflux/mainflux/pkg/events" "github.com/mainflux/mainflux/pkg/messaging" broker "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "google.golang.org/protobuf/proto" ) -// A maximum number of reconnect attempts before NATS connection closes permanently. -// Value -1 represents an unlimited number of reconnect retries, i.e. the client -// will never give up on retrying to re-establish connection to NATS server. -const maxReconnects = -1 +const ( + // A maximum number of reconnect attempts before NATS connection closes permanently. + // Value -1 represents an unlimited number of reconnect retries, i.e. the client + // will never give up on retrying to re-establish connection to NATS server. + maxReconnects = -1 + + // reconnectBufSize is obtained from the maximum number of unpublished events + // multiplied by the approximate maximum size of a single event. + reconnectBufSize = events.MaxUnpublishedEvents * (1024 * 1024) +) var _ messaging.Publisher = (*publisher)(nil) type publisher struct { - js jetstream.JetStream - conn *broker.Conn + js jetstream.JetStream + conn *broker.Conn + prefix string } -// Publisher wraps messaging Publisher exposing -// Close() method for NATS connection. - // NewPublisher returns NATS message Publisher. -func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) { - conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) +func NewPublisher(ctx context.Context, url string, opts ...messaging.Option) (messaging.Publisher, error) { + conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects), broker.ReconnectBufSize(int(reconnectBufSize))) if err != nil { return nil, err } @@ -41,9 +46,17 @@ func NewPublisher(ctx context.Context, url string) (messaging.Publisher, error) if _, err := js.CreateStream(ctx, jsStreamConfig); err != nil { return nil, err } + ret := &publisher{ - js: js, - conn: conn, + js: js, + conn: conn, + prefix: chansPrefix, + } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } } return ret, nil @@ -59,7 +72,7 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. return err } - subject := fmt.Sprintf("%s.%s", chansPrefix, topic) + subject := fmt.Sprintf("%s.%s", pub.prefix, topic) if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index f59e2f7cb..a60557399 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -53,7 +53,7 @@ type pubsub struct { // from ordinary subscribe. For more information, please take a look // here: https://docs.nats.io/developing-with-nats/receiving/queues. // If the queue is empty, Subscribe will be used. -func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging.PubSub, error) { +func NewPubSub(ctx context.Context, url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { conn, err := broker.Connect(url, broker.MaxReconnects(maxReconnects)) if err != nil { return nil, err @@ -69,13 +69,20 @@ func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging. ret := &pubsub{ publisher: publisher{ - js: js, - conn: conn, + js: js, + conn: conn, + prefix: chansPrefix, }, stream: stream, logger: logger, } + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } + } + return ret, nil } diff --git a/pkg/messaging/nats/setup_test.go b/pkg/messaging/nats/setup_test.go index 7c3e6d60f..1b1be179a 100644 --- a/pkg/messaging/nats/setup_test.go +++ b/pkg/messaging/nats/setup_test.go @@ -39,7 +39,7 @@ func TestMain(m *testing.M) { } handleInterrupt(pool, container) - address := fmt.Sprintf("%s:%s", "localhost", container.GetPort("4222/tcp")) + address := fmt.Sprintf("nats://%s:%s", "localhost", container.GetPort("4222/tcp")) if err := pool.Retry(func() error { publisher, err = nats.NewPublisher(context.Background(), address) return err diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 7ed5c93e8..ed845b545 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -59,3 +59,22 @@ type PubSub interface { Publisher Subscriber } + +// Option represents optional configuration for message broker. +// +// This is used to provide optional configuration parameters to the +// underlying publisher and pubsub implementation so that it can be +// configured to meet the specific needs. +// +// For example, it can be used to set the message prefix so that +// brokers can be used for event sourcing as well as internal message broker. +// Using value of type interface is not recommended but is the most suitable +// for this use case as options should be compiled with respect to the +// underlying broker which can either be RabbitMQ or NATS. +// +// The example below shows how to set the prefix and jetstream stream for NATS. +// +// Example: +// +// broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js)) +type Option func(vals interface{}) error diff --git a/pkg/messaging/rabbitmq/options.go b/pkg/messaging/rabbitmq/options.go new file mode 100644 index 000000000..3d206535e --- /dev/null +++ b/pkg/messaging/rabbitmq/options.go @@ -0,0 +1,60 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package rabbitmq + +import ( + "errors" + + "github.com/mainflux/mainflux/pkg/messaging" + amqp "github.com/rabbitmq/amqp091-go" +) + +// ErrInvalidType is returned when the provided value is not of the expected type. +var ErrInvalidType = errors.New("invalid type") + +// Prefix sets the prefix for the publisher. +func Prefix(prefix string) messaging.Option { + return func(val interface{}) error { + p, ok := val.(*publisher) + if !ok { + return ErrInvalidType + } + + p.prefix = prefix + + return nil + } +} + +// Channel sets the channel for the publisher or subscriber. +func Channel(channel *amqp.Channel) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.channel = channel + case *pubsub: + v.channel = channel + default: + return ErrInvalidType + } + + return nil + } +} + +// Exchange sets the exchange for the publisher or subscriber. +func Exchange(exchange string) messaging.Option { + return func(val interface{}) error { + switch v := val.(type) { + case *publisher: + v.exchange = exchange + case *pubsub: + v.exchange = exchange + default: + return ErrInvalidType + } + + return nil + } +} diff --git a/pkg/messaging/rabbitmq/publisher.go b/pkg/messaging/rabbitmq/publisher.go index db4384911..bb9ca1ed1 100644 --- a/pkg/messaging/rabbitmq/publisher.go +++ b/pkg/messaging/rabbitmq/publisher.go @@ -16,17 +16,18 @@ import ( var _ messaging.Publisher = (*publisher)(nil) type publisher struct { - conn *amqp.Connection - ch *amqp.Channel + conn *amqp.Connection + channel *amqp.Channel + prefix string + exchange string } // NewPublisher returns RabbitMQ message Publisher. -func NewPublisher(url string) (messaging.Publisher, error) { +func NewPublisher(url string, opts ...messaging.Option) (messaging.Publisher, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err } - ch, err := conn.Channel() if err != nil { return nil, err @@ -34,10 +35,20 @@ func NewPublisher(url string) (messaging.Publisher, error) { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { return nil, err } + ret := &publisher{ - conn: conn, - ch: ch, + conn: conn, + channel: ch, + prefix: chansPrefix, + exchange: exchangeName, + } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } } + return ret, nil } @@ -49,15 +60,16 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. if err != nil { return err } - subject := fmt.Sprintf("%s.%s", chansPrefix, topic) + + subject := fmt.Sprintf("%s.%s", pub.prefix, topic) if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) } subject = formatTopic(subject) - err = pub.ch.PublishWithContext( + err = pub.channel.PublishWithContext( ctx, - exchangeName, + pub.exchange, subject, false, false, @@ -76,9 +88,6 @@ func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging. } func (pub *publisher) Close() error { - if err := pub.ch.Close(); err != nil { - return err - } return pub.conn.Close() } diff --git a/pkg/messaging/rabbitmq/pubsub.go b/pkg/messaging/rabbitmq/pubsub.go index 2327344b0..e50cb6e97 100644 --- a/pkg/messaging/rabbitmq/pubsub.go +++ b/pkg/messaging/rabbitmq/pubsub.go @@ -16,10 +16,11 @@ import ( ) const ( - chansPrefix = "channels" // SubjectAllChannels represents subject to subscribe for all the channels. SubjectAllChannels = "channels.#" - exchangeName = "mainflux" + + exchangeName = "messages" + chansPrefix = "channels" ) var ( @@ -45,7 +46,7 @@ type pubsub struct { } // NewPubSub returns RabbitMQ message publisher/subscriber. -func NewPubSub(url string, logger mflog.Logger) (messaging.PubSub, error) { +func NewPubSub(url string, logger mflog.Logger, opts ...messaging.Option) (messaging.PubSub, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err @@ -57,14 +58,24 @@ func NewPubSub(url string, logger mflog.Logger) (messaging.PubSub, error) { if err := ch.ExchangeDeclare(exchangeName, amqp.ExchangeTopic, true, false, false, false, nil); err != nil { return nil, err } + ret := &pubsub{ publisher: publisher{ - conn: conn, - ch: ch, + conn: conn, + channel: ch, + exchange: exchangeName, + prefix: chansPrefix, }, logger: logger, subscriptions: make(map[string]map[string]subscription), } + + for _, opt := range opts { + if err := opt(ret); err != nil { + return nil, err + } + } + return ret, nil } @@ -102,23 +113,23 @@ func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) clientID := fmt.Sprintf("%s-%s", cfg.Topic, cfg.ID) - queue, err := ps.ch.QueueDeclare(clientID, true, false, false, false, nil) + queue, err := ps.channel.QueueDeclare(clientID, true, false, false, false, nil) if err != nil { return err } - if err := ps.ch.QueueBind(queue.Name, cfg.Topic, exchangeName, false, nil); err != nil { + if err := ps.channel.QueueBind(queue.Name, cfg.Topic, ps.exchange, false, nil); err != nil { return err } - msgs, err := ps.ch.Consume(queue.Name, clientID, true, false, false, false, nil) + msgs, err := ps.channel.Consume(queue.Name, clientID, true, false, false, false, nil) if err != nil { return err } go ps.handle(msgs, cfg.Handler) s[cfg.ID] = subscription{ cancel: func() error { - if err := ps.ch.Cancel(clientID, false); err != nil { + if err := ps.channel.Cancel(clientID, false); err != nil { return err } return cfg.Handler.Cancel() @@ -154,7 +165,7 @@ func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { return err } } - if err := ps.ch.QueueUnbind(topic, topic, exchangeName, nil); err != nil { + if err := ps.channel.QueueUnbind(topic, topic, exchangeName, nil); err != nil { return err } diff --git a/pkg/messaging/rabbitmq/pubsub_test.go b/pkg/messaging/rabbitmq/pubsub_test.go index 202d0baa9..ea960900b 100644 --- a/pkg/messaging/rabbitmq/pubsub_test.go +++ b/pkg/messaging/rabbitmq/pubsub_test.go @@ -22,7 +22,7 @@ const ( channel = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" subtopic = "engine" clientID = "9b7b1b3f-b1b0-46a8-a717-b8213f9eda3b" - exchangeName = "mainflux" + exchangeName = "messages" ) var ( diff --git a/pkg/messaging/rabbitmq/setup_test.go b/pkg/messaging/rabbitmq/setup_test.go index 7f059996a..d9277b01f 100644 --- a/pkg/messaging/rabbitmq/setup_test.go +++ b/pkg/messaging/rabbitmq/setup_test.go @@ -65,6 +65,7 @@ func TestMain(m *testing.M) { } code := m.Run() + if err := pool.Purge(container); err != nil { log.Fatalf("Could not purge container: %s", err) } diff --git a/scripts/ci.sh b/scripts/ci.sh index f5dc9d1f7..35c0a470f 100755 --- a/scripts/ci.sh +++ b/scripts/ci.sh @@ -67,6 +67,8 @@ setup_mf() { done echo "Compile check for rabbitmq..." MF_MESSAGE_BROKER_TYPE=rabbitmq make http + echo "Compile check for redis..." + MF_ES_STORE_TYPE=redis make http make -j$NPROC } diff --git a/things/events/streams.go b/things/events/streams.go index 5ef9f842f..5a1eed5ff 100644 --- a/things/events/streams.go +++ b/things/events/streams.go @@ -9,7 +9,7 @@ import ( "github.com/mainflux/mainflux" mfclients "github.com/mainflux/mainflux/pkg/clients" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/things" ) @@ -25,7 +25,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc things.Service, url string) (things.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/twins/events/setup_test.go b/twins/events/setup_test.go index dd4e4bbf7..b1a5765a2 100644 --- a/twins/events/setup_test.go +++ b/twins/events/setup_test.go @@ -14,7 +14,10 @@ import ( "github.com/ory/dockertest/v3" ) -var redisClient *redis.Client +var ( + redisClient *redis.Client + redisURL string +) func TestMain(m *testing.M) { pool, err := dockertest.NewPool("") @@ -27,12 +30,14 @@ func TestMain(m *testing.M) { log.Fatalf("Could not start container: %s", err) } + redisURL = fmt.Sprintf("redis://localhost:%s/0", container.GetPort("6379/tcp")) + opts, err := redis.ParseURL(redisURL) + if err != nil { + log.Fatalf("Could not parse redis URL: %s", err) + } + if err := pool.Retry(func() error { - redisClient = redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("localhost:%s", container.GetPort("6379/tcp")), - Password: "", - DB: 0, - }) + redisClient = redis.NewClient(opts) return redisClient.Ping(context.Background()).Err() }); err != nil { diff --git a/twins/events/streams.go b/twins/events/streams.go index 72a6704c3..3466c7170 100644 --- a/twins/events/streams.go +++ b/twins/events/streams.go @@ -7,7 +7,7 @@ import ( "context" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/twins" ) @@ -24,7 +24,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around things service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc twins.Service, url string) (twins.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err } diff --git a/users/events/streams.go b/users/events/streams.go index 5e2942bfe..c146124ef 100644 --- a/users/events/streams.go +++ b/users/events/streams.go @@ -9,7 +9,7 @@ import ( "github.com/mainflux/mainflux" mfclients "github.com/mainflux/mainflux/pkg/clients" "github.com/mainflux/mainflux/pkg/events" - "github.com/mainflux/mainflux/pkg/events/redis" + "github.com/mainflux/mainflux/pkg/events/store" "github.com/mainflux/mainflux/users" ) @@ -25,7 +25,7 @@ type eventStore struct { // NewEventStoreMiddleware returns wrapper around users service that sends // events to event store. func NewEventStoreMiddleware(ctx context.Context, svc users.Service, url string) (users.Service, error) { - publisher, err := redis.NewPublisher(ctx, url, streamID) + publisher, err := store.NewPublisher(ctx, url, streamID) if err != nil { return nil, err }