Skip to content

Commit

Permalink
Remove concurrent queues and use task system instead
Browse files Browse the repository at this point in the history
  • Loading branch information
xkoredev committed Oct 20, 2023
1 parent 1245e1f commit e2cb738
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 78 deletions.
5 changes: 5 additions & 0 deletions src/common/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
#include <functional>
#include <string>

/**
* This class can be used to schedule either sql queries or tasks for work
* in a single thread pool.
* This can be used to obtain thread-safety without using locks.
*/
class Async
{
public:
Expand Down
92 changes: 30 additions & 62 deletions src/world/conquest_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,38 @@ along with this program. If not, see http://www.gnu.org/licenses/

#include "message_server.h"

namespace {
// Lockless queue used to perform any internal state changes.
// This is necessary due to how the class performs operations both on
// the time_server the message_server threads.
moodycamel::ConcurrentQueue<std::function<void()>> action_queue;
}

void process_conquest_actions(const bool& requestExit);

/**
* ConquestSystem both handles messages from map servers and
* updates the database with the latest conquest data periodically.
*
*
* This class is guided by the following pattern:
* - All public methods that may modify the database are enqueued in the action queue.
* - The action queue is processed in its own thread
* - Private methods are not guarded via the action queue, but are only called from
* public methods, so we can assume that they are always called from the action queue.
*/
ConquestSystem::ConquestSystem(const std::atomic_bool& requestExit) :
actionQueueThread(std::make_unique<nonstd::jthread>(std::bind(process_conquest_actions, std::ref(requestExit))))
* - All public methods that may modify the database are enqueued in the task system.
* - The task system processes tasks in its own thread
* - Private methods are not guarded via the task system, but are only called from
* public methods, so we can assume that they are always called from the task system.
*/
ConquestSystem::ConquestSystem()
{
action_queue.enqueue([this]()
{
sql = std::make_unique<SqlConnection>();
});
submit([this]()
{ sql = std::make_unique<SqlConnection>(); });
}

bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port)
bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port)
{
const uint8 conquestMsgType = payload[1];
if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE)
{
// updateWeekConquest already goes through action queue
updateWeekConquest();
// updateWeekConquest already goes through task system
updateWeekConquest();
return true;
}

if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS)
{
action_queue.enqueue([this, payload]()
{
submit([this, payload]()
{
int32 points = 0;
uint32 nation = 0;
uint8 region = 0;
Expand All @@ -77,23 +65,21 @@ bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,
// We update influence but do not immediately send this update to all map servers
// Influence updates are sent periodically via time_server instead.
// It is okay for map servers to be eventually consistent.
updateInfluencePoints(points, nation, (REGION_TYPE)region);
});
updateInfluencePoints(points, nation, (REGION_TYPE)region); });

return true;
}

if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_CONQUEST_UPDATE)
{
action_queue.enqueue([this, payload, from_addr, from_port]()
{
// Convert from_addr to ip + port
uint64 ipp = from_addr.s_addr;
ipp |= (((uint64)from_port) << 32);
// Convert from_addr to ip + port
uint64 ipp = from_addr.s_addr;
ipp |= (((uint64)from_port) << 32);

submit([this, ipp]()
{
// Send influence data to the requesting map server
sendInfluencesMsg(true, ipp);
});
sendInfluencesMsg(true, ipp); });

return true;
}
Expand All @@ -107,7 +93,8 @@ bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,

void ConquestSystem::updateWeekConquest()
{
action_queue.enqueue([this]() {
submit([this]()
{
TracyZoneScoped;

// 1- Notify all zones that tally started
Expand All @@ -129,22 +116,19 @@ void ConquestSystem::updateWeekConquest()
}

// 3- Send tally end Msg
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END);
});
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END); });
}

void ConquestSystem::updateHourlyConquest()
{
action_queue.enqueue([this]() {
sendInfluencesMsg(true);
});
submit([this]()
{ sendInfluencesMsg(true); });
}

void ConquestSystem::updateVanaHourlyConquest()
{
action_queue.enqueue([this]() {
sendInfluencesMsg(false);
});
submit([this]()
{ sendInfluencesMsg(false); });
}

void ConquestSystem::sendTallyStartMsg()
Expand Down Expand Up @@ -339,19 +323,3 @@ auto ConquestSystem::getRegionControls() -> std::vector<region_control_t> const

return controllers;
}

