diff --git a/src/game/scheduling/dispatcher.cpp b/src/game/scheduling/dispatcher.cpp index 638e69aaeb8..1bb7512f11d 100644 --- a/src/game/scheduling/dispatcher.cpp +++ b/src/game/scheduling/dispatcher.cpp @@ -92,6 +92,7 @@ void Dispatcher::executeEvents(std::unique_lock &asyncLock) { if (groupId == static_cast(TaskGroup::Serial)) { executeSerialEvents(tasks); + mergeEvents(); // merge request, as there may be async event requests } else { executeParallelEvents(tasks, groupId, asyncLock); } @@ -99,8 +100,11 @@ void Dispatcher::executeEvents(std::unique_lock &asyncLock) { } void Dispatcher::executeScheduledEvents() { - for (uint_fast64_t i = 0, max = scheduledTasks.size(); i < max && !scheduledTasks.empty(); ++i) { - const auto &task = scheduledTasks.top(); + auto &threadScheduledTasks = getThreadTask()->scheduledTasks; + + auto it = scheduledTasks.begin(); + while (it != scheduledTasks.end()) { + const auto &task = *it; if (task->getTime() > Task::TIME_NOW) { break; } @@ -111,12 +115,16 @@ void Dispatcher::executeScheduledEvents() { if (task->execute() && task->isCycle()) { task->updateTime(); - scheduledTasks.emplace(task); + threadScheduledTasks.emplace_back(task); } else { - scheduledTasksRef.erase(task->getEventId()); + scheduledTasksRef.erase(task->getId()); } - scheduledTasks.pop(); + ++it; + } + + if (it != scheduledTasks.begin()) { + scheduledTasks.erase(scheduledTasks.begin(), it); } dispacherContext.reset(); @@ -126,18 +134,15 @@ void Dispatcher::executeScheduledEvents() { void Dispatcher::mergeEvents() { for (const auto &thread : threads) { std::scoped_lock lock(thread->mutex); - if (!thread->tasks.empty()) { - for (uint_fast8_t i = 0; i < static_cast(TaskGroup::Last); ++i) { + for (uint_fast8_t i = 0; i < static_cast(TaskGroup::Last); ++i) { + if (!thread->tasks[i].empty()) { m_tasks[i].insert(m_tasks[i].end(), make_move_iterator(thread->tasks[i].begin()), make_move_iterator(thread->tasks[i].end())); thread->tasks[i].clear(); } } if (!thread->scheduledTasks.empty()) { - for (auto &task : thread->scheduledTasks) { - scheduledTasks.emplace(task); - scheduledTasksRef.emplace(task->getEventId(), task); - } + scheduledTasks.insert(make_move_iterator(thread->scheduledTasks.begin()), make_move_iterator(thread->scheduledTasks.end())); thread->scheduledTasks.clear(); } } @@ -153,23 +158,24 @@ std::chrono::nanoseconds Dispatcher::timeUntilNextScheduledTask() const { return CHRONO_MILI_MAX; } - const auto &task = scheduledTasks.top(); + const auto &task = *scheduledTasks.begin(); const auto timeRemaining = task->getTime() - Task::TIME_NOW; return std::max(timeRemaining, CHRONO_NANO_0); } void Dispatcher::addEvent(std::function &&f, std::string_view context, uint32_t expiresAfterMs) { - const auto &thread = threads[getThreadId()]; + const auto &thread = getThreadTask(); std::scoped_lock lock(thread->mutex); thread->tasks[static_cast(TaskGroup::Serial)].emplace_back(expiresAfterMs, std::move(f), context); notify(); } uint64_t Dispatcher::scheduleEvent(const std::shared_ptr &task) { - const auto &thread = threads[getThreadId()]; + const auto &thread = getThreadTask(); std::scoped_lock lock(thread->mutex); + auto eventId = scheduledTasksRef - .emplace(task->generateId(), thread->scheduledTasks.emplace_back(task)) + .emplace(task->getId(), thread->scheduledTasks.emplace_back(task)) .first->first; notify(); @@ -177,14 +183,14 @@ uint64_t Dispatcher::scheduleEvent(const std::shared_ptr &task) { } void Dispatcher::asyncEvent(std::function &&f, TaskGroup group) { - const auto &thread = threads[getThreadId()]; + const auto &thread = getThreadTask(); std::scoped_lock lock(thread->mutex); thread->tasks[static_cast(group)].emplace_back(0, std::move(f), dispacherContext.taskName); notify(); } void Dispatcher::stopEvent(uint64_t eventId) { - auto it = scheduledTasksRef.find(eventId); + const auto &it = scheduledTasksRef.find(eventId); if (it != scheduledTasksRef.end()) { it->second->cancel(); scheduledTasksRef.erase(it); diff --git a/src/game/scheduling/dispatcher.hpp b/src/game/scheduling/dispatcher.hpp index f360566c275..73ba38ed224 100644 --- a/src/game/scheduling/dispatcher.hpp +++ b/src/game/scheduling/dispatcher.hpp @@ -78,8 +78,8 @@ class Dispatcher { public: explicit Dispatcher(ThreadPool &threadPool) : threadPool(threadPool) { - threads.reserve(std::thread::hardware_concurrency() + 1); - for (uint_fast16_t i = 0; i < std::thread::hardware_concurrency() + 1; ++i) { + threads.reserve(threadPool.getNumberOfThreads() + 1); + for (uint_fast16_t i = 0; i < threads.capacity(); ++i) { threads.emplace_back(std::make_unique()); } }; @@ -133,17 +133,9 @@ class Dispatcher { Task::TIME_NOW = std::chrono::system_clock::now(); } - static int16_t getThreadId() { - static std::atomic_int16_t lastId = -1; - thread_local static int16_t id = -1; - - if (id == -1) { - lastId.fetch_add(1); - id = lastId.load(); - } - - return id; - }; + const auto &getThreadTask() const { + return threads[ThreadPool::getThreadId()]; + } uint64_t scheduleEvent(uint32_t delay, std::function &&f, std::string_view context, bool cycle, bool log = true) { return scheduleEvent(std::make_shared(std::move(f), context, delay, cycle, log)); @@ -204,7 +196,7 @@ class Dispatcher { // Main Events std::array, static_cast(TaskGroup::Last)> m_tasks; - std::priority_queue, std::deque>, Task::Compare> scheduledTasks; + phmap::btree_multiset, Task::Compare> scheduledTasks; phmap::parallel_flat_hash_map_m> scheduledTasksRef; friend class CanaryServer; diff --git a/src/game/scheduling/task.cpp b/src/game/scheduling/task.cpp index 59e007935ef..c9e6157ab9e 100644 --- a/src/game/scheduling/task.cpp +++ b/src/game/scheduling/task.cpp @@ -15,9 +15,15 @@ std::chrono::system_clock::time_point Task::TIME_NOW = SYSTEM_TIME_ZERO; std::atomic_uint_fast64_t Task::LAST_EVENT_ID = 0; bool Task::execute() const { - if (!func || hasExpired()) { + if (isCanceled()) { return false; } + + if (hasExpired()) { + g_logger().info("The task '{}' has expired, it has not been executed in {} ms.", getContext(), expiration - utime); + return false; + } + if (log) { if (hasTraceableContext()) { g_logger().trace("Executing task {}.", getContext()); diff --git a/src/game/scheduling/task.hpp b/src/game/scheduling/task.hpp index 6a2d33b465b..f42602242c5 100644 --- a/src/game/scheduling/task.hpp +++ b/src/game/scheduling/task.hpp @@ -9,6 +9,7 @@ #pragma once #include "utils/tools.hpp" +#include static constexpr auto SYSTEM_TIME_ZERO = std::chrono::system_clock::time_point(std::chrono::milliseconds(0)); @@ -17,24 +18,27 @@ class Task { static std::chrono::system_clock::time_point TIME_NOW; Task(uint32_t expiresAfterMs, std::function &&f, std::string_view context) : - expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO), - context(context), func(std::move(f)) { + func(std::move(f)), context(context), utime(TIME_NOW), expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO) { assert(!this->context.empty() && "Context cannot be empty!"); } Task(std::function &&f, std::string_view context, uint32_t delay, bool cycle = false, bool log = true) : - cycle(cycle), log(log), delay(delay), utime(TIME_NOW + std::chrono::milliseconds(delay)), context(context), func(std::move(f)) { + func(std::move(f)), context(context), utime(TIME_NOW + std::chrono::milliseconds(delay)), delay(delay), cycle(cycle), log(log) { assert(!this->context.empty() && "Context cannot be empty!"); } ~Task() = default; - void setEventId(uint64_t id) { - eventId = id; - } + uint64_t getId() { + if (id == 0) { + if (++LAST_EVENT_ID == 0) { + LAST_EVENT_ID = 1; + } + + id = LAST_EVENT_ID; + } - uint64_t getEventId() const { - return eventId; + return id; } uint32_t getDelay() const { @@ -58,43 +62,24 @@ class Task { } bool isCanceled() const { - return canceled; + return func == nullptr; } void cancel() { - canceled = true; func = nullptr; } bool execute() const; +private: + static std::atomic_uint_fast64_t LAST_EVENT_ID; + void updateTime() { utime = TIME_NOW + std::chrono::milliseconds(delay); } - uint64_t generateId() { - if (eventId == 0) { - if (++LAST_EVENT_ID == 0) { - LAST_EVENT_ID = 1; - } - - eventId = LAST_EVENT_ID; - } - - return eventId; - } - - struct Compare { - bool operator()(const std::shared_ptr &a, const std::shared_ptr &b) const { - return b->utime < a->utime; - } - }; - -private: - static std::atomic_uint_fast64_t LAST_EVENT_ID; - bool hasTraceableContext() const { - const static auto tasksContext = phmap::flat_hash_set({ + const static auto tasksContext = std::unordered_set({ "Creature::checkCreatureWalk", "Decay::checkDecay", "Dispatcher::asyncEvent", @@ -123,16 +108,23 @@ class Task { return tasksContext.contains(context); } - bool canceled = false; - bool cycle = false; - bool log = true; + struct Compare { + bool operator()(const std::shared_ptr &a, const std::shared_ptr &b) const { + return a->utime < b->utime; + } + }; - uint32_t delay = 0; - uint64_t eventId = 0; + std::function func = nullptr; + std::string_view context; std::chrono::system_clock::time_point utime = SYSTEM_TIME_ZERO; std::chrono::system_clock::time_point expiration = SYSTEM_TIME_ZERO; - std::string_view context; - std::function func = nullptr; + uint64_t id = 0; + uint32_t delay = 0; + + bool cycle = false; + bool log = true; + + friend class Dispatcher; }; diff --git a/src/lib/thread/thread_pool.cpp b/src/lib/thread/thread_pool.cpp index a88d73fc6c4..93b730b0b8f 100644 --- a/src/lib/thread/thread_pool.cpp +++ b/src/lib/thread/thread_pool.cpp @@ -29,7 +29,7 @@ void ThreadPool::start() { * will make processing non-blocking in some way and that would allow * single core computers to process things concurrently, but not in parallel. */ - int nThreads = std::max(static_cast(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS); + nThreads = std::max(static_cast(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS); for (std::size_t i = 0; i < nThreads; ++i) { threads.emplace_back([this] { ioService.run(); }); diff --git a/src/lib/thread/thread_pool.hpp b/src/lib/thread/thread_pool.hpp index 90a57bae16a..8ad60f14676 100644 --- a/src/lib/thread/thread_pool.hpp +++ b/src/lib/thread/thread_pool.hpp @@ -23,9 +23,27 @@ class ThreadPool { asio::io_context &getIoContext(); void addLoad(const std::function &load); + uint16_t getNumberOfThreads() const { + return nThreads; + } + + static int16_t getThreadId() { + static std::atomic_int16_t lastId = -1; + thread_local static int16_t id = -1; + + if (id == -1) { + lastId.fetch_add(1); + id = lastId.load(); + } + + return id; + }; + private: Logger &logger; asio::io_context ioService; std::vector threads; asio::io_context::work work { ioService }; + + uint16_t nThreads = 0; };