diff --git a/src/main/java/rs/iggy/clients/blocking/SystemClient.java b/src/main/java/rs/iggy/clients/blocking/SystemClient.java index 1558903..27b6c6b 100644 --- a/src/main/java/rs/iggy/clients/blocking/SystemClient.java +++ b/src/main/java/rs/iggy/clients/blocking/SystemClient.java @@ -11,7 +11,7 @@ public interface SystemClient { ClientInfoDetails getMe(); - ClientInfoDetails getClient(String clientId); + ClientInfoDetails getClient(Long clientId); List getClients(); diff --git a/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java b/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java index 2e811e7..60f6082 100644 --- a/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java @@ -30,7 +30,7 @@ public ClientInfoDetails getMe() { } @Override - public ClientInfoDetails getClient(String clientId) { + public ClientInfoDetails getClient(Long clientId) { var request = httpClient.prepareGetRequest(CLIENTS + "/" + clientId); return httpClient.execute(request, ClientInfoDetails.class); } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java index e6bdc50..91ebea8 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java @@ -10,6 +10,10 @@ import rs.iggy.partition.Partition; import rs.iggy.stream.StreamBase; import rs.iggy.stream.StreamDetails; +import rs.iggy.system.ClientInfo; +import rs.iggy.system.ClientInfoDetails; +import rs.iggy.system.ConsumerGroupInfo; +import rs.iggy.system.Stats; import rs.iggy.topic.CompressionAlgorithm; import rs.iggy.topic.Topic; import rs.iggy.topic.TopicDetails; @@ -167,6 +171,94 @@ static Message readMessage(ByteBuf response) { return new Message(offset, state, timestamp, id, checksum, headers, payload); } + static Stats readStats(ByteBuf response) { + var processId = response.readUnsignedIntLE(); + var cpuUsage = response.readFloatLE(); + var totalCpuUsage = response.readFloatLE(); + var memoryUsage = readU64AsBigInteger(response); + var totalMemory = readU64AsBigInteger(response); + var availableMemory = readU64AsBigInteger(response); + var runTime = readU64AsBigInteger(response); + var startTime = readU64AsBigInteger(response); + var readBytes = readU64AsBigInteger(response); + var writtenBytes = readU64AsBigInteger(response); + var messagesSizeBytes = readU64AsBigInteger(response); + var streamsCount = response.readUnsignedIntLE(); + var topicsCount = response.readUnsignedIntLE(); + var partitionsCount = response.readUnsignedIntLE(); + var segmentsCount = response.readUnsignedIntLE(); + var messagesCount = readU64AsBigInteger(response); + var clientsCount = response.readUnsignedIntLE(); + var consumerGroupsCount = response.readUnsignedIntLE(); + var hostnameLength = response.readUnsignedIntLE(); + var hostname = response.readCharSequence(toInt(hostnameLength), StandardCharsets.UTF_8).toString(); + var osNameLength = response.readUnsignedIntLE(); + var osName = response.readCharSequence(toInt(osNameLength), StandardCharsets.UTF_8).toString(); + var osVersionLength = response.readUnsignedIntLE(); + var osVersion = response.readCharSequence(toInt(osVersionLength), StandardCharsets.UTF_8).toString(); + var kernelVersionLength = response.readUnsignedIntLE(); + var kernelVersion = response.readCharSequence(toInt(kernelVersionLength), StandardCharsets.UTF_8).toString(); + + return new Stats(processId, + cpuUsage, + totalCpuUsage, + memoryUsage.toString(), + totalMemory.toString(), + availableMemory.toString(), + runTime, + startTime, + readBytes.toString(), + writtenBytes.toString(), + messagesSizeBytes.toString(), + streamsCount, + topicsCount, + partitionsCount, + segmentsCount, + messagesCount, + clientsCount, + consumerGroupsCount, + hostname, + osName, + osVersion, + kernelVersion); + } + + static ClientInfoDetails readClientInfoDetails(ByteBuf response) { + var clientInfo = readClientInfo(response); + var consumerGroups = new ArrayList(); + for (int i = 0; i < clientInfo.consumerGroupsCount(); i++) { + consumerGroups.add(readConsumerGroupInfo(response)); + } + + return new ClientInfoDetails(clientInfo, consumerGroups); + } + + static ClientInfo readClientInfo(ByteBuf response) { + var clientId = response.readUnsignedIntLE(); + var userId = response.readUnsignedIntLE(); + var userIdOptional = Optional.empty(); + if (userId != 0) { + userIdOptional = Optional.of(userId); + } + var transport = response.readByte(); + var transportString = "Tcp"; + if (transport == 2) { + transportString = "Quic"; + } + var addressLength = response.readUnsignedIntLE(); + var address = response.readCharSequence(toInt(addressLength), StandardCharsets.UTF_8).toString(); + var consumerGroupsCount = response.readUnsignedIntLE(); + return new ClientInfo(clientId, userIdOptional, address, transportString, consumerGroupsCount); + } + + static ConsumerGroupInfo readConsumerGroupInfo(ByteBuf response) { + var streamId = response.readUnsignedIntLE(); + var topicId = response.readUnsignedIntLE(); + var groupId = response.readUnsignedIntLE(); + + return new ConsumerGroupInfo(streamId, topicId, groupId); + } + private static BigInteger readU64AsBigInteger(ByteBuf buffer) { var bytesArray = new byte[9]; buffer.readBytes(bytesArray, 0, 8); diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java index 0cbdb41..a82a931 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java @@ -11,6 +11,7 @@ public class IggyTcpClient implements IggyClient { private final ConsumerGroupsTcpClient consumerGroupsClient; private final ConsumerOffsetTcpClient consumerOffsetsClient; private final MessagesTcpClient messagesClient; + private final SystemTcpClient systemClient; public IggyTcpClient(String host, Integer port) { TcpConnectionHandler connection = new TcpConnectionHandler(host, port); @@ -21,11 +22,12 @@ public IggyTcpClient(String host, Integer port) { consumerGroupsClient = new ConsumerGroupsTcpClient(connection); consumerOffsetsClient = new ConsumerOffsetTcpClient(connection); messagesClient = new MessagesTcpClient(connection); + systemClient = new SystemTcpClient(connection); } @Override public SystemClient system() { - throw new UnsupportedOperationException(); + return systemClient; } @Override diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java new file mode 100644 index 0000000..ca6f01f --- /dev/null +++ b/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java @@ -0,0 +1,60 @@ +package rs.iggy.clients.blocking.tcp; + +import io.netty.buffer.Unpooled; +import rs.iggy.clients.blocking.SystemClient; +import rs.iggy.system.ClientInfo; +import rs.iggy.system.ClientInfoDetails; +import rs.iggy.system.Stats; +import java.util.ArrayList; +import java.util.List; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.*; + +class SystemTcpClient implements SystemClient { + private static final int PING_CODE = 1; + private static final int GET_STATS_CODE = 10; + private static final int GET_ME_CODE = 20; + private static final int GET_CLIENT_CODE = 21; + private static final int GET_CLIENTS_CODE = 22; + + private final TcpConnectionHandler connection; + + SystemTcpClient(TcpConnectionHandler connection) { + this.connection = connection; + } + + @Override + public Stats getStats() { + var response = connection.send(GET_STATS_CODE); + return readStats(response); + } + + @Override + public ClientInfoDetails getMe() { + var response = connection.send(GET_ME_CODE); + return readClientInfoDetails(response); + } + + @Override + public ClientInfoDetails getClient(Long clientId) { + var payload = Unpooled.buffer(4); + payload.writeIntLE(clientId.intValue()); + var response = connection.send(GET_CLIENT_CODE, payload); + return readClientInfoDetails(response); + } + + @Override + public List getClients() { + var response = connection.send(GET_CLIENTS_CODE); + List clients = new ArrayList<>(); + while (response.isReadable()) { + clients.add(readClientInfo(response)); + } + return clients; + } + + @Override + public String ping() { + connection.send(PING_CODE); + return ""; + } +} diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java index b75a5c4..7f02716 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java @@ -10,7 +10,16 @@ class UsersTcpClient implements UsersClient { + private static final int GET_USER_CODE = 31; + private static final int GET_USERS_CODE = 32; + private static final int CREATE_USER_CODE = 33; + private static final int DELETE_USER_CODE = 34; + private static final int UPDATE_USER_CODE = 35; + private static final int UPDATE_PERMISSIONS_CODE = 36; + private static final int CHANGE_PASSWORD_CODE = 37; private static final int LOGIN_USER_CODE = 38; + private static final int LOGOUT_USER_CODE = 39; + private final TcpConnectionHandler connection; UsersTcpClient(TcpConnectionHandler connection) { diff --git a/src/main/java/rs/iggy/system/ClientInfoDetails.java b/src/main/java/rs/iggy/system/ClientInfoDetails.java index e73d520..9714408 100644 --- a/src/main/java/rs/iggy/system/ClientInfoDetails.java +++ b/src/main/java/rs/iggy/system/ClientInfoDetails.java @@ -1,5 +1,6 @@ package rs.iggy.system; +import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -11,4 +12,7 @@ public record ClientInfoDetails( Long consumerGroupsCount, List consumerGroups ) { + public ClientInfoDetails(ClientInfo clientInfo, ArrayList consumerGroups) { + this(clientInfo.clientId(), clientInfo.userId(), clientInfo.address(), clientInfo.transport(), clientInfo.consumerGroupsCount(), consumerGroups); + } } diff --git a/src/main/java/rs/iggy/system/Stats.java b/src/main/java/rs/iggy/system/Stats.java index 52a9b1f..1c10798 100644 --- a/src/main/java/rs/iggy/system/Stats.java +++ b/src/main/java/rs/iggy/system/Stats.java @@ -5,14 +5,15 @@ public record Stats( Long processId, Float cpuUsage, - BigInteger memoryUsage, - BigInteger totalMemory, - BigInteger availableMemory, + Float totalCpuUsage, + String memoryUsage, + String totalMemory, + String availableMemory, BigInteger runTime, BigInteger startTime, - BigInteger readBytes, - BigInteger writtenBytes, - BigInteger messagesSizeBytes, + String readBytes, + String writtenBytes, + String messagesSizeBytes, Long streamsCount, Long topicsCount, Long partitionsCount, diff --git a/src/test/java/rs/iggy/clients/blocking/SystemClientBaseTest.java b/src/test/java/rs/iggy/clients/blocking/SystemClientBaseTest.java new file mode 100644 index 0000000..4b13742 --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/SystemClientBaseTest.java @@ -0,0 +1,27 @@ +package rs.iggy.clients.blocking; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class SystemClientBaseTest extends IntegrationTest { + + protected SystemClient systemClient; + + @BeforeEach + void beforeEachBase() { + systemClient = client.system(); + + login(); + } + + @Test + void shouldGetStats() { + // when + var stats = systemClient.getStats(); + + // then + assertThat(stats).isNotNull(); + } + +} diff --git a/src/test/java/rs/iggy/clients/blocking/http/SystemHttpClientTest.java b/src/test/java/rs/iggy/clients/blocking/http/SystemHttpClientTest.java new file mode 100644 index 0000000..ee38c1e --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/http/SystemHttpClientTest.java @@ -0,0 +1,13 @@ +package rs.iggy.clients.blocking.http; + +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.SystemClientBaseTest; + +class SystemHttpClientTest extends SystemClientBaseTest { + + @Override + protected IggyClient getClient() { + return HttpClientFactory.create(iggyServer); + } + +} diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/SystemTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/SystemTcpClientTest.java new file mode 100644 index 0000000..4dff169 --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/tcp/SystemTcpClientTest.java @@ -0,0 +1,42 @@ +package rs.iggy.clients.blocking.tcp; + +import org.junit.jupiter.api.Test; +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.SystemClientBaseTest; +import rs.iggy.system.ClientInfo; +import rs.iggy.system.ClientInfoDetails; +import java.util.List; +import static org.assertj.core.api.Assertions.assertThat; + +class SystemTcpClientTest extends SystemClientBaseTest { + + @Override + protected IggyClient getClient() { + return TcpClientFactory.create(iggyServer); + } + + @Test + void shouldGetMeAndClient() { + // when + ClientInfoDetails me = systemClient.getMe(); + + // then + assertThat(me).isNotNull(); + + // when + var clientInfo = systemClient.getClient(me.clientId()); + + // then + assertThat(clientInfo).isNotNull(); + } + + @Test + void shouldGetClients() { + // when + List clients = systemClient.getClients(); + + // then + assertThat(clients).isNotNull(); + assertThat(clients.size()).isEqualTo(1); + } +}