Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
beats-dh committed Sep 10, 2024
1 parent b330a33 commit 32cb762
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 76 deletions.
164 changes: 96 additions & 68 deletions src/server/network/connection/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "game/scheduling/dispatcher.hpp"
#include "server/server.hpp"

Connection_ptr ConnectionManager::createConnection(asio::io_service &io_service, ConstServicePort_ptr servicePort) {
Connection_ptr ConnectionManager::createConnection(asio::io_service &io_service, const ConstServicePort_ptr &servicePort) {
auto connection = std::make_shared<Connection>(io_service, servicePort);
connections.emplace(connection);
return connection;
Expand Down Expand Up @@ -53,19 +53,21 @@ Connection::Connection(asio::io_service &initIoService, ConstServicePort_ptr ini
void Connection::close(bool force) {
ConnectionManager::getInstance().releaseConnection(shared_from_this());

std::scoped_lock lock(connectionLock);
ip = 0;
ip.store(0, std::memory_order_relaxed);

if (connectionState == CONNECTION_STATE_CLOSED) {
ConnectionState_t expectedState = CONNECTION_STATE_OPEN;
if (!connectionState.compare_exchange_weak(expectedState, CONNECTION_STATE_CLOSED)) {
return;
}
connectionState = CONNECTION_STATE_CLOSED;

if (protocol) {
g_dispatcher().addEvent([protocol = protocol] { protocol->release(); }, "Protocol::release", std::chrono::milliseconds(CONNECTION_WRITE_TIMEOUT * 1000).count());
}

if (messageQueue.empty() || force) {
writing.store(false);

OutputMessage_ptr outputMessage = messageQueue.dequeue();
if (!outputMessage || force) {
closeSocket();
}
}
Expand Down Expand Up @@ -120,11 +122,11 @@ void Connection::acceptInternal(bool toggleParseHeader) {
close(FORCE_CLOSE);
}
}

void Connection::parseProxyIdentification(const std::error_code &error) {
std::scoped_lock lock(connectionLock);
readTimer.cancel();

if (error || connectionState == CONNECTION_STATE_CLOSED) {
if (error || connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
if (error != asio::error::operation_aborted && error != asio::error::eof && error != asio::error::connection_reset) {
g_logger().error("[Connection::parseProxyIdentification] - Read error: {}", error.message());
}
Expand All @@ -133,39 +135,42 @@ void Connection::parseProxyIdentification(const std::error_code &error) {
}

uint8_t* msgBuffer = msg.getBuffer();
auto charData = static_cast<char*>(static_cast<void*>(msgBuffer));
std::string serverName = g_configManager().getString(SERVER_NAME, __FUNCTION__) + "\n";
if (connectionState == CONNECTION_STATE_IDENTIFYING) {
const auto charData = static_cast<char*>(static_cast<void*>(msgBuffer));
const std::string serverName = g_configManager().getString(SERVER_NAME, __FUNCTION__) + "\n";

if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_IDENTIFYING) {
if (msgBuffer[1] == 0x00 || strncasecmp(charData, &serverName[0], 2) != 0) {
// Probably not proxy identification so let's try standard parsing method
connectionState = CONNECTION_STATE_OPEN;
connectionState.store(CONNECTION_STATE_OPEN, std::memory_order_relaxed);
parseHeader(error);
return;
} else {
size_t remainder = serverName.length() - 2;
const size_t remainder = serverName.length() - 2;
if (remainder > 0) {
connectionState = CONNECTION_STATE_READINGS;
connectionState.store(CONNECTION_STATE_READINGS, std::memory_order_relaxed);
try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); });
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) {
Connection::handleTimeout(self, error);
});

// Read the remainder of proxy identification
asio::async_read(socket, asio::buffer(msg.getBuffer(), remainder), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parseProxyIdentification(error); });
asio::async_read(socket, asio::buffer(msg.getBuffer(), remainder), [self = shared_from_this()](const std::error_code &error, std::size_t N) {
self->parseProxyIdentification(error);
});
} catch (const std::system_error &e) {
g_logger().error("Connection::parseProxyIdentification] - error: {}", e.what());
g_logger().error("[Connection::parseProxyIdentification] - Error: {}", e.what());
close(FORCE_CLOSE);
}
return;
} else {
connectionState = CONNECTION_STATE_OPEN;
connectionState.store(CONNECTION_STATE_OPEN, std::memory_order_relaxed);
}
}
} else if (connectionState == CONNECTION_STATE_READINGS) {
size_t remainder = serverName.length() - 2;
} else if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_READINGS) {
const size_t remainder = serverName.length() - 2;
if (strncasecmp(charData, &serverName[2], remainder) == 0) {
connectionState = CONNECTION_STATE_OPEN;
connectionState.store(CONNECTION_STATE_OPEN, std::memory_order_relaxed);
} else {
g_logger().error("Connection::parseProxyIdentification] Invalid Client Login! Server Name mismatch!");
g_logger().error("[Connection::parseProxyIdentification] Invalid Client Login! Server Name mismatch!");
close(FORCE_CLOSE);
return;
}
Expand All @@ -175,7 +180,6 @@ void Connection::parseProxyIdentification(const std::error_code &error) {
}

void Connection::parseHeader(const std::error_code &error) {
std::scoped_lock lock(connectionLock);
readTimer.cancel();

if (error) {
Expand All @@ -184,11 +188,11 @@ void Connection::parseHeader(const std::error_code &error) {
}
close(FORCE_CLOSE);
return;
} else if (connectionState == CONNECTION_STATE_CLOSED) {
} else if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
return;
}

uint32_t timePassed = std::max<uint32_t>(1, (time(nullptr) - timeConnected) + 1);
const uint32_t timePassed = std::max<uint32_t>(1, (time(nullptr) - timeConnected) + 1);
if ((++packetsSent / timePassed) > static_cast<uint32_t>(g_configManager().getNumber(MAX_PACKETS_PER_SECOND, __FUNCTION__))) {
g_logger().warn("[Connection::parseHeader] - {} disconnected for exceeding packet per second limit.", convertIPToString(getIP()));
close();
Expand All @@ -200,31 +204,34 @@ void Connection::parseHeader(const std::error_code &error) {
packetsSent = 0;
}

uint16_t size = msg.getLengthHeader();
const uint16_t size = msg.getLengthHeader();
if (size == 0 || size > INPUTMESSAGE_MAXSIZE) {
close(FORCE_CLOSE);
return;
}

try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); });
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) {
Connection::handleTimeout(self, error);
});

