Skip to content

Commit

Permalink
Internal changes.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 480067689
  • Loading branch information
dryman authored and copybara-github committed Oct 10, 2022
1 parent 68d6e0f commit f8a72d5
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 8 deletions.
11 changes: 11 additions & 0 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ struct ArrayRecordReaderBase::ArrayRecordReaderState {
std::queue<IndexedPair<std::future<std::vector<ChunkDecoder>>>>
future_decoders;

// Writer options for debugging purposes.
std::optional<std::string> writer_options = std::nullopt;

uint64_t ChunkEndOffset(uint64_t chunk_idx) const {
if (chunk_idx == footer.size() - 1) {
return footer_offset;
Expand Down Expand Up @@ -263,6 +266,10 @@ void ArrayRecordReaderBase::Initialize() {
return;
}
state_->num_records = footer_metadata.array_record_metadata().num_records();
if (footer_metadata.array_record_metadata().has_writer_options()) {
state_->writer_options =
footer_metadata.array_record_metadata().writer_options();
}
}
{
AR_ENDO_SCOPE("Reading footer body");
Expand Down Expand Up @@ -757,4 +764,8 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
return true;
}

std::optional<std::string> ArrayRecordReaderBase::WriterOptionsString() const {
return state_->writer_options;
}

} // namespace array_record
3 changes: 3 additions & 0 deletions cpp/array_record_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class ArrayRecordReaderBase : public riegeli::Object {
// `false` (when `!ok()`) - failure
bool ReadRecord(absl::string_view* record);

// Returns the writer options if presented.
std::optional<std::string> WriterOptionsString() const;

protected:
explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool);
~ArrayRecordReaderBase() override;
Expand Down
9 changes: 9 additions & 0 deletions cpp/array_record_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ TEST_P(ArrayRecordReaderTest, MoveTest) {
// Once a reader is moved, it is closed.
ASSERT_FALSE(reader_before_move.is_open()); // NOLINT

auto recorded_writer_options = ArrayRecordWriterBase::Options::FromString(
reader.WriterOptionsString().value())
.value();
EXPECT_EQ(writer_options.compression_type(),
recorded_writer_options.compression_type());
EXPECT_EQ(writer_options.compression_level(),
recorded_writer_options.compression_level());
EXPECT_EQ(writer_options.transpose(), recorded_writer_options.transpose());

std::vector<uint64_t> indices = {1, 2, 4};
ASSERT_TRUE(reader
.ParallelReadRecordsWithIndices(
Expand Down
65 changes: 61 additions & 4 deletions cpp/array_record_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ limitations under the License.
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include "absl/types/span.h"
Expand Down Expand Up @@ -63,6 +64,8 @@ constexpr uint32_t kZstdDefaultWindowLog = 20;
// Generated from `echo 'ArrayRecord' | md5sum | cut -b 1-16`
constexpr uint64_t kMagic = 0x71930e704fdae05eULL;

constexpr char kArrayRecordDefaultCompression[] = "zstd:3";

using riegeli::Chunk;
using riegeli::ChunkType;
using riegeli::CompressorOptions;
Expand Down Expand Up @@ -109,11 +112,18 @@ absl::StatusOr<Chunk> ChunkFromSpan(CompressorOptions compression_options,

} // namespace

ArrayRecordWriterBase::Options::Options() {
DCHECK_OK(
this->compressor_options_.FromString(kArrayRecordDefaultCompression));
this->compressor_options_.set_window_log(kZstdDefaultWindowLog);
}

// static
absl::StatusOr<ArrayRecordWriterBase::Options>
ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
ArrayRecordWriterBase::Options options;
OptionsParser options_parser;
options_parser.AddOption("default", ValueParser::FailIfAnySeen());
// Group
options_parser.AddOption(
"group_size", ValueParser::Int(1, INT32_MAX, &options.group_size_));
Expand Down Expand Up @@ -151,6 +161,15 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
if (!options_parser.FromString(text)) {
return options_parser.status();
}
// From our benchmarks we figured zstd:3 gives the best trade-off for both the
// compression and decomopression speed.
if (text == "default" ||
(!absl::StrContains(compressor_text, "uncompressed") &&
!absl::StrContains(compressor_text, "brotli") &&
!absl::StrContains(compressor_text, "snappy") &&
!absl::StrContains(compressor_text, "zstd"))) {
absl::StrAppend(&compressor_text, ",", kArrayRecordDefaultCompression);
}
// max_parallelism is set after options_parser.FromString()
if (max_parallelism > 0) {
options.set_max_parallelism(max_parallelism);
Expand All @@ -167,13 +186,48 @@ ArrayRecordWriterBase::Options::FromString(absl::string_view text) {
return options;
}

std::string ArrayRecordWriterBase::Options::ToString() const {
std::string option;
absl::StrAppend(&option, "group_size:", this->group_size_,
",transpose:", this->transpose_ ? "true" : "false",
",pad_to_block_boundary:",
this->pad_to_block_boundary_ ? "true" : "false");
if (this->transpose_) {
absl::StrAppend(&option,
",transpose_bucket_size:", this->transpose_bucket_size_);
}
switch (this->compressor_options().compression_type()) {
case riegeli::CompressionType::kNone:
absl::StrAppend(&option, ",uncompressed");
break;
case riegeli::CompressionType::kBrotli:
absl::StrAppend(
&option, ",brotli:", this->compressor_options().compression_level());
break;
case riegeli::CompressionType::kZstd:
absl::StrAppend(&option,
",zstd:", this->compressor_options().compression_level());
break;
case riegeli::CompressionType::kSnappy:
absl::StrAppend(&option, ",snappy");
break;
}
if (this->compressor_options().window_log().has_value()) {
absl::StrAppend(&option, ",window_log:",
this->compressor_options().window_log().value());
}
if (max_parallelism_.has_value()) {
absl::StrAppend(&option, ",max_parallelism:", max_parallelism_.value());
}
return option;
}

// Thread compatible callback guarded by SequencedChunkWriter's mutex.
class ArrayRecordWriterBase::SubmitChunkCallback
: public SequencedChunkWriterBase::SubmitChunkCallback {
public:
explicit SubmitChunkCallback(const ArrayRecordWriterBase::Options options)
: compression_options_(options.compressor_options()),
max_parallelism_(options.max_parallelism().value()) {
: options_(options), max_parallelism_(options.max_parallelism().value()) {
constexpr uint64_t kDefaultDecodedDataSize = (1 << 20);
last_decoded_data_size_.store(kDefaultDecodedDataSize);
}
Expand All @@ -200,7 +254,7 @@ class ArrayRecordWriterBase::SubmitChunkCallback
void WriteFooterAndPostscript(SequencedChunkWriterBase* writer);

private:
const CompressorOptions compression_options_;
const Options options_;

absl::Mutex mu_;
const int32_t max_parallelism_;
Expand Down Expand Up @@ -456,8 +510,11 @@ ArrayRecordWriterBase::SubmitChunkCallback::CreateFooterChunk() {
footer_metadata.mutable_array_record_metadata()->set_num_chunks(
array_footer_.size());
footer_metadata.mutable_array_record_metadata()->set_num_records(num_records);
footer_metadata.mutable_array_record_metadata()->set_writer_options(
options_.ToString());
// Perhaps we can compress the footer
return ChunkFromSpan(compression_options_, absl::MakeConstSpan(array_footer_),
return ChunkFromSpan(options_.compressor_options(),
absl::MakeConstSpan(array_footer_),
std::optional(footer_metadata));
}

Expand Down
5 changes: 4 additions & 1 deletion cpp/array_record_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ArrayRecordWriterBase : public riegeli::Object {
public:
class Options {
public:
Options() {}
Options();

// Parses options from text:
// ```
Expand Down Expand Up @@ -268,6 +268,9 @@ class ArrayRecordWriterBase : public riegeli::Object {
return metadata_;
}

// Serialize the options to a string.
std::string ToString() const;

private:
int32_t group_size_ = kDefaultGroupSize;
riegeli::CompressorOptions compressor_options_;
Expand Down
120 changes: 117 additions & 3 deletions cpp/array_record_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,59 @@ INSTANTIATE_TEST_SUITE_P(
testing::Bool(), testing::Bool(), testing::Bool()));

TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
{
auto option = ArrayRecordWriterBase::Options();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());
}
{
auto option = ArrayRecordWriterBase::Options::FromString("").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kBrotli);
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString("default").value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().compression_level(), 3);
EXPECT_EQ(option.compressor_options().window_log().value(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:false,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -210,10 +254,42 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.group_size(), 32);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().window_log(), 20);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"zstd:3,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"brotli:6,group_size:32,transpose,window_log:25")
.value();
EXPECT_EQ(option.group_size(), 32);
EXPECT_TRUE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kBrotli);
EXPECT_EQ(option.compressor_options().brotli_window_log(), 20);
EXPECT_EQ(option.compressor_options().window_log(), 25);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"brotli:6,"
"window_log:25");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -224,9 +300,19 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kZstd);
EXPECT_EQ(option.compressor_options().zstd_window_log(), 20);
EXPECT_EQ(option.compressor_options().window_log(), 20);
EXPECT_EQ(option.compressor_options().compression_level(), 5);
EXPECT_FALSE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:32,"
"transpose:true,"
"pad_to_block_boundary:false,"
"transpose_bucket_size:256,"
"zstd:5,"
"window_log:20");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
Expand All @@ -239,6 +325,34 @@ TEST(ArrayRecordWriterOptionsTest, ParsingTest) {
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kNone);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:true,"
"uncompressed");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
{
auto option = ArrayRecordWriterBase::Options::FromString(
"snappy,pad_to_block_boundary:true")
.value();
EXPECT_EQ(option.group_size(),
ArrayRecordWriterBase::Options::kDefaultGroupSize);
EXPECT_FALSE(option.transpose());
EXPECT_EQ(option.max_parallelism(), std::nullopt);
EXPECT_EQ(option.compressor_options().compression_type(),
riegeli::CompressionType::kSnappy);
EXPECT_TRUE(option.pad_to_block_boundary());

EXPECT_EQ(option.ToString(),
"group_size:65536,"
"transpose:false,"
"pad_to_block_boundary:true,"
"snappy");
EXPECT_TRUE(
ArrayRecordWriterBase::Options::FromString(option.ToString()).ok());
}
}

Expand Down
2 changes: 2 additions & 0 deletions cpp/layout.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ message RiegeliFooterMetadata {
optional uint32 version = 1;
optional uint64 num_chunks = 2;
optional uint64 num_records = 3;
// Writer options for debugging purposes.
optional string writer_options = 4;
}

oneof metadata {
Expand Down

0 comments on commit f8a72d5

Please sign in to comment.