Skip to content

Commit

Permalink
Refactor storages to async
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Nov 20, 2024
1 parent 74cbf40 commit b0081b6
Show file tree
Hide file tree
Showing 30 changed files with 423 additions and 303 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ set(arcticdb_srcs
version/symbol_list.cpp
version/version_map_batch_methods.cpp
storage/s3/ec2_utils.cpp
util/buffer_holder.cpp)
util/buffer_holder.cpp storage/async_storage.hpp)

add_library(arcticdb_core_object OBJECT ${arcticdb_srcs})

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ folly::Future<std::pair<entity::VariantKey, SegmentInMemory>> read(

std::pair<entity::VariantKey, SegmentInMemory> read_sync(const entity::VariantKey &key,
storage::ReadKeyOpts opts) override {
return DecodeSegmentTask{}(read_dispatch(key, library_, opts));
return DecodeSegmentTask{}(read_sync_dispatch(key, library_, opts));
}

folly::Future<storage::KeySegmentPair> read_compressed(
Expand All @@ -232,7 +232,7 @@ storage::KeySegmentPair read_compressed_sync(
const entity::VariantKey& key,
storage::ReadKeyOpts opts
) override {
return read_dispatch( key, library_, opts );
return read_sync_dispatch( key, library_, opts );
}

folly::Future<std::pair<std::optional<VariantKey>, std::optional<google::protobuf::Any>>> read_metadata(const entity::VariantKey &key, storage::ReadKeyOpts opts) override {
Expand Down
10 changes: 8 additions & 2 deletions cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ struct KeySegmentContinuation {
Callable continuation_;
};

inline storage::KeySegmentPair read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
inline folly::Future<storage::KeySegmentPair> read_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read(key, opts);
});
}

inline storage::KeySegmentPair read_sync_dispatch(const entity::VariantKey& variant_key, const std::shared_ptr<storage::Library>& lib, const storage::ReadKeyOpts& opts) {
return util::variant_match(variant_key, [&lib, &opts](const auto &key) {
return lib->read_sync(key, opts);
});
}

