From 1245e1fd1fd549e76a4f867f68cad8fab32ed90b Mon Sep 17 00:00:00 2001 From: Kore Date: Thu, 19 Oct 2023 01:01:12 -0700 Subject: [PATCH] Use concurrent queues to schedule conquest tasks --- src/world/conquest_system.cpp | 171 ++++++++++++++++++++++------------ src/world/conquest_system.h | 16 ++-- src/world/message_handler.h | 10 +- src/world/world_server.cpp | 4 +- 4 files changed, 127 insertions(+), 74 deletions(-) diff --git a/src/world/conquest_system.cpp b/src/world/conquest_system.cpp index 2ad8f4c0c9a..dd6847c3be2 100644 --- a/src/world/conquest_system.cpp +++ b/src/world/conquest_system.cpp @@ -23,47 +23,78 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "message_server.h" -ConquestSystem::ConquestSystem() -: sql(std::make_unique()) +namespace { + // Lockless queue used to perform any internal state changes. + // This is necessary due to how the class performs operations both on + // the time_server the message_server threads. + moodycamel::ConcurrentQueue> action_queue; +} + +void process_conquest_actions(const bool& requestExit); + +/** + * ConquestSystem both handles messages from map servers and + * updates the database with the latest conquest data periodically. + * + * This class is guided by the following pattern: + * - All public methods that may modify the database are enqueued in the action queue. + * - The action queue is processed in its own thread + * - Private methods are not guarded via the action queue, but are only called from + * public methods, so we can assume that they are always called from the action queue. +*/ +ConquestSystem::ConquestSystem(const std::atomic_bool& requestExit) : +actionQueueThread(std::make_unique(std::bind(process_conquest_actions, std::ref(requestExit)))) { + action_queue.enqueue([this]() + { + sql = std::make_unique(); + }); } -bool ConquestSystem::handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) +bool ConquestSystem::handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) { const uint8 conquestMsgType = payload[1]; if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE) { - updateWeekConquest(); + // updateWeekConquest already goes through action queue + updateWeekConquest(); return true; } if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS) { - // const int32 points = ref(data, 2); - int32 points = 0; - uint32 nation = 0; - uint8 region = 0; - std::memcpy(&points, payload.data() + 2, sizeof(int32)); - std::memcpy(&nation, payload.data() + 6, sizeof(uint32)); - std::memcpy(®ion, payload.data() + 10, sizeof(uint8)); - - // We update influence but do not immediately send this update to all map servers - // Influence updates are sent periodically via time_server instead. - // It is okay for map servers to be eventually consistent. - updateInfluencePoints(points, nation, (REGION_TYPE)region); + action_queue.enqueue([this, payload]() + { + int32 points = 0; + uint32 nation = 0; + uint8 region = 0; + std::memcpy(&points, payload.data() + 2, sizeof(int32)); + std::memcpy(&nation, payload.data() + 6, sizeof(uint32)); + std::memcpy(®ion, payload.data() + 10, sizeof(uint8)); + + // We update influence but do not immediately send this update to all map servers + // Influence updates are sent periodically via time_server instead. + // It is okay for map servers to be eventually consistent. + updateInfluencePoints(points, nation, (REGION_TYPE)region); + }); + return true; } if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_CONQUEST_UPDATE) { - // Convert from_addr to ip + port - uint64 ipp = from_addr.s_addr; - ipp |= (((uint64)from_port) << 32); + action_queue.enqueue([this, payload, from_addr, from_port]() + { + // Convert from_addr to ip + port + uint64 ipp = from_addr.s_addr; + ipp |= (((uint64)from_port) << 32); + + // Send influence data to the requesting map server + sendInfluencesMsg(true, ipp); + }); - // Send influence data to the requesting map server - sendInfluencesMsg(true, ipp); return true; } @@ -74,6 +105,48 @@ bool ConquestSystem::handleMessage(std::vector payload, return false; } +void ConquestSystem::updateWeekConquest() +{ + action_queue.enqueue([this]() { + TracyZoneScoped; + + // 1- Notify all zones that tally started + sendTallyStartMsg(); + + // 2- Do the actual db update + const char* Query = "UPDATE conquest_system SET region_control = \ + IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \ + sandoria_influence > beastmen_influence, 0, \ + IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \ + bastok_influence > beastmen_influence, 1, \ + IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \ + windurst_influence > beastmen_influence, 2, 3)));"; + + int ret = sql->Query(Query); + if (ret == SQL_ERROR) + { + ShowError("handleWeeklyUpdate() failed"); + } + + // 3- Send tally end Msg + sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); + }); +} + +void ConquestSystem::updateHourlyConquest() +{ + action_queue.enqueue([this]() { + sendInfluencesMsg(true); + }); +} + +void ConquestSystem::updateVanaHourlyConquest() +{ + action_queue.enqueue([this]() { + sendInfluencesMsg(false); + }); +} + void ConquestSystem::sendTallyStartMsg() { // 1- Send message to all zones. We are starting update. @@ -223,42 +296,6 @@ bool ConquestSystem::updateInfluencePoints(int points, unsigned int nation, REGI return ret != SQL_ERROR; } -void ConquestSystem::updateWeekConquest() -{ - TracyZoneScoped; - - // 1- Notify all zones that tally started - sendTallyStartMsg(); - - // 2- Do the actual db update - const char* Query = "UPDATE conquest_system SET region_control = \ - IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \ - sandoria_influence > beastmen_influence, 0, \ - IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \ - bastok_influence > beastmen_influence, 1, \ - IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \ - windurst_influence > beastmen_influence, 2, 3)));"; - - int ret = sql->Query(Query); - if (ret == SQL_ERROR) - { - ShowError("handleWeeklyUpdate() failed"); - } - - // 3- Send tally end Msg - sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); -} - -void ConquestSystem::updateHourlyConquest() -{ - sendInfluencesMsg(true); -} - -void ConquestSystem::updateVanaHourlyConquest() -{ - sendInfluencesMsg(false); -} - auto ConquestSystem::getRegionalInfluences() -> std::vector const { const char* Query = "SELECT sandoria_influence, bastok_influence, windurst_influence, beastmen_influence FROM conquest_system;"; @@ -302,3 +339,19 @@ auto ConquestSystem::getRegionControls() -> std::vector const return controllers; } + +void process_conquest_actions(const bool& requestExit) +{ + while (!requestExit) + { + std::function action; + if (action_queue.try_dequeue(action)) + { + action(); + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + } +} \ No newline at end of file diff --git a/src/world/conquest_system.h b/src/world/conquest_system.h index 4570b985023..0107bd59b55 100644 --- a/src/world/conquest_system.h +++ b/src/world/conquest_system.h @@ -21,6 +21,9 @@ along with this program. If not, see http://www.gnu.org/licenses/ #pragma once +#include +#include + #include "common/sql.h" #include "map/conquest_system.h" #include "map/zone.h" @@ -33,17 +36,15 @@ along with this program. If not, see http://www.gnu.org/licenses/ class ConquestSystem : public IMessageHandler { public: - ConquestSystem(); + ConquestSystem(const std::atomic_bool& requestExit); ~ConquestSystem() override = default; /** * IMessageHandler implementation. Used to handle messages from message_server. - * NOTE: The copy of payload here is intentional, since these systems will eventually - * : be moved to their own threads. */ - bool handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) override; + bool handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) override; /** * Called weekly, updates conquest data and sends regional control information @@ -66,6 +67,9 @@ class ConquestSystem : public IMessageHandler private: std::unique_ptr sql; + // Thread used for the action queue operations + std::unique_ptr actionQueueThread; + bool updateInfluencePoints(int points, unsigned int nation, REGION_TYPE region); auto getRegionalInfluences() -> std::vector const; diff --git a/src/world/message_handler.h b/src/world/message_handler.h index a257f028b20..48b69f0ce9d 100644 --- a/src/world/message_handler.h +++ b/src/world/message_handler.h @@ -30,11 +30,7 @@ class IMessageHandler { } - /* - * NOTE: The copy of payload here is intentional, since these systems will eventually - * : be moved to their own threads. - */ - virtual bool handleMessage(std::vector payload, - in_addr from_addr, - uint16 from_port) = 0; + virtual bool handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) = 0; }; diff --git a/src/world/world_server.cpp b/src/world/world_server.cpp index 146045a364b..a731d50110f 100644 --- a/src/world/world_server.cpp +++ b/src/world/world_server.cpp @@ -30,8 +30,8 @@ WorldServer::WorldServer(int argc, char** argv) , sql(std::make_unique()) , httpServer(std::make_unique()) , messageServer(std::make_unique(this, std::ref(m_RequestExit))) -, conquestSystem(std::make_unique()) -, besiegedSystem(std::make_unique()) +, conquestSystem(std::make_unique(std::ref(m_RequestExit))) +, besiegedSystem(std::make_unique(std::ref(m_RequestExit))) , campaignSystem(std::make_unique()) , colonizationSystem(std::make_unique()) {