Skip to content

Commit

Permalink
Complete the initial separation
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Jan 8, 2025
1 parent 074ccff commit 76830b9
Show file tree
Hide file tree
Showing 21 changed files with 1,054 additions and 679 deletions.
18 changes: 16 additions & 2 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,22 @@ include(cmake/thirdparty/get_thread_pool.cmake)
# ##################################################################################################
# * library targets --------------------------------------------------------------------------------

set(SOURCES "src/file_handle.cpp" "src/cufile/config.cpp" "src/cufile/driver.cpp"
"src/shim/cuda.cpp" "src/shim/cufile.cpp" "src/shim/libcurl.cpp" "src/shim/utils.cpp"
set(SOURCES
"src/batch.cpp"
"src/bounce_buffer.cpp"
"src/buffer.cpp"
"src/cufile/config.cpp"
"src/cufile/driver.cpp"
"src/defaults.cpp"
"src/error.cpp"
"src/file_handle.cpp"
"src/posix_io.cpp"
"src/shim/cuda.cpp"
"src/shim/cufile.cpp"
"src/shim/libcurl.cpp"
"src/shim/utils.cpp"
"src/stream.cpp"
"src/utils.cpp"
)

if(KvikIO_REMOTE_SUPPORT)
Expand Down
84 changes: 17 additions & 67 deletions cpp/include/kvikio/batch.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,68 +73,30 @@ class BatchHandle {
*
* @param max_num_events The maximum number of operations supported by this instance.
*/
BatchHandle(int max_num_events) : _initialized{true}, _max_num_events{max_num_events}
{
CUFILE_TRY(cuFileAPI::instance().BatchIOSetUp(&_handle, max_num_events));
}
BatchHandle(int max_num_events);

/**
* @brief BatchHandle support move semantic but isn't copyable
*/
BatchHandle(const BatchHandle&) = delete;
BatchHandle& operator=(BatchHandle const&) = delete;
BatchHandle(BatchHandle&& o) noexcept
: _initialized{std::exchange(o._initialized, false)},
_max_num_events{std::exchange(o._max_num_events, 0)}
{
_handle = std::exchange(o._handle, CUfileBatchHandle_t{});
}
~BatchHandle() noexcept { close(); }
BatchHandle(BatchHandle&& o) noexcept;
~BatchHandle() noexcept;

[[nodiscard]] bool closed() const noexcept { return !_initialized; }
[[nodiscard]] bool closed() const noexcept;

/**
* @brief Destroy the batch handle and free up resources
*/
void close() noexcept
{
if (closed()) { return; }
_initialized = false;

cuFileAPI::instance().BatchIODestroy(_handle);
}
void close() noexcept;

/**
* @brief Submit a vector of batch operations
*
* @param operations The vector of batch operations, which must not exceed the
* `max_num_events`.
*/
void submit(const std::vector<BatchOp>& operations)
{
if (convert_size2ssize(operations.size()) > _max_num_events) {
throw CUfileException("Cannot submit more than the max_num_events)");
}
std::vector<CUfileIOParams_t> io_batch_params;
io_batch_params.reserve(operations.size());
for (const auto& op : operations) {
if (op.file_handle.is_compat_mode_preferred()) {
throw CUfileException("Cannot submit a FileHandle opened in compatibility mode");
}

io_batch_params.push_back(CUfileIOParams_t{.mode = CUFILE_BATCH,
.u = {.batch = {.devPtr_base = op.devPtr_base,
.file_offset = op.file_offset,
.devPtr_offset = op.devPtr_offset,
.size = op.size}},
.fh = op.file_handle.handle(),
.opcode = op.opcode,
.cookie = nullptr});
}

CUFILE_TRY(cuFileAPI::instance().BatchIOSubmit(
_handle, io_batch_params.size(), io_batch_params.data(), 0));
}
void submit(const std::vector<BatchOp>& operations);

/**
* @brief Get status of submitted operations
Expand All @@ -148,16 +110,9 @@ class BatchHandle {
*/
std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
std::vector<CUfileIOEvents_t> ret;
ret.resize(_max_num_events);
CUFILE_TRY(cuFileAPI::instance().BatchIOGetStatus(_handle, min_nr, &max_nr, &ret[0], timeout));
ret.resize(max_nr);
return ret;
}

void cancel() { CUFILE_TRY(cuFileAPI::instance().BatchIOCancel(_handle)); }
struct timespec* timeout = nullptr);

