diff --git a/src/main/java/rs/iggy/clients/blocking/ConsumerGroupsClient.java b/src/main/java/rs/iggy/clients/blocking/ConsumerGroupsClient.java index a12373c..09b0144 100644 --- a/src/main/java/rs/iggy/clients/blocking/ConsumerGroupsClient.java +++ b/src/main/java/rs/iggy/clients/blocking/ConsumerGroupsClient.java @@ -10,11 +10,11 @@ public interface ConsumerGroupsClient { - default ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long groupId) { + default Optional getConsumerGroup(Long streamId, Long topicId, Long groupId) { return getConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerId.of(groupId)); } - ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId); + Optional getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId); default List getConsumerGroups(Long streamId, Long topicId) { return getConsumerGroups(StreamId.of(streamId), TopicId.of(topicId)); diff --git a/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java b/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java index 3d6cc7d..126970f 100644 --- a/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java @@ -19,9 +19,9 @@ public ConsumerGroupsHttpClient(HttpClient httpClient) { } @Override - public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { + public Optional getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { var request = httpClient.prepareGetRequest(path(streamId, topicId) + "/" + groupId); - return httpClient.execute(request, ConsumerGroupDetails.class); + return httpClient.executeWithOptionalResponse(request, ConsumerGroupDetails.class); } @Override diff --git a/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java b/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java index b901011..bb828b2 100644 --- a/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java @@ -8,6 +8,7 @@ import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.io.HttpClientResponseHandler; import org.apache.hc.core5.http.io.support.ClassicRequestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,33 +43,39 @@ T execute(ClassicHttpRequest request, TypeReference typeReference) { } private T execute(ClassicHttpRequest request, JavaType type) { - try (var client = HttpClients.createDefault()) { - return client.execute(request, response -> { - handleErrorResponse(response); - return objectMapper.readValue(response.getEntity().getContent(), type); - }); - } catch (IOException e) { - throw new RuntimeException(e); - } + return executeRequest(request, response -> handleTypedResponse(response, type)); + } + + public Optional executeWithOptionalResponse(ClassicHttpRequest request, Class clazz) { + return executeWithOptionalResponse(request, objectMapper.constructType(clazz)); + } + + private Optional executeWithOptionalResponse(ClassicHttpRequest request, JavaType type) { + return executeRequest(request, response -> { + if (response.getCode() == 404) { + return Optional.empty(); + } + return Optional.of(handleTypedResponse(response, type)); + }); } String executeWithStringResponse(ClassicHttpRequest request) { - try (var client = HttpClients.createDefault()) { - return client.execute(request, response -> { - handleErrorResponse(response); - return new String(response.getEntity().getContent().readAllBytes()); - }); - } catch (IOException e) { - throw new RuntimeException(e); - } + return executeRequest(request, response -> { + handleErrorResponse(response); + return new String(response.getEntity().getContent().readAllBytes()); + }); } void execute(ClassicHttpRequest request) { + executeRequest(request, response -> { + handleErrorResponse(response); + return ""; + }); + } + + private T executeRequest(ClassicHttpRequest request, HttpClientResponseHandler responseHandler) { try (var client = HttpClients.createDefault()) { - client.execute(request, response -> { - handleErrorResponse(response); - return ""; - }); + return client.execute(request, responseHandler); } catch (IOException e) { throw new RuntimeException(e); } @@ -114,6 +121,11 @@ private ClassicHttpRequest addRequestBody(ClassicRequestBuilder requestBuilder, } + private T handleTypedResponse(ClassicHttpResponse response, JavaType type) throws IOException { + handleErrorResponse(response); + return objectMapper.readValue(response.getEntity().getContent(), type); + } + private void handleErrorResponse(ClassicHttpResponse response) throws IOException { if (!isSuccessful(response.getCode())) { var error = objectMapper.readValue(response.getEntity().getContent(), IggyHttpError.class); diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java index 888d6a4..f75a3ab 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java @@ -32,12 +32,15 @@ public ConsumerGroupsTcpClient(TcpConnectionHandler connection) { } @Override - public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { + public Optional getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) { var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); payload.writeBytes(toBytes(groupId)); var response = connection.send(GET_CONSUMER_GROUP_CODE, payload); - return readConsumerGroupDetails(response); + if (response.isReadable()) { + return Optional.of(readConsumerGroupDetails(response)); + } + return Optional.empty(); } @Override diff --git a/src/test/java/rs/iggy/clients/blocking/ConsumerGroupsClientBaseTest.java b/src/test/java/rs/iggy/clients/blocking/ConsumerGroupsClientBaseTest.java index 2778ad0..6cffa96 100644 --- a/src/test/java/rs/iggy/clients/blocking/ConsumerGroupsClientBaseTest.java +++ b/src/test/java/rs/iggy/clients/blocking/ConsumerGroupsClientBaseTest.java @@ -39,9 +39,9 @@ void shouldCreateAndDeleteConsumerGroup() { ConsumerId.of("consumer-group-42")); // then - assertThat(consumerGroupById).isNotNull(); - assertThat(consumerGroupById.id()).isEqualTo(42L); - assertThat(consumerGroupById.name()).isEqualTo("consumer-group-42"); + assertThat(consumerGroupById).isPresent(); + assertThat(consumerGroupById.get().id()).isEqualTo(42L); + assertThat(consumerGroupById.get().name()).isEqualTo("consumer-group-42"); assertThat(consumerGroupById).isEqualTo(consumerGroupByName); // when @@ -49,6 +49,7 @@ void shouldCreateAndDeleteConsumerGroup() { // then assertThat(consumerGroupsClient.getConsumerGroups(42L, 42L)).isEmpty(); + assertThat(consumerGroupsClient.getConsumerGroup(42L, 42L, 42L)).isEmpty(); } @Test diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java index aa81c9c..04c5251 100644 --- a/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java +++ b/src/test/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClientTest.java @@ -26,14 +26,14 @@ void shouldJoinAndLeaveConsumerGroup() { consumerGroupsClient.joinConsumerGroup(42L, 42L, group.id()); // then - group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()); + group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()).get(); assertThat(group.membersCount()).isEqualTo(1); // when consumerGroupsClient.leaveConsumerGroup(42L, 42L, group.id()); // then - group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()); + group = consumerGroupsClient.getConsumerGroup(42L, 42L, group.id()).get(); assertThat(group.membersCount()).isEqualTo(0); }