diff --git a/src/common/async.h b/src/common/async.h index 9759b8c8b44..65a6f4f6507 100644 --- a/src/common/async.h +++ b/src/common/async.h @@ -27,6 +27,11 @@ #include #include +/** + * This class can be used to schedule either sql queries or tasks for work + * in a single thread pool. + * This can be used to obtain thread-safety without using locks. + */ class Async { public: diff --git a/src/world/conquest_system.cpp b/src/world/conquest_system.cpp index dd6847c3be2..d0d3efcc8af 100644 --- a/src/world/conquest_system.cpp +++ b/src/world/conquest_system.cpp @@ -23,50 +23,38 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "message_server.h" -namespace { - // Lockless queue used to perform any internal state changes. - // This is necessary due to how the class performs operations both on - // the time_server the message_server threads. - moodycamel::ConcurrentQueue> action_queue; -} - -void process_conquest_actions(const bool& requestExit); - /** * ConquestSystem both handles messages from map servers and * updates the database with the latest conquest data periodically. - * + * * This class is guided by the following pattern: - * - All public methods that may modify the database are enqueued in the action queue. - * - The action queue is processed in its own thread - * - Private methods are not guarded via the action queue, but are only called from - * public methods, so we can assume that they are always called from the action queue. -*/ -ConquestSystem::ConquestSystem(const std::atomic_bool& requestExit) : -actionQueueThread(std::make_unique(std::bind(process_conquest_actions, std::ref(requestExit)))) + * - All public methods that may modify the database are enqueued in the task system. + * - The task system processes tasks in its own thread + * - Private methods are not guarded via the task system, but are only called from + * public methods, so we can assume that they are always called from the task system. + */ +ConquestSystem::ConquestSystem() { - action_queue.enqueue([this]() - { - sql = std::make_unique(); - }); + submit([this]() + { sql = std::make_unique(); }); } -bool ConquestSystem::handleMessage(const std::vector& payload, - in_addr from_addr, - uint16 from_port) +bool ConquestSystem::handleMessage(const std::vector& payload, + in_addr from_addr, + uint16 from_port) { const uint8 conquestMsgType = payload[1]; if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE) { - // updateWeekConquest already goes through action queue - updateWeekConquest(); + // updateWeekConquest already goes through task system + updateWeekConquest(); return true; } if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS) { - action_queue.enqueue([this, payload]() - { + submit([this, payload]() + { int32 points = 0; uint32 nation = 0; uint8 region = 0; @@ -77,23 +65,21 @@ bool ConquestSystem::handleMessage(const std::vector& payload, // We update influence but do not immediately send this update to all map servers // Influence updates are sent periodically via time_server instead. // It is okay for map servers to be eventually consistent. - updateInfluencePoints(points, nation, (REGION_TYPE)region); - }); + updateInfluencePoints(points, nation, (REGION_TYPE)region); }); return true; } if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_CONQUEST_UPDATE) { - action_queue.enqueue([this, payload, from_addr, from_port]() - { - // Convert from_addr to ip + port - uint64 ipp = from_addr.s_addr; - ipp |= (((uint64)from_port) << 32); + // Convert from_addr to ip + port + uint64 ipp = from_addr.s_addr; + ipp |= (((uint64)from_port) << 32); + submit([this, ipp]() + { // Send influence data to the requesting map server - sendInfluencesMsg(true, ipp); - }); + sendInfluencesMsg(true, ipp); }); return true; } @@ -107,7 +93,8 @@ bool ConquestSystem::handleMessage(const std::vector& payload, void ConquestSystem::updateWeekConquest() { - action_queue.enqueue([this]() { + submit([this]() + { TracyZoneScoped; // 1- Notify all zones that tally started @@ -129,22 +116,19 @@ void ConquestSystem::updateWeekConquest() } // 3- Send tally end Msg - sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); - }); + sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); }); } void ConquestSystem::updateHourlyConquest() { - action_queue.enqueue([this]() { - sendInfluencesMsg(true); - }); + submit([this]() + { sendInfluencesMsg(true); }); } void ConquestSystem::updateVanaHourlyConquest() { - action_queue.enqueue([this]() { - sendInfluencesMsg(false); - }); + submit([this]() + { sendInfluencesMsg(false); }); } void ConquestSystem::sendTallyStartMsg() @@ -339,19 +323,3 @@ auto ConquestSystem::getRegionControls() -> std::vector const return controllers; } - -void process_conquest_actions(const bool& requestExit) -{ - while (!requestExit) - { - std::function action; - if (action_queue.try_dequeue(action)) - { - action(); - } - else - { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - } -} \ No newline at end of file diff --git a/src/world/conquest_system.h b/src/world/conquest_system.h index 0107bd59b55..1aec2d07af2 100644 --- a/src/world/conquest_system.h +++ b/src/world/conquest_system.h @@ -21,9 +21,6 @@ along with this program. If not, see http://www.gnu.org/licenses/ #pragma once -#include -#include - #include "common/sql.h" #include "map/conquest_system.h" #include "map/zone.h" @@ -36,7 +33,7 @@ along with this program. If not, see http://www.gnu.org/licenses/ class ConquestSystem : public IMessageHandler { public: - ConquestSystem(const std::atomic_bool& requestExit); + ConquestSystem(); ~ConquestSystem() override = default; /** @@ -67,9 +64,6 @@ class ConquestSystem : public IMessageHandler private: std::unique_ptr sql; - // Thread used for the action queue operations - std::unique_ptr actionQueueThread; - bool updateInfluencePoints(int points, unsigned int nation, REGION_TYPE region); auto getRegionalInfluences() -> std::vector const; diff --git a/src/world/message_handler.h b/src/world/message_handler.h index 48b69f0ce9d..f92c9ba3c3b 100644 --- a/src/world/message_handler.h +++ b/src/world/message_handler.h @@ -23,14 +23,56 @@ along with this program. If not, see http://www.gnu.org/licenses/ #include "common/mmo.h" +#include +#ifdef WIN32 +#include +#else +#include +#endif + +/** + * Class responsible for handling messages recieved from the message server. + * + * Implementations of this class are responsible for queueing any work + * that affects internal state / sql queries via the given task system, using + * the submit() method. + * + * This is done to ensure thread-safety outside of the message_server thread. + */ class IMessageHandler { public: + IMessageHandler() + { + ts = new ts::task_system(1); + } + virtual ~IMessageHandler() { + delete ts; } + /** + * Handles messages recieved from the map servers. + */ virtual bool handleMessage(const std::vector& payload, in_addr from_addr, uint16 from_port) = 0; + + /** + * Submits work to be executed asynchronously by the task system. + * This work will happen serialy, on a single thread. + */ + void submit(std::function func) + { + // clang-format off + ts->schedule([func]() + { + func(); + }); + // clang-format on + } + +private: + ts::task_system* ts; }; diff --git a/src/world/message_server.cpp b/src/world/message_server.cpp index 87dc9e6cd7b..9a5cdaa58f1 100644 --- a/src/world/message_server.cpp +++ b/src/world/message_server.cpp @@ -53,11 +53,11 @@ namespace moodycamel::ConcurrentQueue outgoing_queue; - std::unique_ptr sql; - std::unordered_map regionalMsgHandlers; - std::unordered_map zoneSettingsMap; - std::vector mapEndpoints; - std::vector yellMapEndpoints; + std::unique_ptr sql; + std::unordered_map regionalMsgHandlers; + std::unordered_map zoneSettingsMap; + std::vector mapEndpoints; + std::vector yellMapEndpoints; } // namespace void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet) @@ -370,7 +370,8 @@ void message_server_init(WorldServer* worldServer, const bool& requestExit) sql = std::make_unique(); // Handler map registrations. - regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector&& bytes, in_addr from_ip, uint16 from_port) { + regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector&& bytes, in_addr from_ip, uint16 from_port) + { worldServer->conquestSystem->handleMessage(std::move(bytes), from_ip, from_port); }; diff --git a/src/world/message_server.h b/src/world/message_server.h index 1e5ddda4dd1..025254c2c4e 100644 --- a/src/world/message_server.h +++ b/src/world/message_server.h @@ -21,10 +21,10 @@ along with this program. If not, see http://www.gnu.org/licenses/ #pragma once +#include "besieged_system.h" #include "common/mmo.h" #include "common/socket.h" #include "common/sql.h" -#include "besieged_system.h" #include "conquest_system.h" #include "message_handler.h" diff --git a/src/world/world_server.cpp b/src/world/world_server.cpp index a731d50110f..146045a364b 100644 --- a/src/world/world_server.cpp +++ b/src/world/world_server.cpp @@ -30,8 +30,8 @@ WorldServer::WorldServer(int argc, char** argv) , sql(std::make_unique()) , httpServer(std::make_unique()) , messageServer(std::make_unique(this, std::ref(m_RequestExit))) -, conquestSystem(std::make_unique(std::ref(m_RequestExit))) -, besiegedSystem(std::make_unique(std::ref(m_RequestExit))) +, conquestSystem(std::make_unique()) +, besiegedSystem(std::make_unique()) , campaignSystem(std::make_unique()) , colonizationSystem(std::make_unique()) {