void cancel();
};

#else
Expand All @@ -166,24 +121,19 @@ class BatchHandle {
public:
BatchHandle() noexcept = default;

BatchHandle(int max_num_events)
{
throw CUfileException("BatchHandle requires cuFile's batch API, please build with CUDA v12.1+");
}
BatchHandle(int max_num_events);

[[nodiscard]] bool closed() const noexcept { return true; }
[[nodiscard]] bool closed() const noexcept;

void close() noexcept {}
void close() noexcept;

void submit(const std::vector<BatchOp>& operations) {}
void submit(const std::vector<BatchOp>& operations);

std::vector<CUfileIOEvents_t> status(unsigned min_nr,
unsigned max_nr,
struct timespec* timeout = nullptr)
{
return std::vector<CUfileIOEvents_t>{};
}
void cancel() {}
struct timespec* timeout = nullptr);

void cancel();
};

#endif
Expand Down
84 changes: 14 additions & 70 deletions cpp/include/kvikio/bounce_buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
*/
#pragma once

#include <mutex>
#include <stack>

#include <kvikio/defaults.hpp>
Expand Down Expand Up @@ -47,18 +46,15 @@ class AllocRetain {
std::size_t const _size;

public:
Alloc(AllocRetain* manager, void* alloc, std::size_t size)
: _manager(manager), _alloc{alloc}, _size{size}
{
}
Alloc(AllocRetain* manager, void* alloc, std::size_t size);
Alloc(Alloc const&) = delete;
Alloc& operator=(Alloc const&) = delete;
Alloc(Alloc&& o) = delete;
Alloc& operator=(Alloc&& o) = delete;
~Alloc() noexcept { _manager->put(_alloc, _size); }
void* get() noexcept { return _alloc; }
void* get(std::ptrdiff_t offset) noexcept { return static_cast<char*>(_alloc) + offset; }
std::size_t size() noexcept { return _size; }
~Alloc() noexcept;
void* get() noexcept;
void* get(std::ptrdiff_t offset) noexcept;
std::size_t size() noexcept;
};

AllocRetain() = default;
Expand All @@ -77,80 +73,28 @@ class AllocRetain {
*
* @return The number of bytes cleared
*/
std::size_t _clear()
{
std::size_t ret = _free_allocs.size() * _size;
while (!_free_allocs.empty()) {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(_free_allocs.top()));
_free_allocs.pop();
}
return ret;
}
std::size_t _clear();

/**
* @brief Ensure the sizes of the retained allocations match `defaults::bounce_buffer_size()`
*
* NB: `_mutex` must be taken prior to calling this function.
*/
void _ensure_alloc_size()
{
auto const bounce_buffer_size = defaults::bounce_buffer_size();
if (_size != bounce_buffer_size) {
_clear();
_size = bounce_buffer_size;
}
}
void _ensure_alloc_size();

public:
[[nodiscard]] Alloc get()
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// Check if we have an allocation available
if (!_free_allocs.empty()) {
void* ret = _free_allocs.top();
_free_allocs.pop();
return Alloc(this, ret, _size);
}

// If no available allocation, allocate and register a new one
void* alloc{};
// Allocate page-locked host memory
CUDA_DRIVER_TRY(cudaAPI::instance().MemHostAlloc(&alloc, _size, CU_MEMHOSTREGISTER_PORTABLE));
return Alloc(this, alloc, _size);
}

