Skip to content

Commit

Permalink
Implement optional response for get consumer group
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 13, 2024
1 parent 500b481 commit 5fbe5bd
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

public interface ConsumerGroupsClient {

default ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long groupId) {
default Optional<ConsumerGroupDetails> getConsumerGroup(Long streamId, Long topicId, Long groupId) {
return getConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerId.of(groupId));
}

ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId);
Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId);

default List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId) {
return getConsumerGroups(StreamId.of(streamId), TopicId.of(topicId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public ConsumerGroupsHttpClient(HttpClient httpClient) {
}

@Override
public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
public Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
var request = httpClient.prepareGetRequest(path(streamId, topicId) + "/" + groupId);
return httpClient.execute(request, ConsumerGroupDetails.class);
return httpClient.executeWithOptionalResponse(request, ConsumerGroupDetails.class);
}

@Override
Expand Down
52 changes: 32 additions & 20 deletions src/main/java/rs/iggy/clients/blocking/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.io.HttpClientResponseHandler;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -42,33 +43,39 @@ <T> T execute(ClassicHttpRequest request, TypeReference<T> typeReference) {
}

private <T> T execute(ClassicHttpRequest request, JavaType type) {
try (var client = HttpClients.createDefault()) {
return client.execute(request, response -> {
handleErrorResponse(response);
return objectMapper.readValue(response.getEntity().getContent(), type);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return executeRequest(request, response -> handleTypedResponse(response, type));
}

public <T> Optional<T> executeWithOptionalResponse(ClassicHttpRequest request, Class<T> clazz) {
return executeWithOptionalResponse(request, objectMapper.constructType(clazz));
}

private <T> Optional<T> executeWithOptionalResponse(ClassicHttpRequest request, JavaType type) {
return executeRequest(request, response -> {
if (response.getCode() == 404) {
return Optional.empty();
}
return Optional.of(handleTypedResponse(response, type));
});
}

String executeWithStringResponse(ClassicHttpRequest request) {
try (var client = HttpClients.createDefault()) {
return client.execute(request, response -> {
handleErrorResponse(response);
return new String(response.getEntity().getContent().readAllBytes());
});
} catch (IOException e) {
throw new RuntimeException(e);
}
return executeRequest(request, response -> {
handleErrorResponse(response);
return new String(response.getEntity().getContent().readAllBytes());
});
}

void execute(ClassicHttpRequest request) {
executeRequest(request, response -> {
handleErrorResponse(response);
return "";
});
}

private <T> T executeRequest(ClassicHttpRequest request, HttpClientResponseHandler<T> responseHandler) {
try (var client = HttpClients.createDefault()) {
client.execute(request, response -> {
handleErrorResponse(response);
return "";
});
return client.execute(request, responseHandler);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -114,6 +121,11 @@ private ClassicHttpRequest addRequestBody(ClassicRequestBuilder requestBuilder,

}

private <T> T handleTypedResponse(ClassicHttpResponse response, JavaType type) throws IOException {
handleErrorResponse(response);
return objectMapper.readValue(response.getEntity().getContent(), type);
}

private void handleErrorResponse(ClassicHttpResponse response) throws IOException {
if (!isSuccessful(response.getCode())) {
var error = objectMapper.readValue(response.getEntity().getContent(), IggyHttpError.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ public ConsumerGroupsTcpClient(TcpConnectionHandler connection) {
}

@Override
public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
public Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
var response = connection.send(GET_CONSUMER_GROUP_CODE, payload);
return readConsumerGroupDetails(response);
if (response.isReadable()) {
return Optional.of(readConsumerGroupDetails(response));
}
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,17 @@ void shouldCreateAndDeleteConsumerGroup() {
ConsumerId.of("consumer-group-42"));

// then
assertThat(consumerGroupById).isNotNull();
assertThat(consumerGroupById.id()).isEqualTo(42L);
assertThat(consumerGroupById.name()).isEqualTo("consumer-group-42");
assertThat(consumerGroupById).isPresent();
assertThat(consumerGroupById.get().id()).isEqualTo(42L);
assertThat(consumerGroupById.get().name()).isEqualTo("consumer-group-42");
assertThat(consumerGroupById).isEqualTo(consumerGroupByName);

// when
consumerGroupsClient.deleteConsumerGroup(42L, 42L, 42L);

// then
assertThat(consumerGroupsClient.getConsumerGroups(42L, 42L)).isEmpty();
assertThat(consumerGroupsClient.getConsumerGroup(42L, 42L, 42L)).isEmpty();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ void shouldJoinAndLeaveConsumerGroup() {
consumerGroupsClient.joinConsumerGroup(42L, 42L, group.id());

// then
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id());
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()).get();
assertThat(group.membersCount()).isEqualTo(1);

// when
consumerGroupsClient.leaveConsumerGroup(42L, 42L, group.id());

// then
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id());
group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()).get();
assertThat(group.membersCount()).isEqualTo(0);
}

Expand Down

0 comments on commit 5fbe5bd

Please sign in to comment.