diff --git a/src/server/network/message/outputmessage.cpp b/src/server/network/message/outputmessage.cpp index f0f2530fa39..ee39907730e 100644 --- a/src/server/network/message/outputmessage.cpp +++ b/src/server/network/message/outputmessage.cpp @@ -14,8 +14,9 @@ #include "game/scheduling/dispatcher.hpp" #include "utils/lockfree.hpp" -constexpr uint16_t OUTPUTMESSAGE_FREE_LIST_CAPACITY = 2048; +constexpr size_t OUTPUTMESSAGE_FREE_LIST_CAPACITY = 2048; constexpr std::chrono::milliseconds OUTPUTMESSAGE_AUTOSEND_DELAY { 10 }; +static inline LockfreePoolingAllocator outputMessageAllocator; OutputMessagePool &OutputMessagePool::getInstance() { return inject(); @@ -59,5 +60,19 @@ void OutputMessagePool::removeProtocolFromAutosend(const Protocol_ptr &protocol) } OutputMessage_ptr OutputMessagePool::getOutputMessage() { - return std::allocate_shared(LockfreePoolingAllocator()); + OutputMessage* rawPtr = outputMessageAllocator.allocate(1); + + try { + new (rawPtr) OutputMessage(); + } catch (...) { + outputMessageAllocator.deallocate(rawPtr, 1); + throw; + } + + return { rawPtr, [](OutputMessage* ptr) { + if (ptr != nullptr) { + ptr->~OutputMessage(); + outputMessageAllocator.deallocate(ptr, 1); + } + } }; } diff --git a/src/server/network/protocol/protocol.cpp b/src/server/network/protocol/protocol.cpp index 61eda412147..c0073a1e6ec 100644 --- a/src/server/network/protocol/protocol.cpp +++ b/src/server/network/protocol/protocol.cpp @@ -16,6 +16,8 @@ #include "game/scheduling/dispatcher.hpp" #include "utils/tools.hpp" +#include + Protocol::Protocol(const Connection_ptr &initConnection) : connectionPtr(initConnection) { } @@ -120,8 +122,9 @@ void Protocol::send(OutputMessage_ptr msg) const { } void Protocol::disconnect() const { - if (const auto connection = getConnection()) { - connection->close(); + auto conn = connectionPtr.lock(); + if (conn) { + conn->close(); } } @@ -234,7 +237,11 @@ bool Protocol::isConnectionExpired() const { } Connection_ptr Protocol::getConnection() const { - return connectionPtr.lock(); + auto conn = connectionPtr.lock(); + if (!conn) { + return nullptr; + } + return conn; } uint32_t Protocol::getIP() const { diff --git a/src/server/network/protocol/protocol.hpp b/src/server/network/protocol/protocol.hpp index 172c9afba32..2b139e79ee8 100644 --- a/src/server/network/protocol/protocol.hpp +++ b/src/server/network/protocol/protocol.hpp @@ -79,7 +79,9 @@ class Protocol : public std::enable_shared_from_this { ZStream() noexcept; ~ZStream() { - deflateEnd(stream.get()); + if (stream) { + deflateEnd(stream.get()); + } } std::unique_ptr stream; diff --git a/src/utils/lockfree.hpp b/src/utils/lockfree.hpp index be245c051b0..46d820107b9 100644 --- a/src/utils/lockfree.hpp +++ b/src/utils/lockfree.hpp @@ -1,37 +1,258 @@ /** - * Canary - A free and open-source MMORPG server emulator - * Copyright (©) 2019-2024 OpenTibiaBR - * Repository: https://github.com/opentibiabr/canary - * License: https://github.com/opentibiabr/canary/blob/main/LICENSE - * Contributors: https://github.com/opentibiabr/canary/graphs/contributors - * Website: https://docs.opentibiabr.com/ + * Lock-free Memory Management Implementation + * ---------------------------------------- + * + * This implementation provides a high-performance, lock-free memory management system + * optimized for multi-threaded environments. It uses a combination of thread-local + * caching and lock-free queues to minimize contention and maximize throughput. + * + * Key Features: + * - Lock-free operations using atomic queues + * - Thread-local caching for fast allocation/deallocation + * - Memory prefetching for improved performance + * - Cache-line alignment to prevent false sharing + * - Batch operations for efficient memory management + * + * Flow: + * 1. Allocation: + * - First tries to get memory from thread-local cache + * - If local cache is empty, refills it from the global free list + * - If global free list is empty, triggers growth + * + * 2. Deallocation: + * - First attempts to store in thread-local cache + * - If local cache is full, flushes half to global free list + * - Maintains balance between local and global storage + * + * Usage Examples: + * + * 1. Basic Raw Pointer Usage: + * @code + * // Define a lock-free pool for MyClass with capacity 1000 + * using MyPool = LockfreeFreeList; + * + * // Pre-allocate some objects + * MyPool::preallocate(100); + * + * // Allocate an object + * MyClass* obj = MyPool::fast_allocate(); + * if (obj) { + * // Use the object + * // ... + * + * // Deallocate when done + * MyPool::fast_deallocate(obj); + * } + * @endcode + * + * 2. Integration with Smart Pointers and Allocators: + * @code + * // Define custom allocator using the lock-free pool + * template + * class PoolAllocator : public std::pmr::memory_resource { + * void* do_allocate(size_t bytes, size_t alignment) override { + * return LockfreeFreeList::fast_allocate(); + * } + * void do_deallocate(void* p, size_t bytes, size_t alignment) override { + * LockfreeFreeList::fast_deallocate(static_cast(p)); + * } + * }; + * + * // Create an allocator + * PoolAllocator messageAllocator; + * + * // Use with shared_ptr + * Message_ptr getMessage() { + * return std::allocate_shared(messageAllocator); + * } + * @endcode + * + * 3. Using with Custom Memory Resource: + * @code + * // Create a custom memory resource + * class CustomResource : public std::pmr::memory_resource { + * using Pool = LockfreeFreeList; + * + * void* do_allocate(size_t bytes, size_t alignment) override { + * return Pool::fast_allocate(); + * } + * void do_deallocate(void* p, size_t, size_t) override { + * Pool::fast_deallocate(static_cast(p)); + * } + * }; + * + * // Use with polymorphic allocator + * std::pmr::polymorphic_allocator polyAlloc{&customResource}; + * auto msg = std::allocate_shared(polyAlloc); + * @endcode + * + * Performance Considerations: + * - Uses CACHE_LINE_SIZE alignment to prevent false sharing + * - Implements prefetching for better cache utilization + * - Batch operations reduce contention on the global free list + * - Thread-local caching minimizes inter-thread synchronization + * + * Memory Management: + * - DEFAULT_BATCH_SIZE controls the size of thread-local caches + * - STATIC_PREALLOCATION_SIZE sets initial pool size + * - Memory is aligned to prevent cache-line sharing + * - Uses std::pmr for flexible memory resource management */ #pragma once #include +#include + +// Cache line size to avoid false sharing +#define CACHE_LINE_SIZE 64 + +// Prefetch optimization +#ifdef _MSC_VER + #include + #define PREFETCH(addr) _mm_prefetch((const char*)(addr), _MM_HINT_T0) +#else + #define PREFETCH(addr) __builtin_prefetch(addr) +#endif + +// Compiler optimizations +#if defined(__GNUC__) || defined(__clang__) + #define LIKELY(x) __builtin_expect(!!(x), 1) + #define UNLIKELY(x) __builtin_expect(!!(x), 0) +#else + #define LIKELY(x) (x) + #define UNLIKELY(x) (x) +#endif + +constexpr size_t STATIC_PREALLOCATION_SIZE = 500; template struct LockfreeFreeList { using FreeList = atomic_queue::AtomicQueue2; - static FreeList &get() { + // Increased for better cache utilization + static constexpr size_t DEFAULT_BATCH_SIZE = 128; + static constexpr size_t PREFETCH_DISTANCE = 4; + + // Aligned structure to avoid false sharing + struct alignas(CACHE_LINE_SIZE) AlignedCounters { + std::atomic count; + char padding[CACHE_LINE_SIZE - sizeof(std::atomic)]; + + AlignedCounters() : + count(0) { } + }; + + static AlignedCounters allocated_count; + static AlignedCounters failed_allocations; + + // Thread-local memory pool + static thread_local std::array local_cache; + static thread_local size_t local_cache_size; + + [[nodiscard]] static FreeList &get() noexcept { static FreeList freeList; return freeList; } - static void preallocate(size_t count) { + static void preallocate(const size_t count, std::pmr::memory_resource* resource = std::pmr::get_default_resource()) noexcept { + auto &freeList = get(); + T* batch[DEFAULT_BATCH_SIZE]; + size_t successful = 0; + + for (size_t i = 0; i < count; i += DEFAULT_BATCH_SIZE) { + const size_t batchSize = std::min(DEFAULT_BATCH_SIZE, count - i); + + // Pre-allocate with prefetch + for (size_t j = 0; j < batchSize; ++j) { + if (j + PREFETCH_DISTANCE < batchSize) { + PREFETCH(&batch[j + PREFETCH_DISTANCE]); + } + batch[j] = static_cast(resource->allocate(sizeof(T), alignof(T))); + } + + // Optimized insertion + for (size_t j = 0; j < batchSize; ++j) { + if (UNLIKELY(!freeList.try_push(batch[j]))) { + for (size_t k = j; k < batchSize; ++k) { + resource->deallocate(batch[k], sizeof(T), alignof(T)); + } + failed_allocations.count.fetch_add(1, std::memory_order_relaxed); + return; + } + ++successful; + } + } + allocated_count.count.fetch_add(successful, std::memory_order_release); + } + + // Thread-local cache + [[nodiscard]] static T* fast_allocate() noexcept { + if (LIKELY(local_cache_size > 0)) { + return local_cache[--local_cache_size]; + } + + // Refill local cache auto &freeList = get(); - for (size_t i = 0; i < count; ++i) { - auto p = static_cast(::operator new(sizeof(T), static_cast(alignof(T)))); - if (!freeList.try_push(p)) { - ::operator delete(p, static_cast(alignof(T))); + size_t fetched = 0; + while (fetched < DEFAULT_BATCH_SIZE) { + T* ptr; + if (!freeList.try_pop(ptr)) { break; } + local_cache[fetched++] = ptr; + } + + local_cache_size = fetched; + return (fetched > 0) ? local_cache[--local_cache_size] : nullptr; + } + + static bool fast_deallocate(T* ptr) noexcept { + if (LIKELY(local_cache_size < DEFAULT_BATCH_SIZE)) { + local_cache[local_cache_size++] = ptr; + return true; + } + + // Local cache full, try to empty half + auto &freeList = get(); + const size_t half = DEFAULT_BATCH_SIZE / 2; + for (size_t i = 0; i < half; ++i) { + if (!freeList.try_push(local_cache[i])) { + return false; + } + } + + // Move the other half to the beginning + std::move(local_cache.begin() + half, local_cache.begin() + DEFAULT_BATCH_SIZE, local_cache.begin()); + local_cache_size = half; + local_cache[local_cache_size++] = ptr; + return true; + } + + [[nodiscard]] static size_t get_allocated_count() noexcept { + return allocated_count.count.load(std::memory_order_relaxed); + } + + static void try_grow() noexcept { + const size_t current = get_allocated_count(); + if (LIKELY(CAPACITY - current >= DEFAULT_BATCH_SIZE)) { + preallocate(DEFAULT_BATCH_SIZE); } } }; +template +typename LockfreeFreeList::AlignedCounters LockfreeFreeList::allocated_count; + +template +typename LockfreeFreeList::AlignedCounters LockfreeFreeList::failed_allocations; + +template +thread_local std::array::DEFAULT_BATCH_SIZE> LockfreeFreeList::local_cache; + +template +thread_local size_t LockfreeFreeList::local_cache_size = 0; + template class LockfreePoolingAllocator { public: @@ -42,27 +263,45 @@ class LockfreePoolingAllocator { using other = LockfreePoolingAllocator; }; - LockfreePoolingAllocator() noexcept = default; + LockfreePoolingAllocator() noexcept { + preallocateOnce(); + } template explicit LockfreePoolingAllocator(const LockfreePoolingAllocator &) noexcept { } - ~LockfreePoolingAllocator() = default; + [[nodiscard]] T* allocate(std::size_t n) { + if (LIKELY(n == 1)) { + if (T* p = LockfreeFreeList::fast_allocate()) { + return p; + } - T* allocate(std::size_t n) { - if (n == 1) { - T* p; - if (LockfreeFreeList::get().try_pop(p)) { + LockfreeFreeList::try_grow(); + if (T* p = LockfreeFreeList::fast_allocate()) { return p; } } - return static_cast(::operator new(n * sizeof(T))); + return static_cast(std::pmr::get_default_resource()->allocate(n * sizeof(T), alignof(T))); } - void deallocate(T* p, std::size_t n) const noexcept { - if (n == 1 && LockfreeFreeList::get().try_push(p)) { - return; + void deallocate(T* p, std::size_t n) noexcept { + if (LIKELY(n == 1)) { + if (LockfreeFreeList::fast_deallocate(p)) { + return; + } } - ::operator delete(p); + std::pmr::get_default_resource()->deallocate(p, n * sizeof(T), alignof(T)); + } + +private: + static void preallocateOnce() { + std::call_once(preallocationFlag, []() { + LockfreeFreeList::preallocate(STATIC_PREALLOCATION_SIZE); + }); } + + static std::once_flag preallocationFlag; }; + +template +std::once_flag LockfreePoolingAllocator::preallocationFlag; diff --git a/vcpkg.json b/vcpkg.json index e5a83b21263..34bfee7a84b 100644 --- a/vcpkg.json +++ b/vcpkg.json @@ -5,6 +5,7 @@ "abseil", "argon2", "asio", + "atomic-queue", "bext-di", "bext-ut", "curl", @@ -23,7 +24,7 @@ { "name": "opentelemetry-cpp", "default-features": true, - "features": ["otlp-http", "prometheus"] + "features": [ "otlp-http", "prometheus" ] }, { "name": "gmp",