From e7df7b4f468a9a59ab296b0cdf20dd9949ed750d Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Mon, 10 Oct 2022 14:31:28 +0200 Subject: [PATCH] refactor: prefix kafka group with application name --- mqtt-integration/src/service/session.rs | 2 +- websocket-integration/src/service.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mqtt-integration/src/service/session.rs b/mqtt-integration/src/service/session.rs index eae965b5f..077877db7 100644 --- a/mqtt-integration/src/service/session.rs +++ b/mqtt-integration/src/service/session.rs @@ -190,7 +190,7 @@ impl Session { .kafka_target(KafkaEventType::Events, &self.config.kafka) .map(|target| target.into()) .map_err(|_| v5::codec::SubscribeAckReason::UnspecifiedError)?, - consumer_group: group_id.map(|s| s.to_string()), + consumer_group: group_id.map(|s| format!("{app}.{s}")), }; let event_stream = EventStream::::new(stream_config).map_err(|err| { log::info!("Failed to subscribe to Kafka topic: {}", err); diff --git a/websocket-integration/src/service.rs b/websocket-integration/src/service.rs index 3e8620f0e..462c825b4 100644 --- a/websocket-integration/src/service.rs +++ b/websocket-integration/src/service.rs @@ -147,7 +147,7 @@ impl Service { .kafka_target(KafkaEventType::Events, kafka_config) .map_err(|_| ServiceError::InternalError("This should be infallible".into()))? .into(), - consumer_group: group_id, + consumer_group: group_id.map(|group_id| format!("{application}.{group_id}")), }) .map_err(|err| { log::info!("Failed to subscribe to Kafka topic: {}", err);