Skip to content

Commit

Permalink
Add new writer option to reduce IOPs for small groups.
Browse files Browse the repository at this point in the history
Also change the default group size to 1 as this is the most common usecase.

PiperOrigin-RevId: 606995301
  • Loading branch information
dryman authored and copybara-github committed Oct 29, 2024
1 parent ee89b74 commit ec3d6d5
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 29 deletions.
60 changes: 43 additions & 17 deletions cpp/array_record_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,16 @@ class ArrayRecordReaderTest

TEST_P(ArrayRecordReaderTest, MoveTest) {
std::string encoded;
auto writer_options = GetWriterOptions().set_group_size(2);
auto writer_options = GetWriterOptions();
int32_t group_size;
if (optimize_for_random_access()) {
group_size = 1;
writer_options.set_groups_awaiting_flush(256);
} else {
group_size = 3;
writer_options.set_groups_awaiting_flush(1);
}
writer_options.set_group_size(group_size);
auto writer = ArrayRecordWriter(
riegeli::Maker<riegeli::StringWriter>(&encoded), writer_options, nullptr);

Expand Down Expand Up @@ -113,7 +122,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
})
.ok());

EXPECT_EQ(reader_before_move.RecordGroupSize(), 2);
EXPECT_EQ(reader_before_move.RecordGroupSize(), group_size);

ArrayRecordReader reader = std::move(reader_before_move);
// Once a reader is moved, it is closed.
Expand Down Expand Up @@ -159,7 +168,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
EXPECT_FALSE(reader.ReadRecord(&record_view));
EXPECT_TRUE(reader.ok());

EXPECT_EQ(reader.RecordGroupSize(), 2);
EXPECT_EQ(reader.RecordGroupSize(), group_size);

ASSERT_TRUE(reader.Close());
}
Expand All @@ -172,11 +181,21 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
size_t len = dist(bitgen);
records[i] = MTRandomBytes(bitgen, len);
}
auto writer_options = GetWriterOptions();
int32_t group_size;
if (optimize_for_random_access()) {
group_size = 1;
writer_options.set_groups_awaiting_flush(1024);
} else {
group_size = 1024;
writer_options.set_groups_awaiting_flush(1);
}
writer_options.set_group_size(group_size);

std::string encoded;
auto writer =
ArrayRecordWriter(riegeli::Maker<riegeli::StringWriter>(&encoded),
GetWriterOptions(), get_pool());
writer_options, get_pool());
for (auto i : Seq(kDatasetSize)) {
EXPECT_TRUE(writer.WriteRecord(records[i]));
}
Expand All @@ -193,58 +212,65 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) {
reader_opt, use_thread_pool() ? get_pool() : nullptr);
ASSERT_TRUE(reader.status().ok());
EXPECT_EQ(reader.NumRecords(), kDatasetSize);
uint64_t group_size =
std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize);
EXPECT_EQ(reader.RecordGroupSize(), group_size);