void process_conquest_actions(const bool& requestExit)
{
while (!requestExit)
{
std::function<void()> action;
if (action_queue.try_dequeue(action))
{
action();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
8 changes: 1 addition & 7 deletions src/world/conquest_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ along with this program. If not, see http://www.gnu.org/licenses/

#pragma once

#include <concurrentqueue.h>
#include <nonstd/jthread.hpp>

#include "common/sql.h"
#include "map/conquest_system.h"
#include "map/zone.h"
Expand All @@ -36,7 +33,7 @@ along with this program. If not, see http://www.gnu.org/licenses/
class ConquestSystem : public IMessageHandler
{
public:
ConquestSystem(const std::atomic_bool& requestExit);
ConquestSystem();
~ConquestSystem() override = default;

/**
Expand Down Expand Up @@ -67,9 +64,6 @@ class ConquestSystem : public IMessageHandler
private:
std::unique_ptr<SqlConnection> sql;

// Thread used for the action queue operations
std::unique_ptr<nonstd::jthread> actionQueueThread;

bool updateInfluencePoints(int points, unsigned int nation, REGION_TYPE region);

auto getRegionalInfluences() -> std::vector<influence_t> const;
Expand Down
42 changes: 42 additions & 0 deletions src/world/message_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,56 @@ along with this program. If not, see http://www.gnu.org/licenses/

#include "common/mmo.h"

#include <task_system.hpp>
#ifdef WIN32
#include <winsock2.h>
#else
#include <netinet/in.h>
#endif

/**
* Class responsible for handling messages recieved from the message server.
*
* Implementations of this class are responsible for queueing any work
* that affects internal state / sql queries via the given task system, using
* the submit() method.
*
* This is done to ensure thread-safety outside of the message_server thread.
*/
class IMessageHandler
{
public:
IMessageHandler()
{
ts = new ts::task_system(1);
}

virtual ~IMessageHandler()
{
delete ts;
}

/**
* Handles messages recieved from the map servers.
*/
virtual bool handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port) = 0;

/**
* Submits work to be executed asynchronously by the task system.
* This work will happen serialy, on a single thread.
*/
void submit(std::function<void()> func)
{
// clang-format off
ts->schedule([func]()
{
func();
});
// clang-format on
}

private:
ts::task_system* ts;
};
13 changes: 7 additions & 6 deletions src/world/message_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ namespace

moodycamel::ConcurrentQueue<chat_message_t> outgoing_queue;

std::unique_ptr<SqlConnection> sql;
std::unordered_map<REGIONALMSGTYPE, InternalHandler> regionalMsgHandlers;
std::unordered_map<uint16, zone_settings_t> zoneSettingsMap;
std::vector<uint64> mapEndpoints;
std::vector<uint64> yellMapEndpoints;
std::unique_ptr<SqlConnection> sql;
std::unordered_map<REGIONALMSGTYPE, InternalHandler> regionalMsgHandlers;
std::unordered_map<uint16, zone_settings_t> zoneSettingsMap;
std::vector<uint64> mapEndpoints;
std::vector<uint64> yellMapEndpoints;
} // namespace

void queue_message(uint64 ipp, MSGSERVTYPE type, zmq::message_t* extra, zmq::message_t* packet)
Expand Down Expand Up @@ -370,7 +370,8 @@ void message_server_init(WorldServer* worldServer, const bool& requestExit)
sql = std::make_unique<SqlConnection>();

// Handler map registrations.
regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector<uint8>&& bytes, in_addr from_ip, uint16 from_port) {
regionalMsgHandlers[REGIONALMSGTYPE::REGIONAL_EVT_MSG_CONQUEST] = [worldServer](std::vector<uint8>&& bytes, in_addr from_ip, uint16 from_port)
{
worldServer->conquestSystem->handleMessage(std::move(bytes), from_ip, from_port);
};

Expand Down
2 changes: 1 addition & 1 deletion src/world/message_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ along with this program. If not, see http://www.gnu.org/licenses/

#pragma once

#include "besieged_system.h"
#include "common/mmo.h"
#include "common/socket.h"
#include "common/sql.h"
#include "besieged_system.h"
#include "conquest_system.h"
#include "message_handler.h"

Expand Down
4 changes: 2 additions & 2 deletions src/world/world_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ WorldServer::WorldServer(int argc, char** argv)
, sql(std::make_unique<SqlConnection>())
, httpServer(std::make_unique<HTTPServer>())
, messageServer(std::make_unique<message_server_wrapper_t>(this, std::ref(m_RequestExit)))
, conquestSystem(std::make_unique<ConquestSystem>(std::ref(m_RequestExit)))
, besiegedSystem(std::make_unique<BesiegedSystem>(std::ref(m_RequestExit)))
, conquestSystem(std::make_unique<ConquestSystem>())
, besiegedSystem(std::make_unique<BesiegedSystem>())
, campaignSystem(std::make_unique<CampaignSystem>())
, colonizationSystem(std::make_unique<ColonizationSystem>())
{
Expand Down

0 comments on commit e2cb738

Please sign in to comment.