Skip to content

Commit

Permalink
Implement system client for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 11, 2024
1 parent f11378a commit 1d07422
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rs/iggy/clients/blocking/SystemClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public interface SystemClient {

ClientInfoDetails getMe();

ClientInfoDetails getClient(String clientId);
ClientInfoDetails getClient(Long clientId);

List<ClientInfo> getClients();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ClientInfoDetails getMe() {
}

@Override
public ClientInfoDetails getClient(String clientId) {
public ClientInfoDetails getClient(Long clientId) {
var request = httpClient.prepareGetRequest(CLIENTS + "/" + clientId);
return httpClient.execute(request, ClientInfoDetails.class);
}
Expand Down
92 changes: 92 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import rs.iggy.partition.Partition;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
import rs.iggy.system.ClientInfo;
import rs.iggy.system.ClientInfoDetails;
import rs.iggy.system.ConsumerGroupInfo;
import rs.iggy.system.Stats;
import rs.iggy.topic.CompressionAlgorithm;
import rs.iggy.topic.Topic;
import rs.iggy.topic.TopicDetails;
Expand Down Expand Up @@ -167,6 +171,94 @@ static Message readMessage(ByteBuf response) {
return new Message(offset, state, timestamp, id, checksum, headers, payload);
}

static Stats readStats(ByteBuf response) {
var processId = response.readUnsignedIntLE();
var cpuUsage = response.readFloatLE();
var totalCpuUsage = response.readFloatLE();
var memoryUsage = readU64AsBigInteger(response);
var totalMemory = readU64AsBigInteger(response);
var availableMemory = readU64AsBigInteger(response);
var runTime = readU64AsBigInteger(response);
var startTime = readU64AsBigInteger(response);
var readBytes = readU64AsBigInteger(response);
var writtenBytes = readU64AsBigInteger(response);
var messagesSizeBytes = readU64AsBigInteger(response);
var streamsCount = response.readUnsignedIntLE();
var topicsCount = response.readUnsignedIntLE();
var partitionsCount = response.readUnsignedIntLE();
var segmentsCount = response.readUnsignedIntLE();
var messagesCount = readU64AsBigInteger(response);
var clientsCount = response.readUnsignedIntLE();
var consumerGroupsCount = response.readUnsignedIntLE();
var hostnameLength = response.readUnsignedIntLE();
var hostname = response.readCharSequence(toInt(hostnameLength), StandardCharsets.UTF_8).toString();
var osNameLength = response.readUnsignedIntLE();
var osName = response.readCharSequence(toInt(osNameLength), StandardCharsets.UTF_8).toString();
var osVersionLength = response.readUnsignedIntLE();
var osVersion = response.readCharSequence(toInt(osVersionLength), StandardCharsets.UTF_8).toString();
var kernelVersionLength = response.readUnsignedIntLE();
var kernelVersion = response.readCharSequence(toInt(kernelVersionLength), StandardCharsets.UTF_8).toString();

return new Stats(processId,
cpuUsage,
totalCpuUsage,
memoryUsage.toString(),
totalMemory.toString(),
availableMemory.toString(),
runTime,
startTime,
readBytes.toString(),
writtenBytes.toString(),
messagesSizeBytes.toString(),
streamsCount,
topicsCount,
partitionsCount,
segmentsCount,
messagesCount,
clientsCount,
consumerGroupsCount,
hostname,
osName,
osVersion,
kernelVersion);
}

static ClientInfoDetails readClientInfoDetails(ByteBuf response) {
var clientInfo = readClientInfo(response);
var consumerGroups = new ArrayList<ConsumerGroupInfo>();
for (int i = 0; i < clientInfo.consumerGroupsCount(); i++) {
consumerGroups.add(readConsumerGroupInfo(response));
}

return new ClientInfoDetails(clientInfo, consumerGroups);
}

static ClientInfo readClientInfo(ByteBuf response) {
var clientId = response.readUnsignedIntLE();
var userId = response.readUnsignedIntLE();
var userIdOptional = Optional.<Long>empty();
if (userId != 0) {
userIdOptional = Optional.of(userId);
}
var transport = response.readByte();
var transportString = "Tcp";
if (transport == 2) {
transportString = "Quic";
}
var addressLength = response.readUnsignedIntLE();
var address = response.readCharSequence(toInt(addressLength), StandardCharsets.UTF_8).toString();
var consumerGroupsCount = response.readUnsignedIntLE();
return new ClientInfo(clientId, userIdOptional, address, transportString, consumerGroupsCount);
}

static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) {
var streamId = response.readUnsignedIntLE();
var topicId = response.readUnsignedIntLE();
var groupId = response.readUnsignedIntLE();

return new ConsumerGroupInfo(streamId, topicId, groupId);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class IggyTcpClient implements IggyClient {
private final ConsumerGroupsTcpClient consumerGroupsClient;
private final ConsumerOffsetTcpClient consumerOffsetsClient;
private final MessagesTcpClient messagesClient;
private final SystemTcpClient systemClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
Expand All @@ -21,11 +22,12 @@ public IggyTcpClient(String host, Integer port) {
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
consumerOffsetsClient = new ConsumerOffsetTcpClient(connection);
messagesClient = new MessagesTcpClient(connection);
systemClient = new SystemTcpClient(connection);
}

@Override
public SystemClient system() {
throw new UnsupportedOperationException();
return systemClient;
}

@Override
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.Unpooled;
import rs.iggy.clients.blocking.SystemClient;
import rs.iggy.system.ClientInfo;
import rs.iggy.system.ClientInfoDetails;
import rs.iggy.system.Stats;
import java.util.ArrayList;
import java.util.List;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.*;

class SystemTcpClient implements SystemClient {
private static final int PING_CODE = 1;
private static final int GET_STATS_CODE = 10;
private static final int GET_ME_CODE = 20;
private static final int GET_CLIENT_CODE = 21;
private static final int GET_CLIENTS_CODE = 22;

private final TcpConnectionHandler connection;

SystemTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

@Override
public Stats getStats() {
var response = connection.send(GET_STATS_CODE);
return readStats(response);
}

@Override
public ClientInfoDetails getMe() {
var response = connection.send(GET_ME_CODE);
return readClientInfoDetails(response);
}

@Override
public ClientInfoDetails getClient(Long clientId) {
var payload = Unpooled.buffer(4);
payload.writeIntLE(clientId.intValue());
var response = connection.send(GET_CLIENT_CODE, payload);
return readClientInfoDetails(response);
}

@Override
public List<ClientInfo> getClients() {
var response = connection.send(GET_CLIENTS_CODE);
List<ClientInfo> clients = new ArrayList<>();
while (response.isReadable()) {
clients.add(readClientInfo(response));
}
return clients;
}

@Override
public String ping() {
connection.send(PING_CODE);
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@

class UsersTcpClient implements UsersClient {

private static final int GET_USER_CODE = 31;
private static final int GET_USERS_CODE = 32;
private static final int CREATE_USER_CODE = 33;
private static final int DELETE_USER_CODE = 34;
private static final int UPDATE_USER_CODE = 35;
private static final int UPDATE_PERMISSIONS_CODE = 36;
private static final int CHANGE_PASSWORD_CODE = 37;
private static final int LOGIN_USER_CODE = 38;
private static final int LOGOUT_USER_CODE = 39;

private final TcpConnectionHandler connection;

UsersTcpClient(TcpConnectionHandler connection) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/rs/iggy/system/ClientInfoDetails.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rs.iggy.system;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

Expand All @@ -11,4 +12,7 @@ public record ClientInfoDetails(
Long consumerGroupsCount,
List<ConsumerGroupInfo> consumerGroups
) {
public ClientInfoDetails(ClientInfo clientInfo, ArrayList<ConsumerGroupInfo> consumerGroups) {
this(clientInfo.clientId(), clientInfo.userId(), clientInfo.address(), clientInfo.transport(), clientInfo.consumerGroupsCount(), consumerGroups);
}
}
13 changes: 7 additions & 6 deletions src/main/java/rs/iggy/system/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
public record Stats(
Long processId,
Float cpuUsage,
BigInteger memoryUsage,
BigInteger totalMemory,
BigInteger availableMemory,
Float totalCpuUsage,
String memoryUsage,
String totalMemory,
String availableMemory,
BigInteger runTime,
BigInteger startTime,
BigInteger readBytes,
BigInteger writtenBytes,
BigInteger messagesSizeBytes,
String readBytes,
String writtenBytes,
String messagesSizeBytes,
Long streamsCount,
Long topicsCount,
Long partitionsCount,
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/rs/iggy/clients/blocking/SystemClientBaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rs.iggy.clients.blocking;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;

public abstract class SystemClientBaseTest extends IntegrationTest {

protected SystemClient systemClient;

@BeforeEach
void beforeEachBase() {
systemClient = client.system();

login();
}

@Test
void shouldGetStats() {
// when
var stats = systemClient.getStats();

// then
assertThat(stats).isNotNull();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rs.iggy.clients.blocking.http;

import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.SystemClientBaseTest;

class SystemHttpClientTest extends SystemClientBaseTest {

@Override
protected IggyClient getClient() {
return HttpClientFactory.create(iggyServer);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package rs.iggy.clients.blocking.tcp;

import org.junit.jupiter.api.Test;
import rs.iggy.clients.blocking.IggyClient;
import rs.iggy.clients.blocking.SystemClientBaseTest;
import rs.iggy.system.ClientInfo;
import rs.iggy.system.ClientInfoDetails;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;

class SystemTcpClientTest extends SystemClientBaseTest {

@Override
protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

@Test
void shouldGetMeAndClient() {
// when
ClientInfoDetails me = systemClient.getMe();

// then
assertThat(me).isNotNull();

// when
var clientInfo = systemClient.getClient(me.clientId());

// then
assertThat(clientInfo).isNotNull();
}

@Test
void shouldGetClients() {
// when
List<ClientInfo> clients = systemClient.getClients();

// then
assertThat(clients).isNotNull();
assertThat(clients.size()).isEqualTo(1);
}
}

0 comments on commit 1d07422

Please sign in to comment.