Skip to content

Commit

Permalink
Use concurrent queues to schedule conquest tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
xkoredev committed Oct 20, 2023
1 parent 125eb1a commit 1245e1f
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 74 deletions.
171 changes: 112 additions & 59 deletions src/world/conquest_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,78 @@ along with this program. If not, see http://www.gnu.org/licenses/

#include "message_server.h"

ConquestSystem::ConquestSystem()
: sql(std::make_unique<SqlConnection>())
namespace {
// Lockless queue used to perform any internal state changes.
// This is necessary due to how the class performs operations both on
// the time_server the message_server threads.
moodycamel::ConcurrentQueue<std::function<void()>> action_queue;
}

void process_conquest_actions(const bool& requestExit);

/**
* ConquestSystem both handles messages from map servers and
* updates the database with the latest conquest data periodically.
*
* This class is guided by the following pattern:
* - All public methods that may modify the database are enqueued in the action queue.
* - The action queue is processed in its own thread
* - Private methods are not guarded via the action queue, but are only called from
* public methods, so we can assume that they are always called from the action queue.
*/
ConquestSystem::ConquestSystem(const std::atomic_bool& requestExit) :
actionQueueThread(std::make_unique<nonstd::jthread>(std::bind(process_conquest_actions, std::ref(requestExit))))
{
action_queue.enqueue([this]()
{
sql = std::make_unique<SqlConnection>();
});
}

bool ConquestSystem::handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port)
bool ConquestSystem::handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port)
{
const uint8 conquestMsgType = payload[1];
if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_WEEKLY_UPDATE)
{
updateWeekConquest();
// updateWeekConquest already goes through action queue
updateWeekConquest();
return true;
}

if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_ADD_INFLUENCE_POINTS)
{
// const int32 points = ref<int32>(data, 2);
int32 points = 0;
uint32 nation = 0;
uint8 region = 0;
std::memcpy(&points, payload.data() + 2, sizeof(int32));
std::memcpy(&nation, payload.data() + 6, sizeof(uint32));
std::memcpy(&region, payload.data() + 10, sizeof(uint8));

// We update influence but do not immediately send this update to all map servers
// Influence updates are sent periodically via time_server instead.
// It is okay for map servers to be eventually consistent.
updateInfluencePoints(points, nation, (REGION_TYPE)region);
action_queue.enqueue([this, payload]()
{
int32 points = 0;
uint32 nation = 0;
uint8 region = 0;
std::memcpy(&points, payload.data() + 2, sizeof(int32));
std::memcpy(&nation, payload.data() + 6, sizeof(uint32));
std::memcpy(&region, payload.data() + 10, sizeof(uint8));

// We update influence but do not immediately send this update to all map servers
// Influence updates are sent periodically via time_server instead.
// It is okay for map servers to be eventually consistent.
updateInfluencePoints(points, nation, (REGION_TYPE)region);
});

return true;
}

if (conquestMsgType == CONQUESTMSGTYPE::CONQUEST_MAP2WORLD_GM_CONQUEST_UPDATE)
{
// Convert from_addr to ip + port
uint64 ipp = from_addr.s_addr;
ipp |= (((uint64)from_port) << 32);
action_queue.enqueue([this, payload, from_addr, from_port]()
{
// Convert from_addr to ip + port
uint64 ipp = from_addr.s_addr;
ipp |= (((uint64)from_port) << 32);

// Send influence data to the requesting map server
sendInfluencesMsg(true, ipp);
});

// Send influence data to the requesting map server
sendInfluencesMsg(true, ipp);
return true;
}

Expand All @@ -74,6 +105,48 @@ bool ConquestSystem::handleMessage(std::vector<uint8> payload,
return false;
}

void ConquestSystem::updateWeekConquest()
{
action_queue.enqueue([this]() {
TracyZoneScoped;

// 1- Notify all zones that tally started
sendTallyStartMsg();

// 2- Do the actual db update
const char* Query = "UPDATE conquest_system SET region_control = \
IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \
sandoria_influence > beastmen_influence, 0, \
IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \
bastok_influence > beastmen_influence, 1, \
IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \
windurst_influence > beastmen_influence, 2, 3)));";

int ret = sql->Query(Query);
if (ret == SQL_ERROR)
{
ShowError("handleWeeklyUpdate() failed");
}

// 3- Send tally end Msg
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END);
});
}

void ConquestSystem::updateHourlyConquest()
{
action_queue.enqueue([this]() {
sendInfluencesMsg(true);
});
}

void ConquestSystem::updateVanaHourlyConquest()
{
action_queue.enqueue([this]() {
sendInfluencesMsg(false);
});
}

