Skip to content

Commit

Permalink
Schedule conquest tasks using task system
Browse files Browse the repository at this point in the history
  • Loading branch information
xkoredev committed Oct 20, 2023
1 parent b61c149 commit 9f8862c
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 78 deletions.
5 changes: 5 additions & 0 deletions src/common/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
#include <functional>
#include <string>

/**
* This class can be used to schedule either sql queries or tasks for work
* in a single thread pool.
* This can be used to obtain thread-safety without using locks.
*/
class Async
{
public:
Expand Down
129 changes: 75 additions & 54 deletions src/world/conquest_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,50 @@ along with this program. If not, see http://www.gnu.org/licenses/

#include "message_server.h"

/**
* ConquestSystem both handles messages from map servers and
* updates the database with the latest conquest data periodically.
*
* This class is guided by the following pattern:
* - All public methods that may modify the database are enqueued in the task system.
* - The task system processes tasks in its own thread
* - Private methods are not guarded via the task system, but are only called from
* public methods, so we can assume that they are always called from the task system.
*/
ConquestSystem::ConquestSystem()
: sql(std::make_unique<SqlConnection>())
{
submit([this]()
{ sql = std::make_unique<SqlConnection>(); });
}

bool ConquestSystem::handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port)
bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port)
{
const uint8 conquestMsgType = payload[1];
if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE)
{
// updateWeekConquest already goes through task system
updateWeekConquest();
return true;
}

if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS)
{
// const int32 points = ref<int32>(data, 2);
int32 points = 0;
uint32 nation = 0;
uint8 region = 0;
std::memcpy(&points, payload.data() + 2, sizeof(int32));
std::memcpy(&nation, payload.data() + 6, sizeof(uint32));
std::memcpy(&region, payload.data() + 10, sizeof(uint8));

// We update influence but do not immediately send this update to all map servers
// Influence updates are sent periodically via time_server instead.
// It is okay for map servers to be eventually consistent.
updateInfluencePoints(points, nation, (REGION_TYPE)region);
submit([this, payload]()
{
int32 points = 0;
uint32 nation = 0;
uint8 region = 0;
std::memcpy(&points, payload.data() + 2, sizeof(int32));
std::memcpy(&nation, payload.data() + 6, sizeof(uint32));
std::memcpy(&region, payload.data() + 10, sizeof(uint8));

// We update influence but do not immediately send this update to all map servers
// Influence updates are sent periodically via time_server instead.
// It is okay for map servers to be eventually consistent.
updateInfluencePoints(points, nation, (REGION_TYPE)region); });

return true;
}

Expand All @@ -62,8 +76,11 @@ bool ConquestSystem::handleMessage(std::vector<uint8> payload,
uint64 ipp = from_addr.s_addr;
ipp |= (((uint64)from_port) << 32);

// Send influence data to the requesting map server
sendInfluencesMsg(true, ipp);
submit([this, ipp]()
{
// Send influence data to the requesting map server
sendInfluencesMsg(true, ipp); });

return true;
}

Expand All @@ -74,6 +91,46 @@ bool ConquestSystem::handleMessage(std::vector<uint8> payload,
return false;
}

void ConquestSystem::updateWeekConquest()
{
submit([this]()
{
TracyZoneScoped;

// 1- Notify all zones that tally started
sendTallyStartMsg();

// 2- Do the actual db update
const char* Query = "UPDATE conquest_system SET region_control = \
IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \
sandoria_influence > beastmen_influence, 0, \
IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \
bastok_influence > beastmen_influence, 1, \
IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \
windurst_influence > beastmen_influence, 2, 3)));";

int ret = sql->Query(Query);
if (ret == SQL_ERROR)
{
ShowError("handleWeeklyUpdate() failed");
}

// 3- Send tally end Msg
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); });
}

void ConquestSystem::updateHourlyConquest()
{
submit([this]()
{ sendInfluencesMsg(true); });
}

void ConquestSystem::updateVanaHourlyConquest()
{
submit([this]()
{ sendInfluencesMsg(false); });
}

void ConquestSystem::sendTallyStartMsg()
{
// 1- Send message to all zones. We are starting update.
Expand Down Expand Up @@ -223,42 +280,6 @@ bool ConquestSystem::updateInfluencePoints(int points, unsigned int nation, REGI
return ret != SQL_ERROR;
}

void ConquestSystem::updateWeekConquest()
{
TracyZoneScoped;

// 1- Notify all zones that tally started
sendTallyStartMsg();

// 2- Do the actual db update
const char* Query = "UPDATE conquest_system SET region_control = \
IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \
sandoria_influence > beastmen_influence, 0, \
IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \
bastok_influence > beastmen_influence, 1, \
IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \
windurst_influence > beastmen_influence, 2, 3)));";

int ret = sql->Query(Query);
if (ret == SQL_ERROR)
{
ShowError("handleWeeklyUpdate() failed");
}

// 3- Send tally end Msg
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END);
}

void ConquestSystem::updateHourlyConquest()
{
sendInfluencesMsg(true);
}

void ConquestSystem::updateVanaHourlyConquest()
{
sendInfluencesMsg(false);
}

auto ConquestSystem::getRegionalInfluences() -> std::vector<influence_t> const
{
const char* Query = "SELECT sandoria_influence, bastok_influence, windurst_influence, beastmen_influence FROM conquest_system;";
Expand Down
8 changes: 3 additions & 5 deletions src/world/conquest_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ class ConquestSystem : public IMessageHandler

/**
* IMessageHandler implementation. Used to handle messages from message_server.
* NOTE: The copy of payload here is intentional, since these systems will eventually
* : be moved to their own threads.
*/
bool handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port) override;
bool handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port) override;

