Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse the same reader across ReadChunk calls so that prefetched blocks are not discared. #82

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ cc_library(
"@com_google_riegeli//riegeli/base:options_parser",
"@com_google_riegeli//riegeli/base:status",
"@com_google_riegeli//riegeli/bytes:reader",
"@com_google_riegeli//riegeli/bytes:wrapped_reader",
"@com_google_riegeli//riegeli/chunk_encoding:chunk_decoder",
"@com_google_riegeli//riegeli/records:chunk_reader",
"@com_google_riegeli//riegeli/records:record_position",
Expand Down
29 changes: 14 additions & 15 deletions cpp/array_record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ limitations under the License.
#include "riegeli/base/options_parser.h"
#include "riegeli/base/status.h"
#include "riegeli/bytes/reader.h"
#include "riegeli/bytes/wrapped_reader.h"
#include "riegeli/chunk_encoding/chunk_decoder.h"
#include "riegeli/records/chunk_reader.h"
#include "riegeli/records/record_position.h"
Expand All @@ -70,6 +71,7 @@ using riegeli::ChunkReader;
using riegeli::OptionsParser;
using riegeli::Reader;
using riegeli::ValueParser;
using riegeli::WrappedReader;

template <class T>
T CeilOfRatio(T x, T d) {
Expand Down Expand Up @@ -159,20 +161,15 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=(
return *this;
}

// After the first access to the underlying `riegeli::Reader`, the lazily
// evaluated variables for random access are all initialized. Therefore it's
// safe to access the reader from multiple threads later on, even though the
// methods wasn't const.
ChunkDecoder ReadChunk(const ThreadCompatibleSharedPtr<Reader>& reader,
size_t pos, size_t len) {
ChunkDecoder ReadChunk(Reader* mutable_reader, size_t pos, size_t len) {
ChunkDecoder decoder;
if (!reader->ok()) {
decoder.Fail(reader->status());
if (!mutable_reader->ok()) {
decoder.Fail(mutable_reader->status());
return decoder;
}
Reader* mutable_reader =
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
MaskedReader masked_reader(mutable_reader->NewReader(pos), len);
mutable_reader->Seek(pos);
MaskedReader masked_reader(std::make_unique<WrappedReader<>>(mutable_reader),
len);
if (!masked_reader.ok()) {
decoder.Fail(masked_reader.status());
return decoder;
Expand Down Expand Up @@ -227,7 +224,7 @@ void ArrayRecordReaderBase::Initialize() {
}
RiegeliPostscript postscript;
auto postscript_decoder =
ReadChunk(reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
ReadChunk(mutable_reader, size - kRiegeliBlockSize, kRiegeliBlockSize);
if (!postscript_decoder.ReadRecord(postscript)) {
Fail(Annotate(postscript_decoder.status(),
"Failed to read RiegeliPostscript"));
Expand All @@ -245,7 +242,7 @@ void ArrayRecordReaderBase::Initialize() {
}
state_->footer_offset = postscript.footer_offset();
footer_decoder =
ReadChunk(reader, postscript.footer_offset(),
ReadChunk(mutable_reader, postscript.footer_offset(),
size - kRiegeliBlockSize - postscript.footer_offset());

if (!footer_decoder.ReadRecord(footer_metadata)) {
Expand Down Expand Up @@ -670,11 +667,13 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) {
uint64_t chunk_end = std::min(state_->chunk_offsets.size(),
(buffer_idx + 1) * state_->chunk_group_size);
const auto reader = get_backing_reader();
Reader* mutable_reader =
const_cast<Reader*>(reinterpret_cast<const Reader*>(reader.get()));
for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) {
uint64_t chunk_offset = state_->chunk_offsets[chunk_idx];
uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx);
decoders.push_back(
ReadChunk(reader, chunk_offset, chunk_end_offset - chunk_offset));
decoders.push_back(ReadChunk(mutable_reader, chunk_offset,
chunk_end_offset - chunk_offset));
}
state_->buffer_idx = buffer_idx;
state_->current_decoders = std::move(decoders);
Expand Down
Loading