diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ea7d29a06f..5f288d370f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -131,8 +131,22 @@ include(cmake/thirdparty/get_thread_pool.cmake) # ################################################################################################## # * library targets -------------------------------------------------------------------------------- -set(SOURCES "src/file_handle.cpp" "src/cufile/config.cpp" "src/cufile/driver.cpp" - "src/shim/cuda.cpp" "src/shim/cufile.cpp" "src/shim/libcurl.cpp" "src/shim/utils.cpp" +set(SOURCES + "src/batch.cpp" + "src/bounce_buffer.cpp" + "src/buffer.cpp" + "src/cufile/config.cpp" + "src/cufile/driver.cpp" + "src/defaults.cpp" + "src/error.cpp" + "src/file_handle.cpp" + "src/posix_io.cpp" + "src/shim/cuda.cpp" + "src/shim/cufile.cpp" + "src/shim/libcurl.cpp" + "src/shim/utils.cpp" + "src/stream.cpp" + "src/utils.cpp" ) if(KvikIO_REMOTE_SUPPORT) diff --git a/cpp/include/kvikio/batch.hpp b/cpp/include/kvikio/batch.hpp index 7eebbd4df0..9927962a65 100644 --- a/cpp/include/kvikio/batch.hpp +++ b/cpp/include/kvikio/batch.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,36 +73,22 @@ class BatchHandle { * * @param max_num_events The maximum number of operations supported by this instance. */ - BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events} - { - CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events)); - } + BatchHandle(int max_num_events); /** * @brief BatchHandle support move semantic but isn't copyable */ BatchHandle(const BatchHandle&) = delete; BatchHandle& operator=(BatchHandle const&) = delete; - BatchHandle(BatchHandle&& o) noexcept - : _initialized{std::exchange(o._initialized, false)}, - _max_num_events{std::exchange(o._max_num_events, 0)} - { - _handle = std::exchange(o._handle, CUfileBatchHandle_t{}); - } - ~BatchHandle() noexcept { close(); } + BatchHandle(BatchHandle&& o) noexcept; + ~BatchHandle() noexcept; - [[nodiscard]] bool closed() const noexcept { return !_initialized; } + [[nodiscard]] bool closed() const noexcept; /** * @brief Destroy the batch handle and free up resources */ - void close() noexcept - { - if (closed()) { return; } - _initialized = false; - - cuFileAPI::instance().BatchIODestroy(_handle); - } + void close() noexcept; /** * @brief Submit a vector of batch operations @@ -110,31 +96,7 @@ class BatchHandle { * @param operations The vector of batch operations, which must not exceed the * `max_num_events`. */ - void submit(const std::vector& operations) - { - if (convert_size2ssize(operations.size()) > _max_num_events) { - throw CUfileException("Cannot submit more than the max_num_events)"); - } - std::vector io_batch_params; - io_batch_params.reserve(operations.size()); - for (const auto& op : operations) { - if (op.file_handle.is_compat_mode_preferred()) { - throw CUfileException("Cannot submit a FileHandle opened in compatibility mode"); - } - - io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH, - .u = {.batch = {.devPtr_base = op.devPtr_base, - .file_offset = op.file_offset, - .devPtr_offset = op.devPtr_offset, - .size = op.size}}, - .fh = op.file_handle.handle(), - .opcode = op.opcode, - .cookie = nullptr}); - } - - CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit( - _handle, io_batch_params.size(), io_batch_params.data(), 0)); - } + void submit(const std::vector& operations); /** * @brief Get status of submitted operations @@ -148,16 +110,9 @@ class BatchHandle { */ std::vector status(unsigned min_nr, unsigned max_nr, - struct timespec* timeout = nullptr) - { - std::vector ret; - ret.resize(_max_num_events); - CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout)); - ret.resize(max_nr); - return ret; - } - - void cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); } + struct timespec* timeout = nullptr); + + void cancel(); }; #else @@ -166,24 +121,19 @@ class BatchHandle { public: BatchHandle() noexcept = default; - BatchHandle(int max_num_events) - { - throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+"); - } + BatchHandle(int max_num_events); - [[nodiscard]] bool closed() const noexcept { return true; } + [[nodiscard]] bool closed() const noexcept; - void close() noexcept {} + void close() noexcept; - void submit(const std::vector& operations) {} + void submit(const std::vector& operations); std::vector status(unsigned min_nr, unsigned max_nr, - struct timespec* timeout = nullptr) - { - return std::vector{}; - } - void cancel() {} + struct timespec* timeout = nullptr); + + void cancel(); }; #endif diff --git a/cpp/include/kvikio/bounce_buffer.hpp b/cpp/include/kvikio/bounce_buffer.hpp index 498f1d6f5f..5a7623a6a4 100644 --- a/cpp/include/kvikio/bounce_buffer.hpp +++ b/cpp/include/kvikio/bounce_buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ */ #pragma once -#include #include #include @@ -47,18 +46,15 @@ class AllocRetain { std::size_t const _size; public: - Alloc(AllocRetain* manager, void* alloc, std::size_t size) - : _manager(manager), _alloc{alloc}, _size{size} - { - } + Alloc(AllocRetain* manager, void* alloc, std::size_t size); Alloc(Alloc const&) = delete; Alloc& operator=(Alloc const&) = delete; Alloc(Alloc&& o) = delete; Alloc& operator=(Alloc&& o) = delete; - ~Alloc() noexcept { _manager->put(_alloc, _size); } - void* get() noexcept { return _alloc; } - void* get(std::ptrdiff_t offset) noexcept { return static_cast(_alloc) + offset; } - std::size_t size() noexcept { return _size; } + ~Alloc() noexcept; + void* get() noexcept; + void* get(std::ptrdiff_t offset) noexcept; + std::size_t size() noexcept; }; AllocRetain() = default; @@ -77,80 +73,28 @@ class AllocRetain { * * @return The number of bytes cleared */ - std::size_t _clear() - { - std::size_t ret = _free_allocs.size() * _size; - while (!_free_allocs.empty()) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top())); - _free_allocs.pop(); - } - return ret; - } + std::size_t _clear(); /** * @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()` * * NB: `_mutex` must be taken prior to calling this function. */ - void _ensure_alloc_size() - { - auto const bounce_buffer_size = defaults::bounce_buffer_size(); - if (_size != bounce_buffer_size) { - _clear(); - _size = bounce_buffer_size; - } - } + void _ensure_alloc_size(); public: - [[nodiscard]] Alloc get() - { - std::lock_guard const lock(_mutex); - _ensure_alloc_size(); - - // Check if we have an allocation available - if (!_free_allocs.empty()) { - void* ret = _free_allocs.top(); - _free_allocs.pop(); - return Alloc(this, ret, _size); - } - - // If no available allocation, allocate and register a new one - void* alloc{}; - // Allocate page-locked host memory - CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE)); - return Alloc(this, alloc, _size); - } - - void put(void* alloc, std::size_t size) - { - std::lock_guard const lock(_mutex); - _ensure_alloc_size(); - - // If the size of `alloc` matches the sizes of the retained allocations, - // it is added to the set of free allocation otherwise it is freed. - if (size == _size) { - _free_allocs.push(alloc); - } else { - CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc)); - } - } + [[nodiscard]] Alloc get(); + + void put(void* alloc, std::size_t size); /** * @brief Free all retained allocations * * @return The number of bytes cleared */ - std::size_t clear() - { - std::lock_guard const lock(_mutex); - return _clear(); - } - - KVIKIO_EXPORT static AllocRetain& instance() - { - static AllocRetain _instance; - return _instance; - } + std::size_t clear(); + + KVIKIO_EXPORT static AllocRetain& instance(); AllocRetain(AllocRetain const&) = delete; AllocRetain& operator=(AllocRetain const&) = delete; diff --git a/cpp/include/kvikio/buffer.hpp b/cpp/include/kvikio/buffer.hpp index 85c60b3f90..9cef45a6f0 100644 --- a/cpp/include/kvikio/buffer.hpp +++ b/cpp/include/kvikio/buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,17 +15,8 @@ */ #pragma once -#include -#include -#include #include -#include -#include -#include -#include -#include - namespace kvikio { /** @@ -44,32 +35,17 @@ namespace kvikio { * streaming buffer that is reused across multiple cuFile IO operations. */ /*NOLINTNEXTLINE(readability-function-cognitive-complexity)*/ -inline void buffer_register(const void* devPtr_base, - std::size_t size, - int flags = 0, - const std::vector& errors_to_ignore = std::vector()) -{ - if (defaults::is_compat_mode_preferred()) { return; } - CUfileError_t status = cuFileAPI::instance().BufRegister(devPtr_base, size, flags); - if (status.err != CU_FILE_SUCCESS) { - // Check if `status.err` is in `errors_to_ignore` - if (std::find(errors_to_ignore.begin(), errors_to_ignore.end(), status.err) == - errors_to_ignore.end()) { - CUFILE_TRY(status); - } - } -} +void buffer_register(const void* devPtr_base, + std::size_t size, + int flags = 0, + const std::vector& errors_to_ignore = std::vector()); /** * @brief deregister an already registered device memory from cuFile * * @param devPtr_base device pointer to deregister */ -inline void buffer_deregister(const void* devPtr_base) -{ - if (defaults::is_compat_mode_preferred()) { return; } - CUFILE_TRY(cuFileAPI::instance().BufDeregister(devPtr_base)); -} +void buffer_deregister(const void* devPtr_base); /** * @brief Register device memory allocation which is part of devPtr. Use this @@ -85,23 +61,15 @@ inline void buffer_deregister(const void* devPtr_base) * @warning This API is intended for usecases where the memory is used as * streaming buffer that is reused across multiple cuFile IO operations. */ -inline void memory_register(const void* devPtr, - int flags = 0, - const std::vector& errors_to_ignore = {}) -{ - auto [base, nbytes, offset] = get_alloc_info(devPtr); - buffer_register(base, nbytes, flags, errors_to_ignore); -} +void memory_register(const void* devPtr, + int flags = 0, + const std::vector& errors_to_ignore = {}); /** * @brief deregister an already registered device memory from cuFile. * * @param devPtr device pointer to deregister */ -inline void memory_deregister(const void* devPtr) -{ - auto [base, nbytes, offset] = get_alloc_info(devPtr); - buffer_deregister(base); -} +void memory_deregister(const void* devPtr); } // namespace kvikio diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 91071cbb28..4c87724445 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,13 +14,13 @@ * limitations under the License. */ +// Enable documentation of the enum. /** * @file */ #pragma once -#include #include #include #include @@ -55,25 +55,7 @@ namespace detail { * - `AUTO` * @return A CompatMode enum. */ -inline CompatMode parse_compat_mode_str(std::string_view compat_mode_str) -{ - // Convert to lowercase - std::string tmp{compat_mode_str}; - std::transform( - tmp.begin(), tmp.end(), tmp.begin(), [](unsigned char c) { return std::tolower(c); }); - - CompatMode res{}; - if (tmp == "on" || tmp == "true" || tmp == "yes" || tmp == "1") { - res = CompatMode::ON; - } else if (tmp == "off" || tmp == "false" || tmp == "no" || tmp == "0") { - res = CompatMode::OFF; - } else if (tmp == "auto") { - res = CompatMode::AUTO; - } else { - throw std::invalid_argument("Unknown compatibility mode: " + std::string{tmp}); - } - return res; -} +CompatMode parse_compat_mode_str(std::string_view compat_mode_str); template T getenv_or(std::string_view env_var_name, T default_val) @@ -92,44 +74,10 @@ T getenv_or(std::string_view env_var_name, T default_val) } template <> -inline bool getenv_or(std::string_view env_var_name, bool default_val) -{ - const auto* env_val = std::getenv(env_var_name.data()); - if (env_val == nullptr) { return default_val; } - try { - // Try parsing `env_var_name` as a integer - return static_cast(std::stoi(env_val)); - } catch (const std::invalid_argument&) { - } - // Convert to lowercase - std::string str{env_val}; - // Special considerations regarding the case conversion: - // - std::tolower() is not an addressable function. Passing it to std::transform() as - // a function pointer, if the compile turns out successful, causes the program behavior - // "unspecified (possibly ill-formed)", hence the lambda. ::tolower() is addressable - // and does not have this problem, but the following item still applies. - // - To avoid UB in std::tolower() or ::tolower(), the character must be cast to unsigned char. - std::transform( - str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::tolower(c); }); - // Trim whitespaces - std::stringstream trimmer; - trimmer << str; - str.clear(); - trimmer >> str; - // Match value - if (str == "true" || str == "on" || str == "yes") { return true; } - if (str == "false" || str == "off" || str == "no") { return false; } - throw std::invalid_argument("unknown config value " + std::string{env_var_name} + "=" + - std::string{env_val}); -} +bool getenv_or(std::string_view env_var_name, bool default_val); template <> -inline CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val) -{ - auto* env_val = std::getenv(env_var_name.data()); - if (env_val == nullptr) { return default_val; } - return parse_compat_mode_str(env_val); -} +CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val); } // namespace detail @@ -145,54 +93,11 @@ class defaults { std::size_t _gds_threshold; std::size_t _bounce_buffer_size; - static unsigned int get_num_threads_from_env() - { - const int ret = detail::getenv_or("KVIKIO_NTHREADS", 1); - if (ret <= 0) { - throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer greater than zero"); - } - return ret; - } + static unsigned int get_num_threads_from_env(); - defaults() - { - // Determine the default value of `compat_mode` - { - _compat_mode = detail::getenv_or("KVIKIO_COMPAT_MODE", CompatMode::AUTO); - } - // Determine the default value of `task_size` - { - const ssize_t env = detail::getenv_or("KVIKIO_TASK_SIZE", 4 * 1024 * 1024); - if (env <= 0) { - throw std::invalid_argument( - "KVIKIO_TASK_SIZE has to be a positive integer greater than zero"); - } - _task_size = env; - } - // Determine the default value of `gds_threshold` - { - const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024); - if (env < 0) { - throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer"); - } - _gds_threshold = env; - } - // Determine the default value of `bounce_buffer_size` - { - const ssize_t env = detail::getenv_or("KVIKIO_BOUNCE_BUFFER_SIZE", 16 * 1024 * 1024); - if (env <= 0) { - throw std::invalid_argument( - "KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer greater than zero"); - } - _bounce_buffer_size = env; - } - } + defaults(); - KVIKIO_EXPORT static defaults* instance() - { - static defaults _instance; - return &_instance; - } + KVIKIO_EXPORT static defaults* instance(); public: /** @@ -213,7 +118,7 @@ class defaults { * * @return Compatibility mode. */ - [[nodiscard]] static CompatMode compat_mode() { return instance()->_compat_mode; } + [[nodiscard]] static CompatMode compat_mode(); /** * @brief Reset the value of `kvikio::defaults::compat_mode()`. @@ -223,7 +128,7 @@ class defaults { * * @param compat_mode Compatibility mode. */ - static void compat_mode_reset(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } + static void compat_mode_reset(CompatMode compat_mode); /** * @brief Infer the `AUTO` compatibility mode from the system runtime. @@ -234,16 +139,7 @@ class defaults { * (`ON`/`OFF`/`AUTO`) to two (`ON`/`OFF`) so as to determine the actual I/O path. This function * is lightweight as the inferred result is cached. */ - static CompatMode infer_compat_mode_if_auto(CompatMode compat_mode) - { - if (compat_mode == CompatMode::AUTO) { - static auto inferred_compat_mode_for_auto = []() -> CompatMode { - return is_cufile_available() ? CompatMode::OFF : CompatMode::ON; - }(); - return inferred_compat_mode_for_auto; - } - return compat_mode; - } + static CompatMode infer_compat_mode_if_auto(CompatMode compat_mode); /** * @brief Given a requested compatibility mode, whether it is expected to reduce to `ON`. @@ -260,12 +156,7 @@ class defaults { * @param compat_mode Compatibility mode. * @return Boolean answer. */ - static bool is_compat_mode_preferred(CompatMode compat_mode) - { - return compat_mode == CompatMode::ON || - (compat_mode == CompatMode::AUTO && - defaults::infer_compat_mode_if_auto(compat_mode) == CompatMode::ON); - } + static bool is_compat_mode_preferred(CompatMode compat_mode); /** * @brief Whether the global compatibility mode from class defaults is expected to be `ON`. @@ -281,7 +172,7 @@ class defaults { * * @return Boolean answer. */ - static bool is_compat_mode_preferred() { return is_compat_mode_preferred(compat_mode()); } + static bool is_compat_mode_preferred(); /** * @brief Get the default thread pool. @@ -292,7 +183,7 @@ class defaults { * * @return The the default thread pool instance. */ - [[nodiscard]] static BS::thread_pool& thread_pool() { return instance()->_thread_pool; } + [[nodiscard]] static BS::thread_pool& thread_pool(); /** * @brief Get the number of threads in the default thread pool. @@ -302,10 +193,7 @@ class defaults { * * @return The number of threads. */ - [[nodiscard]] static unsigned int thread_pool_nthreads() - { - return thread_pool().get_thread_count(); - } + [[nodiscard]] static unsigned int thread_pool_nthreads(); /** * @brief Reset the number of threads in the default thread pool. Waits for all currently running @@ -316,13 +204,7 @@ class defaults { * * @param nthreads The number of threads to use. */ - static void thread_pool_nthreads_reset(unsigned int nthreads) - { - if (nthreads == 0) { - throw std::invalid_argument("number of threads must be a positive integer greater than zero"); - } - thread_pool().reset(nthreads); - } + static void thread_pool_nthreads_reset(unsigned int nthreads); /** * @brief Get the default task size used for parallel IO operations. @@ -332,20 +214,14 @@ class defaults { * * @return The default task size in bytes. */ - [[nodiscard]] static std::size_t task_size() { return instance()->_task_size; } + [[nodiscard]] static std::size_t task_size(); /** * @brief Reset the default task size used for parallel IO operations. * * @param nbytes The default task size in bytes. */ - static void task_size_reset(std::size_t nbytes) - { - if (nbytes == 0) { - throw std::invalid_argument("task size must be a positive integer greater than zero"); - } - instance()->_task_size = nbytes; - } + static void task_size_reset(std::size_t nbytes); /** * @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes). @@ -358,13 +234,13 @@ class defaults { * * @return The default GDS threshold size in bytes. */ - [[nodiscard]] static std::size_t gds_threshold() { return instance()->_gds_threshold; } + [[nodiscard]] static std::size_t gds_threshold(); /** * @brief Reset the default GDS threshold, which is the minimum size to use GDS (in bytes). * @param nbytes The default GDS threshold size in bytes. */ - static void gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } + static void gds_threshold_reset(std::size_t nbytes); /** * @brief Get the size of the bounce buffer used to stage data in host memory. @@ -374,21 +250,14 @@ class defaults { * * @return The bounce buffer size in bytes. */ - [[nodiscard]] static std::size_t bounce_buffer_size() { return instance()->_bounce_buffer_size; } + [[nodiscard]] static std::size_t bounce_buffer_size(); /** * @brief Reset the size of the bounce buffer used to stage data in host memory. * * @param nbytes The bounce buffer size in bytes. */ - static void bounce_buffer_size_reset(std::size_t nbytes) - { - if (nbytes == 0) { - throw std::invalid_argument( - "size of the bounce buffer must be a positive integer greater than zero"); - } - instance()->_bounce_buffer_size = nbytes; - } + static void bounce_buffer_size_reset(std::size_t nbytes); }; } // namespace kvikio diff --git a/cpp/include/kvikio/posix_io.hpp b/cpp/include/kvikio/posix_io.hpp index 4327a301ec..7675285d09 100644 --- a/cpp/include/kvikio/posix_io.hpp +++ b/cpp/include/kvikio/posix_io.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,31 +65,9 @@ class StreamsByThread { // cuDevicePrimaryCtxReset() or cudaDeviceReset() before program termination. ~StreamsByThread() = default; - KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id) - { - static StreamsByThread _instance; + KVIKIO_EXPORT static CUstream get(CUcontext ctx, std::thread::id thd_id); - // If no current context, we return the null/default stream - if (ctx == nullptr) { return nullptr; } - auto key = std::make_pair(ctx, thd_id); - - // Create a new stream if `ctx` doesn't have one. - if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { - CUstream stream{}; - CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); - _instance._streams[key] = stream; - return stream; - } else { - return search->second; - } - } - - static CUstream get() - { - CUcontext ctx{nullptr}; - CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); - return get(ctx, std::this_thread::get_id()); - } + static CUstream get(); StreamsByThread(const StreamsByThread&) = delete; StreamsByThread& operator=(StreamsByThread const&) = delete; @@ -251,16 +229,11 @@ std::size_t posix_host_write(int fd, const void* buf, std::size_t size, std::siz * @param devPtr_offset Offset relative to the `devPtr_base` pointer to read into. * @return Size of bytes that were successfully read. */ -inline std::size_t posix_device_read(int fd, - const void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -{ - KVIKIO_NVTX_SCOPED_RANGE("posix_device_read()", size); - return detail::posix_device_io( - fd, devPtr_base, size, file_offset, devPtr_offset); -} +std::size_t posix_device_read(int fd, + const void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset); /** * @brief Write device memory to disk using POSIX @@ -275,15 +248,10 @@ inline std::size_t posix_device_read(int fd, * @param devPtr_offset Offset relative to the `devPtr_base` pointer to write into. * @return Size of bytes that were successfully written. */ -inline std::size_t posix_device_write(int fd, - const void* devPtr_base, - std::size_t size, - std::size_t file_offset, - std::size_t devPtr_offset) -{ - KVIKIO_NVTX_SCOPED_RANGE("posix_device_write()", size); - return detail::posix_device_io( - fd, devPtr_base, size, file_offset, devPtr_offset); -} +std::size_t posix_device_write(int fd, + const void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset); } // namespace kvikio::detail diff --git a/cpp/include/kvikio/remote_handle.hpp b/cpp/include/kvikio/remote_handle.hpp index e1b152b23c..ff0741ae3f 100644 --- a/cpp/include/kvikio/remote_handle.hpp +++ b/cpp/include/kvikio/remote_handle.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,101 +33,6 @@ #include namespace kvikio { -namespace detail { - -/** - * @brief Bounce buffer in pinned host memory. - * - * @note Is not thread-safe. - */ -class BounceBufferH2D { - CUstream _stream; // The CUDA stream to use. - CUdeviceptr _dev; // The output device buffer. - AllocRetain::Alloc _host_buffer; // The host buffer to bounce data on. - std::ptrdiff_t _dev_offset{0}; // Number of bytes written to `_dev`. - std::ptrdiff_t _host_offset{0}; // Number of bytes written to `_host` (resets on flush). - - public: - /** - * @brief Create a bounce buffer for an output device buffer. - * - * @param stream The CUDA stream used throughout the lifetime of the bounce buffer. - * @param device_buffer The output device buffer (final destination of the data). - */ - BounceBufferH2D(CUstream stream, void* device_buffer) - : _stream{stream}, - _dev{convert_void2deviceptr(device_buffer)}, - _host_buffer{AllocRetain::instance().get()} - { - } - - /** - * @brief The bounce buffer if flushed to device on destruction. - */ - ~BounceBufferH2D() noexcept - { - try { - flush(); - } catch (CUfileException const& e) { - std::cerr << "BounceBufferH2D error on final flush: "; - std::cerr << e.what(); - std::cerr << std::endl; - } - } - - private: - /** - * @brief Write host memory to the output device buffer. - * - * @param src The host memory source. - * @param size Number of bytes to write. - */ - void write_to_device(void const* src, std::size_t size) - { - if (size > 0) { - CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(_dev + _dev_offset, src, size, _stream)); - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); - _dev_offset += size; - } - } - - /** - * @brief Flush the bounce buffer by writing everything to the output device buffer. - */ - void flush() - { - write_to_device(_host_buffer.get(), _host_offset); - _host_offset = 0; - } - - public: - /** - * @brief Write host memory to the bounce buffer (also host memory). - * - * Only when the bounce buffer has been filled up is data copied to the output device buffer. - * - * @param data The host memory source. - * @param size Number of bytes to write. - */ - void write(char const* data, std::size_t size) - { - if (_host_buffer.size() - _host_offset < size) { // Not enough space left in the bounce buffer - flush(); - assert(_host_offset == 0); - } - if (_host_buffer.size() < size) { - // If still not enough space, we just copy the data to the device. This only happens when - // `defaults::bounce_buffer_size()` is smaller than 16kb thus no need to performance - // optimize for this case. - write_to_device(data, size); - } else if (size > 0) { - std::memcpy(_host_buffer.get(_host_offset), data, size); - _host_offset += size; - } - } -}; - -} // namespace detail class CurlHandle; // Prototype @@ -173,9 +78,9 @@ class HttpEndpoint : public RemoteEndpoint { * * @param url The full http url to the remote file. */ - HttpEndpoint(std::string url) : _url{std::move(url)} {} + HttpEndpoint(std::string url); void setopt(CurlHandle& curl) override; - std::string str() const override { return _url; } + std::string str() const override; ~HttpEndpoint() override = default; }; @@ -203,17 +108,7 @@ class S3Endpoint : public RemoteEndpoint { */ static std::string unwrap_or_default(std::optional aws_arg, std::string const& env_var, - std::string const& err_msg = "") - { - if (aws_arg.has_value()) { return std::move(*aws_arg); } - - char const* env = std::getenv(env_var.c_str()); - if (env == nullptr) { - if (err_msg.empty()) { return std::string(); } - throw std::invalid_argument(err_msg); - } - return std::string(env); - } + std::string const& err_msg = ""); public: /** @@ -234,22 +129,7 @@ class S3Endpoint : public RemoteEndpoint { static std::string url_from_bucket_and_object(std::string const& bucket_name, std::string const& object_name, std::optional const& aws_region, - std::optional aws_endpoint_url) - { - auto const endpoint_url = unwrap_or_default(std::move(aws_endpoint_url), "AWS_ENDPOINT_URL"); - std::stringstream ss; - if (endpoint_url.empty()) { - auto const region = - unwrap_or_default(std::move(aws_region), - "AWS_DEFAULT_REGION", - "S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set."); - // We default to the official AWS url scheme. - ss << "https://" << bucket_name << ".s3." << region << ".amazonaws.com/" << object_name; - } else { - ss << endpoint_url << "/" << bucket_name << "/" << object_name; - } - return ss.str(); - } + std::optional aws_endpoint_url); /** * @brief Given an url like "s3:///", return the name of the bucket and object. @@ -259,14 +139,7 @@ class S3Endpoint : public RemoteEndpoint { * @param s3_url S3 url. * @return Pair of strings: [bucket-name, object-name]. */ - [[nodiscard]] static std::pair parse_s3_url(std::string const& s3_url) - { - // Regular expression to match s3:/// - std::regex const pattern{R"(^s3://([^/]+)/(.+))", std::regex_constants::icase}; - std::smatch matches; - if (std::regex_match(s3_url, matches, pattern)) { return {matches[1].str(), matches[2].str()}; } - throw std::invalid_argument("Input string does not match the expected S3 URL format."); - } + [[nodiscard]] static std::pair parse_s3_url(std::string const& s3_url); /** * @brief Create a S3 endpoint from a url. @@ -284,46 +157,7 @@ class S3Endpoint : public RemoteEndpoint { S3Endpoint(std::string url, std::optional aws_region = std::nullopt, std::optional aws_access_key = std::nullopt, - std::optional aws_secret_access_key = std::nullopt) - : _url{std::move(url)} - { - // Regular expression to match http[s]:// - std::regex pattern{R"(^https?://.*)", std::regex_constants::icase}; - if (!std::regex_search(_url, pattern)) { - throw std::invalid_argument("url must start with http:// or https://"); - } - - auto const region = - unwrap_or_default(std::move(aws_region), - "AWS_DEFAULT_REGION", - "S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set."); - - auto const access_key = - unwrap_or_default(std::move(aws_access_key), - "AWS_ACCESS_KEY_ID", - "S3: must provide `aws_access_key` if AWS_ACCESS_KEY_ID isn't set."); - - auto const secret_access_key = unwrap_or_default( - std::move(aws_secret_access_key), - "AWS_SECRET_ACCESS_KEY", - "S3: must provide `aws_secret_access_key` if AWS_SECRET_ACCESS_KEY isn't set."); - - // Create the CURLOPT_AWS_SIGV4 option - { - std::stringstream ss; - ss << "aws:amz:" << region << ":s3"; - _aws_sigv4 = ss.str(); - } - // Create the CURLOPT_USERPWD option - // Notice, curl uses `secret_access_key` to generate a AWS V4 signature. It is NOT included - // in the http header. See - // - { - std::stringstream ss; - ss << access_key << ":" << secret_access_key; - _aws_userpwd = ss.str(); - } - } + std::optional aws_secret_access_key = std::nullopt); /** * @brief Create a S3 endpoint from a bucket and object name. @@ -346,17 +180,10 @@ class S3Endpoint : public RemoteEndpoint { std::optional aws_region = std::nullopt, std::optional aws_access_key = std::nullopt, std::optional aws_secret_access_key = std::nullopt, - std::optional aws_endpoint_url = std::nullopt) - : S3Endpoint(url_from_bucket_and_object( - bucket_name, object_name, aws_region, std::move(aws_endpoint_url)), - std::move(aws_region), - std::move(aws_access_key), - std::move(aws_secret_access_key)) - { - } + std::optional aws_endpoint_url = std::nullopt); void setopt(CurlHandle& curl) override; - std::string str() const override { return _url; } + std::string str() const override; ~S3Endpoint() override = default; }; @@ -375,10 +202,7 @@ class RemoteHandle { * @param endpoint Remote endpoint used for subsequent IO. * @param nbytes The size of the remote file (in bytes). */ - RemoteHandle(std::unique_ptr endpoint, std::size_t nbytes) - : _endpoint{std::move(endpoint)}, _nbytes{nbytes} - { - } + RemoteHandle(std::unique_ptr endpoint, std::size_t nbytes); /** * @brief Create a new remote handle from an endpoint (infers the file size). @@ -402,14 +226,14 @@ class RemoteHandle { * * @return The number of bytes. */ - [[nodiscard]] std::size_t nbytes() const noexcept { return _nbytes; } + std::size_t nbytes() const noexcept; /** * @brief Get a const reference to the underlying remote endpoint. * * @return The remote endpoint. */ - [[nodiscard]] RemoteEndpoint const& endpoint() const noexcept { return *_endpoint; } + [[nodiscard]] RemoteEndpoint const& endpoint() const noexcept; /** * @brief Read from remote source into buffer (host or device memory). diff --git a/cpp/include/kvikio/stream.hpp b/cpp/include/kvikio/stream.hpp index 9eb9942b7a..4b3e37a980 100644 --- a/cpp/include/kvikio/stream.hpp +++ b/cpp/include/kvikio/stream.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * Copyright (c) 2023-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,11 @@ #include #include -#include -#include -#include -#include #include #include +#include + namespace kvikio { /** @@ -63,38 +61,15 @@ class StreamFuture { StreamFuture() noexcept = default; StreamFuture( - void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream) - : _devPtr_base{devPtr_base}, _stream{stream} - { - // Notice, we allocate the arguments using malloc() as specified in the cuFile docs: - // - if ((_val = static_cast(std::malloc(sizeof(ArgByVal)))) == nullptr) { - throw std::bad_alloc{}; - } - *_val = { - .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0}; - } + void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream); /** * @brief StreamFuture support move semantic but isn't copyable */ StreamFuture(const StreamFuture&) = delete; StreamFuture& operator=(StreamFuture& o) = delete; - StreamFuture(StreamFuture&& o) noexcept - : _devPtr_base{std::exchange(o._devPtr_base, nullptr)}, - _stream{std::exchange(o._stream, nullptr)}, - _val{std::exchange(o._val, nullptr)}, - _stream_synchronized{o._stream_synchronized} - { - } - StreamFuture& operator=(StreamFuture&& o) noexcept - { - _devPtr_base = std::exchange(o._devPtr_base, nullptr); - _stream = std::exchange(o._stream, nullptr); - _val = std::exchange(o._val, nullptr); - _stream_synchronized = o._stream_synchronized; - return *this; - } + StreamFuture(StreamFuture&& o) noexcept; + StreamFuture& operator=(StreamFuture&& o) noexcept; /** * @brief Return the arguments of the future call @@ -102,18 +77,7 @@ class StreamFuture { * @return Tuple of the arguments in the order matching `FileHandle.read()` and * `FileHandle.write()` */ - std::tuple get_args() const - { - if (_val == nullptr) { - throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture"); - } - return {_devPtr_base, - &_val->size, - &_val->file_offset, - &_val->devPtr_offset, - &_val->bytes_done, - _stream}; - } + std::tuple get_args() const; /** * @brief Return the number of bytes read or written by the future operation. @@ -122,38 +86,13 @@ class StreamFuture { * * @return Number of bytes read or written by the future operation. */ - std::size_t check_bytes_done() - { - if (_val == nullptr) { - throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture"); - } - - if (!_stream_synchronized) { - _stream_synchronized = true; - CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); - } - - CUFILE_CHECK_BYTES_DONE(_val->bytes_done); - // At this point, we know `_val->bytes_done` is a positive value otherwise - // CUFILE_CHECK_BYTES_DONE() would have raised an exception. - return static_cast(_val->bytes_done); - } + std::size_t check_bytes_done(); /** * @brief Free the by-value arguments and make sure the associated CUDA stream has been * synchronized. */ - ~StreamFuture() noexcept - { - if (_val != nullptr) { - try { - check_bytes_done(); - } catch (const kvikio::CUfileException& e) { - std::cerr << e.what() << std::endl; - } - std::free(_val); - } - } + ~StreamFuture() noexcept; }; } // namespace kvikio diff --git a/cpp/src/batch.cpp b/cpp/src/batch.cpp new file mode 100644 index 0000000000..8ced70cbd8 --- /dev/null +++ b/cpp/src/batch.cpp @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace kvikio { + +#ifdef KVIKIO_CUFILE_BATCH_API_FOUND + +BatchHandle::BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events} +{ + CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events)); +} + +BatchHandle::BatchHandle(BatchHandle&& o) noexcept + : _initialized{std::exchange(o._initialized, false)}, + _max_num_events{std::exchange(o._max_num_events, 0)} +{ + _handle = std::exchange(o._handle, CUfileBatchHandle_t{}); +} + +BatchHandle::~BatchHandle() noexcept { close(); } + +bool BatchHandle::closed() const noexcept { return !_initialized; } + +void BatchHandle::close() noexcept +{ + if (closed()) { return; } + _initialized = false; + + cuFileAPI::instance().BatchIODestroy(_handle); +} + +void BatchHandle::submit(const std::vector& operations) +{ + if (convert_size2ssize(operations.size()) > _max_num_events) { + throw CUfileException("Cannot submit more than the max_num_events)"); + } + std::vector io_batch_params; + io_batch_params.reserve(operations.size()); + for (const auto& op : operations) { + if (op.file_handle.is_compat_mode_preferred()) { + throw CUfileException("Cannot submit a FileHandle opened in compatibility mode"); + } + + io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH, + .u = {.batch = {.devPtr_base = op.devPtr_base, + .file_offset = op.file_offset, + .devPtr_offset = op.devPtr_offset, + .size = op.size}}, + .fh = op.file_handle.handle(), + .opcode = op.opcode, + .cookie = nullptr}); + } + + CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit( + _handle, io_batch_params.size(), io_batch_params.data(), 0)); +} + +std::vector BatchHandle::status(unsigned min_nr, + unsigned max_nr, + struct timespec* timeout) +{ + std::vector ret; + ret.resize(_max_num_events); + CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout)); + ret.resize(max_nr); + return ret; +} + +void BatchHandle::cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); } + +#else + +BatchHandle::BatchHandle(int max_num_events) +{ + throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+"); +} + +bool BatchHandle::closed() const noexcept { return true; } + +void BatchHandle::close() noexcept {} + +void BatchHandle::submit(const std::vector& operations) {} + +std::vector BatchHandle::status(unsigned min_nr, + unsigned max_nr, + struct timespec* timeout) +{ + return std::vector{}; +} + +void BatchHandle::cancel() {} + +#endif + +} // namespace kvikio diff --git a/cpp/src/bounce_buffer.cpp b/cpp/src/bounce_buffer.cpp new file mode 100644 index 0000000000..65ca1aaa52 --- /dev/null +++ b/cpp/src/bounce_buffer.cpp @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include +#include +#include + +namespace kvikio { + +AllocRetain::Alloc::Alloc(AllocRetain* manager, void* alloc, std::size_t size) + : _manager(manager), _alloc{alloc}, _size{size} +{ +} + +AllocRetain::Alloc::~Alloc() noexcept { _manager->put(_alloc, _size); } + +void* AllocRetain::Alloc::get() noexcept { return _alloc; } + +void* AllocRetain::Alloc::get(std::ptrdiff_t offset) noexcept +{ + return static_cast(_alloc) + offset; +} + +std::size_t AllocRetain::Alloc::size() noexcept { return _size; } + +std::size_t AllocRetain::_clear() +{ + std::size_t ret = _free_allocs.size() * _size; + while (!_free_allocs.empty()) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top())); + _free_allocs.pop(); + } + return ret; +} + +void AllocRetain::_ensure_alloc_size() +{ + auto const bounce_buffer_size = defaults::bounce_buffer_size(); + if (_size != bounce_buffer_size) { + _clear(); + _size = bounce_buffer_size; + } +} + +AllocRetain::Alloc AllocRetain::get() +{ + std::lock_guard const lock(_mutex); + _ensure_alloc_size(); + + // Check if we have an allocation available + if (!_free_allocs.empty()) { + void* ret = _free_allocs.top(); + _free_allocs.pop(); + return Alloc(this, ret, _size); + } + + // If no available allocation, allocate and register a new one + void* alloc{}; + // Allocate page-locked host memory + CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE)); + return Alloc(this, alloc, _size); +} + +void AllocRetain::put(void* alloc, std::size_t size) +{ + std::lock_guard const lock(_mutex); + _ensure_alloc_size(); + + // If the size of `alloc` matches the sizes of the retained allocations, + // it is added to the set of free allocation otherwise it is freed. + if (size == _size) { + _free_allocs.push(alloc); + } else { + CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc)); + } +} + +std::size_t AllocRetain::clear() +{ + std::lock_guard const lock(_mutex); + return _clear(); +} + +AllocRetain& AllocRetain::instance() +{ + static AllocRetain _instance; + return _instance; +} + +} // namespace kvikio diff --git a/cpp/src/buffer.cpp b/cpp/src/buffer.cpp new file mode 100644 index 0000000000..0aa772d50f --- /dev/null +++ b/cpp/src/buffer.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace kvikio { + +void buffer_register(const void* devPtr_base, + std::size_t size, + int flags, + const std::vector& errors_to_ignore) +{ + if (defaults::is_compat_mode_preferred()) { return; } + CUfileError_t status = cuFileAPI::instance().BufRegister(devPtr_base, size, flags); + if (status.err != CU_FILE_SUCCESS) { + // Check if `status.err` is in `errors_to_ignore` + if (std::find(errors_to_ignore.begin(), errors_to_ignore.end(), status.err) == + errors_to_ignore.end()) { + CUFILE_TRY(status); + } + } +} + +void buffer_deregister(const void* devPtr_base) +{ + if (defaults::is_compat_mode_preferred()) { return; } + CUFILE_TRY(cuFileAPI::instance().BufDeregister(devPtr_base)); +} + +void memory_register(const void* devPtr, int flags, const std::vector& errors_to_ignore) +{ + auto [base, nbytes, offset] = get_alloc_info(devPtr); + buffer_register(base, nbytes, flags, errors_to_ignore); +} + +void memory_deregister(const void* devPtr) +{ + auto [base, nbytes, offset] = get_alloc_info(devPtr); + buffer_deregister(base); +} + +} // namespace kvikio diff --git a/cpp/src/cufile/config.cpp b/cpp/src/cufile/config.cpp index 7566c11532..2abbf33e92 100644 --- a/cpp/src/cufile/config.cpp +++ b/cpp/src/cufile/config.cpp @@ -21,7 +21,7 @@ #include namespace kvikio { -namespace detail { +namespace { [[nodiscard]] inline const char* lookup_config_path() { @@ -31,11 +31,11 @@ namespace detail { return ""; } -} // namespace detail +} // namespace const std::string& config_path() { - static const std::string ret = detail::lookup_config_path(); + static const std::string ret = lookup_config_path(); return ret; } diff --git a/cpp/src/cufile/driver.cpp b/cpp/src/cufile/driver.cpp index 127050ed06..13a23f547c 100644 --- a/cpp/src/cufile/driver.cpp +++ b/cpp/src/cufile/driver.cpp @@ -23,7 +23,7 @@ #include namespace kvikio { -namespace detail { +namespace { [[nodiscard]] inline bool get_driver_flag(unsigned int prop, unsigned int flag) noexcept { @@ -38,7 +38,7 @@ inline void set_driver_flag(unsigned int& prop, unsigned int flag, bool val) noe prop &= ~(1U << flag); } } -} // namespace detail +} // namespace #ifdef KVIKIO_CUFILE_FOUND @@ -70,31 +70,31 @@ bool DriverProperties::is_gds_available() return !(get_nvfs_major_version() == 0 && get_nvfs_minor_version() == 0); } -[[nodiscard]] unsigned int DriverProperties::get_nvfs_major_version() +unsigned int DriverProperties::get_nvfs_major_version() { lazy_init(); return _props.nvfs.major_version; } -[[nodiscard]] unsigned int DriverProperties::get_nvfs_minor_version() +unsigned int DriverProperties::get_nvfs_minor_version() { lazy_init(); return _props.nvfs.minor_version; } -[[nodiscard]] bool DriverProperties::get_nvfs_allow_compat_mode() +bool DriverProperties::get_nvfs_allow_compat_mode() { lazy_init(); - return detail::get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_ALLOW_COMPAT_MODE); + return get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_ALLOW_COMPAT_MODE); } -[[nodiscard]] bool DriverProperties::get_nvfs_poll_mode() +bool DriverProperties::get_nvfs_poll_mode() { lazy_init(); - return detail::get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE); + return get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE); } -[[nodiscard]] std::size_t DriverProperties::get_nvfs_poll_thresh_size() +std::size_t DriverProperties::get_nvfs_poll_thresh_size() { lazy_init(); return _props.nvfs.poll_thresh_size; @@ -104,7 +104,7 @@ void DriverProperties::set_nvfs_poll_mode(bool enable) { lazy_init(); CUFILE_TRY(cuFileAPI::instance().DriverSetPollMode(enable, get_nvfs_poll_thresh_size())); - detail::set_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE, enable); + set_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE, enable); } void DriverProperties::set_nvfs_poll_thresh_size(std::size_t size_in_kb) @@ -114,20 +114,20 @@ void DriverProperties::set_nvfs_poll_thresh_size(std::size_t size_in_kb) _props.nvfs.poll_thresh_size = size_in_kb; } -[[nodiscard]] std::vector DriverProperties::get_nvfs_statusflags() +std::vector DriverProperties::get_nvfs_statusflags() { lazy_init(); std::vector ret; - if (detail::get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE)) { + if (get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_USE_POLL_MODE)) { ret.push_back(CU_FILE_USE_POLL_MODE); } - if (detail::get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_ALLOW_COMPAT_MODE)) { + if (get_driver_flag(_props.nvfs.dcontrolflags, CU_FILE_ALLOW_COMPAT_MODE)) { ret.push_back(CU_FILE_ALLOW_COMPAT_MODE); } return ret; } -[[nodiscard]] std::size_t DriverProperties::get_max_device_cache_size() +std::size_t DriverProperties::get_max_device_cache_size() { lazy_init(); return _props.max_device_cache_size; @@ -140,13 +140,13 @@ void DriverProperties::set_max_device_cache_size(std::size_t size_in_kb) _props.max_device_cache_size = size_in_kb; } -[[nodiscard]] std::size_t DriverProperties::get_per_buffer_cache_size() +std::size_t DriverProperties::get_per_buffer_cache_size() { lazy_init(); return _props.per_buffer_cache_size; } -[[nodiscard]] std::size_t DriverProperties::get_max_pinned_memory_size() +std::size_t DriverProperties::get_max_pinned_memory_size() { lazy_init(); return _props.max_device_pinned_mem_size; @@ -159,7 +159,7 @@ void DriverProperties::set_max_pinned_memory_size(std::size_t size_in_kb) _props.max_device_pinned_mem_size = size_in_kb; } -[[nodiscard]] std::size_t DriverProperties::get_max_batch_io_size() +std::size_t DriverProperties::get_max_batch_io_size() { #ifdef KVIKIO_CUFILE_BATCH_API_FOUND lazy_init(); @@ -176,27 +176,27 @@ DriverProperties::DriverProperties() {} bool DriverProperties::is_gds_available() { return false; } -[[nodiscard]] unsigned int DriverProperties::get_nvfs_major_version() +unsigned int DriverProperties::get_nvfs_major_version() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] unsigned int DriverProperties::get_nvfs_minor_version() +unsigned int DriverProperties::get_nvfs_minor_version() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] bool DriverProperties::get_nvfs_allow_compat_mode() +bool DriverProperties::get_nvfs_allow_compat_mode() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] bool DriverProperties::get_nvfs_poll_mode() +bool DriverProperties::get_nvfs_poll_mode() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::size_t DriverProperties::get_nvfs_poll_thresh_size() +std::size_t DriverProperties::get_nvfs_poll_thresh_size() { throw CUfileException("KvikIO not compiled with cuFile.h"); } @@ -211,12 +211,12 @@ void DriverProperties::set_nvfs_poll_thresh_size(std::size_t size_in_kb) throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::vector DriverProperties::get_nvfs_statusflags() +std::vector DriverProperties::get_nvfs_statusflags() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::size_t DriverProperties::get_max_device_cache_size() +std::size_t DriverProperties::get_max_device_cache_size() { throw CUfileException("KvikIO not compiled with cuFile.h"); } @@ -226,12 +226,12 @@ void DriverProperties::set_max_device_cache_size(std::size_t size_in_kb) throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::size_t DriverProperties::get_per_buffer_cache_size() +std::size_t DriverProperties::get_per_buffer_cache_size() { throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::size_t DriverProperties::get_max_pinned_memory_size() +std::size_t DriverProperties::get_max_pinned_memory_size() { throw CUfileException("KvikIO not compiled with cuFile.h"); } @@ -241,7 +241,7 @@ void DriverProperties::set_max_pinned_memory_size(std::size_t size_in_kb) throw CUfileException("KvikIO not compiled with cuFile.h"); } -[[nodiscard]] std::size_t DriverProperties::get_max_batch_io_size() +std::size_t DriverProperties::get_max_batch_io_size() { throw CUfileException("KvikIO not compiled with cuFile.h"); } diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp new file mode 100644 index 0000000000..f249a8b361 --- /dev/null +++ b/cpp/src/defaults.cpp @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace kvikio { + +namespace detail { +CompatMode parse_compat_mode_str(std::string_view compat_mode_str) +{ + // Convert to lowercase + std::string tmp{compat_mode_str}; + std::transform( + tmp.begin(), tmp.end(), tmp.begin(), [](unsigned char c) { return std::tolower(c); }); + + CompatMode res{}; + if (tmp == "on" || tmp == "true" || tmp == "yes" || tmp == "1") { + res = CompatMode::ON; + } else if (tmp == "off" || tmp == "false" || tmp == "no" || tmp == "0") { + res = CompatMode::OFF; + } else if (tmp == "auto") { + res = CompatMode::AUTO; + } else { + throw std::invalid_argument("Unknown compatibility mode: " + std::string{tmp}); + } + return res; +} + +template <> +inline bool getenv_or(std::string_view env_var_name, bool default_val) +{ + const auto* env_val = std::getenv(env_var_name.data()); + if (env_val == nullptr) { return default_val; } + try { + // Try parsing `env_var_name` as a integer + return static_cast(std::stoi(env_val)); + } catch (const std::invalid_argument&) { + } + // Convert to lowercase + std::string str{env_val}; + // Special considerations regarding the case conversion: + // - std::tolower() is not an addressable function. Passing it to std::transform() as + // a function pointer, if the compile turns out successful, causes the program behavior + // "unspecified (possibly ill-formed)", hence the lambda. ::tolower() is addressable + // and does not have this problem, but the following item still applies. + // - To avoid UB in std::tolower() or ::tolower(), the character must be cast to unsigned char. + std::transform( + str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::tolower(c); }); + // Trim whitespaces + std::stringstream trimmer; + trimmer << str; + str.clear(); + trimmer >> str; + // Match value + if (str == "true" || str == "on" || str == "yes") { return true; } + if (str == "false" || str == "off" || str == "no") { return false; } + throw std::invalid_argument("unknown config value " + std::string{env_var_name} + "=" + + std::string{env_val}); +} + +template <> +inline CompatMode getenv_or(std::string_view env_var_name, CompatMode default_val) +{ + auto* env_val = std::getenv(env_var_name.data()); + if (env_val == nullptr) { return default_val; } + return parse_compat_mode_str(env_val); +} + +} // namespace detail + +unsigned int defaults::get_num_threads_from_env() +{ + const int ret = detail::getenv_or("KVIKIO_NTHREADS", 1); + if (ret <= 0) { + throw std::invalid_argument("KVIKIO_NTHREADS has to be a positive integer greater than zero"); + } + return ret; +} + +defaults::defaults() +{ + // Determine the default value of `compat_mode` + { + _compat_mode = detail::getenv_or("KVIKIO_COMPAT_MODE", CompatMode::AUTO); + } + // Determine the default value of `task_size` + { + const ssize_t env = detail::getenv_or("KVIKIO_TASK_SIZE", 4 * 1024 * 1024); + if (env <= 0) { + throw std::invalid_argument( + "KVIKIO_TASK_SIZE has to be a positive integer greater than zero"); + } + _task_size = env; + } + // Determine the default value of `gds_threshold` + { + const ssize_t env = detail::getenv_or("KVIKIO_GDS_THRESHOLD", 1024 * 1024); + if (env < 0) { + throw std::invalid_argument("KVIKIO_GDS_THRESHOLD has to be a positive integer"); + } + _gds_threshold = env; + } + // Determine the default value of `bounce_buffer_size` + { + const ssize_t env = detail::getenv_or("KVIKIO_BOUNCE_BUFFER_SIZE", 16 * 1024 * 1024); + if (env <= 0) { + throw std::invalid_argument( + "KVIKIO_BOUNCE_BUFFER_SIZE has to be a positive integer greater than zero"); + } + _bounce_buffer_size = env; + } +} + +defaults* defaults::instance() +{ + static defaults _instance; + return &_instance; +} +CompatMode defaults::compat_mode() { return instance()->_compat_mode; } + +void defaults::compat_mode_reset(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } + +CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode) +{ + if (compat_mode == CompatMode::AUTO) { + static auto inferred_compat_mode_for_auto = []() -> CompatMode { + return is_cufile_available() ? CompatMode::OFF : CompatMode::ON; + }(); + return inferred_compat_mode_for_auto; + } + return compat_mode; +} + +bool defaults::is_compat_mode_preferred(CompatMode compat_mode) +{ + return compat_mode == CompatMode::ON || + (compat_mode == CompatMode::AUTO && + defaults::infer_compat_mode_if_auto(compat_mode) == CompatMode::ON); +} + +bool defaults::is_compat_mode_preferred() { return is_compat_mode_preferred(compat_mode()); } + +BS::thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } + +unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } + +void defaults::thread_pool_nthreads_reset(unsigned int nthreads) +{ + if (nthreads == 0) { + throw std::invalid_argument("number of threads must be a positive integer greater than zero"); + } + thread_pool().reset(nthreads); +} + +std::size_t defaults::task_size() { return instance()->_task_size; } + +void defaults::task_size_reset(std::size_t nbytes) +{ + if (nbytes == 0) { + throw std::invalid_argument("task size must be a positive integer greater than zero"); + } + instance()->_task_size = nbytes; +} + +std::size_t defaults::gds_threshold() { return instance()->_gds_threshold; } + +void defaults::gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } + +std::size_t defaults::bounce_buffer_size() { return instance()->_bounce_buffer_size; } + +void defaults::bounce_buffer_size_reset(std::size_t nbytes) +{ + if (nbytes == 0) { + throw std::invalid_argument( + "size of the bounce buffer must be a positive integer greater than zero"); + } + instance()->_bounce_buffer_size = nbytes; +} + +} // namespace kvikio diff --git a/cpp/src/error.cpp b/cpp/src/error.cpp new file mode 100644 index 0000000000..21ce736a65 --- /dev/null +++ b/cpp/src/error.cpp @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include diff --git a/cpp/src/file_handle.cpp b/cpp/src/file_handle.cpp index f2b7fa15db..37e0f7729b 100644 --- a/cpp/src/file_handle.cpp +++ b/cpp/src/file_handle.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -163,9 +163,9 @@ FileHandle::FileHandle(const std::string& file_path, } } -[[nodiscard]] int FileHandle::fd_open_flags() const { return open_flags(_fd_direct_off); } +int FileHandle::fd_open_flags() const { return open_flags(_fd_direct_off); } -[[nodiscard]] std::size_t FileHandle::nbytes() const +std::size_t FileHandle::nbytes() const { if (closed()) { return 0; } if (_nbytes == 0) { _nbytes = get_file_size(_fd_direct_off); } diff --git a/cpp/src/posix_io.cpp b/cpp/src/posix_io.cpp new file mode 100644 index 0000000000..d4ee2944e5 --- /dev/null +++ b/cpp/src/posix_io.cpp @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace kvikio::detail { + +CUstream StreamsByThread::get(CUcontext ctx, std::thread::id thd_id) +{ + static StreamsByThread _instance; + + // If no current context, we return the null/default stream + if (ctx == nullptr) { return nullptr; } + auto key = std::make_pair(ctx, thd_id); + + // Create a new stream if `ctx` doesn't have one. + if (auto search = _instance._streams.find(key); search == _instance._streams.end()) { + CUstream stream{}; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamCreate(&stream, CU_STREAM_DEFAULT)); + _instance._streams[key] = stream; + return stream; + } else { + return search->second; + } +} + +CUstream StreamsByThread::get() +{ + CUcontext ctx{nullptr}; + CUDA_DRIVER_TRY(cudaAPI::instance().CtxGetCurrent(&ctx)); + return get(ctx, std::this_thread::get_id()); +} + +std::size_t posix_device_read(int fd, + const void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) +{ + KVIKIO_NVTX_SCOPED_RANGE("posix_device_read()", size); + return detail::posix_device_io( + fd, devPtr_base, size, file_offset, devPtr_offset); +} + +std::size_t posix_device_write(int fd, + const void* devPtr_base, + std::size_t size, + std::size_t file_offset, + std::size_t devPtr_offset) +{ + KVIKIO_NVTX_SCOPED_RANGE("posix_device_write()", size); + return detail::posix_device_io( + fd, devPtr_base, size, file_offset, devPtr_offset); +} + +} // namespace kvikio::detail diff --git a/cpp/src/remote_handle.cpp b/cpp/src/remote_handle.cpp index adcf56befc..9fd0690891 100644 --- a/cpp/src/remote_handle.cpp +++ b/cpp/src/remote_handle.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,106 @@ namespace kvikio { +namespace { + +/** + * @brief Bounce buffer in pinned host memory. + * + * @note Is not thread-safe. + */ +class BounceBufferH2D { + CUstream _stream; // The CUDA stream to use. + CUdeviceptr _dev; // The output device buffer. + AllocRetain::Alloc _host_buffer; // The host buffer to bounce data on. + std::ptrdiff_t _dev_offset{0}; // Number of bytes written to `_dev`. + std::ptrdiff_t _host_offset{0}; // Number of bytes written to `_host` (resets on flush). + + public: + /** + * @brief Create a bounce buffer for an output device buffer. + * + * @param stream The CUDA stream used throughout the lifetime of the bounce buffer. + * @param device_buffer The output device buffer (final destination of the data). + */ + BounceBufferH2D(CUstream stream, void* device_buffer) + : _stream{stream}, + _dev{convert_void2deviceptr(device_buffer)}, + _host_buffer{AllocRetain::instance().get()} + { + } + + /** + * @brief The bounce buffer if flushed to device on destruction. + */ + ~BounceBufferH2D() noexcept + { + try { + flush(); + } catch (CUfileException const& e) { + std::cerr << "BounceBufferH2D error on final flush: "; + std::cerr << e.what(); + std::cerr << std::endl; + } + } + + private: + /** + * @brief Write host memory to the output device buffer. + * + * @param src The host memory source. + * @param size Number of bytes to write. + */ + void write_to_device(void const* src, std::size_t size) + { + if (size > 0) { + CUDA_DRIVER_TRY(cudaAPI::instance().MemcpyHtoDAsync(_dev + _dev_offset, src, size, _stream)); + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); + _dev_offset += size; + } + } + + /** + * @brief Flush the bounce buffer by writing everything to the output device buffer. + */ + void flush() + { + write_to_device(_host_buffer.get(), _host_offset); + _host_offset = 0; + } + + public: + /** + * @brief Write host memory to the bounce buffer (also host memory). + * + * Only when the bounce buffer has been filled up is data copied to the output device buffer. + * + * @param data The host memory source. + * @param size Number of bytes to write. + */ + void write(char const* data, std::size_t size) + { + if (_host_buffer.size() - _host_offset < size) { // Not enough space left in the bounce buffer + flush(); + assert(_host_offset == 0); + } + if (_host_buffer.size() < size) { + // If still not enough space, we just copy the data to the device. This only happens when + // `defaults::bounce_buffer_size()` is smaller than 16kb thus no need to performance + // optimize for this case. + write_to_device(data, size); + } else if (size > 0) { + std::memcpy(_host_buffer.get(_host_offset), data, size); + _host_offset += size; + } + } +}; + +} // namespace + +HttpEndpoint::HttpEndpoint(std::string url) : _url{std::move(url)} {} + +std::string HttpEndpoint::str() const { return _url; } + void HttpEndpoint::setopt(CurlHandle& curl) { curl.setopt(CURLOPT_URL, _url.c_str()); } void S3Endpoint::setopt(CurlHandle& curl) @@ -42,6 +142,114 @@ void S3Endpoint::setopt(CurlHandle& curl) curl.setopt(CURLOPT_USERPWD, _aws_userpwd.c_str()); } +std::string S3Endpoint::unwrap_or_default(std::optional aws_arg, + std::string const& env_var, + std::string const& err_msg) +{ + if (aws_arg.has_value()) { return std::move(*aws_arg); } + + char const* env = std::getenv(env_var.c_str()); + if (env == nullptr) { + if (err_msg.empty()) { return std::string(); } + throw std::invalid_argument(err_msg); + } + return std::string(env); +} + +std::string S3Endpoint::url_from_bucket_and_object(std::string const& bucket_name, + std::string const& object_name, + std::optional const& aws_region, + std::optional aws_endpoint_url) +{ + auto const endpoint_url = unwrap_or_default(std::move(aws_endpoint_url), "AWS_ENDPOINT_URL"); + std::stringstream ss; + if (endpoint_url.empty()) { + auto const region = + unwrap_or_default(std::move(aws_region), + "AWS_DEFAULT_REGION", + "S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set."); + // We default to the official AWS url scheme. + ss << "https://" << bucket_name << ".s3." << region << ".amazonaws.com/" << object_name; + } else { + ss << endpoint_url << "/" << bucket_name << "/" << object_name; + } + return ss.str(); +} + +std::pair S3Endpoint::parse_s3_url(std::string const& s3_url) +{ + // Regular expression to match s3:/// + std::regex const pattern{R"(^s3://([^/]+)/(.+))", std::regex_constants::icase}; + std::smatch matches; + if (std::regex_match(s3_url, matches, pattern)) { return {matches[1].str(), matches[2].str()}; } + throw std::invalid_argument("Input string does not match the expected S3 URL format."); +} + +S3Endpoint::S3Endpoint(std::string url, + std::optional aws_region, + std::optional aws_access_key, + std::optional aws_secret_access_key) + : _url{std::move(url)} +{ + // Regular expression to match http[s]:// + std::regex pattern{R"(^https?://.*)", std::regex_constants::icase}; + if (!std::regex_search(_url, pattern)) { + throw std::invalid_argument("url must start with http:// or https://"); + } + + auto const region = + unwrap_or_default(std::move(aws_region), + "AWS_DEFAULT_REGION", + "S3: must provide `aws_region` if AWS_DEFAULT_REGION isn't set."); + + auto const access_key = + unwrap_or_default(std::move(aws_access_key), + "AWS_ACCESS_KEY_ID", + "S3: must provide `aws_access_key` if AWS_ACCESS_KEY_ID isn't set."); + + auto const secret_access_key = unwrap_or_default( + std::move(aws_secret_access_key), + "AWS_SECRET_ACCESS_KEY", + "S3: must provide `aws_secret_access_key` if AWS_SECRET_ACCESS_KEY isn't set."); + + // Create the CURLOPT_AWS_SIGV4 option + { + std::stringstream ss; + ss << "aws:amz:" << region << ":s3"; + _aws_sigv4 = ss.str(); + } + // Create the CURLOPT_USERPWD option + // Notice, curl uses `secret_access_key` to generate a AWS V4 signature. It is NOT included + // in the http header. See + // + { + std::stringstream ss; + ss << access_key << ":" << secret_access_key; + _aws_userpwd = ss.str(); + } +} + +S3Endpoint::S3Endpoint(std::string const& bucket_name, + std::string const& object_name, + std::optional aws_region, + std::optional aws_access_key, + std::optional aws_secret_access_key, + std::optional aws_endpoint_url) + : S3Endpoint( + url_from_bucket_and_object(bucket_name, object_name, aws_region, std::move(aws_endpoint_url)), + std::move(aws_region), + std::move(aws_access_key), + std::move(aws_secret_access_key)) +{ +} + +std::string S3Endpoint::str() const { return _url; } + +RemoteHandle::RemoteHandle(std::unique_ptr endpoint, std::size_t nbytes) + : _endpoint{std::move(endpoint)}, _nbytes{nbytes} +{ +} + RemoteHandle::RemoteHandle(std::unique_ptr endpoint) { auto curl = create_curl_handle(); @@ -60,6 +268,10 @@ RemoteHandle::RemoteHandle(std::unique_ptr endpoint) _endpoint = std::move(endpoint); } +std::size_t RemoteHandle::nbytes() const noexcept { return _nbytes; } + +RemoteEndpoint const& RemoteHandle::endpoint() const noexcept { return *_endpoint; } + namespace { /** @@ -74,7 +286,7 @@ struct CallbackContext { : buf{static_cast(buf)}, size{size}, offset{0}, overflow_error{0} { } - detail::BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory + BounceBufferH2D* bounce_buffer{nullptr}; // Only used by callback_device_memory }; /** @@ -166,7 +378,7 @@ std::size_t RemoteHandle::read(void* buf, std::size_t size, std::size_t file_off PushAndPopContext c(get_context_from_pointer(buf)); // We use a bounce buffer to avoid many small memory copies to device. Libcurl has a // maximum chunk size of 16kb (`CURL_MAX_WRITE_SIZE`) but chunks are often much smaller. - detail::BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); + BounceBufferH2D bounce_buffer(detail::StreamsByThread::get(), buf); ctx.bounce_buffer = &bounce_buffer; curl.perform(); } diff --git a/cpp/src/shim/utils.cpp b/cpp/src/shim/utils.cpp index ab9afbf648..314fb5382a 100644 --- a/cpp/src/shim/utils.cpp +++ b/cpp/src/shim/utils.cpp @@ -45,7 +45,7 @@ void* load_library(const std::vector& names, int mode) throw std::runtime_error("cannot open shared object file, tried: " + ss.str()); } -[[nodiscard]] bool is_running_in_wsl() +bool is_running_in_wsl() { struct utsname buf {}; int err = ::uname(&buf); @@ -57,7 +57,7 @@ void* load_library(const std::vector& names, int mode) return false; } -[[nodiscard]] bool run_udev_readable() +bool run_udev_readable() { try { return std::filesystem::is_directory("/run/udev"); diff --git a/cpp/src/stream.cpp b/cpp/src/stream.cpp new file mode 100644 index 0000000000..ac0f8138e4 --- /dev/null +++ b/cpp/src/stream.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace kvikio { + +StreamFuture::StreamFuture( + void* devPtr_base, std::size_t size, off_t file_offset, off_t devPtr_offset, CUstream stream) + : _devPtr_base{devPtr_base}, _stream{stream} +{ + // Notice, we allocate the arguments using malloc() as specified in the cuFile docs: + // + if ((_val = static_cast(std::malloc(sizeof(ArgByVal)))) == nullptr) { + throw std::bad_alloc{}; + } + *_val = { + .size = size, .file_offset = file_offset, .devPtr_offset = devPtr_offset, .bytes_done = 0}; +} + +StreamFuture::StreamFuture(StreamFuture&& o) noexcept + : _devPtr_base{std::exchange(o._devPtr_base, nullptr)}, + _stream{std::exchange(o._stream, nullptr)}, + _val{std::exchange(o._val, nullptr)}, + _stream_synchronized{o._stream_synchronized} +{ +} + +StreamFuture& StreamFuture::operator=(StreamFuture&& o) noexcept +{ + _devPtr_base = std::exchange(o._devPtr_base, nullptr); + _stream = std::exchange(o._stream, nullptr); + _val = std::exchange(o._val, nullptr); + _stream_synchronized = o._stream_synchronized; + return *this; +} + +std::tuple StreamFuture::get_args() const +{ + if (_val == nullptr) { + throw kvikio::CUfileException("cannot get arguments from an uninitialized StreamFuture"); + } + return {_devPtr_base, + &_val->size, + &_val->file_offset, + &_val->devPtr_offset, + &_val->bytes_done, + _stream}; +} + +std::size_t StreamFuture::check_bytes_done() +{ + if (_val == nullptr) { + throw kvikio::CUfileException("cannot check bytes done on an uninitialized StreamFuture"); + } + + if (!_stream_synchronized) { + _stream_synchronized = true; + CUDA_DRIVER_TRY(cudaAPI::instance().StreamSynchronize(_stream)); + } + + CUFILE_CHECK_BYTES_DONE(_val->bytes_done); + // At this point, we know `_val->bytes_done` is a positive value otherwise + // CUFILE_CHECK_BYTES_DONE() would have raised an exception. + return static_cast(_val->bytes_done); +} + +StreamFuture::~StreamFuture() noexcept +{ + if (_val != nullptr) { + try { + check_bytes_done(); + } catch (const kvikio::CUfileException& e) { + std::cerr << e.what() << std::endl; + } + std::free(_val); + } +} + +} // namespace kvikio diff --git a/cpp/src/utils.cpp b/cpp/src/utils.cpp index 32834cf3a4..4ba5a757a2 100644 --- a/cpp/src/utils.cpp +++ b/cpp/src/utils.cpp @@ -13,7 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#pragma once #include #include @@ -163,9 +162,7 @@ PushAndPopContext::~PushAndPopContext() } } -// Find the base and offset of the memory allocation `devPtr` is in -std::tuple get_alloc_info(const void* devPtr, - CUcontext* ctx = nullptr) +std::tuple get_alloc_info(const void* devPtr, CUcontext* ctx) { auto dev = convert_void2deviceptr(devPtr); CUdeviceptr base_ptr{};