From 9f8862c8d9d01d382d9d6b79878b89c8c2c64032 Mon Sep 17 00:00:00 2001 From: Kore Date: Wed, 18 Oct 2023 15:18:55 -0700 Subject: [PATCH] Schedule conquest tasks using task system --- src/common/async.h | 5 ++ src/world/conquest_system.cpp | 129 ++++++++++++++++++++-------------- src/world/conquest_system.h | 8 +-- src/world/message_handler.h | 50 +++++++++++-- src/world/message_server.cpp | 24 ++++--- src/world/message_server.h | 10 ++- src/world/world_server.cpp | 2 +- 7 files changed, 150 insertions(+), 78 deletions(-) diff --git a/src/common/async.h b/src/common/async.h index 9759b8c8b44..65a6f4f6507 100644 --- a/src/common/async.h +++ b/src/common/async.h @@ -27,6 +27,11 @@ #include #include +/** + * This class can be used to schedule either sql queries or tasks for work + * in a single thread pool. + * This can be used to obtain thread-safety without using locks. + */ class Async { public: diff --git a/src/world/conquest_system.cpp b/src/world/conquest_system.cpp index 2ad8f4c0c9a..d0d3efcc8af 100644 --- a/src/world/conquest_system.cpp +++ b/src/world/conquest_system.cpp @@ -23,36 +23,50 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "message_server.h" +/** + * ConquestSystem both handles messages from map servers and + * updates the database with the latest conquest data periodically. + * + * This class is guided by the following pattern: + * - All public methods that may modify the database are enqueued in the task system. + * - The task system processes tasks in its own thread + * - Private methods are not guarded via the task system, but are only called from + * public methods, so we can assume that they are always called from the task system. + */ ConquestSystem::ConquestSystem() -: sql(std::make_unique()) { + submit([this]() + { sql = std::make_unique(); }); } -bool ConquestSystem::handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) +bool ConquestSystem::handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) { const uint8 conquestMsgType = payload[1]; if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE) { + // updateWeekConquest already goes through task system updateWeekConquest(); return true; } if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS) { - // const int32 points = ref(data, 2); - int32 points = 0; - uint32 nation = 0; - uint8 region = 0; - std::memcpy(&points, payload.data() + 2, sizeof(int32)); - std::memcpy(&nation, payload.data() + 6, sizeof(uint32)); - std::memcpy(®ion, payload.data() + 10, sizeof(uint8)); - - // We update influence but do not immediately send this update to all map servers - // Influence updates are sent periodically via time_server instead. - // It is okay for map servers to be eventually consistent. - updateInfluencePoints(points, nation, (REGION_TYPE)region); + submit([this, payload]() + { + int32 points = 0; + uint32 nation = 0; + uint8 region = 0; + std::memcpy(&points, payload.data() + 2, sizeof(int32)); + std::memcpy(&nation, payload.data() + 6, sizeof(uint32)); + std::memcpy(®ion, payload.data() + 10, sizeof(uint8)); + + // We update influence but do not immediately send this update to all map servers + // Influence updates are sent periodically via time_server instead. + // It is okay for map servers to be eventually consistent. + updateInfluencePoints(points, nation, (REGION_TYPE)region); }); + return true; } @@ -62,8 +76,11 @@ bool ConquestSystem::handleMessage(std::vector payload, uint64 ipp = from_addr.s_addr; ipp |= (((uint64)from_port) << 32); - // Send influence data to the requesting map server - sendInfluencesMsg(true, ipp); + submit([this, ipp]() + { + // Send influence data to the requesting map server + sendInfluencesMsg(true, ipp); }); + return true; } @@ -74,6 +91,46 @@ bool ConquestSystem::handleMessage(std::vector payload, return false; } +void ConquestSystem::updateWeekConquest() +{ + submit([this]() + { + TracyZoneScoped; + + // 1- Notify all zones that tally started + sendTallyStartMsg(); + + // 2- Do the actual db update + const char* Query = "UPDATE conquest_system SET region_control = \ + IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \ + sandoria_influence > beastmen_influence, 0, \ + IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \ + bastok_influence > beastmen_influence, 1, \ + IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \ + windurst_influence > beastmen_influence, 2, 3)));"; + + int ret = sql->Query(Query); + if (ret == SQL_ERROR) + { + ShowError("handleWeeklyUpdate() failed"); + } + + // 3- Send tally end Msg + sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); }); +} + +void ConquestSystem::updateHourlyConquest() +{ + submit([this]() + { sendInfluencesMsg(true); }); +} + +void ConquestSystem::updateVanaHourlyConquest() +{ + submit([this]() + { sendInfluencesMsg(false); }); +} + void ConquestSystem::sendTallyStartMsg() { // 1- Send message to all zones. We are starting update. @@ -223,42 +280,6 @@ bool ConquestSystem::updateInfluencePoints(int points, unsigned int nation, REGI return ret != SQL_ERROR; } -void ConquestSystem::updateWeekConquest() -{ - TracyZoneScoped; - - // 1- Notify all zones that tally started - sendTallyStartMsg(); - - // 2- Do the actual db update - const char* Query = "UPDATE conquest_system SET region_control = \ - IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \ - sandoria_influence > beastmen_influence, 0, \ - IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \ - bastok_influence > beastmen_influence, 1, \ - IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \ - windurst_influence > beastmen_influence, 2, 3)));"; - - int ret = sql->Query(Query); - if (ret == SQL_ERROR) - { - ShowError("handleWeeklyUpdate() failed"); - } - - // 3- Send tally end Msg - sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); -} - -void ConquestSystem::updateHourlyConquest() -{ - sendInfluencesMsg(true); -} - -void ConquestSystem::updateVanaHourlyConquest() -{ - sendInfluencesMsg(false); -} - auto ConquestSystem::getRegionalInfluences() -> std::vector const { const char* Query = "SELECT sandoria_influence, bastok_influence, windurst_influence, beastmen_influence FROM conquest_system;"; diff --git a/src/world/conquest_system.h b/src/world/conquest_system.h index 4570b985023..1aec2d07af2 100644 --- a/src/world/conquest_system.h +++ b/src/world/conquest_system.h @@ -38,12 +38,10 @@ class ConquestSystem : public IMessageHandler /** * IMessageHandler implementation. Used to handle messages from message_server. - * NOTE: The copy of payload here is intentional, since these systems will eventually - * : be moved to their own threads. */ - bool handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) override; + bool handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) override; /** * Called weekly, updates conquest data and sends regional control information diff --git a/src/world/message_handler.h b/src/world/message_handler.h index a257f028b20..f92c9ba3c3b 100644 --- a/src/world/message_handler.h +++ b/src/world/message_handler.h @@ -23,18 +23,56 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "common/mmo.h" +#include +#ifdef WIN32 +#include +#else +#include +#endif + +/** + * Class responsible for handling messages recieved from the message server. + * + * Implementations of this class are responsible for queueing any work + * that affects internal state / sql queries via the given task system, using + * the submit() method. + * + * This is done to ensure thread-safety outside of the message_server thread. + */ class IMessageHandler { public: + IMessageHandler() + { + ts = new ts::task_system(1); + } + virtual ~IMessageHandler() { + delete ts; } - /* - * NOTE: The copy of payload here is intentional, since these systems will eventually - * : be moved to their own threads. + /** + * Handles messages recieved from the map servers. */ - virtual bool handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) = 0; + virtual bool handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) = 0; + + /** + * Submits work to be executed asynchronously by the task system. + * This work will happen serialy, on a single thread. + */ + void submit(std::function func) + { + // clang-format off + ts->schedule([func]() + { + func(); + }); + // clang-format on + } + +private: + ts::task_system* ts; }; diff --git a/src/world/message_server.cpp b/src/world/message_server.cpp index fc598d5c10d..9a5cdaa58f1 100644 --- a/src/world/message_server.cpp +++ b/src/world/message_server.cpp @@ -27,6 +27,7 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "common/logging.h" #include "conquest_system.h" #include "message_server.h" +#include "world_server.h" struct chat_message_t { @@ -43,6 +44,8 @@ struct zone_settings_t uint32 misc = 0; }; +using InternalHandler = std::function&&, in_addr, uint16)>; + namespace { zmq::context_t zContext; @@ -50,11 +53,11 @@ namespace moodycamel::ConcurrentQueue outgoing_queue; - std::unique_ptr sql; - std::unordered_map> regionalMsgHandlers; - std::unordered_map zoneSettingsMap; - std::vector mapEndpoints; - std::vector yellMapEndpoints; + std::unique_ptr sql; + std::unordered_map regionalMsgHandlers; + std::unordered_map zoneSettingsMap; + std::vector mapEndpoints; + std::vector yellMapEndpoints; } // namespace void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet) @@ -241,7 +244,7 @@ void message_server_parse(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_ try { auto& handler = regionalMsgHandlers.at((REGIONALMSGTYPE)subType); - handler->handleMessage(bytes, from_ip, from_port); + handler(std::move(bytes), from_ip, from_port); } catch (const std::out_of_range& e) { @@ -359,15 +362,18 @@ void cache_zone_settings() std::copy(yellMapEndpointSet.begin(), yellMapEndpointSet.end(), std::back_inserter(yellMapEndpoints)); } -void message_server_init(const bool& requestExit) +void message_server_init(WorldServer* worldServer, const bool& requestExit) { TracySetThreadName("Message Server (ZMQ)"); // Setup SQL sql = std::make_unique(); - // Handler map registrations - regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = std::make_shared(); + // Handler map registrations. + regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector&& bytes, in_addr from_ip, uint16 from_port) + { + worldServer->conquestSystem->handleMessage(std::move(bytes), from_ip, from_port); + }; // Populate zoneSettingsCache with sql data cache_zone_settings(); diff --git a/src/world/message_server.h b/src/world/message_server.h index 4fbf320fda8..025254c2c4e 100644 --- a/src/world/message_server.h +++ b/src/world/message_server.h @@ -21,6 +21,7 @@ along with this program. If not, see http://www.gnu.org/licenses/ #pragma once +#include "besieged_system.h" #include "common/mmo.h" #include "common/socket.h" #include "common/sql.h" @@ -31,16 +32,19 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include #include +class WorldServer; + void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet = nullptr); void queue_message_broadcast(MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet = nullptr); -void message_server_init(const bool& requestExit); +void message_server_init(WorldServer* worldServer, const bool& requestExit); + void message_server_close(); struct message_server_wrapper_t { - message_server_wrapper_t(const std::atomic_bool& requestExit) - : m_thread(std::make_unique(std::bind(message_server_init, std::ref(requestExit)))) + message_server_wrapper_t(WorldServer* worldServer, const std::atomic_bool& requestExit) + : m_thread(std::make_unique(std::bind(message_server_init, worldServer, std::ref(requestExit)))) { } diff --git a/src/world/world_server.cpp b/src/world/world_server.cpp index 35b2217f6e4..146045a364b 100644 --- a/src/world/world_server.cpp +++ b/src/world/world_server.cpp @@ -29,7 +29,7 @@ WorldServer::WorldServer(int argc, char** argv) : Application("world", argc, argv) , sql(std::make_unique()) , httpServer(std::make_unique()) -, messageServer(std::make_unique(std::ref(m_RequestExit))) +, messageServer(std::make_unique(this, std::ref(m_RequestExit))) , conquestSystem(std::make_unique()) , besiegedSystem(std::make_unique()) , campaignSystem(std::make_unique())