From 498c3311dc08f5c8872ccac0ef3dfb278776699c Mon Sep 17 00:00:00 2001 From: Alex Seaton Date: Fri, 9 Aug 2024 21:50:05 +0100 Subject: [PATCH] Fix Segment use-after-move when replicating to NFS (#1756) ## Motivation (this section copied from previous (closed) attempt - https://github.com/man-group/ArcticDB/pull/1746) The motivation for the change is to allow `arcticdb-enterprise` to copy blocks to NFS storages without a use-after-move. I explained this in https://github.com/man-group/arcticdb-enterprise/pull/139 but to have an open record: CopyCompressedInterStoreTask has: ``` // Don't bother copying the key segment pair when writing to the final target if (it == std::prev(target_stores_.end())) { (*it)->write_compressed_sync(std::move(key_segment_pair)); } else { auto key_segment_pair_copy = key_segment_pair; (*it)->write_compressed_sync(std::move(key_segment_pair_copy)); } ``` KeySegmentPair has a shared_ptr to a KeySegmentPair, which we can think of here as just a `Segment`. Therefore the old `key_segment_pair_copy` is shallow, the underlying Segment is the same. But the segment eventually gets passed as an rvalue reference further down the stack. In `do_write_impl` we call `put_object` which calls `serialize_header`. This modifies the segment in place and passes that buffer to the AWS SDK. In the `NfsBackedStorage` we have: ``` void NfsBackedStorage::do_write(Composite&& kvs) { auto enc = kvs.transform([] (auto&& key_seg) { return KeySegmentPair{encode_object_id(key_seg.variant_key()), std::move(key_seg.segment())}; }); s3::detail::do_write_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } ``` where the segment gets moved from. Subsequent attempts to use the segment (eg copying on to the next store) then fail. https://github.com/man-group/arcticdb-enterprise/pull/139 fixed this issue by cloning the segment, but this approach avoids the (expensive) clone. ## Logical Change Copy the `KeySegmentPair`'s pointer to the `Segment` in `nfs_backed_storage.cpp` rather than moving from the segment. ## Refactor and Testing ### Copy Task Move the CopyCompressedInterStoreTask down to ArcticDB from arcticdb-enterprise. Add a test for it on NFS storage. I've verified that the tests in this commit fail without the refactor in the HEAD~1 commit. The only changes to `CopyCompressedInterstoreTask` from enterprise are: - Pass the `KeySegmentPair` by value in to `write_compressed{_sync}`. The `KeySegmentPair` is cheap to copy (especially considering we are about to copy an object across storages, likely with a network hop). - We have adopted the new `set_key` API of `KeySegmentPair`: ``` if (key_to_write_.has_value()) { key_segment_pair.set_key(*key_to_write_); } ``` - We have namespaced the `ProcessingResult` struct in to the task ### KeySegmentPair - Replace methods returning mutable lvalue references to keys with a `set_key` method. - Remove the `release_segment` method as it dangerously leaves the `KeySegmentPair` pointing at a `Segment` object that has been moved from, and it is not actually necessary. ## Follow up work The non-const `Segment& KeySegmentPair#segment()` API is still dangerous and error prone. I have a follow up change to remove it, but that API change affects very many files and will be best raised separately so that it doesn't block this fix for replication. A draft PR showing a proposal for that change is here - https://github.com/man-group/ArcticDB/pull/1757 . --- cpp/arcticdb/CMakeLists.txt | 2 + cpp/arcticdb/async/async_store.hpp | 4 +- cpp/arcticdb/async/bit_rate_stats.cpp | 50 ++++++ cpp/arcticdb/async/bit_rate_stats.hpp | 30 ++++ cpp/arcticdb/async/tasks.cpp | 4 +- cpp/arcticdb/async/tasks.hpp | 131 ++++++++++++--- cpp/arcticdb/async/test/test_async.cpp | 105 +++++++++--- cpp/arcticdb/storage/common.hpp | 6 + cpp/arcticdb/storage/key_segment_pair.hpp | 65 +++----- .../storage/s3/nfs_backed_storage.cpp | 15 +- .../storage/s3/nfs_backed_storage.hpp | 1 - cpp/arcticdb/storage/storage.hpp | 2 +- cpp/arcticdb/storage/test/in_memory_store.hpp | 4 +- cpp/arcticdb/storage/test/test_embedded.cpp | 6 +- .../storage/test/test_mongo_storage.cpp | 6 +- cpp/arcticdb/stream/stream_sink.hpp | 9 +- cpp/arcticdb/stream/test/mock_stores.hpp | 151 ------------------ cpp/arcticdb/toolbox/library_tool.cpp | 2 +- cpp/arcticdb/util/preconditions.hpp | 13 +- cpp/arcticdb/util/test/config_common.hpp | 22 ++- cpp/arcticdb/version/snapshot.cpp | 2 +- .../arcticc/pb2/nfs_backed_storage.proto | 1 + 22 files changed, 360 insertions(+), 271 deletions(-) create mode 100644 cpp/arcticdb/async/bit_rate_stats.cpp create mode 100644 cpp/arcticdb/async/bit_rate_stats.hpp delete mode 100644 cpp/arcticdb/stream/test/mock_stores.hpp diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 29521b0981..b072eafbe1 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -180,6 +180,7 @@ set(arcticdb_srcs # header files async/async_store.hpp async/batch_read_args.hpp + async/bit_rate_stats.hpp async/task_scheduler.hpp async/tasks.hpp codec/codec.hpp @@ -389,6 +390,7 @@ set(arcticdb_srcs version/version_utils.hpp # CPP files async/async_store.cpp + async/bit_rate_stats.cpp async/task_scheduler.cpp async/tasks.cpp codec/codec.cpp diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index 29b9428f33..e5a036b0cf 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -151,11 +151,11 @@ bool is_path_valid(const std::string_view path) const override { return library_->is_path_valid(path); } -folly::Future write_compressed(storage::KeySegmentPair &&ks) override { +folly::Future write_compressed(storage::KeySegmentPair ks) override { return async::submit_io_task(WriteCompressedTask{std::move(ks), library_}); } -void write_compressed_sync(storage::KeySegmentPair &&ks) override { +void write_compressed_sync(storage::KeySegmentPair ks) override { library_->write(Composite(std::move(ks))); } diff --git a/cpp/arcticdb/async/bit_rate_stats.cpp b/cpp/arcticdb/async/bit_rate_stats.cpp new file mode 100644 index 0000000000..0ab72573fd --- /dev/null +++ b/cpp/arcticdb/async/bit_rate_stats.cpp @@ -0,0 +1,50 @@ +#include "bit_rate_stats.hpp" + +#include + +#include "log/log.hpp" +#include "util/format_bytes.hpp" +#include "entity/performance_tracing.hpp" + +constexpr uint64_t max_bytes{0xFFFFFFFFFF}; +constexpr uint64_t max_time_ms{0xFFFFFF}; +constexpr arcticdb::entity::timestamp log_frequency_ns{60LL * 1000L * 1000L * 1000L}; + +namespace arcticdb::async { + + BitRateStats::BitRateStats(): + last_log_time_ns_(util::SysClock::coarse_nanos_since_epoch()) + {} + + void BitRateStats::add_stat(std::size_t bytes, double time_ms) { + auto now = util::SysClock::coarse_nanos_since_epoch(); + uint64_t stat = data_to_stat(bytes, time_ms); + auto previous_stats = stats_.fetch_add(stat); + auto current_stats = previous_stats + stat; + if (now - last_log_time_ns_ > log_frequency_ns && stats_.compare_exchange_strong(current_stats, 0)) { + last_log_time_ns_ = now; + log_stats(current_stats); + } + } + + uint64_t BitRateStats::data_to_stat(std::size_t bytes, double time_ms) const { + if (UNLIKELY(bytes > max_bytes || time_ms > max_time_ms)) { + log::storage().warn("Bit rate stats provided too large to represent, ignoring: {} in {}ms", + format_bytes(bytes), + time_ms); + return 0; + } + uint64_t stat{(bytes << 24) + static_cast(time_ms)}; + return stat; + } + + void BitRateStats::log_stats(uint64_t stats) const { + double time_s = static_cast(stats & max_time_ms) / 1000; + double bytes = static_cast(stats >> 24); + double bandwidth = bytes / time_s; + log::storage().info("Byte rate {}/s", format_bytes(bandwidth)); + std::string log_msg = "Current BW is " + format_bytes(bandwidth)+"/s"; + ARCTICDB_SAMPLE_LOG(log_msg.c_str()); + } + +} // arcticdb::async diff --git a/cpp/arcticdb/async/bit_rate_stats.hpp b/cpp/arcticdb/async/bit_rate_stats.hpp new file mode 100644 index 0000000000..47e7375450 --- /dev/null +++ b/cpp/arcticdb/async/bit_rate_stats.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +#include "arcticdb/util/clock.hpp" +#include "arcticdb/util/constructors.hpp" + +namespace arcticdb::async { + + class BitRateStats { + public: + BitRateStats(); + void add_stat(std::size_t bytes, double time_ms); + + ARCTICDB_NO_MOVE_OR_COPY(BitRateStats) + private: + uint64_t data_to_stat(std::size_t bytes, double time_ms) const; + void log_stats(uint64_t stats) const; + + // Use an 8 byte atomic for lock free implementation + // Upper 5 bytes represent the number of bytes of data transferred (giving max representable value of 1TB) + // Lower 3 bytes represent the total time in milliseconds (giving max representable value of 4.5 hours) + std::atomic_uint64_t stats_{0}; + + entity::timestamp last_log_time_ns_; + }; + +} // arcticdb::async + diff --git a/cpp/arcticdb/async/tasks.cpp b/cpp/arcticdb/async/tasks.cpp index 2a8d4e8731..b209f8528e 100644 --- a/cpp/arcticdb/async/tasks.cpp +++ b/cpp/arcticdb/async/tasks.cpp @@ -36,8 +36,8 @@ namespace arcticdb::async { } pipelines::SegmentAndSlice DecodeSliceTask::decode_into_slice(storage::KeySegmentPair&& key_segment_pair) { - auto key = std::move(key_segment_pair.atom_key()); - auto seg = std::move(key_segment_pair.release_segment()); + auto key = key_segment_pair.atom_key(); + auto& seg = key_segment_pair.segment(); ARCTICDB_DEBUG(log::storage(), "ReadAndDecodeAtomTask decoding segment of size {} with key {}", seg.size(), key); diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 71c89f7a86..3e564e9504 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -19,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -248,32 +250,6 @@ struct PassThroughTask : BaseTask { } }; -struct ReadCompressedSlicesTask : BaseTask { - Composite slice_and_keys_; - std::shared_ptr lib_; - - ReadCompressedSlicesTask(Composite&& sk, std::shared_ptr lib) - : slice_and_keys_(std::move(sk)), - lib_(std::move(lib)) { - ARCTICDB_DEBUG(log::storage(), "Creating read compressed slices task for slice and key {}", - slice_and_keys_); - } - - ARCTICDB_MOVE_ONLY_DEFAULT(ReadCompressedSlicesTask) - - Composite> read() { - return slice_and_keys_.transform([that=this](const auto &sk){ - ARCTICDB_DEBUG(log::version(), "Reading key {}", sk.key()); - return std::make_pair(that->lib_->read(sk.key()).release_segment(), sk); - }); - } - - Composite> operator()() { - ARCTICDB_SAMPLE(ReadCompressed, 0) - return read(); - } -}; - template struct CopyCompressedTask : BaseTask { entity::VariantKey source_key_; @@ -315,6 +291,109 @@ struct CopyCompressedTask : BaseTask { } }; +// Used in arcticdb-enterprise, do not remove without checking whether it is still used there +struct CopyCompressedInterStoreTask : async::BaseTask { + + using AllOk = std::monostate; + using FailedTargets = std::unordered_set; + using ProcessingResult = std::variant; + + CopyCompressedInterStoreTask(entity::VariantKey key_to_read, + std::optional key_to_write, + bool check_key_exists_on_targets, + bool retry_on_failure, + std::shared_ptr source_store, + std::vector> target_stores, + std::shared_ptr bit_rate_stats=nullptr) + : key_to_read_(std::move(key_to_read)), + key_to_write_(std::move(key_to_write)), + check_key_exists_on_targets_(check_key_exists_on_targets), + retry_on_failure_(retry_on_failure), + source_store_(std::move(source_store)), + target_stores_(std::move(target_stores)), + bit_rate_stats_(std::move(bit_rate_stats)){ + ARCTICDB_DEBUG(log::storage(), "Creating copy compressed inter-store task from key {}: {} -> {}: {}", + variant_key_type(key_to_read_), + variant_key_view(key_to_read_), + key_to_write_.has_value() ? variant_key_type(key_to_write_.value()) : variant_key_type(key_to_read_), + key_to_write_.has_value() ? variant_key_view(key_to_write_.value()) : variant_key_view(key_to_read_)); + } + + ARCTICDB_MOVE_ONLY_DEFAULT(CopyCompressedInterStoreTask) + + ProcessingResult operator()() { + auto res = copy(); + + if (!res.empty() && retry_on_failure_) { + res = copy(); + } + + if (!res.empty()) { + return res; + } + + return AllOk{}; + } + +private: + entity::VariantKey key_to_read_; + std::optional key_to_write_; + bool check_key_exists_on_targets_; + bool retry_on_failure_; + std::shared_ptr source_store_; + std::vector> target_stores_; + std::shared_ptr bit_rate_stats_; + + // Returns an empty set if the copy succeeds, otherwise the set contains the names of the target stores that failed + std::unordered_set copy() { + ARCTICDB_SAMPLE(copy, 0) + std::size_t bytes{0}; + interval timer; + timer.start(); + if (check_key_exists_on_targets_) { + target_stores_.erase(std::remove_if(target_stores_.begin(), target_stores_.end(), + [that=this](const std::shared_ptr& target_store) { + return target_store->key_exists_sync(that->key_to_read_); + }), target_stores_.end()); + } + std::unordered_set failed_targets; + if (!target_stores_.empty()) { + storage::KeySegmentPair key_segment_pair; + try { + key_segment_pair = source_store_->read_compressed_sync(key_to_read_, storage::ReadKeyOpts{}); + } catch (const storage::KeyNotFoundException& e) { + log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what()); + return failed_targets; + } + bytes = key_segment_pair.segment().size(); + if (key_to_write_.has_value()) { + key_segment_pair.set_key(*key_to_write_); + } + + for (auto & target_store : target_stores_) { + try { + target_store->write_compressed_sync(key_segment_pair); + } catch (const storage::DuplicateKeyException& e) { + log::storage().debug("Key {} already exists on the target: {}", variant_key_view(key_to_read_), e.what()); + } catch (const storage::KeyNotFoundException& e) { + log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what()); + } catch (const std::exception& e) { + auto name = target_store->name(); + log::storage().error("Failed to write key {} to store {}: {}", variant_key_view(key_to_read_), name, e.what()); + failed_targets.insert(name); + } + } + } + timer.end(); + auto time_ms = timer.get_results_total() * 1000; + if (bit_rate_stats_) { + bit_rate_stats_->add_stat(bytes, time_ms); + } + + return failed_targets; + } +}; + struct DecodeSegmentTask : BaseTask { ARCTICDB_MOVE_ONLY_DEFAULT(DecodeSegmentTask) diff --git a/cpp/arcticdb/async/test/test_async.cpp b/cpp/arcticdb/async/test/test_async.cpp index e24d291037..a56793c679 100644 --- a/cpp/arcticdb/async/test/test_async.cpp +++ b/cpp/arcticdb/async/test/test_async.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -20,7 +21,7 @@ #include #include -namespace ac = arcticdb; +using namespace arcticdb; namespace aa = arcticdb::async; namespace as = arcticdb::storage; namespace asl = arcticdb::storage::lmdb; @@ -41,18 +42,18 @@ TEST(Async, SinkBasic) { auto codec_opt = std::make_shared(); aa::TaskScheduler sched{1}; - auto seg = ac::SegmentInMemory(); + auto seg = SegmentInMemory(); aa::EncodeAtomTask enc{ - ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::NumericId{123}, ac::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2 + entity::KeyType::GENERATION, entity::VersionId{6}, NumericId{123}, NumericId{456}, timestamp{457}, entity::NumericIndex{999}, std::move(seg), codec_opt, EncodingVersion::V2 }; auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get(); - ac::HashAccum h; + HashAccum h; auto default_content_hash = h.digest(); - ASSERT_EQ(ac::entity::atom_key_builder().gen_id(6).start_index(456).end_index(457).creation_ts(999) - .content_hash(default_content_hash).build(ac::NumericId{123}, ac::entity::KeyType::GENERATION), + ASSERT_EQ(entity::atom_key_builder().gen_id(6).start_index(456).end_index(457).creation_ts(999) + .content_hash(default_content_hash).build(NumericId{123}, entity::KeyType::GENERATION), to_atom(v) ); } @@ -70,30 +71,30 @@ TEST(Async, DeDupTest) { as::UserAuth au{"abc"}; auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, au); auto codec_opt = std::make_shared(); - aa::AsyncStore store(lib, *codec_opt, ac::EncodingVersion::V2); - auto seg = ac::SegmentInMemory(); + aa::AsyncStore store(lib, *codec_opt, EncodingVersion::V2); + auto seg = SegmentInMemory(); - std::vector> key_segments; + std::vector> key_segments; - key_segments.emplace_back(ast::StreamSink::PartialKey{ ac::entity::KeyType::TABLE_DATA, 1, "", ac::entity::NumericIndex{0}, ac::entity::NumericIndex{1} }, seg); - key_segments.emplace_back(ast::StreamSink::PartialKey{ ac::entity::KeyType::TABLE_DATA, 2, "", ac::entity::NumericIndex{1}, ac::entity::NumericIndex{2} }, seg); + key_segments.emplace_back(ast::StreamSink::PartialKey{ entity::KeyType::TABLE_DATA, 1, "", entity::NumericIndex{0}, entity::NumericIndex{1} }, seg); + key_segments.emplace_back(ast::StreamSink::PartialKey{ entity::KeyType::TABLE_DATA, 2, "", entity::NumericIndex{1}, entity::NumericIndex{2} }, seg); - ac::HashAccum h; + HashAccum h; auto default_content_hash = h.digest(); - auto de_dup_map = std::make_shared(); - auto k = ac::entity::atom_key_builder().gen_id(3).start_index(0).end_index(1).creation_ts(999) - .content_hash(default_content_hash).build("", ac::entity::KeyType::TABLE_DATA); + auto de_dup_map = std::make_shared(); + auto k = entity::atom_key_builder().gen_id(3).start_index(0).end_index(1).creation_ts(999) + .content_hash(default_content_hash).build("", entity::KeyType::TABLE_DATA); de_dup_map->insert_key(k); std::vector> slice_key_futures; for(auto& [key, segment] : key_segments) { - auto input = std::make_tuple(std::move(key), std::move(segment), {}); + auto input = std::make_tuple(std::move(key), std::move(segment), {}); auto fut = folly::makeFuture(std::move(input)); slice_key_futures.emplace_back(store.async_write(std::move(fut), de_dup_map)); } auto slice_keys = folly::collect(slice_key_futures).get(); - std::vector keys; + std::vector keys; for(const auto& slice_key : slice_keys) keys.emplace_back(slice_key.key()); @@ -290,4 +291,72 @@ TEST(Async, NumCoresCgroupV2) { ASSERT_THROW(arcticdb::async::get_default_num_cpus(test_path), std::invalid_argument); #endif -} \ No newline at end of file +} + +std::shared_ptr create_store(const storage::LibraryPath &library_path, + as::LibraryIndex &library_index, + const storage::UserAuth &user_auth, + std::shared_ptr &codec_opt) { + auto lib = library_index.get_library(library_path, as::OpenMode::WRITE, user_auth); + auto store = aa::AsyncStore(lib, *codec_opt, EncodingVersion::V1); + return std::make_shared>(std::move(store)); +} + +TEST(Async, CopyCompressedInterStore) { + using namespace arcticdb::async; + + // Given + as::EnvironmentName environment_name{"research"}; + as::StorageName storage_name("storage_name"); + as::LibraryPath library_path{"a", "b"}; + namespace ap = arcticdb::pipelines; + + auto config = proto::nfs_backed_storage::Config(); + config.set_use_mock_storage_for_testing(true); + + auto env_config = arcticdb::get_test_environment_config( + library_path, storage_name, environment_name, std::make_optional(config)); + auto config_resolver = as::create_in_memory_resolver(env_config); + as::LibraryIndex library_index{environment_name, config_resolver}; + + as::UserAuth user_auth{"abc"}; + auto codec_opt = std::make_shared(); + + auto source_store = create_store(library_path, library_index, user_auth, codec_opt); + + // When - we write a key to the source and copy it + const arcticdb::entity::RefKey& key = arcticdb::entity::RefKey{"abc", KeyType::VERSION_REF}; + auto segment_in_memory = get_test_frame("symbol", {}, 10, 0).segment_; + auto row_count = segment_in_memory.row_count(); + ASSERT_GT(row_count, 0); + auto segment = encode_dispatch(std::move(segment_in_memory), *codec_opt, arcticdb::EncodingVersion::V1); + (void)segment.calculate_size(); + source_store->write_compressed_sync(as::KeySegmentPair{key, std::move(segment)}); + + auto targets = std::vector>{ + create_store(library_path, library_index, user_auth, codec_opt), + create_store(library_path, library_index, user_auth, codec_opt), + create_store(library_path, library_index, user_auth, codec_opt) + }; + + CopyCompressedInterStoreTask task{ + key, + std::nullopt, + false, + false, + source_store, + targets, + std::shared_ptr() + }; + + arcticdb::async::TaskScheduler sched{1}; + auto res = sched.submit_io_task(std::move(task)).get(); + + // Then + ASSERT_TRUE(std::holds_alternative(res)); + for (const auto& target_store : targets) { + auto read_result = target_store->read_sync(key); + ASSERT_EQ(std::get(read_result.first), key); + ASSERT_EQ(read_result.second.row_count(), row_count); + } +} diff --git a/cpp/arcticdb/storage/common.hpp b/cpp/arcticdb/storage/common.hpp index 97ef858af1..09072bf624 100644 --- a/cpp/arcticdb/storage/common.hpp +++ b/cpp/arcticdb/storage/common.hpp @@ -27,6 +27,12 @@ using StorageName = util::StringWrappingValue; struct InstanceUriTag{}; using InstanceUri = util::StringWrappingValue; +template +requires std::is_same_v || std::is_same_v +bool operator==(const T &l, const T &r) { + return l.value == r.value; +} + /* * Placeholder class for now */ diff --git a/cpp/arcticdb/storage/key_segment_pair.hpp b/cpp/arcticdb/storage/key_segment_pair.hpp index 7833ee96bf..7baa92e6ed 100644 --- a/cpp/arcticdb/storage/key_segment_pair.hpp +++ b/cpp/arcticdb/storage/key_segment_pair.hpp @@ -18,36 +18,36 @@ namespace arcticdb::storage { * not contain any positioning information for the contained data. */ class KeySegmentPair { - struct KeySegmentData { - VariantKey key_; - Segment segment_; - KeySegmentData() = default; - explicit KeySegmentData(VariantKey &&key) : key_(std::move(key)), segment_() {} - KeySegmentData(VariantKey &&key, Segment &&segment) : key_(std::move(key)), segment_(std::move(segment)) {} + public: + KeySegmentPair() = default; + explicit KeySegmentPair(VariantKey &&key) + : key_(std::make_shared(std::move(key))) {} - ARCTICDB_NO_MOVE_OR_COPY(KeySegmentData) - }; + KeySegmentPair(VariantKey &&key, Segment &&segment) + : key_(std::make_shared(std::move(key))), + segment_(std::make_shared(std::move(segment))) {} - public: - KeySegmentPair() : data_(std::make_shared()) {} - explicit KeySegmentPair(VariantKey &&key) : data_(std::make_shared(std::move(key))) {} - KeySegmentPair(VariantKey &&key, Segment&& segment) : - data_(std::make_shared(std::move(key), std::move(segment))) {} + template + KeySegmentPair(K &&key, std::shared_ptr segment) + : key_(std::make_shared(std::forward(key))), + segment_(std::move(segment)) {} ARCTICDB_MOVE_COPY_DEFAULT(KeySegmentPair) - Segment &segment() { - return data_->segment_; + // TODO aseaton remove + Segment& segment() { + util::check(segment_, "Attempting to access segment_ but it has not been set"); + return *segment_; } - Segment&& release_segment() { - return std::move(data_->segment_); + [[nodiscard]] std::shared_ptr segment_ptr() const { + return segment_; } - AtomKey &atom_key() { - util::check(std::holds_alternative(variant_key()), "Expected atom key access"); - return std::get(variant_key()); + template + void set_key(T&& key) { + key_ = std::make_shared(std::forward(key)); } [[nodiscard]] const AtomKey &atom_key() const { @@ -55,26 +55,19 @@ namespace arcticdb::storage { return std::get(variant_key()); } - RefKey &ref_key() { - util::check(std::holds_alternative(variant_key()), "Expected ref key access"); - return std::get(variant_key()); - } - [[nodiscard]] const RefKey &ref_key() const { util::check(std::holds_alternative(variant_key()), "Expected ref key access"); return std::get(variant_key()); } - VariantKey& variant_key() { - return data_->key_; - } - [[nodiscard]] const VariantKey& variant_key() const { - return data_->key_; + util::check(key_, "Attempting to access key_ but it has not been set"); + return *key_; } [[nodiscard]] const Segment &segment() const { - return data_->segment_; + util::check(segment_, "Attempting to access segment_ (const) but it has not been set"); + return *segment_; } [[nodiscard]] bool has_segment() const { @@ -90,14 +83,8 @@ namespace arcticdb::storage { } private: - std::shared_ptr data_; + std::shared_ptr key_ = std::make_shared(); + std::shared_ptr segment_ = std::make_shared(); }; - template || std::is_same_v, int> = 0> - bool operator==(const T &l, const T &r) { - return l.value == r.value; - } - - - } //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index d3705844c8..5990d4bf10 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace arcticdb::storage::nfs_backed { @@ -124,14 +125,14 @@ std::string NfsBackedStorage::name() const { void NfsBackedStorage::do_write(Composite&& kvs) { auto enc = kvs.transform([] (auto&& key_seg) { - return KeySegmentPair{encode_object_id(key_seg.variant_key()), std::move(key_seg.segment())}; + return KeySegmentPair{encode_object_id(key_seg.variant_key()), key_seg.segment_ptr()}; }); s3::detail::do_write_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } void NfsBackedStorage::do_update(Composite&& kvs, UpdateOpts) { auto enc = kvs.transform([] (auto&& key_seg) { - return KeySegmentPair{encode_object_id(key_seg.variant_key()), std::move(key_seg.segment())}; + return KeySegmentPair{encode_object_id(key_seg.variant_key()), key_seg.segment_ptr()}; }); s3::detail::do_update_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } @@ -183,11 +184,16 @@ bool NfsBackedStorage::do_key_exists(const VariantKey& key) { NfsBackedStorage::NfsBackedStorage(const LibraryPath &library_path, OpenMode mode, const Config &conf) : Storage(library_path, mode), - s3_api_(s3::S3ApiInstance::instance()), root_folder_(object_store_utils::get_root_folder(library_path)), bucket_name_(conf.bucket_name()), region_(conf.region()) { - s3_client_ = std::make_unique(s3::get_aws_credentials(conf), s3::get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + + if (conf.use_mock_storage_for_testing()) { + log::storage().warn("Using Mock S3 storage for NfsBackedStorage"); + s3_client_ = std::make_unique(); + } else { + s3_client_ = std::make_unique(s3::get_aws_credentials(conf), s3::get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, false); + } if (!conf.prefix().empty()) { ARCTICDB_DEBUG(log::version(), "prefix found, using: {}", conf.prefix()); auto prefix_path = LibraryPath::from_delim_path(conf.prefix(), '.'); @@ -201,7 +207,6 @@ NfsBackedStorage::NfsBackedStorage(const LibraryPath &library_path, OpenMode mod std::locale locale{ std::locale::classic(), new std::num_put()}; (void)std::locale::global(locale); ARCTICDB_DEBUG(log::storage(), "Opened NFS backed storage at {}", root_folder_); - s3_api_.reset(); } } //namespace arcticdb::storage::nfs_backed diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index 576e462a6a..a04c14e010 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -60,7 +60,6 @@ class NfsBackedStorage final : public Storage { const std::string& root_folder() const { return root_folder_; } const std::string& region() const { return region_; } - std::shared_ptr s3_api_; std::unique_ptr s3_client_; std::string root_folder_; std::string bucket_name_; diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index d88d511f67..f58c007828 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -128,7 +128,7 @@ class Storage { KeySegmentPair read(KeyType&& key, ReadKeyOpts opts) { KeySegmentPair key_seg; const ReadVisitor& visitor = [&key_seg](const VariantKey & vk, Segment&& value) { - key_seg.variant_key() = vk; + key_seg.set_key(vk); key_seg.segment() = std::move(value); }; diff --git a/cpp/arcticdb/storage/test/in_memory_store.hpp b/cpp/arcticdb/storage/test/in_memory_store.hpp index f232fbf35b..11fe0fc2e4 100644 --- a/cpp/arcticdb/storage/test/in_memory_store.hpp +++ b/cpp/arcticdb/storage/test/in_memory_store.hpp @@ -222,11 +222,11 @@ namespace arcticdb { return folly::makeFutureWith([&](){ return read_sync(key, opts); }); } - folly::Future write_compressed(storage::KeySegmentPair&&) override { + folly::Future write_compressed(storage::KeySegmentPair) override { util::raise_rte("Not implemented"); } - void write_compressed_sync(storage::KeySegmentPair&&) override { + void write_compressed_sync(storage::KeySegmentPair) override { util::raise_rte("Not implemented"); } diff --git a/cpp/arcticdb/storage/test/test_embedded.cpp b/cpp/arcticdb/storage/test/test_embedded.cpp index afaaf0a06d..05cd52f9c5 100644 --- a/cpp/arcticdb/storage/test/test_embedded.cpp +++ b/cpp/arcticdb/storage/test/test_embedded.cpp @@ -108,7 +108,7 @@ TEST_P(SimpleTestSuite, Example) { as::KeySegmentPair res; storage->read(k, [&](auto &&k, auto &&seg) { - res.atom_key() = std::get(k); + res.set_key(k); res.segment() = std::move(seg); res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, storage::ReadKeyOpts{}); @@ -132,7 +132,7 @@ TEST_P(SimpleTestSuite, Example) { as::KeySegmentPair update_res; storage->read(k, [&](auto &&k, auto &&seg) { - update_res.atom_key() = std::get(k); + update_res.set_key(k); update_res.segment() = std::move(seg); update_res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, as::ReadKeyOpts{}); @@ -188,7 +188,7 @@ TEST_P(SimpleTestSuite, Strings) { as::KeySegmentPair res; storage->read(save_k, [&](auto &&k, auto &&seg) { - res.atom_key() = std::get(k); + res.set_key(k); res.segment() = std::move(seg); res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, as::ReadKeyOpts{}); diff --git a/cpp/arcticdb/storage/test/test_mongo_storage.cpp b/cpp/arcticdb/storage/test/test_mongo_storage.cpp index 3827fc0020..4a9de615ba 100644 --- a/cpp/arcticdb/storage/test/test_mongo_storage.cpp +++ b/cpp/arcticdb/storage/test/test_mongo_storage.cpp @@ -42,7 +42,7 @@ TEST(MongoStorage, ClientSession) { as::KeySegmentPair res; storage.read(k, [&](auto &&k, auto &&seg) { - res.atom_key() = std::get(k); + res.set_key(k); res.segment() = std::move(seg); res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, as::ReadKeyOpts{}); @@ -70,7 +70,7 @@ TEST(MongoStorage, ClientSession) { as::KeySegmentPair update_res; storage.read(k, [&](auto &&k, auto &&seg) { - update_res.atom_key() = std::get(k); + update_res.set_key(k); update_res.segment() = std::move(seg); update_res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, as::ReadKeyOpts{}); @@ -93,7 +93,7 @@ TEST(MongoStorage, ClientSession) { as::KeySegmentPair numeric_res; storage.read(numeric_k, [&](auto &&k, auto &&seg) { - numeric_res.atom_key() = std::get(k); + numeric_res.set_key(k); numeric_res.segment() = std::move(seg); numeric_res.segment().force_own_buffer(); // necessary since the non-owning buffer won't survive the visit }, as::ReadKeyOpts{}); diff --git a/cpp/arcticdb/stream/stream_sink.hpp b/cpp/arcticdb/stream/stream_sink.hpp index 5b252e85a9..ed28a611ab 100644 --- a/cpp/arcticdb/stream/stream_sink.hpp +++ b/cpp/arcticdb/stream/stream_sink.hpp @@ -99,14 +99,9 @@ struct StreamSink { const StreamId &stream_id, SegmentInMemory &&segment) = 0; - struct BatchWriteArgs { - std::size_t lib_write_count = 0ULL; - BatchWriteArgs() : lib_write_count(0ULL) {} - }; - - [[nodiscard]] virtual folly::Future write_compressed(storage::KeySegmentPair&& ks) = 0; + [[nodiscard]] virtual folly::Future write_compressed(storage::KeySegmentPair ks) = 0; - virtual void write_compressed_sync(storage::KeySegmentPair&& ks) = 0; + virtual void write_compressed_sync(storage::KeySegmentPair ks) = 0; [[nodiscard]] virtual folly::Future async_write( folly::Future> &&input_fut, diff --git a/cpp/arcticdb/stream/test/mock_stores.hpp b/cpp/arcticdb/stream/test/mock_stores.hpp deleted file mode 100644 index 61597e50b1..0000000000 --- a/cpp/arcticdb/stream/test/mock_stores.hpp +++ /dev/null @@ -1,151 +0,0 @@ -/* Copyright 2023 Man Group Operations Limited - * - * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. - * - * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. - */ - -#pragma once - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace arcticdb { - -template -class TypeFilteredStore : - public InMemoryStore { - folly::Future iterate_type( - KeyType kt, folly::Function func, - const std::string &/*prefix*/) override { - if (kt == key_type) - for (auto &item : seg_by_atom_key_) { - fmt::print("{}", item.first); - entity::AtomKey key(item.first); - func(std::move(key)); - } - } - - folly::Future write( - KeyType kt, VersionId version_id, - StreamId stream_id, IndexValue start_index, IndexValue end_index, - SegmentInMemory &&segment - ) override { - if (key_type == kt) - InMemoryStore::write(kt, version_id, stream_id, start_index, end_index, std::move(segment)); - - return atom_key_builder().gen_id(version_id).content_hash(content_hash_).creation_ts(creation_ts_) - .start_index(start_index).end_index(end_index).build(stream_id, key_type); - } - -}; - -class NullStore : - public Store { - - public: - NullStore() = default; - - folly::Future> read(const AtomKey &key) override { - return std::make_pair(key, arcticdb::SegmentInMemory{}); - } - - std::atomic creation_ts{0}; - HashedValue content_hash = 0x42; - - folly::Future - write( - KeyType key_type, VersionId gen_id, - StreamId tsid, IndexValue start_index, IndexValue end_index, - SegmentInMemory && - ) override { - auto key = atom_key_builder().gen_id(gen_id).content_hash(content_hash).creation_ts(creation_ts) - .start_index(start_index).end_index(end_index).build(tsid, key_type); - return folly::makeFuture(key); - } - - folly::Future remove_key(const VariantKey &key) override { - return folly::makeFuture({}); - } - - void read_atoms( - const std::vector &, - folly::Function> &&)> && - ) override { - throw std::runtime_error("Not implemented"); - } - - void read_refs( - const std::vector &, - folly::Function> &&)> && - ) override { - throw std::runtime_error("Not implemented"); - } - - folly::Future iterate_type(KeyType, folly::Function, const std::string &) override { - throw std::runtime_error("Not implemented"); - } - - folly::Future - write(stream::KeyType /*key_type*/, StreamId /*stream_id*/, SegmentInMemory &&/*segment*/) override { - util::raise_rte("Not implemented"); - } - - folly::Future> read(const entity::RefKey &/*key*/) override { - util::raise_rte("Not implemented"); - } - - folly::Future> remove_keys(const std::vector&) override { - util::raise_rte("Not implemented"); - } - - folly::Future> remove_keys(std::vector&&) override { - util::raise_rte("Not implemented"); - } - - folly::Future> batch_write( - std::vector> &&, - const std::unordered_map&, - const BatchWriteArgs & - ) override { - throw std::runtime_error("Not implemented for tests"); - } - - std::vector> batch_key_exists(std::vector &) override { - throw std::runtime_error("Not implemented for tests"); - } - - folly::Future> batch_read_compressed( - std::vector> &&ks, - const BatchReadArgs&) override { - throw std::runtime_error("Not implemented for tests"); - } - - folly::Future>> - - read_metadata(const entity::VariantKey &) override { - util::raise_rte("Not implemented for tests"); - } - - void set_failure_sim(const arcticdb::proto::storage::VersionStoreConfig::StorageFailureSimulator &) override {} - - std::string name() const override { - return "NullStore"; - } - -}; - -} //namespace arcticdb \ No newline at end of file diff --git a/cpp/arcticdb/toolbox/library_tool.cpp b/cpp/arcticdb/toolbox/library_tool.cpp index 14ca2ceffd..af1f9d3708 100644 --- a/cpp/arcticdb/toolbox/library_tool.cpp +++ b/cpp/arcticdb/toolbox/library_tool.cpp @@ -57,7 +57,7 @@ TimeseriesDescriptor LibraryTool::read_timeseries_descriptor(const VariantKey& k void LibraryTool::write(VariantKey key, Segment& segment) { storage::KeySegmentPair kv{std::move(key), std::move(segment)}; - store_->write_compressed_sync(std::move(kv)); + store_->write_compressed_sync(kv); } void LibraryTool::remove(VariantKey key) { diff --git a/cpp/arcticdb/util/preconditions.hpp b/cpp/arcticdb/util/preconditions.hpp index 83aebf939f..18c6a5c59e 100644 --- a/cpp/arcticdb/util/preconditions.hpp +++ b/cpp/arcticdb/util/preconditions.hpp @@ -52,19 +52,24 @@ struct Raise { } }; +template +concept Testable = requires(T a) { + !a; // contextual conversion of a to bool must be possible +}; + template struct Check { static constexpr Raise raise{}; - template - void operator()(bool cond, fmt::format_string format, Args&&...args) const { + template + void operator()(Cond cond, fmt::format_string format, Args&&...args) const { if (ARCTICDB_UNLIKELY(!cond)) { raise(format, std::forward(args)...); } } - template - void operator()(bool cond, FormatString format, Args&&...args) const { + template + void operator()(Cond cond, FormatString format, Args&&...args) const { if (ARCTICDB_UNLIKELY(!cond)) { raise(format, std::forward(args)...); } diff --git a/cpp/arcticdb/util/test/config_common.hpp b/cpp/arcticdb/util/test/config_common.hpp index fe2ef247a1..4dc4c533f5 100644 --- a/cpp/arcticdb/util/test/config_common.hpp +++ b/cpp/arcticdb/util/test/config_common.hpp @@ -14,20 +14,32 @@ namespace arcticdb { +inline auto get_test_lmdb_config( + ) { + arcticdb::proto::lmdb_storage::Config cfg; + cfg.set_path("./"); //TODO local path is a bit annoying. TMPDIR? + cfg.set_recreate_if_exists(true); + return cfg; +} + +template inline auto get_test_environment_config( const arcticdb::storage::LibraryPath& path, const arcticdb::storage::StorageName& storage_name, - const arcticdb::storage::EnvironmentName& environment_name) { + const arcticdb::storage::EnvironmentName& environment_name, + const std::optional storage_config=std::nullopt) { using namespace arcticdb::storage; using MemoryConfig = storage::details::InMemoryConfigResolver::MemoryConfig; MemoryConfig mem_config = {}; arcticdb::proto::storage::VariantStorage storage_conf; - arcticdb::proto::lmdb_storage::Config cfg; - cfg.set_path("./"); //TODO local path is a bit annoying. TMPDIR? - cfg.set_recreate_if_exists(true); - util::pack_to_any(cfg, *storage_conf.mutable_config()); + if (storage_config) { + util::pack_to_any(*storage_config, *storage_conf.mutable_config()); + } else { + auto default_lmdb_config = get_test_lmdb_config(); + util::pack_to_any(default_lmdb_config, *storage_conf.mutable_config()); + } mem_config.storages_.try_emplace(storage_name, storage_conf); arcticdb::proto::storage::LibraryDescriptor library_descriptor; diff --git a/cpp/arcticdb/version/snapshot.cpp b/cpp/arcticdb/version/snapshot.cpp index 9b5ae6c48c..a7f166b083 100644 --- a/cpp/arcticdb/version/snapshot.cpp +++ b/cpp/arcticdb/version/snapshot.cpp @@ -78,7 +78,7 @@ void tombstone_snapshot( } // Append a timestamp to the ID so that other snapshot(s) can reuse the same snapshot name before the cleanup job: std::string new_key = fmt::format("{}@{:x}", key_segment_pair.ref_key(), util::SysClock::coarse_nanos_since_epoch() / 1'000'000); - key_segment_pair.ref_key() = RefKey(new_key, KeyType::SNAPSHOT_TOMBSTONE); + key_segment_pair.set_key(RefKey(std::move(new_key), KeyType::SNAPSHOT_TOMBSTONE)); store->write_compressed(std::move(key_segment_pair)).get(); } diff --git a/cpp/proto/arcticc/pb2/nfs_backed_storage.proto b/cpp/proto/arcticc/pb2/nfs_backed_storage.proto index d8837d1aaa..b56bb4b398 100644 --- a/cpp/proto/arcticc/pb2/nfs_backed_storage.proto +++ b/cpp/proto/arcticc/pb2/nfs_backed_storage.proto @@ -24,4 +24,5 @@ message Config { bool use_virtual_addressing = 12; string ca_cert_path = 13; string ca_cert_dir = 14; + bool use_mock_storage_for_testing = 15; }