From bbec651ad388d1af0b8a671ca0f1f598b27f36f4 Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Sun, 6 Oct 2024 14:28:05 +0200 Subject: [PATCH] Start streams tcp implementation, extract tcp connection handler --- .../clients/blocking/tcp/IggyTcpClient.java | 10 +-- .../blocking/tcp/StreamsTcpClient.java | 65 +++++++++++++++++++ .../blocking/tcp/TcpConnectionHandler.java | 61 +++++++++++++++++ .../clients/blocking/tcp/UsersTcpClient.java | 48 +------------- .../clients/blocking/IntegrationTest.java | 2 +- .../blocking/http/HttpClientFactory.java | 2 +- .../blocking/tcp/IggyTcpClientTest.java | 34 ---------- .../blocking/tcp/StreamTcpClientTest.java | 15 +++++ .../blocking/tcp/TcpClientFactory.java | 2 +- .../blocking/tcp/UsersTcpClientTest.java | 22 +++++++ 10 files changed, 174 insertions(+), 87 deletions(-) create mode 100644 src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java create mode 100644 src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java delete mode 100644 src/test/java/rs/iggy/clients/blocking/tcp/IggyTcpClientTest.java create mode 100644 src/test/java/rs/iggy/clients/blocking/tcp/StreamTcpClientTest.java create mode 100644 src/test/java/rs/iggy/clients/blocking/tcp/UsersTcpClientTest.java diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java index e15d37f..28914a1 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java @@ -1,16 +1,16 @@ package rs.iggy.clients.blocking.tcp; -import reactor.netty.Connection; -import reactor.netty.tcp.TcpClient; import rs.iggy.clients.blocking.*; public class IggyTcpClient implements IggyClient { private final UsersTcpClient usersClient; + private final StreamsTcpClient streamsClient; public IggyTcpClient(String host, Integer port) { - Connection tcpConnection = TcpClient.create().host(host).port(port).connectNow(); - usersClient = new UsersTcpClient(tcpConnection); + TcpConnectionHandler connection = new TcpConnectionHandler(host, port); + usersClient = new UsersTcpClient(connection); + streamsClient = new StreamsTcpClient(connection); } @Override @@ -20,7 +20,7 @@ public SystemClient system() { @Override public StreamsClient streams() { - throw new UnsupportedOperationException(); + return streamsClient; } @Override diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java new file mode 100644 index 0000000..16127ef --- /dev/null +++ b/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java @@ -0,0 +1,65 @@ +package rs.iggy.clients.blocking.tcp; + +import io.netty.buffer.Unpooled; +import rs.iggy.clients.blocking.StreamsClient; +import rs.iggy.identifier.StreamId; +import rs.iggy.stream.StreamBase; +import rs.iggy.stream.StreamDetails; +import java.util.List; +import java.util.Optional; + +class StreamsTcpClient implements StreamsClient { + + private static final int CREATE_STREAM_CODE = 202; + private final TcpConnectionHandler connection; + + StreamsTcpClient(TcpConnectionHandler connection) { + this.connection = connection; + } + + @Override + public void createStream(Optional streamId, String name) { + var payloadSize = 4 + 1 + name.length(); + var payload = Unpooled.buffer(payloadSize); + + payload.writeIntLE(streamId.orElse(0L).intValue()); + payload.writeByte(name.length()); + payload.writeBytes(name.getBytes()); + connection.send(CREATE_STREAM_CODE, payload); + } + + @Override + public StreamDetails getStream(Long streamId) { + throw new UnsupportedOperationException(); + } + + @Override + public StreamDetails getStream(StreamId streamId) { + throw new UnsupportedOperationException(); + } + + @Override + public List getStreams() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateStream(Long streamId, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void updateStream(StreamId streamId, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteStream(Long streamId) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteStream(StreamId streamId) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java b/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java new file mode 100644 index 0000000..ef0db51 --- /dev/null +++ b/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java @@ -0,0 +1,61 @@ +package rs.iggy.clients.blocking.tcp; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.tcp.TcpClient; + +final class TcpConnectionHandler { + + private static final int REQUEST_INITIAL_BYTES_LENGTH = 4; + private static final int COMMAND_LENGTH = 4; + private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8; + + private final Connection connection; + + TcpConnectionHandler(String host, Integer port) { + this.connection = TcpClient.create().host(host).port(port).connectNow(); + } + + void send(int command, ByteBuf payload) { + var payloadSize = payload.readableBytes() + COMMAND_LENGTH; + var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH + payloadSize); + buffer.writeIntLE(payloadSize); + buffer.writeIntLE(command); + buffer.writeBytes(payload); + + connection.outbound().send(Mono.just(buffer)).then().block(); + } + + ByteBuf sendWithResponse(int command, ByteBuf payload) { + send(command, payload); + var response = connection.inbound().receive().asByteArray().blockFirst(); + if (response == null) { + throw new RuntimeException("No response"); + } + + var responseBuffer = Unpooled.wrappedBuffer(response); + if (!responseBuffer.isReadable(RESPONSE_INITIAL_BYTES_LENGTH)) { + throw new RuntimeException("Received an invalid or empty response"); + } + + var status = responseBuffer.readUnsignedIntLE(); + var responseLengthL = responseBuffer.readUnsignedIntLE(); + // unsafe cast + var responseLength = (int) responseLengthL; + + return handleResponse(status, responseLength, responseBuffer); + } + + ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) { + if (status != 0) { + throw new RuntimeException("Received an invalid response with status " + status); + } + if (responseLength == 0) { + return Unpooled.EMPTY_BUFFER; + } + return responseBuffer.readBytes(responseLength); + } + +} diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java index 2f7c797..9c29cb2 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java @@ -1,9 +1,6 @@ package rs.iggy.clients.blocking.tcp; -import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; import rs.iggy.clients.blocking.UsersClient; import rs.iggy.identifier.UserId; import rs.iggy.user.*; @@ -12,13 +9,10 @@ class UsersTcpClient implements UsersClient { - private static final int REQUEST_INITIAL_BYTES_LENGTH = 4; private static final int LOGIN_USER_CODE = 38; - private static final int COMMAND_LENGTH = 4; - private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8; - private final Connection connection; + private final TcpConnectionHandler connection; - public UsersTcpClient(Connection connection) { + UsersTcpClient(TcpConnectionHandler connection) { this.connection = connection; } @@ -98,48 +92,12 @@ public IdentityInfo login(String username, String password) { payload.writeIntLE(context.length()); payload.writeBytes(context.getBytes()); - var response = sendWithResponse(LOGIN_USER_CODE, payload); + var response = connection.sendWithResponse(LOGIN_USER_CODE, payload); var userId = response.readUnsignedIntLE(); return new IdentityInfo(userId, Optional.empty()); } - private ByteBuf sendWithResponse(int command, ByteBuf payload) { - var payloadSize = payload.readableBytes() + COMMAND_LENGTH; - var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH + payloadSize); - buffer.writeIntLE(payloadSize); - buffer.writeIntLE(command); - buffer.writeBytes(payload); - - connection.outbound().send(Mono.just(buffer)).then().block(); - var response = connection.inbound().receive().asByteArray().blockFirst(); - if (response == null) { - throw new RuntimeException("No response"); - } - - var responseBuffer = Unpooled.wrappedBuffer(response); - if (!responseBuffer.isReadable(RESPONSE_INITIAL_BYTES_LENGTH)) { - throw new RuntimeException("Received an invalid or empty response"); - } - - var status = responseBuffer.readUnsignedIntLE(); - var responseLengthL = responseBuffer.readUnsignedIntLE(); - // unsafe cast - var responseLength = (int) responseLengthL; - - return handleResponse(status, responseLength, responseBuffer); - } - - private ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) { - if (status != 0) { - throw new RuntimeException("Received an invalid response with status " + status); - } - if (responseLength == 0) { - return Unpooled.EMPTY_BUFFER; - } - return responseBuffer.readBytes(responseLength); - } - @Override public void logout() { throw new UnsupportedOperationException(); diff --git a/src/test/java/rs/iggy/clients/blocking/IntegrationTest.java b/src/test/java/rs/iggy/clients/blocking/IntegrationTest.java index dc564d8..cbf41ad 100644 --- a/src/test/java/rs/iggy/clients/blocking/IntegrationTest.java +++ b/src/test/java/rs/iggy/clients/blocking/IntegrationTest.java @@ -17,7 +17,7 @@ public abstract class IntegrationTest { public static final int TCP_PORT = 8090; @Container - protected final GenericContainer iggyServer = new GenericContainer(DockerImageName.parse("iggyrs/iggy:latest")) + protected final GenericContainer iggyServer = new GenericContainer<>(DockerImageName.parse("iggyrs/iggy:latest")) .withExposedPorts(HTTP_PORT, TCP_PORT); protected IggyClient client; diff --git a/src/test/java/rs/iggy/clients/blocking/http/HttpClientFactory.java b/src/test/java/rs/iggy/clients/blocking/http/HttpClientFactory.java index d5d4170..1b653fa 100644 --- a/src/test/java/rs/iggy/clients/blocking/http/HttpClientFactory.java +++ b/src/test/java/rs/iggy/clients/blocking/http/HttpClientFactory.java @@ -5,7 +5,7 @@ class HttpClientFactory { - static IggyHttpClient create(GenericContainer iggyServer) { + static IggyHttpClient create(GenericContainer iggyServer) { String address = iggyServer.getHost(); Integer port = iggyServer.getMappedPort(HTTP_PORT); return new IggyHttpClient("http://" + address + ":" + port); diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/IggyTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/IggyTcpClientTest.java deleted file mode 100644 index 3dceb13..0000000 --- a/src/test/java/rs/iggy/clients/blocking/tcp/IggyTcpClientTest.java +++ /dev/null @@ -1,34 +0,0 @@ -package rs.iggy.clients.blocking.tcp; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import rs.iggy.clients.blocking.IggyClient; -import rs.iggy.clients.blocking.IntegrationTest; -import rs.iggy.clients.blocking.UsersClient; -import static org.assertj.core.api.Assertions.assertThat; - -class IggyTcpClientTest extends IntegrationTest { - - protected UsersClient usersClient; - - @Override - protected IggyClient getClient() { - return TcpClientFactory.create(iggyServer); - } - - @BeforeEach - void beforeEach() { - usersClient = client.users(); - } - - @Test - void shouldLogin() { - // when - var identityInfo = usersClient.login("iggy", "iggy"); - - // then - assertThat(identityInfo).isNotNull(); - assertThat(identityInfo.userId()).isEqualTo(1L); - } - -} diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/StreamTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/StreamTcpClientTest.java new file mode 100644 index 0000000..5a4f8fd --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/tcp/StreamTcpClientTest.java @@ -0,0 +1,15 @@ +package rs.iggy.clients.blocking.tcp; + +import org.junit.jupiter.api.Disabled; +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.StreamClientBaseTest; + +@Disabled +class StreamTcpClientTest extends StreamClientBaseTest { + + @Override + protected IggyClient getClient() { + return TcpClientFactory.create(iggyServer); + } + +} diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/TcpClientFactory.java b/src/test/java/rs/iggy/clients/blocking/tcp/TcpClientFactory.java index 69fd8be..54fec51 100644 --- a/src/test/java/rs/iggy/clients/blocking/tcp/TcpClientFactory.java +++ b/src/test/java/rs/iggy/clients/blocking/tcp/TcpClientFactory.java @@ -5,7 +5,7 @@ class TcpClientFactory { - static IggyTcpClient create(GenericContainer iggyServer) { + static IggyTcpClient create(GenericContainer iggyServer) { String address = iggyServer.getHost(); Integer port = iggyServer.getMappedPort(TCP_PORT); return new IggyTcpClient(address, port); diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/UsersTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/UsersTcpClientTest.java new file mode 100644 index 0000000..136727c --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/tcp/UsersTcpClientTest.java @@ -0,0 +1,22 @@ +package rs.iggy.clients.blocking.tcp; + +import org.junit.jupiter.api.BeforeEach; +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.UsersClient; +import rs.iggy.clients.blocking.UsersClientBaseTest; + +class UsersTcpClientTest extends UsersClientBaseTest { + + protected UsersClient usersClient; + + @Override + protected IggyClient getClient() { + return TcpClientFactory.create(iggyServer); + } + + @BeforeEach + void beforeEach() { + usersClient = client.users(); + } + +}