Skip to content

Commit

Permalink
More refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Nov 22, 2024
1 parent ce315e2 commit fb2f01c
Show file tree
Hide file tree
Showing 27 changed files with 175 additions and 116 deletions.
8 changes: 4 additions & 4 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ folly::Future<std::pair<entity::VariantKey, SegmentInMemory>> read(
return read_and_continue(key, library_, opts, DecodeSegmentTask{});
}

std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey& key) override {
return DecodeSegmentTask{}(read_sync_dispatch(key, library_));
std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override {
return DecodeSegmentTask{}(read_sync_dispatch(key, library_, opts));
}

folly::Future<storage::KeySegmentPair> read_compressed(
Expand All @@ -227,8 +227,8 @@ folly::Future<storage::KeySegmentPair> read_compressed(
return read_and_continue(key, library_, opts, PassThroughTask{});
}

storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key) override {
return read_sync_dispatch( key, library_);
storage::KeySegmentPair read_compressed_sync(const entity::VariantKey& key, storage::ReadKeyOpts opts) override {
return read_sync_dispatch(key, library_, opts);
}

folly::Future<std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>>> read_metadata(const entity::VariantKey &key, storage::ReadKeyOpts opts) override {
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ inline folly::Future<storage::KeySegmentPair> read_dispatch(entity::VariantKey&&
});
}

inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib) {
return util::variant_match(variant_key, [&lib](const auto &key) {
return lib->read_sync(key);
inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, storage::ReadKeyOpts opts) {
return util::variant_match(variant_key, [&lib, opts](const auto &key) {
return lib->read_sync(key, opts);
});
}

Expand Down Expand Up @@ -366,7 +366,7 @@ struct CopyCompressedInterStoreTask : async::BaseTask {
if (!target_stores_.empty()) {
storage::KeySegmentPair key_segment_pair;
try {
key_segment_pair = source_store_->read_compressed_sync(key_to_read_, storage::ReadKeyOpts{});
key_segment_pair = source_store_->read_compressed_sync(key_to_read_);
} catch (const storage::KeyNotFoundException& e) {
log::storage().debug("Key {} not found on the source: {}", variant_key_view(key_to_read_), e.what());
return failed_targets;
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/storage/async_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

namespace arcticdb::storage {
class AsyncStorage {

public:
folly::Future<folly::Unit> async_read(entity::VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) {
return do_async_read(std::move(variant_key), visitor, opts);
Expand Down
63 changes: 43 additions & 20 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

#include <boost/interprocess/streams/bufferstream.hpp>

#include <folly/gen/Base.h>

#undef GetMessage

namespace arcticdb::storage {
Expand Down Expand Up @@ -172,12 +174,13 @@ void do_read_impl(

template<class KeyBucketizer>
KeySegmentPair do_read_impl(
VariantKey&& variant_key,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
const Azure::Storage::Blobs::DownloadBlobToOptions& download_option,
unsigned int request_timeout) {
VariantKey&& variant_key,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
ReadKeyOpts opts,
const Azure::Storage::Blobs::DownloadBlobToOptions& download_option,
unsigned int request_timeout) {
ARCTICDB_SAMPLE(AzureStorageRead, 0)
std::optional<VariantKey> failed_read;

Expand All @@ -189,6 +192,13 @@ KeySegmentPair do_read_impl(
}
catch (const Azure::Core::RequestFailedException& e) {
raise_if_unexpected_error(e, blob_name);
if (!opts.dont_warn_about_missing_key) {
log::storage().warn("Failed to read azure segment with key '{}' {} {}: {}",
variant_key,
blob_name,
static_cast<int>(e.StatusCode),
e.ReasonPhrase);
}
throw KeyNotFoundException(variant_key,
fmt::format("Failed to read azure segment with key '{}' {} {}: {}",
variant_key,
Expand All @@ -198,21 +208,22 @@ KeySegmentPair do_read_impl(
}
}

namespace fg = folly::gen;

template<class KeyBucketizer>
void do_remove_impl(
VariantKey&& variant_key,
std::span<VariantKey> variant_keys,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
unsigned int request_timeout) {
unsigned int request_timeout) {
ARCTICDB_SUBSAMPLE(AzureStorageDeleteBatch, 0)
auto fmt_db = [](auto&& k) { return variant_key_type(k); };
std::vector<std::string> to_delete;
static const size_t delete_object_limit =
std::min(BATCH_SUBREQUEST_LIMIT,
static_cast<size_t>(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize",
BATCH_SUBREQUEST_LIMIT)));
std::min(BATCH_SUBREQUEST_LIMIT, static_cast<size_t>(ConfigsMap::instance()->get_int("AzureStorage.DeleteBatchSize", BATCH_SUBREQUEST_LIMIT)));

auto submit_batch = [&azure_client, &request_timeout](auto& to_delete) {
auto submit_batch = [&azure_client, &request_timeout](auto &to_delete) {
try {
azure_client.delete_blobs(to_delete, request_timeout);
} catch (const Azure::Core::RequestFailedException& e) {
Expand All @@ -222,12 +233,18 @@ void do_remove_impl(
to_delete.clear();
};

auto key_type_dir = key_type_folder(root_folder, variant_key_type(variant_key));
auto blob_name = object_path(bucketizer.bucketize(key_type_dir, variant_key), variant_key);
to_delete.emplace_back(std::move(blob_name));
if (to_delete.size() == delete_object_limit) {
submit_batch(to_delete);
}
(fg::from(variant_keys) | fg::move | fg::groupBy(fmt_db)).foreach(
[&root_folder, b=std::move(bucketizer), delete_object_limit=delete_object_limit, &to_delete, &submit_batch] (auto&& group) {//bypass incorrect 'set but no used" error for delete_object_limit
auto key_type_dir = key_type_folder(root_folder, group.key());
for (auto k : folly::enumerate(group.values())) {
auto blob_name = object_path(b.bucketize(key_type_dir, *k), *k);
to_delete.emplace_back(std::move(blob_name));
if (to_delete.size() == delete_object_limit) {
submit_batch(to_delete);
}
}
}
);
if (!to_delete.empty()) {
submit_batch(to_delete);
}
Expand Down Expand Up @@ -341,17 +358,23 @@ void AzureStorage::do_read(VariantKey&& variant_key, const ReadVisitor& visitor,
request_timeout_);
}

KeySegmentPair AzureStorage::do_read(VariantKey&& variant_key) {
KeySegmentPair AzureStorage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) {
return detail::do_read_impl(std::move(variant_key),
root_folder_,
*azure_client_,
FlatBucketizer{},
opts,
download_option_,
request_timeout_);
}

void AzureStorage::do_remove(VariantKey&& variant_key, RemoveOpts) {
detail::do_remove_impl(std::move(variant_key), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_);
std::array<VariantKey, 1> arr{std::move(variant_key)};
detail::do_remove_impl(std::span(arr), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_);
}

void AzureStorage::do_remove(std::span<VariantKey> variant_keys, RemoveOpts) {
detail::do_remove_impl(std::move(variant_keys), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_);
}

bool AzureStorage::do_iterate_type_until_match(KeyType key_type,
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AzureStorage final : public Storage {

void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final;

KeySegmentPair do_read(VariantKey&& variant_key) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) final;

Expand Down
12 changes: 12 additions & 0 deletions cpp/arcticdb/storage/file/mapped_file_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ void MappedFileStorage::do_read(VariantKey&& variant_key, const ReadVisitor& vis
visitor(variant_key, std::move(segment));
}

KeySegmentPair MappedFileStorage::do_read(VariantKey&& variant_key, storage::ReadKeyOpts) {
ARCTICDB_SAMPLE(MappedFileStorageRead, 0)
auto maybe_offset = multi_segment_header_.get_offset_for_key(to_atom(variant_key));
util::check(maybe_offset.has_value(), "Failed to find key {} in file", variant_key);
auto [offset, bytes] = std::move(maybe_offset.value());
return {std::move(variant_key), Segment::from_bytes(file_.data() + offset, bytes)};
}

bool MappedFileStorage::do_key_exists(const VariantKey& key) {
ARCTICDB_SAMPLE(MappedFileStorageKeyExists, 0)
return multi_segment_header_.get_offset_for_key(to_atom(key)) != std::nullopt;
Expand All @@ -114,6 +122,10 @@ void MappedFileStorage::do_remove(VariantKey&&, RemoveOpts) {
util::raise_rte("Remove not implemented for file storages");
}

void MappedFileStorage::do_remove(std::span<VariantKey>, RemoveOpts) {
util::raise_rte("Remove not implemented for file storages");
}

bool MappedFileStorage::do_fast_delete() {
util::raise_rte("Fast delete not implemented for file storage - just delete the file");
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class MappedFileStorage final : public SingleFileStorage {

void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;

KeySegmentPair do_read(VariantKey&& variant_key) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) override;

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ class Library {
storages_->read_sync(variant_key, visitor, opts, !storage_fallthrough_);
}

KeySegmentPair read_sync(const VariantKey& key) {
KeySegmentPair read_sync(const VariantKey& key, ReadKeyOpts opts = ReadKeyOpts{}) {
util::check(!std::holds_alternative<StringId>(variant_key_id(key)) || !std::get<StringId>(variant_key_id(key)).empty(), "Unexpected empty id");
return storages_->read_sync(key, !storage_fallthrough_);
return storages_->read_sync(key, opts, !storage_fallthrough_);
}

void remove(std::span<VariantKey> variant_keys, storage::RemoveOpts opts) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void LmdbStorage::do_update(KeySegmentPair&& key_seg, UpdateOpts opts) {
txn.commit();
}

KeySegmentPair LmdbStorage::do_read(VariantKey&& variant_key) {
KeySegmentPair LmdbStorage::do_read(VariantKey&& variant_key, ReadKeyOpts) {
ARCTICDB_SAMPLE(LmdbStorageReadReturn, 0)
std::optional<VariantKey> failed_read;
auto db_name = fmt::format(FMT_COMPILE("{}"), variant_key_type(variant_key));
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/lmdb/lmdb_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class LmdbStorage final : public Storage {

void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts opts) final;

KeySegmentPair do_read(VariantKey&& variant_key) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) final;

Expand Down
7 changes: 6 additions & 1 deletion cpp/arcticdb/storage/memory/memory_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ void MemoryStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) {
}

void MemoryStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts) {
auto key_seg = do_read(std::move(variant_key), ReadKeyOpts{});
visitor(key_seg.variant_key(), std::move(key_seg.segment()));
}

KeySegmentPair MemoryStorage::do_read(VariantKey &&variant_key, ReadKeyOpts) {
ARCTICDB_SAMPLE(MemoryStorageRead, 0)
auto& key_vec = data_[variant_key_type(variant_key)];
auto it = key_vec.find(variant_key);

if (it != key_vec.end()) {
ARCTICDB_DEBUG(log::storage(), "Read key {}: {}", variant_key_type(variant_key), variant_key_view(variant_key));
visitor(variant_key, it->second.clone());
return {std::move(variant_key), it->second.clone()};
} else {
throw KeyNotFoundException(variant_key);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/memory/memory_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace arcticdb::storage::memory {

void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final;

KeySegmentPair do_read(VariantKey&& variant_key) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) final;

Expand Down
45 changes: 19 additions & 26 deletions cpp/arcticdb/storage/mongo/mongo_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ void MongoStorage::do_write(KeySegmentPair &&key_seg) {
}

void MongoStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) {
namespace fg = folly::gen;

ARCTICDB_SAMPLE(MongoStorageWrite, 0)

auto collection = collection_name(key_seg.key_type());
Expand All @@ -112,45 +110,35 @@ void MongoStorage::do_update(KeySegmentPair &&key_seg, UpdateOpts opts) {
}
}

void MongoStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts) {
namespace fg = folly::gen;
ARCTICDB_SAMPLE(MongoStorageRead, 0)
std::vector<VariantKey> keys_not_found;

auto collection = collection_name(variant_key_type(variant_key));
try {
auto kv = client_->read_segment(db_, collection, variant_key);
// later we should add the key to failed_reads in this case
if (!kv.has_value()) {
keys_not_found.push_back(variant_key);
} else {
visitor(variant_key, std::move(kv->segment()));
}

} catch (const mongocxx::operation_exception &ex) {
std::string object_name = std::string(variant_key_view(variant_key));
raise_if_unexpected_error(ex, object_name);
}
void MongoStorage::do_read(VariantKey &&variant_key, const ReadVisitor &visitor, ReadKeyOpts opts) {
auto key_seg = do_read(std::move(variant_key), opts);
visitor(key_seg.variant_key(), std::move(key_seg.segment()));
}

KeySegmentPair MongoStorage::do_read(VariantKey&& variant_key) {
namespace fg = folly::gen;
KeySegmentPair MongoStorage::do_read(VariantKey&& variant_key, ReadKeyOpts opts) {
ARCTICDB_SAMPLE(MongoStorageRead, 0)
std::vector<VariantKey> keys_not_found;
boost::container::small_vector<VariantKey, 1> keys_not_found;

auto collection = collection_name(variant_key_type(variant_key));
try {
auto kv = client_->read_segment(db_, collection, variant_key);
// later we should add the key to failed_reads in this case
if (!kv.has_value()) {
keys_not_found.push_back(variant_key);
throw KeyNotFoundException(variant_key);
} else {
return {VariantKey{variant_key}, std::move(kv->segment())};
}

} catch (const mongocxx::operation_exception &ex) {
std::string object_name = std::string(variant_key_view(variant_key));
raise_if_unexpected_error(ex, object_name);
log::storage().log(
opts.dont_warn_about_missing_key ? spdlog::level::debug : spdlog::level::warn,
"Failed to find segment for key '{}' {}: {}",
variant_key_view(variant_key),
ex.code().value(),
ex.what());

throw KeyNotFoundException(keys_not_found);
}
}

Expand Down Expand Up @@ -193,6 +181,11 @@ void MongoStorage::do_remove(std::span<VariantKey> variant_keys, RemoveOpts opts
}
}

void MongoStorage::do_remove(VariantKey&& variant_key, RemoveOpts opts) {
std::array<VariantKey, 1> arr{std::move(variant_key)};
do_remove(std::span(arr), opts);
}

bool MongoStorage::do_iterate_type_until_match(KeyType key_type,
const IterateTypePredicate &visitor,
const std::string &prefix) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/mongo/mongo_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MongoStorage final : public Storage {

void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final;

KeySegmentPair do_read(VariantKey&& variant_key) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) final;

Expand Down
Loading

0 comments on commit fb2f01c

Please sign in to comment.