From eea9f0d1b126793903fc1e7f73559c642e5633df Mon Sep 17 00:00:00 2001 From: ArrayRecord Team Date: Fri, 27 Oct 2023 14:53:00 -0700 Subject: [PATCH] Reuse the same reader across ReadChunk calls so that prefetched blocks are not discared. In the sequential non-parallel path (ReadRecord->ReadAheadFromBuffer->ReadChunk) ReadChunk reads the whole chunk into memory. Most readers will round up the read to the blocksize and thereby prefetch some data from the next chunk. Currently ReadChunk recreates the reader each time and the previously prefetched data is lost and must be read again. Instead use get_backing_reader() directly each time. We can do this since ReadChunk is only called in single threaded context (Initialize, ReadRecord (parallelism disabled)) so there cannot be any concurrent access to the underlying reader. ReadTrace (before): ``` offset=64, size=1M <-- 1st chunk offset=1M, size=1M ... offset=8M, size=1M <-- block aligned read crosses chunk boundary offset=8.3M, size=0.7M <-- redundant read for 2nd chunk offset=9M, size=1M ... ``` ReadTrace (after): ``` offset=64, size=1M <-- 1st chunk offset=1M, size=1M ... offset=8M, size=1M <-- block aligned read crosses chunk boundary and is reused for the 2nd chunk offset=9M, size=1M ... ``` PiperOrigin-RevId: 577312179 --- cpp/BUILD | 1 + cpp/array_record_reader.cc | 29 ++++++++++++++--------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cpp/BUILD b/cpp/BUILD index 1c77e47..20e4490 100644 --- a/cpp/BUILD +++ b/cpp/BUILD @@ -168,6 +168,7 @@ cc_library( "@com_google_riegeli//riegeli/base:options_parser", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/bytes:reader", + "@com_google_riegeli//riegeli/bytes:wrapped_reader", "@com_google_riegeli//riegeli/chunk_encoding:chunk_decoder", "@com_google_riegeli//riegeli/records:chunk_reader", "@com_google_riegeli//riegeli/records:record_position", diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index bc7675e..3a4d270 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -46,6 +46,7 @@ limitations under the License. #include "riegeli/base/options_parser.h" #include "riegeli/base/status.h" #include "riegeli/bytes/reader.h" +#include "riegeli/bytes/wrapped_reader.h" #include "riegeli/chunk_encoding/chunk_decoder.h" #include "riegeli/records/chunk_reader.h" #include "riegeli/records/record_position.h" @@ -70,6 +71,7 @@ using riegeli::ChunkReader; using riegeli::OptionsParser; using riegeli::Reader; using riegeli::ValueParser; +using riegeli::WrappedReader; template T CeilOfRatio(T x, T d) { @@ -159,20 +161,15 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=( return *this; } -// After the first access to the underlying `riegeli::Reader`, the lazily -// evaluated variables for random access are all initialized. Therefore it's -// safe to access the reader from multiple threads later on, even though the -// methods wasn't const. -ChunkDecoder ReadChunk(const ThreadCompatibleSharedPtr& reader, - size_t pos, size_t len) { +ChunkDecoder ReadChunk(Reader* mutable_reader, size_t pos, size_t len) { ChunkDecoder decoder; - if (!reader->ok()) { - decoder.Fail(reader->status()); + if (!mutable_reader->ok()) { + decoder.Fail(mutable_reader->status()); return decoder; } - Reader* mutable_reader = - const_cast(reinterpret_cast(reader.get())); - MaskedReader masked_reader(mutable_reader->NewReader(pos), len); + mutable_reader->Seek(pos); + MaskedReader masked_reader(std::make_unique>(mutable_reader), + len); if (!masked_reader.ok()) { decoder.Fail(masked_reader.status()); return decoder; @@ -227,7 +224,7 @@ void ArrayRecordReaderBase::Initialize() { } RiegeliPostscript postscript; auto postscript_decoder = - ReadChunk(reader, size - kRiegeliBlockSize, kRiegeliBlockSize); + ReadChunk(mutable_reader, size - kRiegeliBlockSize, kRiegeliBlockSize); if (!postscript_decoder.ReadRecord(postscript)) { Fail(Annotate(postscript_decoder.status(), "Failed to read RiegeliPostscript")); @@ -245,7 +242,7 @@ void ArrayRecordReaderBase::Initialize() { } state_->footer_offset = postscript.footer_offset(); footer_decoder = - ReadChunk(reader, postscript.footer_offset(), + ReadChunk(mutable_reader, postscript.footer_offset(), size - kRiegeliBlockSize - postscript.footer_offset()); if (!footer_decoder.ReadRecord(footer_metadata)) { @@ -670,11 +667,13 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { uint64_t chunk_end = std::min(state_->chunk_offsets.size(), (buffer_idx + 1) * state_->chunk_group_size); const auto reader = get_backing_reader(); + Reader* mutable_reader = + const_cast(reinterpret_cast(reader.get())); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { uint64_t chunk_offset = state_->chunk_offsets[chunk_idx]; uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx); - decoders.push_back( - ReadChunk(reader, chunk_offset, chunk_end_offset - chunk_offset)); + decoders.push_back(ReadChunk(mutable_reader, chunk_offset, + chunk_end_offset - chunk_offset)); } state_->buffer_idx = buffer_idx; state_->current_decoders = std::move(decoders);