Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new writer option to reduce IOPs for small groups. #99

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions cpp/array_record_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ class ArrayRecordReaderTest

TEST_P(ArrayRecordReaderTest, MoveTest) {
std::string encoded;
auto writer_options = GetWriterOptions().set_group_size(2);
auto writer_options = GetWriterOptions();
int32_t group_size;
if (optimize_for_random_access()) {
group_size = 1;
writer_options.set_groups_awaiting_flush(256);
} else {
group_size = 3;
writer_options.set_groups_awaiting_flush(1);
}
writer_options.set_group_size(group_size);
auto writer = ArrayRecordWriter(
riegeli::Maker<riegeli::StringWriter>(&encoded), writer_options, nullptr);

Expand Down Expand Up @@ -113,7 +122,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
})
.ok());

EXPECT_EQ(reader_before_move.RecordGroupSize(), 2);
EXPECT_EQ(reader_before_move.RecordGroupSize(), group_size);

ArrayRecordReader reader = std::move(reader_before_move);
// Once a reader is moved, it is closed.
Expand Down Expand Up @@ -159,7 +168,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
EXPECT_FALSE(reader.ReadRecord(&record_view));
EXPECT_TRUE(reader.ok());

EXPECT_EQ(reader.RecordGroupSize(), 2);
EXPECT_EQ(reader.RecordGroupSize(), group_size);

ASSERT_TRUE(reader.Close());
}
Expand All @@ -172,11 +181,21 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
size_t len = dist(bitgen);
records[i] = MTRandomBytes(bitgen, len);
}
auto writer_options = GetWriterOptions();
int32_t group_size;
if (optimize_for_random_access()) {
group_size = 1;
writer_options.set_groups_awaiting_flush(1024);
} else {
group_size = 1024;
writer_options.set_groups_awaiting_flush(1);
}
writer_options.set_group_size(group_size);

std::string encoded;
auto writer =
ArrayRecordWriter(riegeli::Maker<riegeli::StringWriter>(&encoded),
GetWriterOptions(), get_pool());
writer_options, get_pool());
for (auto i : Seq(kDatasetSize)) {
EXPECT_TRUE(writer.WriteRecord(records[i]));
}
Expand All @@ -193,58 +212,65 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
reader_opt, use_thread_pool() ? get_pool() : nullptr);
ASSERT_TRUE(reader.status().ok());
EXPECT_EQ(reader.NumRecords(), kDatasetSize);
uint64_t group_size =
std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize);
EXPECT_EQ(reader.RecordGroupSize(), group_size);