std::vector<bool> read_all_records(kDatasetSize, false);
std::vector<int32_t> read_all_records(kDatasetSize, 0);
ASSERT_TRUE(reader
.ParallelReadRecords(
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_all_records[record_index]);
read_all_records[record_index] = true;
read_all_records[record_index] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_all_records) {
EXPECT_TRUE(record_was_read);
uint32_t records_read = 0;
for (auto record_was_read : read_all_records) {
if (record_was_read) {
records_read++;
}
}
EXPECT_EQ(records_read, kDatasetSize);
EXPECT_TRUE(reader.SeekRecord(0));

std::vector<uint64_t> indices = {0, 3, 5, 7, 101, 2000};
std::vector<bool> read_indexed_records(indices.size(), false);
std::vector<int32_t> read_indexed_records(indices.size(), 0);
ASSERT_TRUE(reader
.ParallelReadRecordsWithIndices(
indices,
[&](uint64_t indices_idx,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[indices[indices_idx]]);
EXPECT_FALSE(read_indexed_records[indices_idx]);
read_indexed_records[indices_idx] = true;
read_indexed_records[indices_idx] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_indexed_records) {
for (auto record_was_read : read_indexed_records) {
EXPECT_TRUE(record_was_read);
}

uint64_t begin = 10, end = 101;
std::vector<bool> read_range_records(end - begin, false);
std::vector<int32_t> read_range_records(end - begin, 0);
ASSERT_TRUE(reader
.ParallelReadRecordsInRange(
begin, end,
[&](uint64_t record_index,
absl::string_view result_view) -> absl::Status {
EXPECT_EQ(result_view, records[record_index]);
EXPECT_FALSE(read_range_records[record_index - begin]);
read_range_records[record_index - begin] = true;
read_range_records[record_index - begin] = 1;
return absl::OkStatus();
})
.ok());
for (bool record_was_read : read_range_records) {
EXPECT_TRUE(record_was_read);
records_read = 0;
for (auto record_was_read : read_range_records) {
if (record_was_read) {
records_read++;
}
}
EXPECT_EQ(records_read, end - begin);

// Test sequential read
absl::string_view result_view;
Expand Down
5 changes: 5 additions & 0 deletions cpp/array_record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
// Group
options_parser.AddOption(
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
options_parser.AddOption(
"groups_awaiting_flush",
ValueParser::Int(0, INT32_MAX, &options.groups_awaiting_flush_));
int32_t max_parallelism = 0;
options_parser.AddOption(
"max_parallelism",
Expand Down Expand Up @@ -196,6 +199,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
std::string ArrayRecordWriterBase::Options::ToString() const {
std::string option;
absl::StrAppend(&option, "group_size:", this->group_size_,
",groups_awaiting_flush:", this->groups_awaiting_flush_,
",transpose:", this->transpose_ ? "true" : "false",
",pad_to_block_boundary:",
this->pad_to_block_boundary_ ? "true" : "false");
Expand Down Expand Up @@ -362,6 +366,7 @@ void ArrayRecordWriterBase::Initialize() {
}
// Add callback only after we serialize header and metadata.
writer->set_submit_chunk_callback(submit_chunk_callback_.get());
// writer->set_chunks_awaiting_flush(options_.groups_awaiting_flush());
}

void ArrayRecordWriterBase::Done() {
Expand Down
20 changes: 18 additions & 2 deletions cpp/array_record_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
// options ::= option? ("," option?)*
// option ::=
// "group_size" ":" group_size |
// "groups_awaiting_flush" ":" groups_awaiting_flush |
// "max_parallelism" ":" max_parallelism |
// "saturation_delay_ms" : saturation_delay_ms |
// "uncompressed" |
Expand All @@ -105,7 +106,11 @@ class ArrayRecordWriterBase : public riegeli::Object {
// "window_log" : window_log |
// "pad_to_block_boundary" (":" ("true" | "false"))?
// group_size ::= positive integer which specifies number of records to be
// grouped into a chunk before compression. (default 65536)
// grouped into a chunk before compression. (default 1)
// groups_awaiting_flush ::= positive integer which specify the number of
// groups stored in a write buffer before sending to the storage IO.
// This option reduce the total IOPs for small group_size. (default
// 1024)
// saturation_delay_ms ::= positive integer which specifies a delay in
// milliseconds when the parallel writing queue is saturated.
// max_parallelism ::= `auto` or positive integers which specifies
Expand All @@ -123,13 +128,23 @@ class ArrayRecordWriterBase : public riegeli::Object {
//
// The larger the value, the denser the file, at the cost of more expansive
// random accessing.
static constexpr uint32_t kDefaultGroupSize = 65536;
// TODO(fchern) group_size=1 can trigger writer bugs..
static constexpr uint32_t kDefaultGroupSize = 1;
Options& set_group_size(uint32_t group_size) {
group_size_ = group_size;
return *this;
}
uint32_t group_size() const { return group_size_; }

// Set the number of gruops pending in the write buffer before sending to
// the storage IO. This option reduces the total IOPs for small group size.
static constexpr uint32_t kDefaultGroupsAwaitingFlush = 1024;
Options& set_groups_awaiting_flush(uint32_t groups_awaiting_flush) {
groups_awaiting_flush_ = groups_awaiting_flush;
return *this;
}
uint32_t groups_awaiting_flush() const { return groups_awaiting_flush_; }

// Specifies max number of concurrent chunk encoders allowed. Default to the
// thread pool size.
Options& set_max_parallelism(std::optional<uint32_t> max_parallelism) {
Expand Down Expand Up @@ -290,6 +305,7 @@ class ArrayRecordWriterBase : public riegeli::Object {

private:
int32_t group_size_ = kDefaultGroupSize;
int32_t groups_awaiting_flush_ = kDefaultGroupsAwaitingFlush;
riegeli::CompressorOptions compressor_options_;
std::optional<riegeli::RecordsMetadata> metadata_;
bool pad_to_block_boundary_ = false;
Expand Down
35 changes: 28 additions & 7 deletions cpp/array_record_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
auto option = ArrayRecordWriterBase::Options::FromString("").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -218,7 +220,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand All @@ -230,6 +233,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -239,7 +244,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
Expand All @@ -248,10 +254,12 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"group_size:32,transpose,window_log:20")
.value();
auto option =
ArrayRecordWriterBase::Options::FromString(
"group_size:32,groups_awaiting_flush:256,transpose,window_log:20")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(), 256);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -261,6 +269,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:256,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -274,6 +283,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
"brotli:6,group_size:32,transpose,window_log:25")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -283,6 +294,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:1024,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -296,6 +308,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
"group_size:32,transpose,zstd:5")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
Expand All @@ -306,6 +320,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {

EXPECT_EQ(option.ToString(),
"group_size:32,"
"groups_awaiting_flush:1024,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
Expand All @@ -320,14 +335,17 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kNone);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:true,"
"uncompressed");
Expand All @@ -340,14 +358,17 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_EQ(option.groups_awaiting_flush(),
ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kSnappy);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"group_size:1,"
"groups_awaiting_flush:1024,"
"transpose:false,"
"pad_to_block_boundary:true,"
"snappy");
Expand Down
8 changes: 7 additions & 1 deletion cpp/sequenced_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ void SequencedChunkWriterBase::Initialize() {
"Failed to create the file header"));
}
if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) {
Fail(riegeli::Annotate(chunk_writer->status(), "Could not flush"));
Fail(riegeli::Annotate(chunk_writer->status(),
"Could not flush the file header."));
}
}

Expand All @@ -159,6 +160,11 @@ void SequencedChunkWriterBase::Done() {
return;
}
auto* chunk_writer = get_writer();
// if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) {
// Fail(riegeli::Annotate(chunk_writer->status(),
// "Could not flush before close."));
// return;
// }
if (!chunk_writer->Close()) {
Fail(riegeli::Annotate(chunk_writer->status(),
"Failed to close chunk_writer"));
Expand Down
7 changes: 7 additions & 0 deletions cpp/sequenced_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ class SequencedChunkWriterBase : public riegeli::Object {
return pad_to_block_boundary_;
}

void set_chunks_awaiting_flush(uint32_t chunks_awaiting_flush) {
absl::MutexLock l(&mu_);
chunks_awaiting_flush_ = chunks_awaiting_flush;
}

// Setup a callback for each committed chunk. See CommitChunkCallback
// comments for details.
void set_submit_chunk_callback(SubmitChunkCallback* callback) {
Expand Down Expand Up @@ -182,6 +187,8 @@ class SequencedChunkWriterBase : public riegeli::Object {
// Records the sequence number of submitted chunks.
uint64_t submitted_chunks_ ABSL_GUARDED_BY(mu_) = 0;

uint64_t chunks_awaiting_flush_ ABSL_GUARDED_BY(mu_) = 0;

// Queue for storing the future chunks.
std::queue<std::future<absl::StatusOr<riegeli::Chunk>>> queue_
ABSL_GUARDED_BY(mu_);
Expand Down
Loading

0 comments on commit ec3d6d5

Please sign in to comment.