Skip to content

Commit

Permalink
enhance: Add buffered writer to reduce fwrite syscall (#38570)
Browse files Browse the repository at this point in the history
Related to previous PR #38157

If mmapped row is too small, frequent fwrite call still cost too much
cpu time for context switching. This PR add buffered write to avoid this
bad case with extra buffer per variable field.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 27, 2024
1 parent 4df444e commit 19052ef
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 43 deletions.
92 changes: 84 additions & 8 deletions internal/core/src/common/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
#include <unistd.h>

namespace milvus {

#define THROW_FILE_WRITE_ERROR \
PanicInfo(ErrorCode::FileWriteFailed, \
fmt::format("write data to file {} failed, error code {}", \
file_.Path(), \
strerror(errno)));

class File {
public:
File(const File& file) = delete;
Expand All @@ -36,12 +43,26 @@ class File {

static File
Open(const std::string_view filepath, int flags) {
// using default buf size = 4096
return Open(filepath, flags, 4096);
}

static File
Open(const std::string_view filepath, int flags, size_t buf_size) {
int fd = open(filepath.data(), flags, S_IRUSR | S_IWUSR);
AssertInfo(fd != -1,
"failed to create mmap file {}: {}",
filepath,
strerror(errno));
return File(fd, std::string(filepath));
FILE* fs = fdopen(fd, "wb+");
AssertInfo(fs != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
auto f = File(fd, fs, std::string(filepath));
// setup buffer size file stream will use
setvbuf(f.fs_, nullptr, _IOFBF, buf_size);
return f;
}

int
Expand Down Expand Up @@ -94,16 +115,71 @@ class File {
}

private:
explicit File(int fd, const std::string& filepath)
: fd_(fd), filepath_(filepath) {
fs_ = fdopen(fd_, "wb+");
AssertInfo(fs_ != nullptr,
"failed to open file {}: {}",
filepath,
strerror(errno));
explicit File(int fd, FILE* fs, const std::string& filepath)
: fd_(fd), filepath_(filepath), fs_(fs) {
}
int fd_{-1};
FILE* fs_;
std::string filepath_;
};

class BufferedWriter {
public:
// Constructor: Initialize with the file pointer and the buffer size (default 4KB).
explicit BufferedWriter(File& file, size_t buffer_size = 4096)
: file_(file),
buffer_size_(buffer_size),
buffer_(new char[buffer_size]) {
}

~BufferedWriter() {
// Ensure the buffer is flushed when the object is destroyed
flush();
delete[] buffer_;
}

// Write method to handle data larger than the buffer
void
Write(const void* data, size_t size) {
if (size > buffer_size_) {
flush();
ssize_t written_data_size = file_.FWrite(data, size);
if (written_data_size != size) {
THROW_FILE_WRITE_ERROR
}
return;
}

if (buffer_pos_ + size > buffer_size_) {
flush();
}

std::memcpy(buffer_ + buffer_pos_, data, size);
buffer_pos_ += size;
}

template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
void
WriteInt(T value) {
Write(&value, sizeof(value));
}

// Flush method: Write the contents of the buffer to the file
void
flush() {
if (buffer_pos_ > 0) {
ssize_t written_data_size = file_.FWrite(buffer_, buffer_pos_);
if (written_data_size != buffer_pos_) {
THROW_FILE_WRITE_ERROR
}
buffer_pos_ = 0;
}
}

private:
File& file_; // File pointer
size_t buffer_size_; // Size of the internal buffer
char* buffer_; // The buffer itself
size_t buffer_pos_{0}; // Current position in the buffer
};
} // namespace milvus
52 changes: 17 additions & 35 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ WriteFieldData(File& file,
std::vector<std::vector<uint64_t>>& element_indices,
FixedVector<bool>& valid_data) {
if (IsVariableDataType(data_type)) {
// use buffered writer to reduce fwrite/write syscall
// buffer size = 1024*1024 = 1MB
BufferedWriter bw = BufferedWriter(file, 1048576);
switch (data_type) {
case DataType::VARCHAR:
case DataType::STRING: {
Expand All @@ -101,17 +104,10 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto str =
static_cast<const std::string*>(data->RawValue(i));
ssize_t written_data_size =
file.FWriteInt<uint32_t>(uint32_t(str->size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
auto written_data = file.FWrite(str->data(), str->size());
if (written_data < str->size()) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data;
bw.WriteInt<uint32_t>(static_cast<uint32_t>(str->size()));
total_written += sizeof(uint32_t);
bw.Write(str->data(), str->size());
total_written += str->size();
}
break;
}
Expand All @@ -121,18 +117,11 @@ WriteFieldData(File& file,
indices.push_back(total_written);
auto padded_string =
static_cast<const Json*>(data->RawValue(i))->data();
ssize_t written_data_size = file.FWriteInt<uint32_t>(
uint32_t(padded_string.size()));
if (written_data_size != sizeof(uint32_t)) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data_size;
ssize_t written_data =
file.FWrite(padded_string.data(), padded_string.size());
if (written_data < padded_string.size()) {
THROW_FILE_WRITE_ERROR
}
total_written += written_data;
bw.WriteInt<uint32_t>(
static_cast<uint32_t>(padded_string.size()));
total_written += padded_string.size();
bw.Write(padded_string.data(), padded_string.size());
total_written += padded_string.size();
}
break;
}
Expand All @@ -141,13 +130,9 @@ WriteFieldData(File& file,
for (size_t i = 0; i < data->get_num_rows(); ++i) {
indices.push_back(total_written);
auto array = static_cast<const Array*>(data->RawValue(i));
ssize_t written =
file.FWrite(array->data(), array->byte_size());
if (written < array->byte_size()) {
THROW_FILE_WRITE_ERROR
}
bw.Write(array->data(), array->byte_size());
element_indices.emplace_back(array->get_offsets());
total_written += written;
total_written += array->byte_size();
}
break;
}
Expand All @@ -157,12 +142,8 @@ WriteFieldData(File& file,
auto vec =
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));
ssize_t written =
file.FWrite(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;
}
total_written += written;
bw.Write(vec->data(), vec->data_byte_size());
total_written += vec->data_byte_size();
}
break;
}
Expand All @@ -171,6 +152,7 @@ WriteFieldData(File& file,
"not supported data type {}",
GetDataTypeName(data_type));
}
bw.flush();
} else {
// write as: data|data|data|data|data|data......
size_t written = file.FWrite(data->Data(), data->DataSize());
Expand Down

0 comments on commit 19052ef

Please sign in to comment.