Skip to content

Commit

Permalink
Implement joining and leaving consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 11, 2024
1 parent 5f8949b commit 836f5ed
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Official Java client SDK for [Iggy.rs](https://iggy.rs) message streaming.
## To do

- Transport protocols
- TCP
- QUIC
- Publish maven package
- Add examples
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,20 @@ public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId g

@Override
public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
throw new UnsupportedOperationException();
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));

connection.send(JOIN_CONSUMER_GROUP_CODE, payload);
}

@Override
public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
throw new UnsupportedOperationException();
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));

connection.send(LEAVE_CONSUMER_GROUP_CODE, payload);
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package rs.iggy.clients.blocking.tcp;

import org.junit.jupiter.api.Test;
import rs.iggy.clients.blocking.ConsumerGroupsClientBaseTest;
import rs.iggy.clients.blocking.IggyClient;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;

class ConsumerGroupsTcpClientTest extends ConsumerGroupsClientBaseTest {

Expand All @@ -10,4 +13,28 @@ protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

@Test
void shouldJoinAndLeaveConsumerGroup() {
// given
setUpStreamAndTopic();
var group = consumerGroupsClient.createConsumerGroup(42L,
42L,
Optional.of(42L),
"consumer-group-42");

// when
consumerGroupsClient.joinConsumerGroup(42L, 42L, group.id());

// then
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id());
assertThat(group.membersCount()).isEqualTo(1);

// when
consumerGroupsClient.leaveConsumerGroup(42L, 42L, group.id());

// then
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id());
assertThat(group.membersCount()).isEqualTo(0);
}

}

0 comments on commit 836f5ed

Please sign in to comment.