/**
* Called weekly, updates conquest data and sends regional control information
Expand Down
50 changes: 44 additions & 6 deletions src/world/message_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,56 @@ along with this program. If not, see http://www.gnu.org/licenses/

#include "common/mmo.h"

#include <task_system.hpp>
#ifdef WIN32
#include <winsock2.h>
#else
#include <netinet/in.h>
#endif

/**
* Class responsible for handling messages recieved from the message server.
*
* Implementations of this class are responsible for queueing any work
* that affects internal state / sql queries via the given task system, using
* the submit() method.
*
* This is done to ensure thread-safety outside of the message_server thread.
*/
class IMessageHandler
{
public:
IMessageHandler()
{
ts = new ts::task_system(1);
}

virtual ~IMessageHandler()
{
delete ts;
}

/*
* NOTE: The copy of payload here is intentional, since these systems will eventually
* : be moved to their own threads.
/**
* Handles messages recieved from the map servers.
*/
virtual bool handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port) = 0;
virtual bool handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port) = 0;

/**
* Submits work to be executed asynchronously by the task system.
* This work will happen serialy, on a single thread.
*/
void submit(std::function<void()> func)
{
// clang-format off
ts->schedule([func]()
{
func();
});
// clang-format on
}

private:
ts::task_system* ts;
};
24 changes: 15 additions & 9 deletions src/world/message_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ along with this program. If not, see http://www.gnu.org/licenses/
#include "common/logging.h"
#include "conquest_system.h"
#include "message_server.h"
#include "world_server.h"

struct chat_message_t
{
Expand All @@ -43,18 +44,20 @@ struct zone_settings_t
uint32 misc = 0;
};

using InternalHandler = std::function<void(std::vector<uint8>&&, in_addr, uint16)>;

namespace
{
zmq::context_t zContext;
std::unique_ptr<zmq::socket_t> zSocket;

moodycamel::ConcurrentQueue<chat_message_t> outgoing_queue;

std::unique_ptr<SqlConnection> sql;
std::unordered_map<REGIONALMSGTYPE, std::shared_ptr<IMessageHandler>> regionalMsgHandlers;
std::unordered_map<uint16, zone_settings_t> zoneSettingsMap;
std::vector<uint64> mapEndpoints;
std::vector<uint64> yellMapEndpoints;
std::unique_ptr<SqlConnection> sql;
std::unordered_map<REGIONALMSGTYPE, InternalHandler> regionalMsgHandlers;
std::unordered_map<uint16, zone_settings_t> zoneSettingsMap;
std::vector<uint64> mapEndpoints;
std::vector<uint64> yellMapEndpoints;
} // namespace

void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
Expand Down Expand Up @@ -241,7 +244,7 @@ void message_server_parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_
try
{
auto& handler = regionalMsgHandlers.at((REGIONALMSGTYPE)subType);
handler->handleMessage(bytes, from_ip, from_port);
handler(std::move(bytes), from_ip, from_port);
}
catch (const std::out_of_range& e)
{
Expand Down Expand Up @@ -359,15 +362,18 @@ void cache_zone_settings()
std::copy(yellMapEndpointSet.begin(), yellMapEndpointSet.end(), std::back_inserter(yellMapEndpoints));
}

void message_server_init(const bool& requestExit)
void message_server_init(WorldServer* worldServer, const bool& requestExit)
{
TracySetThreadName("Message Server (ZMQ)");

// Setup SQL
sql = std::make_unique<SqlConnection>();

// Handler map registrations
regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = std::make_shared<ConquestSystem>();
// Handler map registrations.
regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector<uint8>&& bytes, in_addr from_ip, uint16 from_port)
{
worldServer->conquestSystem->handleMessage(std::move(bytes), from_ip, from_port);
};

// Populate zoneSettingsCache with sql data
cache_zone_settings();
Expand Down
10 changes: 7 additions & 3 deletions src/world/message_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ along with this program. If not, see http://www.gnu.org/licenses/

#pragma once

#include "besieged_system.h"
#include "common/mmo.h"
#include "common/socket.h"
#include "common/sql.h"
Expand All @@ -31,16 +32,19 @@ along with this program. If not, see http://www.gnu.org/licenses/
#include <zmq.hpp>
#include <zmq_addon.hpp>

class WorldServer;

void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet = nullptr);
void queue_message_broadcast(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet = nullptr);

void message_server_init(const bool& requestExit);
void message_server_init(WorldServer* worldServer, const bool& requestExit);

void message_server_close();

struct message_server_wrapper_t
{
message_server_wrapper_t(const std::atomic_bool& requestExit)
: m_thread(std::make_unique<nonstd::jthread>(std::bind(message_server_init, std::ref(requestExit))))
message_server_wrapper_t(WorldServer* worldServer, const std::atomic_bool& requestExit)
: m_thread(std::make_unique<nonstd::jthread>(std::bind(message_server_init, worldServer, std::ref(requestExit))))
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/world/world_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ WorldServer::WorldServer(int argc, char** argv)
: Application("world", argc, argv)
, sql(std::make_unique<SqlConnection>())
, httpServer(std::make_unique<HTTPServer>())
, messageServer(std::make_unique<message_server_wrapper_t>(std::ref(m_RequestExit)))
, messageServer(std::make_unique<message_server_wrapper_t>(this, std::ref(m_RequestExit)))
, conquestSystem(std::make_unique<ConquestSystem>())
, besiegedSystem(std::make_unique<BesiegedSystem>())
, campaignSystem(std::make_unique<CampaignSystem>())
Expand Down

0 comments on commit 9f8862c

Please sign in to comment.