void put(void* alloc, std::size_t size)
{
std::lock_guard const lock(_mutex);
_ensure_alloc_size();

// If the size of `alloc` matches the sizes of the retained allocations,
// it is added to the set of free allocation otherwise it is freed.
if (size == _size) {
_free_allocs.push(alloc);
} else {
CUDA_DRIVER_TRY(cudaAPI::instance().MemFreeHost(alloc));
}
}
[[nodiscard]] Alloc get();

void put(void* alloc, std::size_t size);

/**
* @brief Free all retained allocations
*
* @return The number of bytes cleared
*/
std::size_t clear()
{
std::lock_guard const lock(_mutex);
return _clear();
}

KVIKIO_EXPORT static AllocRetain& instance()
{
static AllocRetain _instance;
return _instance;
}
std::size_t clear();

KVIKIO_EXPORT static AllocRetain& instance();

AllocRetain(AllocRetain const&) = delete;
AllocRetain& operator=(AllocRetain const&) = delete;
Expand Down
52 changes: 10 additions & 42 deletions cpp/include/kvikio/buffer.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,17 +15,8 @@
*/
#pragma once

#include <algorithm>
#include <iostream>
#include <map>
#include <vector>

#include <kvikio/defaults.hpp>
#include <kvikio/error.hpp>
#include <kvikio/shim/cufile.hpp>
#include <kvikio/shim/cufile_h_wrapper.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {

/**
Expand All @@ -44,32 +35,17 @@ namespace kvikio {
* streaming buffer that is reused across multiple cuFile IO operations.
*/
/*NOLINTNEXTLINE(readability-function-cognitive-complexity)*/
inline void buffer_register(const void* devPtr_base,
std::size_t size,
int flags = 0,
const std::vector<int>& errors_to_ignore = std::vector<int>())
{
if (defaults::is_compat_mode_preferred()) { return; }
CUfileError_t status = cuFileAPI::instance().BufRegister(devPtr_base, size, flags);
if (status.err != CU_FILE_SUCCESS) {
// Check if `status.err` is in `errors_to_ignore`
if (std::find(errors_to_ignore.begin(), errors_to_ignore.end(), status.err) ==
errors_to_ignore.end()) {
CUFILE_TRY(status);
}
}
}
void buffer_register(const void* devPtr_base,
std::size_t size,
int flags = 0,
const std::vector<int>& errors_to_ignore = std::vector<int>());

/**
* @brief deregister an already registered device memory from cuFile
*
* @param devPtr_base device pointer to deregister
*/
inline void buffer_deregister(const void* devPtr_base)
{
if (defaults::is_compat_mode_preferred()) { return; }
CUFILE_TRY(cuFileAPI::instance().BufDeregister(devPtr_base));
}
void buffer_deregister(const void* devPtr_base);

/**
* @brief Register device memory allocation which is part of devPtr. Use this
Expand All @@ -85,23 +61,15 @@ inline void buffer_deregister(const void* devPtr_base)
* @warning This API is intended for usecases where the memory is used as
* streaming buffer that is reused across multiple cuFile IO operations.
*/
inline void memory_register(const void* devPtr,
int flags = 0,
const std::vector<int>& errors_to_ignore = {})
{
auto [base, nbytes, offset] = get_alloc_info(devPtr);
buffer_register(base, nbytes, flags, errors_to_ignore);
}
void memory_register(const void* devPtr,
int flags = 0,
const std::vector<int>& errors_to_ignore = {});

/**
* @brief deregister an already registered device memory from cuFile.
*
* @param devPtr device pointer to deregister
*/
inline void memory_deregister(const void* devPtr)
{
auto [base, nbytes, offset] = get_alloc_info(devPtr);
buffer_deregister(base);
}
void memory_deregister(const void* devPtr);

} // namespace kvikio
Loading

0 comments on commit 76830b9

Please sign in to comment.