// Read packet content
msg.setLength(size + HEADER_LENGTH);
// Read the remainder of proxy identification
asio::async_read(socket, asio::buffer(msg.getBodyBuffer(), size), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parsePacket(error); });

asio::async_read(socket, asio::buffer(msg.getBodyBuffer(), size), [self = shared_from_this()](const std::error_code &error, std::size_t N) {
self->parsePacket(error);
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::parseHeader] - error: {}", e.what());
close(FORCE_CLOSE);
}
}

void Connection::parsePacket(const std::error_code &error) {
std::scoped_lock lock(connectionLock);
readTimer.cancel();

if (error || connectionState == CONNECTION_STATE_CLOSED) {
if (error || connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
if (error) {
g_logger().error("[Connection::parsePacket] - Read error: {}", error.message());
}
Expand All @@ -233,21 +240,22 @@ void Connection::parsePacket(const std::error_code &error) {
}

bool skipReadingNextPacket = false;

if (!receivedFirst) {
// First message received
receivedFirst = true;

if (!protocol) {
// Check packet checksum
uint32_t checksum;
if (int32_t len = msg.getLength() - msg.getBufferPosition() - CHECKSUM_LENGTH;
if (const int32_t len = msg.getLength() - msg.getBufferPosition() - CHECKSUM_LENGTH;
len > 0) {
checksum = adlerChecksum(msg.getBuffer() + msg.getBufferPosition() + CHECKSUM_LENGTH, len);
} else {
checksum = 0;
}

uint32_t recvChecksum = msg.get<uint32_t>();
const auto recvChecksum = msg.get<uint32_t>();
if (recvChecksum != checksum) {
// it might not have been the checksum, step back
msg.skipBytes(-CHECKSUM_LENGTH);
Expand Down Expand Up @@ -275,11 +283,15 @@ void Connection::parsePacket(const std::error_code &error) {

try {
readTimer.expires_from_now(std::chrono::seconds(CONNECTION_READ_TIMEOUT));
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) { Connection::handleTimeout(self, error); });
readTimer.async_wait([self = std::weak_ptr<Connection>(shared_from_this())](const std::error_code &error) {
Connection::handleTimeout(self, error);
});

if (!skipReadingNextPacket) {
// Wait to the next packet
asio::async_read(socket, asio::buffer(msg.getBuffer(), HEADER_LENGTH), [self = shared_from_this()](const std::error_code &error, std::size_t N) { self->parseHeader(error); });
asio::async_read(socket, asio::buffer(msg.getBuffer(), HEADER_LENGTH), [self = shared_from_this()](const std::error_code &error, std::size_t N) {
self->parseHeader(error);
});
}
} catch (const std::system_error &e) {
g_logger().error("[Connection::parsePacket] - error: {}", e.what());
Expand All @@ -300,18 +312,24 @@ void Connection::resumeWork() {
}

void Connection::send(const OutputMessage_ptr &outputMessage) {
std::scoped_lock lock(connectionLock);
if (connectionState == CONNECTION_STATE_CLOSED) {
if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
return;
}

bool noPendingWrite = messageQueue.empty();
messageQueue.emplace_back(outputMessage);
// Enfileira a nova mensagem
messageQueue.enqueue(outputMessage);

// Simula a lógica de noPendingWrite: verifica se não há uma operação de escrita em andamento
bool expected = false;
if (writing.compare_exchange_strong(expected, true)) {
// Se writing foi atualizado para true, não havia uma operação pendente de escrita

if (noPendingWrite) {
if (socket.is_open()) {
try {
asio::post(socket.get_executor(), [self = shared_from_this()] { self->internalWorker(); });
// Inicia a operação de escrita
asio::post(socket.get_executor(), [self = shared_from_this()] {
self->internalWorker(); // Inicia o processamento da fila
});
} catch (const std::system_error &e) {
g_logger().error("[Connection::send] - Exception in posting write operation: {}", e.what());
close(FORCE_CLOSE);
Expand All @@ -324,36 +342,41 @@ void Connection::send(const OutputMessage_ptr &outputMessage) {
}

void Connection::internalWorker() {
std::unique_lock lock(connectionLock);
if (messageQueue.empty()) {
if (connectionState == CONNECTION_STATE_CLOSED) {
OutputMessage_ptr outputMessage = messageQueue.dequeue(); // Correto agora
if (!outputMessage) {
if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
closeSocket();
}
writing.store(false);
return;
}

const auto &outputMessage = messageQueue.front();
lock.unlock();
protocol->onSendMessage(outputMessage);
lock.lock();

internalSend(outputMessage);
try {
asio::post(socket.get_executor(), [self = shared_from_this(), outputMessage] { self->internalSend(outputMessage); });
} catch (std::system_error &e) {
g_logger().error("[Connection::internalWorker] - Exception: {}", e.what());
close(FORCE_CLOSE);
}
}

uint32_t Connection::getIP() {
std::scoped_lock lock(connectionLock);
uint32_t currentIP = ip.load(std::memory_order_relaxed);

if (ip == 1) {
if (currentIP == 1) {
std::error_code error;
asio::ip::tcp::endpoint endpoint = socket.remote_endpoint(error);
const asio::ip::tcp::endpoint endpoint = socket.remote_endpoint(error);
if (error) {
g_logger().error("[Connection::getIP] - Failed to get remote endpoint: {}", error.message());
ip = 0;
ip.store(0, std::memory_order_relaxed);
} else {
ip = htonl(endpoint.address().to_v4().to_uint());
currentIP = htonl(endpoint.address().to_v4().to_uint());
ip.store(currentIP, std::memory_order_relaxed);
}
}
return ip;

return ip.load(std::memory_order_relaxed);
}

void Connection::internalSend(const OutputMessage_ptr &outputMessage) {
Expand All @@ -369,35 +392,40 @@ void Connection::internalSend(const OutputMessage_ptr &outputMessage) {
}

void Connection::onWriteOperation(const std::error_code &error) {
std::unique_lock lock(connectionLock);
writeTimer.cancel();

if (error) {
g_logger().error("[Connection::onWriteOperation] - Write error: {}", error.message());
messageQueue.clear();
close(FORCE_CLOSE);
writing.store(false);
return;
}

messageQueue.pop_front();
OutputMessage_ptr outputMessage = messageQueue.dequeue();

if (!messageQueue.empty()) {
const auto &outputMessage = messageQueue.front();
lock.unlock();
protocol->onSendMessage(outputMessage);
lock.lock();
internalSend(outputMessage);
} else if (connectionState == CONNECTION_STATE_CLOSED) {
if (!outputMessage) {
writing.store(false);
return;
}

if (outputMessage) {
try {
asio::post(socket.get_executor(), [self = shared_from_this(), outputMessage] { self->internalSend(outputMessage); });
} catch (std::system_error &e) {
g_logger().error("[Connection::onWriteOperation] - error: {}", e.what());
close(FORCE_CLOSE);
}
} else if (connectionState.load(std::memory_order_relaxed) == CONNECTION_STATE_CLOSED) {
closeSocket();
}
}

void Connection::handleTimeout(ConnectionWeak_ptr connectionWeak, const std::error_code &error) {
void Connection::handleTimeout(const ConnectionWeak_ptr &connectionWeak, const std::error_code &error) {
if (error == asio::error::operation_aborted) {
return;
}

if (auto connection = connectionWeak.lock()) {
if (const auto connection = connectionWeak.lock()) {
if (!error) {
g_logger().debug("Connection Timeout, IP: {}", convertIPToString(connection->getIP()));
} else {
Expand Down
Loading

0 comments on commit 32cb762

Please sign in to comment.