Skip to content

Commit

Permalink
Update consumer groups client to a new model with group name
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 2, 2023
1 parent ffc65ff commit bf33182
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/main/java/rs/iggy/consumergroup/ConsumerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

public record ConsumerGroup(
Long id,
String name,
Long partitionsCount,
Long membersCount
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

public record ConsumerGroupDetails(
Long id,
String name,
Long partitionsCount,
Long membersCount,
List<ConsumerGroupMember> members
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/rs/iggy/consumergroup/ConsumerGroupsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ public interface ConsumerGroupsClient {

ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);

ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, String consumerGroupName);

List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId);

void createConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);
void createConsumerGroup(Long streamId, Long topicId, Long consumerGroupId, String consumerGroupName);

void deleteConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);

void deleteConsumerGroup(Long streamId, Long topicId, String consumerGroupName);

void joinConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);

void joinConsumerGroup(Long streamId, Long topicId, String consumerGroupName);

void leaveConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);

void leaveConsumerGroup(Long streamId, Long topicId, String consumerGroupName);

}
29 changes: 26 additions & 3 deletions src/main/java/rs/iggy/http/ConsumerGroupsHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ public ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long c
return httpClient.execute(request, ConsumerGroupDetails.class);
}

@Override
public ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, String consumerGroupName) {
var request = httpClient.prepareGetRequest(path(streamId, topicId) + "/" + consumerGroupName);
return httpClient.execute(request, ConsumerGroupDetails.class);
}

@Override
public List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId) {
var request = httpClient.prepareGetRequest(path(streamId, topicId));
Expand All @@ -28,8 +34,9 @@ public List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId) {
}

@Override
public void createConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
var request = httpClient.preparePostRequest(path(streamId, topicId), new CreateConsumerGroup(consumerGroupId));
public void createConsumerGroup(Long streamId, Long topicId, Long consumerGroupId, String consumerGroupName) {
var request = httpClient.preparePostRequest(path(streamId, topicId),
new CreateConsumerGroup(consumerGroupId, consumerGroupName));
httpClient.execute(request);
}

Expand All @@ -39,21 +46,37 @@ public void deleteConsumerGroup(Long streamId, Long topicId, Long consumerGroupI
httpClient.execute(request);
}

@Override
public void deleteConsumerGroup(Long streamId, Long topicId, String consumerGroupName) {
var request = httpClient.prepareDeleteRequest(path(streamId, topicId) + "/" + consumerGroupName);
httpClient.execute(request);
}

@Override
public void joinConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

@Override
public void joinConsumerGroup(Long streamId, Long topicId, String consumerGroupName) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

@Override
public void leaveConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

@Override
public void leaveConsumerGroup(Long streamId, Long topicId, String consumerGroupName) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

private static String path(Long streamId, Long topicId) {
return "/streams/" + streamId + "/topics/" + topicId + "/consumer-groups";
}

private record CreateConsumerGroup(Long consumerGroupId) {
private record CreateConsumerGroup(Long consumerGroupId, String name) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package rs.iggy.consumergroup;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import rs.iggy.IntegrationTest;
import static org.assertj.core.api.Assertions.assertThat;

public abstract class ConsumerGroupsClientBaseTest extends IntegrationTest {

protected ConsumerGroupsClient consumerGroupsClient;

@BeforeEach
void beforeEachBase() {
consumerGroupsClient = client.consumerGroups();

login();
}

@Test
void shouldCreateAndDeleteConsumerGroup() {
// given
setUpStreamAndTopic();

// when
consumerGroupsClient.createConsumerGroup(42L, 42L, 42L, "consumer-group-42");

var consumerGroupById = consumerGroupsClient.getConsumerGroup(42L, 42L, 42L);
var consumerGroupByName = consumerGroupsClient.getConsumerGroup(42L, 42L, 42L);

// then
assertThat(consumerGroupById).isNotNull();
assertThat(consumerGroupById.id()).isEqualTo(42L);
assertThat(consumerGroupById.name()).isEqualTo("consumer-group-42");
assertThat(consumerGroupById).isEqualTo(consumerGroupByName);

// when
consumerGroupsClient.deleteConsumerGroup(42L, 42L, 42L);

// then
assertThat(consumerGroupsClient.getConsumerGroups(42L, 42L)).isEmpty();
}

@Test
void shouldDeleteConsumerGroupByName() {
// given
setUpStreamAndTopic();
consumerGroupsClient.createConsumerGroup(42L, 42L, 42L, "consumer-group-42");
var consumerGroup = consumerGroupsClient.getConsumerGroup(42L, 42L, 42L);
assert consumerGroup != null;

// when
consumerGroupsClient.deleteConsumerGroup(42L, 42L, "consumer-group-42");

// then
assertThat(consumerGroupsClient.getConsumerGroups(42L, 42L)).isEmpty();
}

@Test
void shouldGetAllConsumerGroups() {
// given
setUpStreamAndTopic();

consumerGroupsClient.createConsumerGroup(42L, 42L, 42L, "consumer-group-42");
consumerGroupsClient.createConsumerGroup(42L, 42L, 43L, "consumer-group-43");
consumerGroupsClient.createConsumerGroup(42L, 42L, 44L, "consumer-group-44");

// when
var consumerGroups = consumerGroupsClient.getConsumerGroups(42L, 42L);

// then
assertThat(consumerGroups).hasSize(3);
assertThat(consumerGroups)
.map(ConsumerGroup::name)
.containsExactlyInAnyOrder("consumer-group-42", "consumer-group-43", "consumer-group-44");
}

}
13 changes: 13 additions & 0 deletions src/test/java/rs/iggy/http/ConsumerGroupsHttpClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rs.iggy.http;

import rs.iggy.IggyClient;
import rs.iggy.consumergroup.ConsumerGroupsClientBaseTest;

class ConsumerGroupsHttpClientTest extends ConsumerGroupsClientBaseTest {

@Override
protected IggyClient getClient() {
return HttpClientFactory.create(iggyServer);
}

}

0 comments on commit bf33182

Please sign in to comment.