diff --git a/cpp/array_record_reader_test.cc b/cpp/array_record_reader_test.cc index acb809e..50e692b 100644 --- a/cpp/array_record_reader_test.cc +++ b/cpp/array_record_reader_test.cc @@ -82,7 +82,16 @@ class ArrayRecordReaderTest TEST_P(ArrayRecordReaderTest, MoveTest) { std::string encoded; - auto writer_options = GetWriterOptions().set_group_size(2); + auto writer_options = GetWriterOptions(); + int32_t group_size; + if (optimize_for_random_access()) { + group_size = 1; + writer_options.set_groups_awaiting_flush(256); + } else { + group_size = 3; + writer_options.set_groups_awaiting_flush(1); + } + writer_options.set_group_size(group_size); auto writer = ArrayRecordWriter( riegeli::Maker(&encoded), writer_options, nullptr); @@ -113,7 +122,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) { }) .ok()); - EXPECT_EQ(reader_before_move.RecordGroupSize(), 2); + EXPECT_EQ(reader_before_move.RecordGroupSize(), group_size); ArrayRecordReader reader = std::move(reader_before_move); // Once a reader is moved, it is closed. @@ -159,7 +168,7 @@ TEST_P(ArrayRecordReaderTest, MoveTest) { EXPECT_FALSE(reader.ReadRecord(&record_view)); EXPECT_TRUE(reader.ok()); - EXPECT_EQ(reader.RecordGroupSize(), 2); + EXPECT_EQ(reader.RecordGroupSize(), group_size); ASSERT_TRUE(reader.Close()); } @@ -172,11 +181,21 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { size_t len = dist(bitgen); records[i] = MTRandomBytes(bitgen, len); } + auto writer_options = GetWriterOptions(); + int32_t group_size; + if (optimize_for_random_access()) { + group_size = 1; + writer_options.set_groups_awaiting_flush(1024); + } else { + group_size = 1024; + writer_options.set_groups_awaiting_flush(1); + } + writer_options.set_group_size(group_size); std::string encoded; auto writer = ArrayRecordWriter(riegeli::Maker(&encoded), - GetWriterOptions(), get_pool()); + writer_options, get_pool()); for (auto i : Seq(kDatasetSize)) { EXPECT_TRUE(writer.WriteRecord(records[i])); } @@ -193,27 +212,30 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { reader_opt, use_thread_pool() ? get_pool() : nullptr); ASSERT_TRUE(reader.status().ok()); EXPECT_EQ(reader.NumRecords(), kDatasetSize); - uint64_t group_size = - std::min(ArrayRecordWriterBase::Options::kDefaultGroupSize, kDatasetSize); EXPECT_EQ(reader.RecordGroupSize(), group_size); - std::vector read_all_records(kDatasetSize, false); + std::vector read_all_records(kDatasetSize, 0); ASSERT_TRUE(reader .ParallelReadRecords( [&](uint64_t record_index, absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[record_index]); EXPECT_FALSE(read_all_records[record_index]); - read_all_records[record_index] = true; + read_all_records[record_index] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_all_records) { - EXPECT_TRUE(record_was_read); + uint32_t records_read = 0; + for (auto record_was_read : read_all_records) { + if (record_was_read) { + records_read++; + } } + EXPECT_EQ(records_read, kDatasetSize); + EXPECT_TRUE(reader.SeekRecord(0)); std::vector indices = {0, 3, 5, 7, 101, 2000}; - std::vector read_indexed_records(indices.size(), false); + std::vector read_indexed_records(indices.size(), 0); ASSERT_TRUE(reader .ParallelReadRecordsWithIndices( indices, @@ -221,16 +243,16 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[indices[indices_idx]]); EXPECT_FALSE(read_indexed_records[indices_idx]); - read_indexed_records[indices_idx] = true; + read_indexed_records[indices_idx] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_indexed_records) { + for (auto record_was_read : read_indexed_records) { EXPECT_TRUE(record_was_read); } uint64_t begin = 10, end = 101; - std::vector read_range_records(end - begin, false); + std::vector read_range_records(end - begin, 0); ASSERT_TRUE(reader .ParallelReadRecordsInRange( begin, end, @@ -238,13 +260,17 @@ TEST_P(ArrayRecordReaderTest, RandomDatasetTest) { absl::string_view result_view) -> absl::Status { EXPECT_EQ(result_view, records[record_index]); EXPECT_FALSE(read_range_records[record_index - begin]); - read_range_records[record_index - begin] = true; + read_range_records[record_index - begin] = 1; return absl::OkStatus(); }) .ok()); - for (bool record_was_read : read_range_records) { - EXPECT_TRUE(record_was_read); + records_read = 0; + for (auto record_was_read : read_range_records) { + if (record_was_read) { + records_read++; + } } + EXPECT_EQ(records_read, end - begin); // Test sequential read absl::string_view result_view; diff --git a/cpp/array_record_writer.cc b/cpp/array_record_writer.cc index e61a5e6..6179ea3 100644 --- a/cpp/array_record_writer.cc +++ b/cpp/array_record_writer.cc @@ -131,6 +131,9 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { // Group options_parser.AddOption( "group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_)); + options_parser.AddOption( + "groups_awaiting_flush", + ValueParser::Int(0, INT32_MAX, &options.groups_awaiting_flush_)); int32_t max_parallelism = 0; options_parser.AddOption( "max_parallelism", @@ -196,6 +199,7 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) { std::string ArrayRecordWriterBase::Options::ToString() const { std::string option; absl::StrAppend(&option, "group_size:", this->group_size_, + ",groups_awaiting_flush:", this->groups_awaiting_flush_, ",transpose:", this->transpose_ ? "true" : "false", ",pad_to_block_boundary:", this->pad_to_block_boundary_ ? "true" : "false"); @@ -362,6 +366,7 @@ void ArrayRecordWriterBase::Initialize() { } // Add callback only after we serialize header and metadata. writer->set_submit_chunk_callback(submit_chunk_callback_.get()); + // writer->set_chunks_awaiting_flush(options_.groups_awaiting_flush()); } void ArrayRecordWriterBase::Done() { diff --git a/cpp/array_record_writer.h b/cpp/array_record_writer.h index a47c39f..3d5aa51 100644 --- a/cpp/array_record_writer.h +++ b/cpp/array_record_writer.h @@ -94,6 +94,7 @@ class ArrayRecordWriterBase : public riegeli::Object { // options ::= option? ("," option?)* // option ::= // "group_size" ":" group_size | + // "groups_awaiting_flush" ":" groups_awaiting_flush | // "max_parallelism" ":" max_parallelism | // "saturation_delay_ms" : saturation_delay_ms | // "uncompressed" | @@ -105,7 +106,11 @@ class ArrayRecordWriterBase : public riegeli::Object { // "window_log" : window_log | // "pad_to_block_boundary" (":" ("true" | "false"))? // group_size ::= positive integer which specifies number of records to be - // grouped into a chunk before compression. (default 65536) + // grouped into a chunk before compression. (default 1) + // groups_awaiting_flush ::= positive integer which specify the number of + // groups stored in a write buffer before sending to the storage IO. + // This option reduce the total IOPs for small group_size. (default + // 1024) // saturation_delay_ms ::= positive integer which specifies a delay in // milliseconds when the parallel writing queue is saturated. // max_parallelism ::= `auto` or positive integers which specifies @@ -123,13 +128,23 @@ class ArrayRecordWriterBase : public riegeli::Object { // // The larger the value, the denser the file, at the cost of more expansive // random accessing. - static constexpr uint32_t kDefaultGroupSize = 65536; + // TODO(fchern) group_size=1 can trigger writer bugs.. + static constexpr uint32_t kDefaultGroupSize = 1; Options& set_group_size(uint32_t group_size) { group_size_ = group_size; return *this; } uint32_t group_size() const { return group_size_; } + // Set the number of gruops pending in the write buffer before sending to + // the storage IO. This option reduces the total IOPs for small group size. + static constexpr uint32_t kDefaultGroupsAwaitingFlush = 1024; + Options& set_groups_awaiting_flush(uint32_t groups_awaiting_flush) { + groups_awaiting_flush_ = groups_awaiting_flush; + return *this; + } + uint32_t groups_awaiting_flush() const { return groups_awaiting_flush_; } + // Specifies max number of concurrent chunk encoders allowed. Default to the // thread pool size. Options& set_max_parallelism(std::optional max_parallelism) { @@ -290,6 +305,7 @@ class ArrayRecordWriterBase : public riegeli::Object { private: int32_t group_size_ = kDefaultGroupSize; + int32_t groups_awaiting_flush_ = kDefaultGroupsAwaitingFlush; riegeli::CompressorOptions compressor_options_; std::optional metadata_; bool pad_to_block_boundary_ = false; diff --git a/cpp/array_record_writer_test.cc b/cpp/array_record_writer_test.cc index a8b1414..632423b 100644 --- a/cpp/array_record_writer_test.cc +++ b/cpp/array_record_writer_test.cc @@ -209,6 +209,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { auto option = ArrayRecordWriterBase::Options::FromString("").value(); EXPECT_EQ(option.group_size(), ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_FALSE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -218,7 +220,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_FALSE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," + "groups_awaiting_flush:1024," "transpose:false," "pad_to_block_boundary:false," "zstd:3," @@ -230,6 +233,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { auto option = ArrayRecordWriterBase::Options::FromString("default").value(); EXPECT_EQ(option.group_size(), ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_FALSE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -239,7 +244,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_FALSE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," + "groups_awaiting_flush:1024," "transpose:false," "pad_to_block_boundary:false," "zstd:3," @@ -248,10 +254,12 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { ArrayRecordWriterBase::Options::FromString(option.ToString()).ok()); } { - auto option = ArrayRecordWriterBase::Options::FromString( - "group_size:32,transpose,window_log:20") - .value(); + auto option = + ArrayRecordWriterBase::Options::FromString( + "group_size:32,groups_awaiting_flush:256,transpose,window_log:20") + .value(); EXPECT_EQ(option.group_size(), 32); + EXPECT_EQ(option.groups_awaiting_flush(), 256); EXPECT_TRUE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -261,6 +269,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.ToString(), "group_size:32," + "groups_awaiting_flush:256," "transpose:true," "pad_to_block_boundary:false," "transpose_bucket_size:256," @@ -274,6 +283,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { "brotli:6,group_size:32,transpose,window_log:25") .value(); EXPECT_EQ(option.group_size(), 32); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_TRUE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -283,6 +294,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.ToString(), "group_size:32," + "groups_awaiting_flush:1024," "transpose:true," "pad_to_block_boundary:false," "transpose_bucket_size:256," @@ -296,6 +308,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { "group_size:32,transpose,zstd:5") .value(); EXPECT_EQ(option.group_size(), 32); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_TRUE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -306,6 +320,7 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_EQ(option.ToString(), "group_size:32," + "groups_awaiting_flush:1024," "transpose:true," "pad_to_block_boundary:false," "transpose_bucket_size:256," @@ -320,6 +335,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { .value(); EXPECT_EQ(option.group_size(), ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_FALSE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -327,7 +344,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_TRUE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," + "groups_awaiting_flush:1024," "transpose:false," "pad_to_block_boundary:true," "uncompressed"); @@ -340,6 +358,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { .value(); EXPECT_EQ(option.group_size(), ArrayRecordWriterBase::Options::kDefaultGroupSize); + EXPECT_EQ(option.groups_awaiting_flush(), + ArrayRecordWriterBase::Options::kDefaultGroupsAwaitingFlush); EXPECT_FALSE(option.transpose()); EXPECT_EQ(option.max_parallelism(), std::nullopt); EXPECT_EQ(option.compressor_options().compression_type(), @@ -347,7 +367,8 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) { EXPECT_TRUE(option.pad_to_block_boundary()); EXPECT_EQ(option.ToString(), - "group_size:65536," + "group_size:1," + "groups_awaiting_flush:1024," "transpose:false," "pad_to_block_boundary:true," "snappy"); diff --git a/cpp/sequenced_chunk_writer.cc b/cpp/sequenced_chunk_writer.cc index 916c031..0e5932d 100644 --- a/cpp/sequenced_chunk_writer.cc +++ b/cpp/sequenced_chunk_writer.cc @@ -149,7 +149,8 @@ void SequencedChunkWriterBase::Initialize() { "Failed to create the file header")); } if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) { - Fail(riegeli::Annotate(chunk_writer->status(), "Could not flush")); + Fail(riegeli::Annotate(chunk_writer->status(), + "Could not flush the file header.")); } } @@ -159,6 +160,11 @@ void SequencedChunkWriterBase::Done() { return; } auto* chunk_writer = get_writer(); + // if (!chunk_writer->Flush(riegeli::FlushType::kFromObject)) { + // Fail(riegeli::Annotate(chunk_writer->status(), + // "Could not flush before close.")); + // return; + // } if (!chunk_writer->Close()) { Fail(riegeli::Annotate(chunk_writer->status(), "Failed to close chunk_writer")); diff --git a/cpp/sequenced_chunk_writer.h b/cpp/sequenced_chunk_writer.h index b255ca2..89a8042 100644 --- a/cpp/sequenced_chunk_writer.h +++ b/cpp/sequenced_chunk_writer.h @@ -145,6 +145,11 @@ class SequencedChunkWriterBase : public riegeli::Object { return pad_to_block_boundary_; } + void set_chunks_awaiting_flush(uint32_t chunks_awaiting_flush) { + absl::MutexLock l(&mu_); + chunks_awaiting_flush_ = chunks_awaiting_flush; + } + // Setup a callback for each committed chunk. See CommitChunkCallback // comments for details. void set_submit_chunk_callback(SubmitChunkCallback* callback) { @@ -182,6 +187,8 @@ class SequencedChunkWriterBase : public riegeli::Object { // Records the sequence number of submitted chunks. uint64_t submitted_chunks_ ABSL_GUARDED_BY(mu_) = 0; + uint64_t chunks_awaiting_flush_ ABSL_GUARDED_BY(mu_) = 0; + // Queue for storing the future chunks. std::queue>> queue_ ABSL_GUARDED_BY(mu_); diff --git a/python/array_record_data_source_test.py b/python/array_record_data_source_test.py index 5196d99..6b268f3 100644 --- a/python/array_record_data_source_test.py +++ b/python/array_record_data_source_test.py @@ -51,15 +51,7 @@ def test_check_default_group_size(self): writer.write(b"foobar") writer.close() reader = array_record_module.ArrayRecordReader(filename) - with self.assertLogs(level="ERROR") as log_output: - array_record_data_source._check_group_size(filename, reader) - self.assertRegex( - log_output.output[0], - ( - r"File .* was created with group size 65536. Grain requires group" - r" size 1 for good performance" - ), - ) + self.assertRegex(reader.writer_options_string(), r"group_size:1") def test_check_valid_group_size(self): filename = os.path.join(FLAGS.test_tmpdir, "test.array_record") diff --git a/python/array_record_module_test.py b/python/array_record_module_test.py index 4df8ff2..4f2e88a 100644 --- a/python/array_record_module_test.py +++ b/python/array_record_module_test.py @@ -139,8 +139,10 @@ def test_writer_options(self): # Includes default options. self.assertEqual( reader.writer_options_string(), - "group_size:42,transpose:false,pad_to_block_boundary:false,zstd:3," - "window_log:20,max_parallelism:1") + "group_size:42,groups_awaiting_flush:1024," + "transpose:false,pad_to_block_boundary:false,zstd:3," + "window_log:20,max_parallelism:1", + ) if __name__ == "__main__": absltest.main()