From 836f5ed86a974d5e93b257b0d2da1c0a13f25930 Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Fri, 11 Oct 2024 21:44:18 +0200 Subject: [PATCH] Implement joining and leaving consumer group --- README.md | 1 - .../blocking/tcp/ConsumerGroupsTcpClient.java | 12 +++++++-- .../tcp/ConsumerGroupsTcpClientTest.java | 27 +++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c003095..43ca8ef 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,6 @@ Official Java client SDK for [Iggy.rs](https://iggy.rs) message streaming. ## To do - Transport protocols - - TCP - QUIC - Publish maven package - Add examples diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java index dbb79d3..888d6a4 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java @@ -77,12 +77,20 @@ public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId g @Override public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - throw new UnsupportedOperationException(); + var payload = toBytes(streamId); + payload.writeBytes(toBytes(topicId)); + payload.writeBytes(toBytes(groupId)); + + connection.send(JOIN_CONSUMER_GROUP_CODE, payload); } @Override public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { - throw new UnsupportedOperationException(); + var payload = toBytes(streamId); + payload.writeBytes(toBytes(topicId)); + payload.writeBytes(toBytes(groupId)); + + connection.send(LEAVE_CONSUMER_GROUP_CODE, payload); } } diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java index f440b70..aa81c9c 100644 --- a/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java +++ b/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java @@ -1,7 +1,10 @@ package rs.iggy.clients.blocking.tcp; +import org.junit.jupiter.api.Test; import rs.iggy.clients.blocking.ConsumerGroupsClientBaseTest; import rs.iggy.clients.blocking.IggyClient; +import java.util.Optional; +import static org.assertj.core.api.Assertions.assertThat; class ConsumerGroupsTcpClientTest extends ConsumerGroupsClientBaseTest { @@ -10,4 +13,28 @@ protected IggyClient getClient() { return TcpClientFactory.create(iggyServer); } + @Test + void shouldJoinAndLeaveConsumerGroup() { + // given + setUpStreamAndTopic(); + var group = consumerGroupsClient.createConsumerGroup(42L, + 42L, + Optional.of(42L), + "consumer-group-42"); + + // when + consumerGroupsClient.joinConsumerGroup(42L, 42L, group.id()); + + // then + group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()); + assertThat(group.membersCount()).isEqualTo(1); + + // when + consumerGroupsClient.leaveConsumerGroup(42L, 42L, group.id()); + + // then + group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()); + assertThat(group.membersCount()).isEqualTo(0); + } + }