diff --git a/cpp/BUILD b/cpp/BUILD index 1c77e47..20e4490 100644 --- a/cpp/BUILD +++ b/cpp/BUILD @@ -168,6 +168,7 @@ cc_library( "@com_google_riegeli//riegeli/base:options_parser", "@com_google_riegeli//riegeli/base:status", "@com_google_riegeli//riegeli/bytes:reader", + "@com_google_riegeli//riegeli/bytes:wrapped_reader", "@com_google_riegeli//riegeli/chunk_encoding:chunk_decoder", "@com_google_riegeli//riegeli/records:chunk_reader", "@com_google_riegeli//riegeli/records:record_position", diff --git a/cpp/array_record_reader.cc b/cpp/array_record_reader.cc index bc7675e..3a4d270 100644 --- a/cpp/array_record_reader.cc +++ b/cpp/array_record_reader.cc @@ -46,6 +46,7 @@ limitations under the License. #include "riegeli/base/options_parser.h" #include "riegeli/base/status.h" #include "riegeli/bytes/reader.h" +#include "riegeli/bytes/wrapped_reader.h" #include "riegeli/chunk_encoding/chunk_decoder.h" #include "riegeli/records/chunk_reader.h" #include "riegeli/records/record_position.h" @@ -70,6 +71,7 @@ using riegeli::ChunkReader; using riegeli::OptionsParser; using riegeli::Reader; using riegeli::ValueParser; +using riegeli::WrappedReader; template T CeilOfRatio(T x, T d) { @@ -159,20 +161,15 @@ ArrayRecordReaderBase& ArrayRecordReaderBase::operator=( return *this; } -// After the first access to the underlying `riegeli::Reader`, the lazily -// evaluated variables for random access are all initialized. Therefore it's -// safe to access the reader from multiple threads later on, even though the -// methods wasn't const. -ChunkDecoder ReadChunk(const ThreadCompatibleSharedPtr& reader, - size_t pos, size_t len) { +ChunkDecoder ReadChunk(Reader* mutable_reader, size_t pos, size_t len) { ChunkDecoder decoder; - if (!reader->ok()) { - decoder.Fail(reader->status()); + if (!mutable_reader->ok()) { + decoder.Fail(mutable_reader->status()); return decoder; } - Reader* mutable_reader = - const_cast(reinterpret_cast(reader.get())); - MaskedReader masked_reader(mutable_reader->NewReader(pos), len); + mutable_reader->Seek(pos); + MaskedReader masked_reader(std::make_unique>(mutable_reader), + len); if (!masked_reader.ok()) { decoder.Fail(masked_reader.status()); return decoder; @@ -227,7 +224,7 @@ void ArrayRecordReaderBase::Initialize() { } RiegeliPostscript postscript; auto postscript_decoder = - ReadChunk(reader, size - kRiegeliBlockSize, kRiegeliBlockSize); + ReadChunk(mutable_reader, size - kRiegeliBlockSize, kRiegeliBlockSize); if (!postscript_decoder.ReadRecord(postscript)) { Fail(Annotate(postscript_decoder.status(), "Failed to read RiegeliPostscript")); @@ -245,7 +242,7 @@ void ArrayRecordReaderBase::Initialize() { } state_->footer_offset = postscript.footer_offset(); footer_decoder = - ReadChunk(reader, postscript.footer_offset(), + ReadChunk(mutable_reader, postscript.footer_offset(), size - kRiegeliBlockSize - postscript.footer_offset()); if (!footer_decoder.ReadRecord(footer_metadata)) { @@ -670,11 +667,13 @@ bool ArrayRecordReaderBase::ReadAheadFromBuffer(uint64_t buffer_idx) { uint64_t chunk_end = std::min(state_->chunk_offsets.size(), (buffer_idx + 1) * state_->chunk_group_size); const auto reader = get_backing_reader(); + Reader* mutable_reader = + const_cast(reinterpret_cast(reader.get())); for (uint64_t chunk_idx = chunk_start; chunk_idx < chunk_end; ++chunk_idx) { uint64_t chunk_offset = state_->chunk_offsets[chunk_idx]; uint64_t chunk_end_offset = state_->ChunkEndOffset(chunk_idx); - decoders.push_back( - ReadChunk(reader, chunk_offset, chunk_end_offset - chunk_offset)); + decoders.push_back(ReadChunk(mutable_reader, chunk_offset, + chunk_end_offset - chunk_offset)); } state_->buffer_idx = buffer_idx; state_->current_decoders = std::move(decoders);