From fb2f01cc2901573897cedf9b742964c8baacff6c Mon Sep 17 00:00:00 2001 From: willdealtry Date: Fri, 22 Nov 2024 17:01:44 +0000 Subject: [PATCH] More refactoring --- cpp/arcticdb/async/async_store.hpp | 8 +-- cpp/arcticdb/async/tasks.hpp | 8 +-- cpp/arcticdb/storage/async_storage.hpp | 1 - cpp/arcticdb/storage/azure/azure_storage.cpp | 63 +++++++++++++------ cpp/arcticdb/storage/azure/azure_storage.hpp | 2 +- .../storage/file/mapped_file_storage.cpp | 12 ++++ .../storage/file/mapped_file_storage.hpp | 2 +- cpp/arcticdb/storage/library.hpp | 4 +- cpp/arcticdb/storage/lmdb/lmdb_storage.cpp | 2 +- cpp/arcticdb/storage/lmdb/lmdb_storage.hpp | 2 +- .../storage/memory/memory_storage.cpp | 7 ++- .../storage/memory/memory_storage.hpp | 2 +- cpp/arcticdb/storage/mongo/mongo_storage.cpp | 45 ++++++------- cpp/arcticdb/storage/mongo/mongo_storage.hpp | 2 +- cpp/arcticdb/storage/s3/detail-inl.hpp | 37 +++++++---- .../storage/s3/nfs_backed_storage.cpp | 15 +++++ .../storage/s3/nfs_backed_storage.hpp | 2 +- cpp/arcticdb/storage/s3/s3_storage.cpp | 6 +- cpp/arcticdb/storage/s3/s3_storage.hpp | 4 +- cpp/arcticdb/storage/storage.hpp | 6 +- cpp/arcticdb/storage/storages.hpp | 11 ++-- cpp/arcticdb/stream/append_map.cpp | 2 +- cpp/arcticdb/stream/stream_source.hpp | 7 ++- .../stream/test/stream_test_common.hpp | 2 - cpp/arcticdb/toolbox/library_tool.cpp | 2 +- .../version/test/test_version_common.hpp | 1 - cpp/arcticdb/version/version_utils.hpp | 36 +++++------ 27 files changed, 175 insertions(+), 116 deletions(-) diff --git a/cpp/arcticdb/async/async_store.hpp b/cpp/arcticdb/async/async_store.hpp index 44a41502dea..54f7483ef9a 100644 --- a/cpp/arcticdb/async/async_store.hpp +++ b/cpp/arcticdb/async/async_store.hpp @@ -217,8 +217,8 @@ folly::Future> read( return read_and_continue(key, library_, opts, DecodeSegmentTask{}); } -std::pair read_sync(const entity::VariantKey& key) override { - return DecodeSegmentTask{}(read_sync_dispatch(key, library_)); +std::pair read_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override { + return DecodeSegmentTask{}(read_sync_dispatch(key, library_, opts)); } folly::Future read_compressed( @@ -227,8 +227,8 @@ folly::Future read_compressed( return read_and_continue(key, library_, opts, PassThroughTask{}); } -storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key) override { - return read_sync_dispatch( key, library_); +storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override { + return read_sync_dispatch(key, library_, opts); } folly::Future, std::optional>> read_metadata(const entity::VariantKey &key, storage::ReadKeyOpts opts) override { diff --git a/cpp/arcticdb/async/tasks.hpp b/cpp/arcticdb/async/tasks.hpp index 1cac8b77d3e..0a551934da5 100644 --- a/cpp/arcticdb/async/tasks.hpp +++ b/cpp/arcticdb/async/tasks.hpp @@ -215,9 +215,9 @@ inline folly::Future read_dispatch(entity::VariantKey&& }); } -inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr& lib) { - return util::variant_match(variant_key, [&lib](const auto &key) { - return lib->read_sync(key); +inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr& lib, storage::ReadKeyOpts opts) { + return util::variant_match(variant_key, [&lib, opts](const auto &key) { + return lib->read_sync(key, opts); }); } @@ -366,7 +366,7 @@ struct CopyCompressedInterStoreTask : async::BaseTask { if (!target_stores_.empty()) { storage::KeySegmentPair key_segment_pair; try { - key_segment_pair = source_store_->read_compressed_sync(key_to_read_, storage::ReadKeyOpts{}); + key_segment_pair = source_store_->read_compressed_sync(key_to_read_); } catch (const storage::KeyNotFoundException& e) { log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what()); return failed_targets; diff --git a/cpp/arcticdb/storage/async_storage.hpp b/cpp/arcticdb/storage/async_storage.hpp index 0db6fbcc65c..e49da7f9b98 100644 --- a/cpp/arcticdb/storage/async_storage.hpp +++ b/cpp/arcticdb/storage/async_storage.hpp @@ -10,7 +10,6 @@ namespace arcticdb::storage { class AsyncStorage { - public: folly::Future async_read(entity::VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) { return do_async_read(std::move(variant_key), visitor, opts); diff --git a/cpp/arcticdb/storage/azure/azure_storage.cpp b/cpp/arcticdb/storage/azure/azure_storage.cpp index 5dc0a7262d2..3d6314c62b7 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.cpp +++ b/cpp/arcticdb/storage/azure/azure_storage.cpp @@ -28,6 +28,8 @@ #include +#include + #undef GetMessage namespace arcticdb::storage { @@ -172,12 +174,13 @@ void do_read_impl( template KeySegmentPair do_read_impl( - VariantKey&& variant_key, - const std::string& root_folder, - AzureClientWrapper& azure_client, - KeyBucketizer&& bucketizer, - const Azure::Storage::Blobs::DownloadBlobToOptions& download_option, - unsigned int request_timeout) { + VariantKey&& variant_key, + const std::string& root_folder, + AzureClientWrapper& azure_client, + KeyBucketizer&& bucketizer, + ReadKeyOpts opts, + const Azure::Storage::Blobs::DownloadBlobToOptions& download_option, + unsigned int request_timeout) { ARCTICDB_SAMPLE(AzureStorageRead, 0) std::optional failed_read; @@ -189,6 +192,13 @@ KeySegmentPair do_read_impl( } catch (const Azure::Core::RequestFailedException& e) { raise_if_unexpected_error(e, blob_name); + if (!opts.dont_warn_about_missing_key) { + log::storage().warn("Failed to read azure segment with key '{}' {} {}: {}", + variant_key, + blob_name, + static_cast(e.StatusCode), + e.ReasonPhrase); + } throw KeyNotFoundException(variant_key, fmt::format("Failed to read azure segment with key '{}' {} {}: {}", variant_key, @@ -198,21 +208,22 @@ KeySegmentPair do_read_impl( } } +namespace fg = folly::gen; + template void do_remove_impl( - VariantKey&& variant_key, + std::span variant_keys, const std::string& root_folder, AzureClientWrapper& azure_client, KeyBucketizer&& bucketizer, - unsigned int request_timeout) { + unsigned int request_timeout) { ARCTICDB_SUBSAMPLE(AzureStorageDeleteBatch, 0) + auto fmt_db = [](auto&& k) { return variant_key_type(k); }; std::vector to_delete; static const size_t delete_object_limit = - std::min(BATCH_SUBREQUEST_LIMIT, - static_cast(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", - BATCH_SUBREQUEST_LIMIT))); + std::min(BATCH_SUBREQUEST_LIMIT, static_cast(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", BATCH_SUBREQUEST_LIMIT))); - auto submit_batch = [&azure_client, &request_timeout](auto& to_delete) { + auto submit_batch = [&azure_client, &request_timeout](auto &to_delete) { try { azure_client.delete_blobs(to_delete, request_timeout); } catch (const Azure::Core::RequestFailedException& e) { @@ -222,12 +233,18 @@ void do_remove_impl( to_delete.clear(); }; - auto key_type_dir = key_type_folder(root_folder, variant_key_type(variant_key)); - auto blob_name = object_path(bucketizer.bucketize(key_type_dir, variant_key), variant_key); - to_delete.emplace_back(std::move(blob_name)); - if (to_delete.size() == delete_object_limit) { - submit_batch(to_delete); - } + (fg::from(variant_keys) | fg::move | fg::groupBy(fmt_db)).foreach( + [&root_folder, b=std::move(bucketizer), delete_object_limit=delete_object_limit, &to_delete, &submit_batch] (auto&& group) {//bypass incorrect 'set but no used" error for delete_object_limit + auto key_type_dir = key_type_folder(root_folder, group.key()); + for (auto k : folly::enumerate(group.values())) { + auto blob_name = object_path(b.bucketize(key_type_dir, *k), *k); + to_delete.emplace_back(std::move(blob_name)); + if (to_delete.size() == delete_object_limit) { + submit_batch(to_delete); + } + } + } + ); if (!to_delete.empty()) { submit_batch(to_delete); } @@ -341,17 +358,23 @@ void AzureStorage::do_read(VariantKey&& variant_key, const ReadVisitor& visitor, request_timeout_); } -KeySegmentPair AzureStorage::do_read(VariantKey&& variant_key) { +KeySegmentPair AzureStorage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) { return detail::do_read_impl(std::move(variant_key), root_folder_, *azure_client_, FlatBucketizer{}, + opts, download_option_, request_timeout_); } void AzureStorage::do_remove(VariantKey&& variant_key, RemoveOpts) { - detail::do_remove_impl(std::move(variant_key), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_); + std::array arr{std::move(variant_key)}; + detail::do_remove_impl(std::span(arr), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_); +} + +void AzureStorage::do_remove(std::span variant_keys, RemoveOpts) { + detail::do_remove_impl(std::move(variant_keys), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_); } bool AzureStorage::do_iterate_type_until_match(KeyType key_type, diff --git a/cpp/arcticdb/storage/azure/azure_storage.hpp b/cpp/arcticdb/storage/azure/azure_storage.hpp index 73574fa98c9..0c2c39572dd 100644 --- a/cpp/arcticdb/storage/azure/azure_storage.hpp +++ b/cpp/arcticdb/storage/azure/azure_storage.hpp @@ -41,7 +41,7 @@ class AzureStorage final : public Storage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.cpp b/cpp/arcticdb/storage/file/mapped_file_storage.cpp index 60251b75858..68f1accaaea 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.cpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.cpp @@ -105,6 +105,14 @@ void MappedFileStorage::do_read(VariantKey&& variant_key, const ReadVisitor& vis visitor(variant_key, std::move(segment)); } +KeySegmentPair MappedFileStorage::do_read(VariantKey&& variant_key, storage::ReadKeyOpts) { + ARCTICDB_SAMPLE(MappedFileStorageRead, 0) + auto maybe_offset = multi_segment_header_.get_offset_for_key(to_atom(variant_key)); + util::check(maybe_offset.has_value(), "Failed to find key {} in file", variant_key); + auto [offset, bytes] = std::move(maybe_offset.value()); + return {std::move(variant_key), Segment::from_bytes(file_.data() + offset, bytes)}; +} + bool MappedFileStorage::do_key_exists(const VariantKey& key) { ARCTICDB_SAMPLE(MappedFileStorageKeyExists, 0) return multi_segment_header_.get_offset_for_key(to_atom(key)) != std::nullopt; @@ -114,6 +122,10 @@ void MappedFileStorage::do_remove(VariantKey&&, RemoveOpts) { util::raise_rte("Remove not implemented for file storages"); } +void MappedFileStorage::do_remove(std::span, RemoveOpts) { + util::raise_rte("Remove not implemented for file storages"); +} + bool MappedFileStorage::do_fast_delete() { util::raise_rte("Fast delete not implemented for file storage - just delete the file"); } diff --git a/cpp/arcticdb/storage/file/mapped_file_storage.hpp b/cpp/arcticdb/storage/file/mapped_file_storage.hpp index 7e7eacd08f5..024a9825318 100644 --- a/cpp/arcticdb/storage/file/mapped_file_storage.hpp +++ b/cpp/arcticdb/storage/file/mapped_file_storage.hpp @@ -40,7 +40,7 @@ class MappedFileStorage final : public SingleFileStorage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) override; diff --git a/cpp/arcticdb/storage/library.hpp b/cpp/arcticdb/storage/library.hpp index 36d400d5c7c..dbf570b1fea 100644 --- a/cpp/arcticdb/storage/library.hpp +++ b/cpp/arcticdb/storage/library.hpp @@ -112,9 +112,9 @@ class Library { storages_->read_sync(variant_key, visitor, opts, !storage_fallthrough_); } - KeySegmentPair read_sync(const VariantKey& key) { + KeySegmentPair read_sync(const VariantKey& key, ReadKeyOpts opts = ReadKeyOpts{}) { util::check(!std::holds_alternative(variant_key_id(key)) || !std::get(variant_key_id(key)).empty(), "Unexpected empty id"); - return storages_->read_sync(key, !storage_fallthrough_); + return storages_->read_sync(key, opts, !storage_fallthrough_); } void remove(std::span variant_keys, storage::RemoveOpts opts) { diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp index 11862646ebe..1c1fbd6364e 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.cpp @@ -130,7 +130,7 @@ void LmdbStorage::do_update(KeySegmentPair&& key_seg, UpdateOpts opts) { txn.commit(); } -KeySegmentPair LmdbStorage::do_read(VariantKey&& variant_key) { +KeySegmentPair LmdbStorage::do_read(VariantKey&& variant_key, ReadKeyOpts) { ARCTICDB_SAMPLE(LmdbStorageReadReturn, 0) std::optional failed_read; auto db_name = fmt::format(FMT_COMPILE("{}"), variant_key_type(variant_key)); diff --git a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp index 5484c2f3ca2..054b839cec5 100644 --- a/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp +++ b/cpp/arcticdb/storage/lmdb/lmdb_storage.hpp @@ -42,7 +42,7 @@ class LmdbStorage final : public Storage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/memory/memory_storage.cpp b/cpp/arcticdb/storage/memory/memory_storage.cpp index 6c70ac5faf5..63c46385017 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.cpp +++ b/cpp/arcticdb/storage/memory/memory_storage.cpp @@ -73,13 +73,18 @@ void MemoryStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) { } void MemoryStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts) { + auto key_seg = do_read(std::move(variant_key), ReadKeyOpts{}); + visitor(key_seg.variant_key(), std::move(key_seg.segment())); +} + +KeySegmentPair MemoryStorage::do_read(VariantKey &&variant_key, ReadKeyOpts) { ARCTICDB_SAMPLE(MemoryStorageRead, 0) auto& key_vec = data_[variant_key_type(variant_key)]; auto it = key_vec.find(variant_key); if (it != key_vec.end()) { ARCTICDB_DEBUG(log::storage(), "Read key {}: {}", variant_key_type(variant_key), variant_key_view(variant_key)); - visitor(variant_key, it->second.clone()); + return {std::move(variant_key), it->second.clone()}; } else { throw KeyNotFoundException(variant_key); } diff --git a/cpp/arcticdb/storage/memory/memory_storage.hpp b/cpp/arcticdb/storage/memory/memory_storage.hpp index acc4e5b9d75..1645d0c1ec8 100644 --- a/cpp/arcticdb/storage/memory/memory_storage.hpp +++ b/cpp/arcticdb/storage/memory/memory_storage.hpp @@ -33,7 +33,7 @@ namespace arcticdb::storage::memory { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.cpp b/cpp/arcticdb/storage/mongo/mongo_storage.cpp index 7fe9f8e496a..0ef1e3b6051 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.cpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.cpp @@ -91,8 +91,6 @@ void MongoStorage::do_write(KeySegmentPair &&key_seg) { } void MongoStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) { - namespace fg = folly::gen; - ARCTICDB_SAMPLE(MongoStorageWrite, 0) auto collection = collection_name(key_seg.key_type()); @@ -112,45 +110,35 @@ void MongoStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) { } } -void MongoStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts) { - namespace fg = folly::gen; - ARCTICDB_SAMPLE(MongoStorageRead, 0) - std::vector keys_not_found; - - auto collection = collection_name(variant_key_type(variant_key)); - try { - auto kv = client_->read_segment(db_, collection, variant_key); - // later we should add the key to failed_reads in this case - if (!kv.has_value()) { - keys_not_found.push_back(variant_key); - } else { - visitor(variant_key, std::move(kv->segment())); - } - - } catch (const mongocxx::operation_exception &ex) { - std::string object_name = std::string(variant_key_view(variant_key)); - raise_if_unexpected_error(ex, object_name); - } +void MongoStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts opts) { + auto key_seg = do_read(std::move(variant_key), opts); + visitor(key_seg.variant_key(), std::move(key_seg.segment())); } -KeySegmentPair MongoStorage::do_read(VariantKey&& variant_key) { - namespace fg = folly::gen; +KeySegmentPair MongoStorage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) { ARCTICDB_SAMPLE(MongoStorageRead, 0) - std::vector keys_not_found; + boost::container::small_vector keys_not_found; auto collection = collection_name(variant_key_type(variant_key)); try { auto kv = client_->read_segment(db_, collection, variant_key); // later we should add the key to failed_reads in this case if (!kv.has_value()) { - keys_not_found.push_back(variant_key); + throw KeyNotFoundException(variant_key); } else { return {VariantKey{variant_key}, std::move(kv->segment())}; } - } catch (const mongocxx::operation_exception &ex) { std::string object_name = std::string(variant_key_view(variant_key)); raise_if_unexpected_error(ex, object_name); + log::storage().log( + opts.dont_warn_about_missing_key ? spdlog::level::debug : spdlog::level::warn, + "Failed to find segment for key '{}' {}: {}", + variant_key_view(variant_key), + ex.code().value(), + ex.what()); + + throw KeyNotFoundException(keys_not_found); } } @@ -193,6 +181,11 @@ void MongoStorage::do_remove(std::span variant_keys, RemoveOpts opts } } +void MongoStorage::do_remove(VariantKey&& variant_key, RemoveOpts opts) { + std::array arr{std::move(variant_key)}; + do_remove(std::span(arr), opts); +} + bool MongoStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate &visitor, const std::string &prefix) { diff --git a/cpp/arcticdb/storage/mongo/mongo_storage.hpp b/cpp/arcticdb/storage/mongo/mongo_storage.hpp index 43d36994cc3..2ff3a046c53 100644 --- a/cpp/arcticdb/storage/mongo/mongo_storage.hpp +++ b/cpp/arcticdb/storage/mongo/mongo_storage.hpp @@ -35,7 +35,7 @@ class MongoStorage final : public Storage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/detail-inl.hpp b/cpp/arcticdb/storage/s3/detail-inl.hpp index 9e682255c80..9a5f204831c 100644 --- a/cpp/arcticdb/storage/s3/detail-inl.hpp +++ b/cpp/arcticdb/storage/s3/detail-inl.hpp @@ -88,6 +88,14 @@ inline void raise_s3_exception(const Aws::S3::S3Error& err, const std::string& o raise(error_message); } +folly::Future do_async_read_impl(entity::VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) { + +} + +folly::Future do_async_read_impl(entity::VariantKey&& variant_key, ReadKeyOpts opts) { + +} + inline bool is_expected_error_type(Aws::S3::S3Errors err) { return err == Aws::S3::S3Errors::NO_SUCH_KEY || err == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || err == Aws::S3::S3Errors::NO_SUCH_BUCKET; @@ -137,29 +145,22 @@ void do_update_impl( } template -void do_read_impl( +KeySegmentPair do_read_impl( VariantKey&& variant_key, - const ReadVisitor& visitor, const std::string& root_folder, const std::string& bucket_name, const S3ClientInterface& s3_client, KeyBucketizer&& bucketizer, ReadKeyOpts opts) { ARCTICDB_SAMPLE(S3StorageRead, 0) - std::vector keys_not_found; - auto key_type_dir = key_type_folder(root_folder, variant_key_type(variant_key)); auto s3_object_name = object_path(bucketizer.bucketize(key_type_dir, variant_key), variant_key); - auto get_object_result = s3_client.get_object( - s3_object_name, - bucket_name); + auto get_object_result = s3_client.get_object(s3_object_name, bucket_name); if (get_object_result.is_success()) { ARCTICDB_SUBSAMPLE(S3StorageVisitSegment, 0) - - visitor(variant_key, std::move(get_object_result.get_output())); - + return {VariantKey{variant_key}, std::move(get_object_result.get_output())}; ARCTICDB_DEBUG(log::storage(), "Read key {}: {}", variant_key_type(variant_key), variant_key_view(variant_key)); } else { auto& error = get_object_result.get_error(); @@ -172,11 +173,21 @@ void do_read_impl( error.GetExceptionName().c_str(), error.GetMessage().c_str()); - keys_not_found.push_back(variant_key); + throw KeyNotFoundException(variant_key); } +} - if (!keys_not_found.empty()) - throw KeyNotFoundException(std::move(keys_not_found)); +template +void do_read_impl( + VariantKey&& variant_key, + const ReadVisitor& visitor, + const std::string& root_folder, + const std::string& bucket_name, + const S3ClientInterface& s3_client, + KeyBucketizer&& bucketizer, + ReadKeyOpts opts) { + auto key_seg = do_read_impl(std::move(variant_key), root_folder, bucket_name, s3_client, bucketizer, opts); + visitor(key_seg.variant_key(), std::move(key_seg.segment())); } struct FailedDelete { diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp index 7204df153c4..5ddc4358986 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.cpp @@ -174,11 +174,26 @@ void NfsBackedStorage::do_read(VariantKey&& variant_key, const ReadVisitor& visi s3::detail::do_read_impl(std::move(enc), func, root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, opts); } +KeySegmentPair NfsBackedStorage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) { + auto enc = encode_object_id(variant_key); + auto key_seg = s3::detail::do_read_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}, opts); + return {unencode_object_id(key_seg.variant_key()), std::move(key_seg.segment())}; +} + void NfsBackedStorage::do_remove(VariantKey&& variant_key, RemoveOpts) { auto enc = encode_object_id(variant_key); s3::detail::do_remove_impl(std::move(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); } +void NfsBackedStorage::do_remove(std::span variant_keys, RemoveOpts) { + std::vector enc; + enc.reserve(variant_keys.size()); + std::transform(std::begin(variant_keys), std::end(variant_keys), std::back_inserter(enc), [] (auto&& key) { + return encode_object_id(key); + }); + s3::detail::do_remove_impl(std::span(enc), root_folder_, bucket_name_, *s3_client_, NfsBucketizer{}); +} + bool NfsBackedStorage::do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string& prefix) { const IterateTypePredicate func = [&v = visitor, prefix=prefix] (VariantKey&& k) { auto key = unencode_object_id(k); diff --git a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp index ec803cf5ce6..2cd254b47e9 100644 --- a/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp +++ b/cpp/arcticdb/storage/s3/nfs_backed_storage.hpp @@ -39,7 +39,7 @@ class NfsBackedStorage final : public Storage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/s3/s3_storage.cpp b/cpp/arcticdb/storage/s3/s3_storage.cpp index 50041e565b4..037f07343ee 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.cpp +++ b/cpp/arcticdb/storage/s3/s3_storage.cpp @@ -30,8 +30,6 @@ using namespace object_store_utils; namespace s3 { -namespace fg = folly::gen; - std::string S3Storage::name() const { return fmt::format("s3_storage-{}/{}/{}", region_, bucket_name_, root_folder_); } @@ -56,6 +54,10 @@ void S3Storage::do_read(VariantKey&& variant_key, const ReadVisitor& visitor, Re detail::do_read_impl(std::move(variant_key), visitor, root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, opts); } +KeySegmentPair S3Storage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) { + return detail::do_read_impl(std::move(variant_key), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}, opts); +} + void S3Storage::do_remove(VariantKey&& variant_key, RemoveOpts) { detail::do_remove_impl(std::move(variant_key), root_folder_, bucket_name_, *s3_client_, FlatBucketizer{}); } diff --git a/cpp/arcticdb/storage/s3/s3_storage.hpp b/cpp/arcticdb/storage/s3/s3_storage.hpp index 6778ed7970c..a135c494d8b 100644 --- a/cpp/arcticdb/storage/s3/s3_storage.hpp +++ b/cpp/arcticdb/storage/s3/s3_storage.hpp @@ -28,7 +28,7 @@ namespace arcticdb::storage::s3 { const std::string USE_AWS_CRED_PROVIDERS_TOKEN = "_RBAC_"; -class S3Storage final : public Storage { +class S3Storage final : public Storage, AsyncStorage { public: using Config = arcticdb::proto::s3_storage::Config; @@ -53,7 +53,7 @@ class S3Storage final : public Storage { void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final; - KeySegmentPair do_read(VariantKey&& variant_key) final; + KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final; void do_remove(VariantKey&& variant_key, RemoveOpts opts) final; diff --git a/cpp/arcticdb/storage/storage.hpp b/cpp/arcticdb/storage/storage.hpp index 5c95bcac4d9..8ae270388fe 100644 --- a/cpp/arcticdb/storage/storage.hpp +++ b/cpp/arcticdb/storage/storage.hpp @@ -44,8 +44,8 @@ class Storage { return do_read(std::move(variant_key), visitor, opts); } - KeySegmentPair read(VariantKey&& variant_key) { - return do_read(std::move(variant_key)); + KeySegmentPair read(VariantKey&& variant_key, ReadKeyOpts opts) { + return do_read(std::move(variant_key), opts); } [[nodiscard]] virtual bool has_async_api() const { @@ -110,7 +110,7 @@ class Storage { virtual void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) = 0; - virtual KeySegmentPair do_read(VariantKey&& variant_key) = 0; + virtual KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) = 0; virtual void do_remove(VariantKey&& variant_key, RemoveOpts opts) = 0; diff --git a/cpp/arcticdb/storage/storages.hpp b/cpp/arcticdb/storage/storages.hpp index f1a5ef05d39..863010aa405 100644 --- a/cpp/arcticdb/storage/storages.hpp +++ b/cpp/arcticdb/storage/storages.hpp @@ -85,13 +85,14 @@ class Storages { KeySegmentPair read_sync_fallthrough(const VariantKey& variant_key) { for(const auto& storage : storages_) { try { - return storage->read(VariantKey{variant_key}); + return storage->read(VariantKey{variant_key}, ReadKeyOpts{}); } catch (typename storage::KeyNotFoundException& ex) { ARCTICDB_DEBUG(log::version(), "Keys not found in storage, continuing to next storage"); } } throw storage::KeyNotFoundException(variant_key); } + void read_sync(const VariantKey& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts, bool primary_only=true) { ARCTICDB_RUNTIME_SAMPLE(StoragesRead, 0) if(primary_only || variant_key_type(variant_key) != KeyType::TABLE_DATA) @@ -100,10 +101,10 @@ class Storages { read_sync_fallthrough(variant_key, visitor, opts); } - KeySegmentPair read_sync(const VariantKey& variant_key, bool primary_only=true) { + KeySegmentPair read_sync(const VariantKey& variant_key, ReadKeyOpts opts, bool primary_only=true) { ARCTICDB_RUNTIME_SAMPLE(StoragesRead, 0) if(primary_only || variant_key_type(variant_key) != KeyType::TABLE_DATA) - return primary().read(VariantKey{variant_key}); + return primary().read(VariantKey{variant_key}, opts); return read_sync_fallthrough(variant_key); } @@ -121,7 +122,7 @@ class Storages { if(storage.has_async_api()) { return storage.async_api()->async_read(std::move(variant_key), opts); } else { - auto key_seg = storage.read(std::move(variant_key)); + auto key_seg = storage.read(std::move(variant_key), opts); return folly::makeFuture(std::move(key_seg)); } } @@ -198,7 +199,7 @@ class Storages { auto key = std::forward(vk); if (to_atom(key).creation_ts() < horizon) { try { - auto key_seg = source.read(VariantKey{key}); + auto key_seg = source.read(VariantKey{key}, ReadKeyOpts{}); target.write(std::move(key_seg)); source.remove(std::move(key), storage::RemoveOpts{}); } catch (const std::exception& ex) { diff --git a/cpp/arcticdb/stream/append_map.cpp b/cpp/arcticdb/stream/append_map.cpp index c61267b1223..aea3f261f74 100644 --- a/cpp/arcticdb/stream/append_map.cpp +++ b/cpp/arcticdb/stream/append_map.cpp @@ -378,7 +378,7 @@ std::pair> get_descriptor_a const std::shared_ptr& store, const AtomKey& k, bool load_data, - storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) { + storage::ReadKeyOpts opts) { if(load_data) { auto seg = store->read_sync(k, opts).second; return std::make_pair(seg.index_descriptor(), std::make_optional(seg)); diff --git a/cpp/arcticdb/stream/stream_source.hpp b/cpp/arcticdb/stream/stream_source.hpp index 9a04ab76cd1..759a2ab58c1 100644 --- a/cpp/arcticdb/stream/stream_source.hpp +++ b/cpp/arcticdb/stream/stream_source.hpp @@ -30,14 +30,17 @@ struct StreamSource { storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; virtual std::pair read_sync( - const entity::VariantKey &key) = 0; + const entity::VariantKey &key, + storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) + = 0; virtual folly::Future read_compressed( const entity::VariantKey &key, storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0; virtual storage::KeySegmentPair read_compressed_sync( - const entity::VariantKey& key + const entity::VariantKey& key, + storage::ReadKeyOpts opts = storage::ReadKeyOpts{} ) = 0; virtual void iterate_type( diff --git a/cpp/arcticdb/stream/test/stream_test_common.hpp b/cpp/arcticdb/stream/test/stream_test_common.hpp index 5ed0b3b000e..1879322a83b 100644 --- a/cpp/arcticdb/stream/test/stream_test_common.hpp +++ b/cpp/arcticdb/stream/test/stream_test_common.hpp @@ -29,8 +29,6 @@ #include -namespace fg = folly::gen; - namespace arcticdb { template diff --git a/cpp/arcticdb/toolbox/library_tool.cpp b/cpp/arcticdb/toolbox/library_tool.cpp index ecd0dcc57bc..65a12bb2af2 100644 --- a/cpp/arcticdb/toolbox/library_tool.cpp +++ b/cpp/arcticdb/toolbox/library_tool.cpp @@ -49,7 +49,7 @@ ReadResult LibraryTool::read(const VariantKey& key) { } Segment LibraryTool::read_to_segment(const VariantKey& key) { - auto kv = store()->read_compressed_sync(key, storage::ReadKeyOpts{}); + auto kv = store()->read_compressed_sync(key); util::check(kv.has_segment(), "Failed to read key: {}", key); kv.segment().force_own_buffer(); return std::move(kv.segment()); diff --git a/cpp/arcticdb/version/test/test_version_common.hpp b/cpp/arcticdb/version/test/test_version_common.hpp index 1b6bc8ba3b0..7c08e19293e 100644 --- a/cpp/arcticdb/version/test/test_version_common.hpp +++ b/cpp/arcticdb/version/test/test_version_common.hpp @@ -12,7 +12,6 @@ using namespace arcticdb::storage; using namespace arcticdb::stream; using namespace arcticdb; -namespace fg = folly::gen; const uint64_t NumVersions = 10; const uint64_t NumValues = 10; diff --git a/cpp/arcticdb/version/version_utils.hpp b/cpp/arcticdb/version/version_utils.hpp index 22925db0c7e..73e1cc31535 100644 --- a/cpp/arcticdb/version/version_utils.hpp +++ b/cpp/arcticdb/version/version_utils.hpp @@ -131,21 +131,21 @@ std::shared_ptr build_version_map_entry_with_predicate_iteratio std::vector read_keys; for (auto key_type : key_types) { store->iterate_type(key_type, - [&predicate, &read_keys, &store, &output, &perform_read_segment_with_keys](VariantKey &&vk) { - const auto &key = to_atom(std::move(vk)); - if (!predicate(key)) - return; - - read_keys.push_back(key); - ARCTICDB_DEBUG(log::storage(), "Version map iterating key {}", key); - if (perform_read_segment_with_keys) { - auto [kv, seg] = store->read_sync(to_atom(key)); - LoadProgress load_progress; - (void)read_segment_with_keys(seg, output, load_progress); - } - }, - prefix); - } + [&predicate, &read_keys, &store, &output, &perform_read_segment_with_keys](VariantKey &&vk) { + const auto &key = to_atom(std::move(vk)); + if (!predicate(key)) + return; + + read_keys.push_back(key); + ARCTICDB_DEBUG(log::storage(), "Version map iterating key {}", key); + if (perform_read_segment_with_keys) { + auto [kv, seg] = store->read_sync(to_atom(key)); + LoadProgress load_progress; + (void)read_segment_with_keys(seg, output, load_progress); + } + }, + prefix); +} if (!perform_read_segment_with_keys) { output->keys_.insert(output->keys_.end(), std::move_iterator(read_keys.begin()), @@ -176,13 +176,11 @@ inline void read_symbol_ref(const std::shared_ptr& store, const St std::pair key_seg_pair; // Trying to read a missing ref key is expected e.g. when writing a previously missing symbol. // If the ref key is missing we keep the entry empty and should not raise warnings. - auto read_opts = storage::ReadKeyOpts{}; - read_opts.dont_warn_about_missing_key=true; try { - key_seg_pair = store->read_sync(RefKey{stream_id, KeyType::VERSION_REF}, read_opts); + key_seg_pair = store->read_sync(RefKey{stream_id, KeyType::VERSION_REF}); } catch (const storage::KeyNotFoundException&) { try { - key_seg_pair = store->read_sync(RefKey{stream_id, KeyType::VERSION, true}, read_opts); + key_seg_pair = store->read_sync(RefKey{stream_id, KeyType::VERSION, true}); } catch (const storage::KeyNotFoundException&) { return; }