void ConquestSystem::sendTallyStartMsg()
{
// 1- Send message to all zones. We are starting update.
Expand Down Expand Up @@ -223,42 +296,6 @@ bool ConquestSystem::updateInfluencePoints(int points, unsigned int nation, REGI
return ret != SQL_ERROR;
}

void ConquestSystem::updateWeekConquest()
{
TracyZoneScoped;

// 1- Notify all zones that tally started
sendTallyStartMsg();

// 2- Do the actual db update
const char* Query = "UPDATE conquest_system SET region_control = \
IF(sandoria_influence > bastok_influence AND sandoria_influence > windurst_influence AND \
sandoria_influence > beastmen_influence, 0, \
IF(bastok_influence > sandoria_influence AND bastok_influence > windurst_influence AND \
bastok_influence > beastmen_influence, 1, \
IF(windurst_influence > bastok_influence AND windurst_influence > sandoria_influence AND \
windurst_influence > beastmen_influence, 2, 3)));";

int ret = sql->Query(Query);
if (ret == SQL_ERROR)
{
ShowError("handleWeeklyUpdate() failed");
}

// 3- Send tally end Msg
sendRegionControlsMsg(CONQUEST_WORLD2MAP_WEEKLY_UPDATE_END);
}

void ConquestSystem::updateHourlyConquest()
{
sendInfluencesMsg(true);
}

void ConquestSystem::updateVanaHourlyConquest()
{
sendInfluencesMsg(false);
}

auto ConquestSystem::getRegionalInfluences() -> std::vector<influence_t> const
{
const char* Query = "SELECT sandoria_influence, bastok_influence, windurst_influence, beastmen_influence FROM conquest_system;";
Expand Down Expand Up @@ -302,3 +339,19 @@ auto ConquestSystem::getRegionControls() -> std::vector<region_control_t> const

return controllers;
}

void process_conquest_actions(const bool& requestExit)
{
while (!requestExit)
{
std::function<void()> action;
if (action_queue.try_dequeue(action))
{
action();
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}
16 changes: 10 additions & 6 deletions src/world/conquest_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ along with this program. If not, see http://www.gnu.org/licenses/

#pragma once

#include <concurrentqueue.h>
#include <nonstd/jthread.hpp>

#include "common/sql.h"
#include "map/conquest_system.h"
#include "map/zone.h"
Expand All @@ -33,17 +36,15 @@ along with this program. If not, see http://www.gnu.org/licenses/
class ConquestSystem : public IMessageHandler
{
public:
ConquestSystem();
ConquestSystem(const std::atomic_bool& requestExit);
~ConquestSystem() override = default;

/**
* IMessageHandler implementation. Used to handle messages from message_server.
* NOTE: The copy of payload here is intentional, since these systems will eventually
* : be moved to their own threads.
*/
bool handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port) override;
bool handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port) override;

/**
* Called weekly, updates conquest data and sends regional control information
Expand All @@ -66,6 +67,9 @@ class ConquestSystem : public IMessageHandler
private:
std::unique_ptr<SqlConnection> sql;

// Thread used for the action queue operations
std::unique_ptr<nonstd::jthread> actionQueueThread;

bool updateInfluencePoints(int points, unsigned int nation, REGION_TYPE region);

auto getRegionalInfluences() -> std::vector<influence_t> const;
Expand Down
10 changes: 3 additions & 7 deletions src/world/message_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ class IMessageHandler
{
}

/*
* NOTE: The copy of payload here is intentional, since these systems will eventually
* : be moved to their own threads.
*/
virtual bool handleMessage(std::vector<uint8> payload,
in_addr from_addr,
uint16 from_port) = 0;
virtual bool handleMessage(const std::vector<uint8>& payload,
in_addr from_addr,
uint16 from_port) = 0;
};
4 changes: 2 additions & 2 deletions src/world/world_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ WorldServer::WorldServer(int argc, char** argv)
, sql(std::make_unique<SqlConnection>())
, httpServer(std::make_unique<HTTPServer>())
, messageServer(std::make_unique<message_server_wrapper_t>(this, std::ref(m_RequestExit)))
, conquestSystem(std::make_unique<ConquestSystem>())
, besiegedSystem(std::make_unique<BesiegedSystem>())
, conquestSystem(std::make_unique<ConquestSystem>(std::ref(m_RequestExit)))
, besiegedSystem(std::make_unique<BesiegedSystem>(std::ref(m_RequestExit)))
, campaignSystem(std::make_unique<CampaignSystem>())
, colonizationSystem(std::make_unique<ColonizationSystem>())
{
Expand Down

0 comments on commit 1245e1f

Please sign in to comment.