Skip to content

Commit

Permalink
improve: dispatcher (opentibiabr#1732)
Browse files Browse the repository at this point in the history
This PR consists of:

1- removing redundant code
2- task expiration Log
3- priority_queue to btree_multiset
4- flat_hash_set to unordered_set on hasTraceableContext
5- scheduled tasks with batch erase and insert
6- some other adjustments.

**Benchmark:**


![image](https://github.com/opentibiabr/canary/assets/2267386/0f3dd190-2d19-40f6-b61e-5f8bd34dd2d1)
Note; Unfortunately priority_queue does not have batch insertion.

<details>

```c++
static constexpr auto max_stress = 999999;
const auto time_now = OTSYS_TIME();

std::multiset<std::shared_ptr<Task>, Task::Compare> multiset;
phmap::btree_multiset<std::shared_ptr<Task>, Task::Compare> btreemultiset;
std::priority_queue<std::shared_ptr<Task>, std::deque<std::shared_ptr<Task>>, Task::Compare> priority;

std::vector<std::shared_ptr<Task>> tasks;
for (int i = 0; ++i < max_stress;) {
	tasks.emplace_back(std::make_shared<Task>([] {}, "priority", time_now + (100 + (rand() % 2000))));
}

const auto &benchmark = [&](std::string_view type, auto set, uint64_t stress) {
	stress = std::min<uint64_t>(stress, max_stress);
	Benchmark bm;
	for (int i = 0; ++i <= 3;) {
		set.insert(tasks.begin(), tasks.begin() + stress);		
		while (!set.empty()) {
			auto it = set.begin();
			// top:
			auto s = *it;
			// pop:
			set.erase(it);
		}
	}
	g_logger().info("{}({}) - {}ms", type, stress, bm.duration());
};

const auto &benchmarkPriority = [&](auto priority, uint64_t stress) {
	stress = std::min<uint64_t>(stress, max_stress);
	Benchmark bm;
	for (int i = 0; ++i <= 3;) {
		for (int d = 0; ++d <= stress;) {
			priority.emplace(tasks[d]);
		}
		while (!priority.empty()) {
			auto t = priority.top();
			priority.pop();
		}
	}
	g_logger().info("{}({}) - {}ms", "priority", stress, bm.duration());
};

benchmark("multiset", multiset, 999);
benchmark("multiset", multiset, 99999);
benchmark("multiset", multiset, 999999);
benchmark("btreemultiset", btreemultiset, 999);
benchmark("btreemultiset", btreemultiset, 99999);
benchmark("btreemultiset", btreemultiset, 999999);

benchmarkPriority(priority, 999);
benchmarkPriority(priority, 99999);
benchmarkPriority(priority, 999999);
```
</details>


![image](https://github.com/opentibiabr/canary/assets/2267386/39fd3f5e-629b-4151-8315-9e07e5cd41b1)
<details>

```c++
static constexpr auto max_stress = 999999;

const std::vector < std::string_view> tasks ({
	"Creature::checkCreatureWalk",
	"Decay::checkDecay",
	"Dispatcher::asyncEvent",
	"Game::checkCreatureAttack",
	"Game::checkCreatures",
	"Game::checkImbuements",
	"Game::checkLight",
	"Game::createFiendishMonsters",
	"Game::createInfluencedMonsters",
	"Game::updateCreatureWalk",
	"Game::updateForgeableMonsters",
	"GlobalEvents::think",
	"LuaEnvironment::executeTimerEvent",
	"Modules::executeOnRecvbyte",
	"OutputMessagePool::sendAll",
	"ProtocolGame::addGameTask",
	"ProtocolGame::parsePacketFromDispatcher",
	"Raids::checkRaids",
	"SpawnMonster::checkSpawnMonster",
	"SpawnMonster::scheduleSpawn",
	"SpawnNpc::checkSpawnNpc",
	"Webhook::run",
	"sendRecvMessageCallback",
});

const std::set<std::string_view> set(tasks.begin(), tasks.end());
const std::unordered_set<std::string_view> unordered_set(tasks.begin(), tasks.end());
const phmap::btree_set<std::string_view> btree_set(tasks.begin(), tasks.end());
const phmap::flat_hash_set<std::string_view> flat_hash_set(tasks.begin(), tasks.end());

	// stress
for (int i = 0; ++i <= 9999999;) { }

const auto &benchmark = [&](std::string_view type, auto set, uint64_t stress) {
	stress = std::min<uint64_t>(stress, max_stress);
	Benchmark bm;
	for (int i = 0; ++i <= stress;) {
		set.contains(tasks[rand() % (tasks.size() - 1)]);
	}
	g_logger().info("{}({}) - {}ms", type, stress, bm.duration());
};

benchmark("set", set, 999);
benchmark("set", set, 99999);
benchmark("set", set, 999999);
benchmark("unordered_set", unordered_set, 999);
benchmark("unordered_set", unordered_set, 99999);
benchmark("unordered_set", unordered_set, 999999);

benchmark("btree_set", btree_set, 999);
benchmark("btree_set", btree_set, 99999);
benchmark("btree_set", btree_set, 999999);

benchmark("flat_hash_set", flat_hash_set, 999);
benchmark("flat_hash_set", flat_hash_set, 99999);
benchmark("flat_hash_set", flat_hash_set, 999999);
```
</details>
  • Loading branch information
mehah authored Oct 24, 2023
1 parent cc7aeed commit 2a464ad
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 72 deletions.
40 changes: 23 additions & 17 deletions src/game/scheduling/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,19 @@ void Dispatcher::executeEvents(std::unique_lock<std::mutex> &asyncLock) {

if (groupId == static_cast<uint8_t>(TaskGroup::Serial)) {
executeSerialEvents(tasks);
mergeEvents(); // merge request, as there may be async event requests
} else {
executeParallelEvents(tasks, groupId, asyncLock);
}
}
}

void Dispatcher::executeScheduledEvents() {
for (uint_fast64_t i = 0, max = scheduledTasks.size(); i < max && !scheduledTasks.empty(); ++i) {
const auto &task = scheduledTasks.top();
auto &threadScheduledTasks = getThreadTask()->scheduledTasks;

auto it = scheduledTasks.begin();
while (it != scheduledTasks.end()) {
const auto &task = *it;
if (task->getTime() > Task::TIME_NOW) {
break;
}
Expand All @@ -111,12 +115,16 @@ void Dispatcher::executeScheduledEvents() {

if (task->execute() && task->isCycle()) {
task->updateTime();
scheduledTasks.emplace(task);
threadScheduledTasks.emplace_back(task);
} else {
scheduledTasksRef.erase(task->getEventId());
scheduledTasksRef.erase(task->getId());
}

scheduledTasks.pop();
++it;
}

if (it != scheduledTasks.begin()) {
scheduledTasks.erase(scheduledTasks.begin(), it);
}

dispacherContext.reset();
Expand All @@ -126,18 +134,15 @@ void Dispatcher::executeScheduledEvents() {
void Dispatcher::mergeEvents() {
for (const auto &thread : threads) {
std::scoped_lock lock(thread->mutex);
if (!thread->tasks.empty()) {
for (uint_fast8_t i = 0; i < static_cast<uint8_t>(TaskGroup::Last); ++i) {
for (uint_fast8_t i = 0; i < static_cast<uint8_t>(TaskGroup::Last); ++i) {
if (!thread->tasks[i].empty()) {
m_tasks[i].insert(m_tasks[i].end(), make_move_iterator(thread->tasks[i].begin()), make_move_iterator(thread->tasks[i].end()));
thread->tasks[i].clear();
}
}

if (!thread->scheduledTasks.empty()) {
for (auto &task : thread->scheduledTasks) {
scheduledTasks.emplace(task);
scheduledTasksRef.emplace(task->getEventId(), task);
}
scheduledTasks.insert(make_move_iterator(thread->scheduledTasks.begin()), make_move_iterator(thread->scheduledTasks.end()));
thread->scheduledTasks.clear();
}
}
Expand All @@ -153,38 +158,39 @@ std::chrono::nanoseconds Dispatcher::timeUntilNextScheduledTask() const {
return CHRONO_MILI_MAX;
}

const auto &task = scheduledTasks.top();
const auto &task = *scheduledTasks.begin();
const auto timeRemaining = task->getTime() - Task::TIME_NOW;
return std::max<std::chrono::nanoseconds>(timeRemaining, CHRONO_NANO_0);
}

void Dispatcher::addEvent(std::function<void(void)> &&f, std::string_view context, uint32_t expiresAfterMs) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);
thread->tasks[static_cast<uint8_t>(TaskGroup::Serial)].emplace_back(expiresAfterMs, std::move(f), context);
notify();
}

uint64_t Dispatcher::scheduleEvent(const std::shared_ptr<Task> &task) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);

auto eventId = scheduledTasksRef
.emplace(task->generateId(), thread->scheduledTasks.emplace_back(task))
.emplace(task->getId(), thread->scheduledTasks.emplace_back(task))
.first->first;

notify();
return eventId;
}

void Dispatcher::asyncEvent(std::function<void(void)> &&f, TaskGroup group) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);
thread->tasks[static_cast<uint8_t>(group)].emplace_back(0, std::move(f), dispacherContext.taskName);
notify();
}

void Dispatcher::stopEvent(uint64_t eventId) {
auto it = scheduledTasksRef.find(eventId);
const auto &it = scheduledTasksRef.find(eventId);
if (it != scheduledTasksRef.end()) {
it->second->cancel();
scheduledTasksRef.erase(it);
Expand Down
20 changes: 6 additions & 14 deletions src/game/scheduling/dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class Dispatcher {
public:
explicit Dispatcher(ThreadPool &threadPool) :
threadPool(threadPool) {
threads.reserve(std::thread::hardware_concurrency() + 1);
for (uint_fast16_t i = 0; i < std::thread::hardware_concurrency() + 1; ++i) {
threads.reserve(threadPool.getNumberOfThreads() + 1);
for (uint_fast16_t i = 0; i < threads.capacity(); ++i) {
threads.emplace_back(std::make_unique<ThreadTask>());
}
};
Expand Down Expand Up @@ -133,17 +133,9 @@ class Dispatcher {
Task::TIME_NOW = std::chrono::system_clock::now();
}

static int16_t getThreadId() {
static std::atomic_int16_t lastId = -1;
thread_local static int16_t id = -1;

if (id == -1) {
lastId.fetch_add(1);
id = lastId.load();
}

return id;
};
const auto &getThreadTask() const {
return threads[ThreadPool::getThreadId()];
}

uint64_t scheduleEvent(uint32_t delay, std::function<void(void)> &&f, std::string_view context, bool cycle, bool log = true) {
return scheduleEvent(std::make_shared<Task>(std::move(f), context, delay, cycle, log));
Expand Down Expand Up @@ -204,7 +196,7 @@ class Dispatcher {

// Main Events
std::array<std::vector<Task>, static_cast<uint8_t>(TaskGroup::Last)> m_tasks;
std::priority_queue<std::shared_ptr<Task>, std::deque<std::shared_ptr<Task>>, Task::Compare> scheduledTasks;
phmap::btree_multiset<std::shared_ptr<Task>, Task::Compare> scheduledTasks;
phmap::parallel_flat_hash_map_m<uint64_t, std::shared_ptr<Task>> scheduledTasksRef;

friend class CanaryServer;
Expand Down
8 changes: 7 additions & 1 deletion src/game/scheduling/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ std::chrono::system_clock::time_point Task::TIME_NOW = SYSTEM_TIME_ZERO;
std::atomic_uint_fast64_t Task::LAST_EVENT_ID = 0;

bool Task::execute() const {
if (!func || hasExpired()) {
if (isCanceled()) {
return false;
}

if (hasExpired()) {
g_logger().info("The task '{}' has expired, it has not been executed in {} ms.", getContext(), expiration - utime);
return false;
}

if (log) {
if (hasTraceableContext()) {
g_logger().trace("Executing task {}.", getContext());
Expand Down
70 changes: 31 additions & 39 deletions src/game/scheduling/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#pragma once
#include "utils/tools.hpp"
#include <unordered_set>

static constexpr auto SYSTEM_TIME_ZERO = std::chrono::system_clock::time_point(std::chrono::milliseconds(0));

Expand All @@ -17,24 +18,27 @@ class Task {
static std::chrono::system_clock::time_point TIME_NOW;

Task(uint32_t expiresAfterMs, std::function<void(void)> &&f, std::string_view context) :
expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO),
context(context), func(std::move(f)) {
func(std::move(f)), context(context), utime(TIME_NOW), expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO) {
assert(!this->context.empty() && "Context cannot be empty!");
}

Task(std::function<void(void)> &&f, std::string_view context, uint32_t delay, bool cycle = false, bool log = true) :
cycle(cycle), log(log), delay(delay), utime(TIME_NOW + std::chrono::milliseconds(delay)), context(context), func(std::move(f)) {
func(std::move(f)), context(context), utime(TIME_NOW + std::chrono::milliseconds(delay)), delay(delay), cycle(cycle), log(log) {
assert(!this->context.empty() && "Context cannot be empty!");
}

~Task() = default;

void setEventId(uint64_t id) {
eventId = id;
}
uint64_t getId() {
if (id == 0) {
if (++LAST_EVENT_ID == 0) {
LAST_EVENT_ID = 1;
}

id = LAST_EVENT_ID;
}

uint64_t getEventId() const {
return eventId;
return id;
}

uint32_t getDelay() const {
Expand All @@ -58,43 +62,24 @@ class Task {
}

bool isCanceled() const {
return canceled;
return func == nullptr;
}

void cancel() {
canceled = true;
func = nullptr;
}

bool execute() const;

private:
static std::atomic_uint_fast64_t LAST_EVENT_ID;

void updateTime() {
utime = TIME_NOW + std::chrono::milliseconds(delay);
}

uint64_t generateId() {
if (eventId == 0) {
if (++LAST_EVENT_ID == 0) {
LAST_EVENT_ID = 1;
}

eventId = LAST_EVENT_ID;
}

return eventId;
}

struct Compare {
bool operator()(const std::shared_ptr<Task> &a, const std::shared_ptr<Task> &b) const {
return b->utime < a->utime;
}
};

private:
static std::atomic_uint_fast64_t LAST_EVENT_ID;

bool hasTraceableContext() const {
const static auto tasksContext = phmap::flat_hash_set<std::string>({
const static auto tasksContext = std::unordered_set<std::string_view>({
"Creature::checkCreatureWalk",
"Decay::checkDecay",
"Dispatcher::asyncEvent",
Expand Down Expand Up @@ -123,16 +108,23 @@ class Task {
return tasksContext.contains(context);
}

bool canceled = false;
bool cycle = false;
bool log = true;
struct Compare {
bool operator()(const std::shared_ptr<Task> &a, const std::shared_ptr<Task> &b) const {
return a->utime < b->utime;
}
};

uint32_t delay = 0;
uint64_t eventId = 0;
std::function<void(void)> func = nullptr;
std::string_view context;

std::chrono::system_clock::time_point utime = SYSTEM_TIME_ZERO;
std::chrono::system_clock::time_point expiration = SYSTEM_TIME_ZERO;

std::string_view context;
std::function<void(void)> func = nullptr;
uint64_t id = 0;
uint32_t delay = 0;

bool cycle = false;
bool log = true;

friend class Dispatcher;
};
2 changes: 1 addition & 1 deletion src/lib/thread/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void ThreadPool::start() {
* will make processing non-blocking in some way and that would allow
* single core computers to process things concurrently, but not in parallel.
*/
int nThreads = std::max<int>(static_cast<int>(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS);
nThreads = std::max<uint16_t>(static_cast<int>(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS);

for (std::size_t i = 0; i < nThreads; ++i) {
threads.emplace_back([this] { ioService.run(); });
Expand Down
18 changes: 18 additions & 0 deletions src/lib/thread/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,27 @@ class ThreadPool {
asio::io_context &getIoContext();
void addLoad(const std::function<void(void)> &load);

uint16_t getNumberOfThreads() const {
return nThreads;
}

static int16_t getThreadId() {
static std::atomic_int16_t lastId = -1;
thread_local static int16_t id = -1;

if (id == -1) {
lastId.fetch_add(1);
id = lastId.load();
}

return id;
};

private:
Logger &logger;
asio::io_context ioService;
std::vector<std::jthread> threads;
asio::io_context::work work { ioService };

uint16_t nThreads = 0;
};

0 comments on commit 2a464ad

Please sign in to comment.