Skip to content

Commit

Permalink
Reuse the same reader across ReadChunk calls so that prefetched block…
Browse files Browse the repository at this point in the history
…s are not discared.

In the sequential non-parallel path (ReadRecord->ReadAheadFromBuffer->ReadChunk) ReadChunk reads the whole chunk into memory. Most readers will round up the read to the blocksize and thereby prefetch some data from the next chunk.

Currently ReadChunk recreates the reader each time and the previously prefetched data is lost and must be read again.

Instead use get_backing_reader() directly each time. We can do this since ReadChunk is only called in single threaded context (Initialize, ReadRecord (parallelism disabled)) so there cannot be any concurrent access to the underlying reader.

ReadTrace (before):
```
offset=64, size=1M <-- 1st chunk
offset=1M, size=1M
...
offset=8M, size=1M <-- block aligned read crosses chunk boundary
offset=8.3M, size=0.7M <-- redundant read for 2nd chunk
offset=9M, size=1M
...
```

ReadTrace (after):
```
offset=64, size=1M <-- 1st chunk
offset=1M, size=1M
...
offset=8M, size=1M <-- block aligned read crosses chunk boundary and is reused for the 2nd chunk
offset=9M, size=1M
...
```
PiperOrigin-RevId: 577312179
  • Loading branch information
ArrayRecord Team authored and copybara-github committed Oct 30, 2023
1 parent b1403b5 commit fa16989
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
1 change: 1 addition & 0 deletions cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ cc_library(
"@com_google_riegeli//riegeli/base:options_parser",
"@com_google_riegeli//riegeli/base:status",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:wrapped_reader",
"@com_google_riegeli//riegeli/chunk_encoding:chunk_decoder",
"@com_google_riegeli//riegeli/records:chunk_reader",
"@com_google_riegeli//riegeli/records:record_position",
Expand Down
29 changes: 14 additions & 15 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ limitations under the License.
#include "riegeli/base/options_parser.h"
#include "riegeli/base/status.h"
#include "riegeli/bytes/reader.h"
#include "riegeli/bytes/wrapped_reader.h"
#include "riegeli/chunk_encoding/chunk_decoder.h"
#include "riegeli/records/chunk_reader.h"
#include "riegeli/records/record_position.h"
Expand All @@ -70,6 +71,7 @@ using riegeli::ChunkReader;
using riegeli::OptionsParser;
using riegeli::Reader;
using riegeli::ValueParser;
using riegeli::WrappedReader;

template <class T>
T CeilOfRatio(T x, T d) {
Expand Down Expand Up @@ -159,20 +161,15 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=(
return *this;
}

// After the first access to the underlying `riegeli::Reader`, the lazily
// evaluated variables for random access are all initialized. Therefore it's
// safe to access the reader from multiple threads later on, even though the
// methods wasn't const.
ChunkDecoder ReadChunk(const ThreadCompatibleSharedPtr<Reader>& reader,
size_t pos, size_t len) {
ChunkDecoder ReadChunk(Reader* mutable_reader, size_t pos, size_t len) {
ChunkDecoder decoder;
if (!reader->ok()) {
decoder.Fail(reader->status());
if (!mutable_reader->ok()) {
decoder.Fail(mutable_reader->status());
return decoder;
}
Reader* mutable_reader =
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
MaskedReader masked_reader(mutable_reader->NewReader(pos), len);
mutable_reader->Seek(pos);
MaskedReader masked_reader(std::make_unique<WrappedReader<>>(mutable_reader),
len);
if (!masked_reader.ok()) {
decoder.Fail(masked_reader.status());
return decoder;
Expand Down Expand Up @@ -227,7 +224,7 @@ void ArrayRecordReaderBase::Initialize() {
}
RiegeliPostscript postscript;
auto postscript_decoder =
ReadChunk(reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
ReadChunk(mutable_reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
if (!postscript_decoder.ReadRecord(postscript)) {
Fail(Annotate(postscript_decoder.status(),
"Failed to read RiegeliPostscript"));
Expand All @@ -245,7 +242,7 @@ void ArrayRecordReaderBase::Initialize() {
}
state_->footer_offset = postscript.footer_offset();
footer_decoder =
ReadChunk(reader, postscript.footer_offset(),
ReadChunk(mutable_reader, postscript.footer_offset(),
size - kRiegeliBlockSize - postscript.footer_offset());

if (!footer_decoder.ReadRecord(footer_metadata)) {
Expand Down Expand Up @@ -670,11 +667,13 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
uint64_t chunk_end = std::min(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
const auto reader = get_backing_reader();
Reader* mutable_reader =
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
uint64_t chunk_offset = state_->chunk_offsets[chunk_idx];
uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx);
decoders.push_back(
ReadChunk(reader, chunk_offset, chunk_end_offset - chunk_offset));
decoders.push_back(ReadChunk(mutable_reader, chunk_offset,
chunk_end_offset - chunk_offset));
}
state_->buffer_idx = buffer_idx;
state_->current_decoders = std::move(decoders);
Expand Down

0 comments on commit fa16989

Please sign in to comment.