std::vector<bool> read_all_records(kDatasetSize, false);
std::vector<int32_t> read_all_records(kDatasetSize, 0);
ASSERT_TRUE(reader
.ParallelReadRecords(
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_all_records[record_index]);
read_all_records[record_index] = true;
read_all_records[record_index] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_all_records) {
EXPECT_TRUE(record_was_read);
uint32_t records_read = 0;
for (auto record_was_read : read_all_records) {
if (record_was_read) {
records_read++;
}
}
EXPECT_EQ(records_read, kDatasetSize);
EXPECT_TRUE(reader.SeekRecord(0));

std::vector<uint64_t> indices = {0, 3, 5, 7, 101, 2000};
std::vector<bool> read_indexed_records(indices.size(), false);
std::vector<int32_t> read_indexed_records(indices.size(), 0);
ASSERT_TRUE(reader
.ParallelReadRecordsWithIndices(
indices,
[&](uint64_t indices_idx,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[indices[indices_idx]]);
EXPECT_FALSE(read_indexed_records[indices_idx]);
read_indexed_records[indices_idx] = true;
read_indexed_records[indices_idx] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_indexed_records) {
for (auto record_was_read : read_indexed_records) {
EXPECT_TRUE(record_was_read);
}

uint64_t begin = 10, end = 101;
std::vector<bool> read_range_records(end - begin, false);
std::vector<int32_t> read_range_records(end - begin, 0);
ASSERT_TRUE(reader
.ParallelReadRecordsInRange(
begin, end,
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_range_records[record_index - begin]);
read_range_records[record_index - begin] = true;
read_range_records[record_index - begin] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_range_records) {
EXPECT_TRUE(record_was_read);
records_read = 0;
for (auto record_was_read : read_range_records) {
if (record_was_read) {
records_read++;
}
}
EXPECT_EQ(records_read, end - begin);

// Test sequential read
absl::string_view result_view;
Expand Down
5 changes: 5 additions & 0 deletions cpp/array_record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
// Group
options_parser.AddOption(
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
options_parser.AddOption(
"groups_awaiting_flush",
ValueParser::Int(0, INT32_MAX, &options.groups_awaiting_flush_));
int32_t max_parallelism = 0;
options_parser.AddOption(
"max_parallelism",
Expand Down Expand Up @@ -196,6 +199,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
std::string ArrayRecordWriterBase::Options::ToString() const {
std::string option;
absl::StrAppend(&option, "group_size:", this->group_size_,
",groups_awaiting_flush:", this->groups_awaiting_flush_,
",transpose:", this->transpose_ ? "true" : "false",
",pad_to_block_boundary:",
this->pad_to_block_boundary_ ? "true" : "false");
Expand Down Expand Up @@ -362,6 +366,7 @@ void ArrayRecordWriterBase::Initialize() {
}
// Add callback only after we serialize header and metadata.
writer->set_submit_chunk_callback(submit_chunk_callback_.get());
// writer->set_chunks_awaiting_flush(options_.groups_awaiting_flush());
}

void ArrayRecordWriterBase::Done() {
Expand Down
20 changes: 18 additions & 2 deletions cpp/array_record_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
// options ::= option? ("," option?)*
// option ::=
// "group_size" ":" group_size |
// "groups_awaiting_flush" ":" groups_awaiting_flush |
// "max_parallelism" ":" max_parallelism |
// "saturation_delay_ms" : saturation_delay_ms |
// "uncompressed" |
Expand All @@ -105,7 +106,11 @@ class ArrayRecordWriterBase : public riegeli::Object {
// "window_log" : window_log |
// "pad_to_block_boundary" (":" ("true" | "false"))?
// group_size ::= positive integer which specifies number of records to be
// grouped into a chunk before compression. (default 65536)
// grouped into a chunk before compression. (default 1)
// groups_awaiting_flush ::= positive integer which specify the number of
// groups stored in a write buffer before sending to the storage IO.
// This option reduce the total IOPs for small group_size. (default
// 1024)
// saturation_delay_ms ::= positive integer which specifies a delay in
// milliseconds when the parallel writing queue is saturated.
// max_parallelism ::= `auto` or positive integers which specifies
Expand All @@ -123,13 +128,23 @@ class ArrayRecordWriterBase : public riegeli::Object {
//
// The larger the value, the denser the file, at the cost of more expansive
// random accessing.
static constexpr uint32_t kDefaultGroupSize = 65536;
// TODO(fchern) group_size=1 can trigger writer bugs..
static constexpr uint32_t kDefaultGroupSize = 1;
Options& set_group_size(uint32_t group_size) {
group_size_ = group_size;
return *this;
}
uint32_t group_size() const { return group_size_; }

// Set the number of gruops pending in the write buffer before sending to
// the storage IO. This option reduces the total IOPs for small group size.
static constexpr uint32_t kDefaultGroupsAwaitingFlush = 1024;
Options& set_groups_awaiting_flush(uint32_t groups_awaiting_flush) {
groups_awaiting_flush_ = groups_awaiting_flush;
return *this;
}
uint32_t groups_awaiting_flush() const { return groups_awaiting_flush_; }

// Specifies max number of concurrent chunk encoders allowed. Default to the
// thread pool size.
Options& set_max_parallelism(std::optional<uint32_t> max_parallelism) {
Expand Down Expand Up @@ -290,6 +305,7 @@ class ArrayRecordWriterBase : public riegeli::Object {

private:
int32_t group_size_ = kDefaultGroupSize;
int32_t groups_awaiting_flush_ = kDefaultGroupsAwaitingFlush;
riegeli::CompressorOptions compressor_options_;
std::optional<riegeli::RecordsMetadata> metadata_;
bool pad_to_block_boundary_ = false;
Expand Down
35 changes: 28 additions & 7 deletions cpp/array_record_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
auto option = ArrayRecordWriterBase::Options::FromString("").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -218,7 +220,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand All @@ -230,6 +233,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -239,7 +244,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand All @@ -248,10 +254,12 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"group_size:32,transpose,window_log:20")
.value();
auto option =
ArrayRecordWriterBase::Options::FromString(
"group_size:32,groups_awaiting_flush:256,transpose,window_log:20")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(), 256);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -261,6 +269,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:256,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -274,6 +283,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
"brotli:6,group_size:32,transpose,window_log:25")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -283,6 +294,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:1024,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -296,6 +308,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
"group_size:32,transpose,zstd:5")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -306,6 +320,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:1024,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -320,14 +335,17 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kNone);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:true,"
"uncompressed");
Expand All @@ -340,14 +358,17 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kSnappy);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:true,"
"snappy");
Expand Down
8 changes: 7 additions & 1 deletion cpp/sequenced_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ void SequencedChunkWriterBase::Initialize() {
"Failed to create the file header"));
}
if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) {
Fail(riegeli::Annotate(chunk_writer->status(), "Could not flush"));
Fail(riegeli::Annotate(chunk_writer->status(),
"Could not flush the file header."));
}
}

Expand All @@ -159,6 +160,11 @@ void SequencedChunkWriterBase::Done() {
return;
}
auto* chunk_writer = get_writer();
// if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) {
// Fail(riegeli::Annotate(chunk_writer->status(),
// "Could not flush before close."));
// return;
// }
if (!chunk_writer->Close()) {
Fail(riegeli::Annotate(chunk_writer->status(),
"Failed to close chunk_writer"));
Expand Down
7 changes: 7 additions & 0 deletions cpp/sequenced_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ class SequencedChunkWriterBase : public riegeli::Object {
return pad_to_block_boundary_;
}

void set_chunks_awaiting_flush(uint32_t chunks_awaiting_flush) {
absl::MutexLock l(&mu_);
chunks_awaiting_flush_ = chunks_awaiting_flush;
}

// Setup a callback for each committed chunk. See CommitChunkCallback
// comments for details.
void set_submit_chunk_callback(SubmitChunkCallback* callback) {
Expand Down Expand Up @@ -182,6 +187,8 @@ class SequencedChunkWriterBase : public riegeli::Object {
// Records the sequence number of submitted chunks.
uint64_t submitted_chunks_ ABSL_GUARDED_BY(mu_) = 0;

uint64_t chunks_awaiting_flush_ ABSL_GUARDED_BY(mu_) = 0;

// Queue for storing the future chunks.
std::queue<std::future<absl::StatusOr<riegeli::Chunk>>> queue_
ABSL_GUARDED_BY(mu_);
Expand Down
Loading
Loading