template <typename Callable>
struct ReadCompressedTask : BaseTask {
entity::VariantKey key_;
Expand Down Expand Up @@ -571,7 +577,7 @@ struct WriteCompressedTask : BaseTask {
storage::KeySegmentPair kv_;
std::shared_ptr<storage::Library> lib_;

WriteCompressedTask(storage::KeySegmentPair &&kv, std::shared_ptr<storage::Library> lib) :
WriteCompressedTask(storage::KeySegmentPair&& key_seg, std::shared_ptr<storage::Library> lib) :
kv_(std::move(kv)), lib_(std::move(lib)) {
ARCTICDB_DEBUG(log::storage(), "Creating write compressed task");
}
Expand Down
28 changes: 28 additions & 0 deletions cpp/arcticdb/storage/async_storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <arcticdb/entity/variant_key.hpp>
#include <arcticdb/storage/common.hpp>
#include <arcticdb/storage/storage_options.hpp>
#include <arcticdb/storage/key_segment_pair.hpp>

#include <folly/futures/Future.h>


namespace arcticdb::storage {
class AsyncStorage {

public:
folly::Future<folly::Unit> async_read(entity::VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) {
return do_async_read(std::move(variant_key), visitor, opts);
}

folly::Future<KeySegmentPair> async_read(entity::VariantKey&& variant_key, ReadKeyOpts opts) {
return do_async_read(std::move(variant_key), opts);
}

private:
virtual folly::Future<folly::Unit> do_async_read(entity::VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) = 0;

virtual folly::Future<KeySegmentPair> do_async_read(entity::VariantKey&& variant_key, ReadKeyOpts opts) = 0;
};
} // namespace arcticdb
24 changes: 14 additions & 10 deletions cpp/arcticdb/storage/azure/azure_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@
#include <arcticdb/util/exponential_backoff.hpp>
#include <arcticdb/util/configs_map.hpp>
#include <arcticdb/util/composite.hpp>
#include <arcticdb/storage/azure/azure_client_interface.hpp>
#include <arcticdb/storage/azure/azure_client_impl.hpp>
#include <arcticdb/storage/mock/azure_mock_client.hpp>
#include <arcticdb/storage/azure/azure_client_interface.hpp>
#include <arcticdb/storage/azure/azure_client_impl.hpp>
#include <arcticdb/storage/storage_exceptions.hpp>

#include <azure/core.hpp>
#include <azure/storage/blobs.hpp>

#include <boost/interprocess/streams/bufferstream.hpp>

#include <arcticdb/storage/azure/azure_client_interface.hpp>
#include <arcticdb/storage/azure/azure_client_impl.hpp>
#include "arcticdb/storage/mock/azure_mock_client.hpp"


#undef GetMessage

Expand Down Expand Up @@ -106,7 +110,7 @@ void raise_if_unexpected_error(const Azure::Core::RequestFailedException& e, con

template<class KeyBucketizer>
void do_write_impl(
Composite<KeySegmentPair>&& kvs,
KeySegmentPair&& key_seg,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
Expand Down Expand Up @@ -140,7 +144,7 @@ void do_write_impl(

template<class KeyBucketizer>
void do_update_impl(
Composite<KeySegmentPair>&& kvs,
KeySegmentPair&& key_seg,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
Expand Down Expand Up @@ -194,7 +198,7 @@ void do_read_impl(Composite<VariantKey> && ks,
}

template<class KeyBucketizer>
void do_remove_impl(Composite<VariantKey>&& ks,
void do_remove_impl(VariantKey&& variant_key,
const std::string& root_folder,
AzureClientWrapper& azure_client,
KeyBucketizer&& bucketizer,
Expand Down Expand Up @@ -306,19 +310,19 @@ std::string AzureStorage::name() const {
return fmt::format("azure_storage-{}/{}", container_name_, root_folder_);
}

void AzureStorage::do_write(Composite<KeySegmentPair>&& kvs) {
void AzureStorage::do_write(KeySegmentPair&& key_seg) {
detail::do_write_impl(std::move(kvs), root_folder_, *azure_client_, FlatBucketizer{}, upload_option_, request_timeout_);
}

void AzureStorage::do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts) {
void AzureStorage::do_update(KeySegmentPair&& key_seg, UpdateOpts) {
detail::do_update_impl(std::move(kvs), root_folder_, *azure_client_, FlatBucketizer{}, upload_option_, request_timeout_);
}

void AzureStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) {
void AzureStorage::do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) {
detail::do_read_impl(std::move(ks), visitor, root_folder_, *azure_client_, FlatBucketizer{}, opts, download_option_, request_timeout_);
}

void AzureStorage::do_remove(Composite<VariantKey>&& ks, RemoveOpts) {
void AzureStorage::do_remove(VariantKey&& variant_key, RemoveOpts) {
detail::do_remove_impl(std::move(ks), root_folder_, *azure_client_, FlatBucketizer{}, request_timeout_);
}

Expand Down
16 changes: 12 additions & 4 deletions cpp/arcticdb/storage/azure/azure_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ class AzureStorage final : public Storage {
std::string name() const final;

protected:
void do_write(Composite<KeySegmentPair>&& kvs) final;
void do_write(KeySegmentPair&& key_seg) final;

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) final;
void do_update(KeySegmentPair&& key_seg, UpdateOpts opts) final;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) final;
void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) final;

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) final;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) final;

void do_remove(std::span<VariantKey> variant_keys, RemoveOpts opts) final;

bool do_iterate_type_until_match(KeyType key_type, const IterateTypePredicate& visitor, const std::string &prefix) final;

Expand All @@ -51,6 +55,10 @@ class AzureStorage final : public Storage {
return true;
}

bool do_has_async_methods() const final {
return false;
}

bool do_fast_delete() final {
return false;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/storage/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace arcticdb::storage {

using ReadVisitor = std::function<void(const entity::VariantKey&, Segment &&)>;

struct EnvironmentNameTag{};
using EnvironmentName = util::StringWrappingValue<EnvironmentNameTag>;
Expand Down
8 changes: 4 additions & 4 deletions cpp/arcticdb/storage/file/mapped_file_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ uint64_t MappedFileStorage::write_segment(Segment&& seg) {
return offset;
}

void MappedFileStorage::do_write(Composite<KeySegmentPair>&& kvs) {
void MappedFileStorage::do_write(KeySegmentPair&& key_seg) {
ARCTICDB_SAMPLE(MappedFileStorageWriteValues, 0)
auto key_values = std::move(kvs);
key_values.broadcast([this] (auto key_seg) {
Expand All @@ -95,11 +95,11 @@ void MappedFileStorage::do_write(Composite<KeySegmentPair>&& kvs) {
});
}

void MappedFileStorage::do_update(Composite<KeySegmentPair>&&, UpdateOpts) {
void MappedFileStorage::do_update(VariantKey&&, UpdateOpts) {
util::raise_rte("Update not implemented for file storages");
}

void MappedFileStorage::do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts) {
void MappedFileStorage::do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts) {
ARCTICDB_SAMPLE(MappedFileStorageRead, 0)
auto keys = std::move(ks);
keys.broadcast([&visitor, this] (const auto& key) {
Expand All @@ -116,7 +116,7 @@ bool MappedFileStorage::do_key_exists(const VariantKey& key) {
return multi_segment_header_.get_offset_for_key(to_atom(key)) != std::nullopt;
}

void MappedFileStorage::do_remove(Composite<VariantKey>&&, RemoveOpts) {
void MappedFileStorage::do_remove(VariantKey&&, RemoveOpts) {
util::raise_rte("Remove not implemented for file storages");
}

Expand Down
12 changes: 8 additions & 4 deletions cpp/arcticdb/storage/file/mapped_file_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,17 @@ class MappedFileStorage final : public SingleFileStorage {
private:
void do_write_raw(const uint8_t* data, size_t bytes) override;

void do_write(Composite<KeySegmentPair>&& kvs) override;
void do_write(KeySegmentPair&& key_seg) override;

void do_update(Composite<KeySegmentPair>&& kvs, UpdateOpts opts) override;
void do_update(KeySegmentPair&& key_seg, UpdateOpts opts) override;

void do_read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;
void do_read(VariantKey&& variant_key, const ReadVisitor& visitor, storage::ReadKeyOpts opts) override;

void do_remove(Composite<VariantKey>&& ks, RemoveOpts opts) override;
KeySegmentPair do_read(VariantKey&& variant_key, ReadKeyOpts opts) final;

void do_remove(VariantKey&& variant_key, RemoveOpts opts) override;

void do_remove(std::span<VariantKey> variant_keys, RemoveOpts opts) final;

bool do_supports_prefix_matching() const override {
return false;
Expand Down
42 changes: 22 additions & 20 deletions cpp/arcticdb/storage/library.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,49 @@ class Library {
return storages_->scan_for_matching_key(key_type, predicate);
}

void write(Composite<KeySegmentPair>&& kvs) {
void write(KeySegmentPair&& key_seg) {
ARCTICDB_SAMPLE(LibraryWrite, 0)
if (open_mode() < OpenMode::WRITE) {
throw LibraryPermissionException(library_path_, open_mode(), "write");
}

storages_->write(std::move(kvs));
storages_->write(std::move(key_seg));
}

void update(Composite<KeySegmentPair>&& kvs, storage::UpdateOpts opts) {
void update(KeySegmentPair&& key_seg, storage::UpdateOpts opts) {
ARCTICDB_SAMPLE(LibraryUpdate, 0)
if (open_mode() < OpenMode::WRITE)
throw LibraryPermissionException(library_path_, open_mode(), "update");

storages_->update(std::move(kvs), opts);
storages_->update(std::move(key_seg), opts);
}

void read(Composite<VariantKey>&& ks, const ReadVisitor& visitor, ReadKeyOpts opts) {
folly::Future<folly::Unit> read(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) {
ARCTICDB_SAMPLE(LibraryRead, 0)
storages_->read(std::move(ks), visitor, opts, !storage_fallthrough_);
return storages_->read(std::move(variant_key), visitor, opts, !storage_fallthrough_);
}

void remove(Composite<VariantKey>&& ks, storage::RemoveOpts opts) {
folly::Future<KeySegmentPair> read(VariantKey variant_key, ReadKeyOpts opts = ReadKeyOpts{}) {
return storages_->read(std::move(variant_key), opts);
}

void read_sync(VariantKey&& variant_key, const ReadVisitor& visitor, ReadKeyOpts opts) {
ARCTICDB_SAMPLE(LibraryRead, 0)
storages_->read_sync(std::move(variant_key), visitor, opts, !storage_fallthrough_);
}

KeySegmentPair read_sync(VariantKey&& key, ReadKeyOpts opts = ReadKeyOpts{}) {
util::check(!std::holds_alternative<StringId>(variant_key_id(key)) || !std::get<StringId>(variant_key_id(key)).empty(), "Unexpected empty id");
return storages_->read_sync(std::move(key), opts, !storage_fallthrough_);
}

void remove(std::span<VariantKey> variant_keys, storage::RemoveOpts opts) {
if (open_mode() < arcticdb::storage::OpenMode::DELETE) {
throw LibraryPermissionException(library_path_, open_mode(), "delete");
}

ARCTICDB_SAMPLE(LibraryRemove, 0)
storages_->remove(std::move(ks), opts);
storages_->remove(std::move(variant_keys), opts);
}

[[nodiscard]] std::optional<std::shared_ptr<SingleFileStorage>> get_single_file_storage() const {
Expand All @@ -132,18 +146,6 @@ class Library {
return storages_->is_path_valid(path);
}

KeySegmentPair read(VariantKey key, ReadKeyOpts opts = ReadKeyOpts{}) {
KeySegmentPair res{VariantKey{key}};
util::check(!std::holds_alternative<StringId>(variant_key_id(key)) || !std::get<StringId>(variant_key_id(key)).empty(), "Unexpected empty id");
const ReadVisitor& visitor = [&res](const VariantKey&, Segment&& value) {
res.segment() = std::move(value);
};

read(Composite<VariantKey>(std::move(key)), visitor, opts);

return res;
}

/** Calls VariantStorage::do_key_path on the primary storage */
[[nodiscard]] std::string key_path(const VariantKey& key) const {
return storages_->key_path(key);
Expand Down
Loading

0 comments on commit b0081b6

Please sign in to comment.