Skip to content

Commit

Permalink
Start streams tcp implementation, extract tcp connection handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 6, 2024
1 parent 9ae782b commit bbec651
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 87 deletions.
10 changes: 5 additions & 5 deletions src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package rs.iggy.clients.blocking.tcp;

import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;
import rs.iggy.clients.blocking.*;

public class IggyTcpClient implements IggyClient {

private final UsersTcpClient usersClient;
private final StreamsTcpClient streamsClient;

public IggyTcpClient(String host, Integer port) {
Connection tcpConnection = TcpClient.create().host(host).port(port).connectNow();
usersClient = new UsersTcpClient(tcpConnection);
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
usersClient = new UsersTcpClient(connection);
streamsClient = new StreamsTcpClient(connection);
}

@Override
Expand All @@ -20,7 +20,7 @@ public SystemClient system() {

@Override
public StreamsClient streams() {
throw new UnsupportedOperationException();
return streamsClient;
}

@Override
Expand Down
65 changes: 65 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.Unpooled;
import rs.iggy.clients.blocking.StreamsClient;
import rs.iggy.identifier.StreamId;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
import java.util.List;
import java.util.Optional;

class StreamsTcpClient implements StreamsClient {

private static final int CREATE_STREAM_CODE = 202;
private final TcpConnectionHandler connection;

StreamsTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

@Override
public void createStream(Optional<Long> streamId, String name) {
var payloadSize = 4 + 1 + name.length();
var payload = Unpooled.buffer(payloadSize);

payload.writeIntLE(streamId.orElse(0L).intValue());
payload.writeByte(name.length());
payload.writeBytes(name.getBytes());
connection.send(CREATE_STREAM_CODE, payload);
}

@Override
public StreamDetails getStream(Long streamId) {
throw new UnsupportedOperationException();
}

@Override
public StreamDetails getStream(StreamId streamId) {
throw new UnsupportedOperationException();
}

@Override
public List<StreamBase> getStreams() {
throw new UnsupportedOperationException();
}

@Override
public void updateStream(Long streamId, String name) {
throw new UnsupportedOperationException();
}

@Override
public void updateStream(StreamId streamId, String name) {
throw new UnsupportedOperationException();
}

@Override
public void deleteStream(Long streamId) {
throw new UnsupportedOperationException();
}

@Override
public void deleteStream(StreamId streamId) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

final class TcpConnectionHandler {

private static final int REQUEST_INITIAL_BYTES_LENGTH = 4;
private static final int COMMAND_LENGTH = 4;
private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8;

private final Connection connection;

TcpConnectionHandler(String host, Integer port) {
this.connection = TcpClient.create().host(host).port(port).connectNow();
}

void send(int command, ByteBuf payload) {
var payloadSize = payload.readableBytes() + COMMAND_LENGTH;
var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH + payloadSize);
buffer.writeIntLE(payloadSize);
buffer.writeIntLE(command);
buffer.writeBytes(payload);

connection.outbound().send(Mono.just(buffer)).then().block();
}

ByteBuf sendWithResponse(int command, ByteBuf payload) {
send(command, payload);
var response = connection.inbound().receive().asByteArray().blockFirst();
if (response == null) {
throw new RuntimeException("No response");
}

var responseBuffer = Unpooled.wrappedBuffer(response);
if (!responseBuffer.isReadable(RESPONSE_INITIAL_BYTES_LENGTH)) {
throw new RuntimeException("Received an invalid or empty response");
}

var status = responseBuffer.readUnsignedIntLE();
var responseLengthL = responseBuffer.readUnsignedIntLE();
// unsafe cast
var responseLength = (int) responseLengthL;

return handleResponse(status, responseLength, responseBuffer);
}

ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) {
if (status != 0) {
throw new RuntimeException("Received an invalid response with status " + status);
}
if (responseLength == 0) {
return Unpooled.EMPTY_BUFFER;
}
return responseBuffer.readBytes(responseLength);
}

}
48 changes: 3 additions & 45 deletions src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import rs.iggy.clients.blocking.UsersClient;
import rs.iggy.identifier.UserId;
import rs.iggy.user.*;
Expand All @@ -12,13 +9,10 @@

class UsersTcpClient implements UsersClient {

private static final int REQUEST_INITIAL_BYTES_LENGTH = 4;
private static final int LOGIN_USER_CODE = 38;
private static final int COMMAND_LENGTH = 4;
private static final int RESPONSE_INITIAL_BYTES_LENGTH = 8;
private final Connection connection;
private final TcpConnectionHandler connection;

public UsersTcpClient(Connection connection) {
UsersTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

Expand Down Expand Up @@ -98,48 +92,12 @@ public IdentityInfo login(String username, String password) {
payload.writeIntLE(context.length());
payload.writeBytes(context.getBytes());

var response = sendWithResponse(LOGIN_USER_CODE, payload);
var response = connection.sendWithResponse(LOGIN_USER_CODE, payload);

var userId = response.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
}

private ByteBuf sendWithResponse(int command, ByteBuf payload) {
var payloadSize = payload.readableBytes() + COMMAND_LENGTH;
var buffer = Unpooled.buffer(REQUEST_INITIAL_BYTES_LENGTH + payloadSize);
buffer.writeIntLE(payloadSize);
buffer.writeIntLE(command);
buffer.writeBytes(payload);

connection.outbound().send(Mono.just(buffer)).then().block();
var response = connection.inbound().receive().asByteArray().blockFirst();
if (response == null) {
throw new RuntimeException("No response");
}

var responseBuffer = Unpooled.wrappedBuffer(response);
if (!responseBuffer.isReadable(RESPONSE_INITIAL_BYTES_LENGTH)) {
throw new RuntimeException("Received an invalid or empty response");
}

var status = responseBuffer.readUnsignedIntLE();
var responseLengthL = responseBuffer.readUnsignedIntLE();
// unsafe cast
var responseLength = (int) responseLengthL;

return handleResponse(status, responseLength, responseBuffer);
}

private ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) {
if (status != 0) {
throw new RuntimeException("Received an invalid response with status " + status);
}
if (responseLength == 0) {
return Unpooled.EMPTY_BUFFER;
}
return responseBuffer.readBytes(responseLength);
}

@Override
public void logout() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public abstract class IntegrationTest {
public static final int TCP_PORT = 8090;

@Container
protected final GenericContainer<?> iggyServer = new GenericContainer(DockerImageName.parse("iggyrs/iggy:latest"))
protected final GenericContainer<?> iggyServer = new GenericContainer<>(DockerImageName.parse("iggyrs/iggy:latest"))
.withExposedPorts(HTTP_PORT, TCP_PORT);

protected IggyClient client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HttpClientFactory {

static IggyHttpClient create(GenericContainer iggyServer) {
static IggyHttpClient create(GenericContainer<?> iggyServer) {
String address = iggyServer.getHost();
Integer port = iggyServer.getMappedPort(HTTP_PORT);
return new IggyHttpClient("http://" + address + ":" + port);
Expand Down
34 changes: 0 additions & 34 deletions src/test/java/rs/iggy/clients/blocking/tcp/IggyTcpClientTest.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package rs.iggy.clients.blocking.tcp;

import org.junit.jupiter.api.Disabled;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.StreamClientBaseTest;

@Disabled
class StreamTcpClientTest extends StreamClientBaseTest {

@Override
protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class TcpClientFactory {

static IggyTcpClient create(GenericContainer iggyServer) {
static IggyTcpClient create(GenericContainer<?> iggyServer) {
String address = iggyServer.getHost();
Integer port = iggyServer.getMappedPort(TCP_PORT);
return new IggyTcpClient(address, port);
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/rs/iggy/clients/blocking/tcp/UsersTcpClientTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package rs.iggy.clients.blocking.tcp;

import org.junit.jupiter.api.BeforeEach;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.UsersClient;
import rs.iggy.clients.blocking.UsersClientBaseTest;

class UsersTcpClientTest extends UsersClientBaseTest {

protected UsersClient usersClient;

@Override
protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

@BeforeEach
void beforeEach() {
usersClient = client.users();
}

}

0 comments on commit bbec651

Please sign in to comment.