Skip to content

Commit

Permalink
Get rid of Segment& segment()
Browse files Browse the repository at this point in the history
  • Loading branch information
poodlewars committed Aug 10, 2024
1 parent 498c331 commit 2058aef
Show file tree
Hide file tree
Showing 39 changed files with 91 additions and 89 deletions.
3 changes: 1 addition & 2 deletions cpp/arcticdb/async/async_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ std::pair<entity::VariantKey, std::optional<Segment>> lookup_match_in_dedup_map(
ARCTICDB_DEBUG(log::version(),
"No existing key with same contents: writing new object {}",
key_seg.atom_key());
return std::make_pair(std::move(key_seg.atom_key()), std::make_optional(std::move(key_seg.segment())));

return std::make_pair(key_seg.atom_key(), std::make_optional(std::move(*key_seg.release_segment())));
} else {
ARCTICDB_DEBUG(log::version(),
"Found existing key with same contents: using existing object {}",
Expand Down
10 changes: 5 additions & 5 deletions cpp/arcticdb/async/tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ namespace arcticdb::async {

pipelines::SegmentAndSlice DecodeSliceTask::decode_into_slice(storage::KeySegmentPair&& key_segment_pair) {
auto key = key_segment_pair.atom_key();
auto& seg = key_segment_pair.segment();
auto seg = key_segment_pair.segment_ptr();
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}",
seg.size(),
seg->size(),
key);
auto &hdr = seg.header();
const auto& desc = seg.descriptor();
auto &hdr = seg->header();
const auto& desc = seg->descriptor();
auto descriptor = async::get_filtered_descriptor(desc, columns_to_decode_);
ranges_and_key_.col_range_.second = ranges_and_key_.col_range_.first + (descriptor.field_count() - descriptor.index().field_count());
ARCTICDB_TRACE(log::codec(), "Creating segment");
SegmentInMemory segment_in_memory(std::move(descriptor));
decode_into_memory_segment(seg, hdr, segment_in_memory, desc);
decode_into_memory_segment(*seg, hdr, segment_in_memory, desc);
return pipelines::SegmentAndSlice(std::move(ranges_and_key_), std::move(segment_in_memory));
}

Expand Down
15 changes: 10 additions & 5 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,12 @@ struct CopyCompressedTask : BaseTask {
VariantKey copy() {
return std::visit([that = this](const auto &source_key) {
auto key_seg = that->lib_->read(source_key);
auto target_key_seg = stream::make_target_key<ClockType>(that->key_type_, that->stream_id_, that->version_id_, source_key, std::move(key_seg.segment()));
auto target_key_seg = stream::make_target_key<ClockType>(
that->key_type_,
that->stream_id_,
that->version_id_,
source_key,
std::move(*key_seg.release_segment()));
auto return_key = target_key_seg.variant_key();
that->lib_->write(Composite<storage::KeySegmentPair>{std::move(target_key_seg) });
return return_key;
Expand Down Expand Up @@ -406,7 +411,7 @@ struct DecodeSegmentTask : BaseTask {
ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

return {key_seg.variant_key(), decode_segment(std::move(key_seg.segment()))};
return {key_seg.variant_key(), decode_segment(*key_seg.segment_ptr())};
}
};

Expand Down Expand Up @@ -543,7 +548,7 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeTimeseriesDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor(key_seg.segment());
auto maybe_desc = decode_timeseries_descriptor(*key_seg.segment_ptr());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
Expand All @@ -566,7 +571,7 @@ struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask {
"DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment());
auto maybe_desc = decode_timeseries_descriptor_for_incompletes(*key_seg.segment_ptr());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
Expand All @@ -585,7 +590,7 @@ struct DecodeMetadataAndDescriptorTask : BaseTask {
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(log::storage(), "DecodeMetadataAndDescriptorTask decoding segment with key {}", variant_key_view(key_seg.variant_key()));

auto [any, descriptor] = decode_metadata_and_descriptor_fields(key_seg.segment());
auto [any, descriptor] = decode_metadata_and_descriptor_fields(*key_seg.segment_ptr());
return std::make_tuple(
std::move(key_seg.variant_key()),
std::move(any),
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,7 @@ void decode_into_memory_segment(
decode_v1(segment, hdr, res, desc);
}

SegmentInMemory decode_segment(Segment&& s) {
auto segment = std::move(s);
SegmentInMemory decode_segment(Segment& segment) {
auto &hdr = segment.header();
ARCTICDB_TRACE(log::codec(), "Decoding descriptor: {}", segment.descriptor());
auto descriptor = segment.descriptor();
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/codec/codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ EncodedFieldCollection decode_encoded_fields(
const uint8_t* data,
const uint8_t* begin ARCTICDB_UNUSED);

SegmentInMemory decode_segment(Segment&& segment);
SegmentInMemory decode_segment(Segment& segment);

void decode_into_memory_segment(
const Segment& segment,
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/codec/test/test_codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ TYPED_TEST(SegmentStringEncodingTest, EncodeSingleString) {
constexpr EncodingVersion encoding_version = TypeParam::value;
Segment seg = encode_dispatch(s.clone(), opt, encoding_version);

SegmentInMemory res = decode_segment(std::move(seg));
SegmentInMemory res = decode_segment(seg);
ASSERT_EQ(copy.string_at(0, 1), res.string_at(0, 1));
ASSERT_EQ(std::string("happy"), res.string_at(0, 1));
}
Expand All @@ -346,7 +346,7 @@ TYPED_TEST(SegmentStringEncodingTest, EncodeStringsBasic) {
constexpr EncodingVersion encoding_version = TypeParam::value;
Segment seg = encode_dispatch(SegmentInMemory{s}, opt, encoding_version);

SegmentInMemory res = decode_segment(std::move(seg));
SegmentInMemory res = decode_segment(seg);
ASSERT_EQ(copy.string_at(0, 1), res.string_at(0, 1));
ASSERT_EQ(std::string("happy"), res.string_at(0, 1));
ASSERT_EQ(copy.string_at(1, 3), res.string_at(1, 3));
Expand Down
10 changes: 4 additions & 6 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,8 @@ bool remaining_fields_empty(IteratorType it, const PipelineContextRow& context)
void decode_into_frame_static(
SegmentInMemory &frame,
PipelineContextRow &context,
Segment &&s,
Segment& seg,
const std::shared_ptr<BufferHolder>& buffers) {
auto seg = std::move(s);
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
const uint8_t *data = seg.buffer().data();
const uint8_t *begin = data;
Expand Down Expand Up @@ -379,11 +378,10 @@ void decode_into_frame_static(
void decode_into_frame_dynamic(
SegmentInMemory& frame,
PipelineContextRow& context,
Segment&& s,
Segment& seg,
const std::shared_ptr<BufferHolder>& buffers
) {
ARCTICDB_SAMPLE_DEFAULT(DecodeIntoFrame)
auto seg = std::move(s);
const uint8_t *data = seg.buffer().data();
const uint8_t *begin = data;
const uint8_t *end = begin + seg.buffer().bytes();
Expand Down Expand Up @@ -1170,9 +1168,9 @@ folly::Future<std::vector<VariantKey>> fetch_data(
[row=row, frame=frame, dynamic_schema=dynamic_schema, buffers](auto &&ks) mutable {
auto key_seg = std::forward<storage::KeySegmentPair>(ks);
if(dynamic_schema)
decode_into_frame_dynamic(frame, row, std::move(key_seg.segment()), buffers);
decode_into_frame_dynamic(frame, row, *key_seg.segment_ptr(), buffers);
else
decode_into_frame_static(frame, row, std::move(key_seg.segment()), buffers);
decode_into_frame_static(frame, row, *key_seg.segment_ptr(), buffers);
return key_seg.variant_key();
});
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/read_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ folly::Future<std::vector<VariantKey>> fetch_data(
void decode_into_frame_static(
SegmentInMemory &frame,
PipelineContextRow &context,
Segment &&seg,
Segment& seg,
const std::shared_ptr<BufferHolder>& buffers
);

void decode_into_frame_dynamic(
SegmentInMemory &frame,
PipelineContextRow &context,
Segment &&seg,
Segment& seg,
const std::shared_ptr<BufferHolder>& buffers
);

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_client_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class AzureClientWrapper {
using Config = arcticdb::proto::azure_storage::Config;
virtual void write_blob(
const std::string& blob_name,
Segment&& segment,
Segment& segment,
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
unsigned int request_timeout) = 0;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::optional<Azure::Core::RequestFailedException> has_failure_trigger(const std

void MockAzureClient::write_blob(
const std::string& blob_name,
arcticdb::Segment&& segment,
arcticdb::Segment& segment,
const Azure::Storage::Blobs::UploadBlockBlobFromOptions&,
unsigned int) {

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_mock_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class MockAzureClient : public AzureClientWrapper {

void write_blob(
const std::string& blob_name,
Segment&& segment,
Segment& segment,
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
unsigned int request_timeout) override;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_real_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Azure::Storage::Blobs::BlobClientOptions RealAzureClient::get_client_options(con

void RealAzureClient::write_blob(
const std::string& blob_name,
Segment&& segment,
Segment& segment,
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
unsigned int request_timeout) {

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_real_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RealAzureClient : public AzureClientWrapper {

void write_blob(
const std::string& blob_name,
Segment&& segment,
Segment& segment,
const Azure::Storage::Blobs::UploadBlockBlobFromOptions& upload_option,
unsigned int request_timeout) override;

Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ void do_write_impl(
for (auto& kv : group.values()) {
auto& k = kv.variant_key();
auto blob_name = object_path(b.bucketize(key_type_dir, k), k);
auto& seg = kv.segment();

try {
azure_client.write_blob(blob_name, std::move(seg), upload_option, request_timeout);
azure_client.write_blob(blob_name, *kv.segment_ptr(), upload_option, request_timeout);
}
catch (const Azure::Core::RequestFailedException& e) {
raise_azure_exception(e, blob_name);
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/storage/file/mapped_file_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void MappedFileStorage::init() {

SegmentInMemory MappedFileStorage::read_segment(size_t offset, size_t bytes) const {
auto index_segment = Segment::from_bytes(file_.data() + offset, bytes);
return decode_segment(std::move(index_segment));
return decode_segment(index_segment);
}

void MappedFileStorage::do_load_header(size_t header_offset, size_t header_size) {
Expand All @@ -75,7 +75,7 @@ uint64_t MappedFileStorage::get_data_offset(const Segment& seg) {
return previous_offset;
}

uint64_t MappedFileStorage::write_segment(Segment&& seg) {
uint64_t MappedFileStorage::write_segment(Segment& seg) {
auto segment = std::move(seg);
auto offset = get_data_offset(segment);
auto* data = file_.data() + offset;
Expand All @@ -89,7 +89,7 @@ void MappedFileStorage::do_write(Composite<KeySegmentPair>&& kvs) {
ARCTICDB_SAMPLE(MappedFileStorageWriteValues, 0)
auto key_values = std::move(kvs);
key_values.broadcast([this] (auto key_seg) {
const auto offset = write_segment(std::move(key_seg.segment()));
const auto offset = write_segment(*key_seg.segment_ptr());
const auto size = key_seg.segment().size();
multi_segment_header_.add_key_and_offset(key_seg.atom_key(), offset, size);
});
Expand Down Expand Up @@ -129,7 +129,7 @@ void MappedFileStorage::do_finalize(KeyData key_data) {
auto header_segment = encode_dispatch(multi_segment_header_.detach_segment(),
config_.codec_opts(),
EncodingVersion{static_cast<uint16_t>(config_.encoding_version())});
write_segment(std::move(header_segment));
write_segment(header_segment);
auto pos = reinterpret_cast<KeyData*>(file_.data() + offset_);
*pos = key_data;
ARCTICDB_DEBUG(log::storage(), "Finalizing mapped file, writing key data {}", *pos);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class MappedFileStorage final : public SingleFileStorage {

void do_load_header(size_t header_offset, size_t header_size) override;

uint64_t write_segment(Segment&& seg);
uint64_t write_segment(Segment& seg);

uint8_t* do_read_raw(size_t offset, size_t bytes) override;

Expand Down
12 changes: 9 additions & 3 deletions cpp/arcticdb/storage/key_segment_pair.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ namespace arcticdb::storage {

ARCTICDB_MOVE_COPY_DEFAULT(KeySegmentPair)

// TODO aseaton remove
Segment& segment() {
std::unique_ptr<Segment> release_segment() {
util::check(segment_, "Attempting to access segment_ but it has not been set");
return *segment_;
auto tmp = std::make_unique<Segment>(std::move(*segment_));
segment_ = std::make_shared<Segment>();
return tmp;
}

[[nodiscard]] std::shared_ptr<Segment> segment_ptr() const {
util::check(segment_, "Attempting to access segment_ptr it is empty");
return segment_;
}

Expand All @@ -50,6 +52,10 @@ namespace arcticdb::storage {
key_ = std::make_shared<VariantKey>(std::forward<T>(key));
}

void set_segment(Segment&& segment) {
segment_ = std::make_shared<Segment>(std::move(segment));
}

[[nodiscard]] const AtomKey &atom_key() const {
util::check(std::holds_alternative<AtomKey>(variant_key()), "Expected atom key access");
return std::get<AtomKey >(variant_key());
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class Library {
KeySegmentPair res{VariantKey{key}};
util::check(!std::holds_alternative<StringId>(variant_key_id(key)) || !std::get<StringId>(variant_key_id(key)).empty(), "Unexpected empty id");
const ReadVisitor& visitor = [&res](const VariantKey&, Segment&& value) {
res.segment() = std::move(value);
res.set_segment(std::move(value));
};

read(Composite<VariantKey>(std::move(key)), visitor, opts);
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_client_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class LmdbClientWrapper {
virtual void write(
const std::string& db_name,
std::string& path,
Segment&& segment,
Segment& segment,
::lmdb::txn& txn,
::lmdb::dbi& dbi,
int64_t overwrite_flag) = 0;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/storage/lmdb/lmdb_mock_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ std::optional<Segment> MockLmdbClient::read(const std::string& db_name, std::str
return std::make_optional<Segment>(lmdb_contents_.at(key).clone());
}

void MockLmdbClient::write(const std::string& db_name, std::string& path, arcticdb::Segment&& segment,
void MockLmdbClient::write(const std::string& db_name, std::string& path, arcticdb::Segment& segment,
::lmdb::txn&, ::lmdb::dbi&, int64_t) {
LmdbKey key = {db_name, path};
raise_if_has_failure_trigger(key, StorageOperation::WRITE);

if(has_key(key)) {
raise_key_exists_error(lmdb_operation_string(StorageOperation::WRITE));
} else {
lmdb_contents_.try_emplace(key, std::move(segment));
lmdb_contents_.try_emplace(key, segment.clone());
}
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_mock_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class MockLmdbClient : public LmdbClientWrapper {
void write(
const std::string& db_name,
std::string& path,
Segment&& segment,
Segment& segment,
::lmdb::txn& txn,
::lmdb::dbi& dbi,
int64_t overwrite_flag) override;
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_real_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::optional<Segment> RealLmdbClient::read(const std::string&, std::string& pat
return segment;
}

void RealLmdbClient::write(const std::string&, std::string& path, arcticdb::Segment&& seg,
void RealLmdbClient::write(const std::string&, std::string& path, arcticdb::Segment& seg,
::lmdb::txn& txn, ::lmdb::dbi& dbi, int64_t overwrite_flag) {
MDB_val mdb_key{path.size(), path.data()};

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_real_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RealLmdbClient : public LmdbClientWrapper {
void write(
const std::string& db_name,
std::string& path,
Segment&& segment,
Segment& segment,
::lmdb::txn& txn,
::lmdb::dbi& dbi,
int64_t overwrite_flag) override;
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/storage/lmdb/lmdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ void LmdbStorage::do_write_internal(Composite<KeySegmentPair>&& kvs, ::lmdb::txn
for (auto &kv : group.values()) {
ARCTICDB_DEBUG(log::storage(), "Lmdb storage writing segment with key {}", kv.key_view());
auto k = to_serialized_key(kv.variant_key());
auto &seg = kv.segment();
int64_t overwrite_flag = std::holds_alternative<RefKey>(kv.variant_key()) ? 0 : MDB_NOOVERWRITE;
try {
lmdb_client_->write(db_name, k, std::move(seg), txn, dbi, overwrite_flag);
lmdb_client_->write(db_name, k, *kv.segment_ptr(), txn, dbi, overwrite_flag);
} catch (const ::lmdb::key_exist_error& e) {
throw DuplicateKeyException(fmt::format("Key already exists: {}: {}", kv.variant_key(), e.what()));
} catch (const ::lmdb::error& ex) {
Expand Down
Loading

0 comments on commit 2058aef

Please sign in to comment.