Skip to content

Commit

Permalink
Storage mover wip 2
Browse files Browse the repository at this point in the history
  • Loading branch information
vasil-pashov committed Nov 21, 2024
1 parent c601987 commit 41e69ee
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 66 deletions.
Binary file added cpp/.CMakeUserPresets.json.swp
Binary file not shown.
2 changes: 2 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ set(arcticdb_srcs
storage/storage.hpp
storage/storage_override.hpp
storage/store.hpp
storage/storage_utils.hpp
stream/aggregator.hpp
stream/aggregator-inl.hpp
stream/append_map.hpp
Expand Down Expand Up @@ -482,6 +483,7 @@ set(arcticdb_srcs
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/storage_factory.cpp
storage/storage_utils.cpp
stream/aggregator.cpp
stream/append_map.cpp
stream/index.cpp
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ class AsyncStore : public Store {
public:
AsyncStore(
std::shared_ptr<storage::Library> library,
const arcticdb::proto::encoding::VariantCodec &codec,
const proto::encoding::VariantCodec &codec,
EncodingVersion encoding_version
) :
library_(std::move(library)),
codec_(std::make_shared<arcticdb::proto::encoding::VariantCodec>(codec)),
codec_(std::make_shared<proto::encoding::VariantCodec>(codec)),
encoding_version_(encoding_version) {
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/arcticdb/entity/key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ KeyClass key_class_from_key_type(KeyType key_type) {
return get_key_data(key_type).key_class_;
}

const char* key_type_description(KeyType key_type) {
return get_key_data(key_type).description_;
}

bool is_string_key_type(KeyType key_type){
return variant_type_from_key_type(key_type) == VariantType::STRING_TYPE;
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/arcticdb/entity/key.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ consteval auto key_types_write_precedence() {

consteval auto key_types_read_precedence() {
auto output = key_types_write_precedence();
std::reverse(std::begin(output), std::end(output));
rng::reverse(output);
return output;
}

Expand Down Expand Up @@ -243,7 +243,7 @@ enum class VariantType : char {

VariantType variant_type_from_key_type(KeyType key_type);

inline bool is_index_key_type(KeyType key_type) {
constexpr bool is_index_key_type(KeyType key_type) {
// TODO: Change name probably.
return (key_type == KeyType::TABLE_INDEX) || (key_type == KeyType::MULTI_KEY);
}
Expand All @@ -252,14 +252,15 @@ bool is_string_key_type(KeyType k);

bool is_ref_key_class(KeyType k);

inline constexpr KeyType get_key_type_for_data_stream(const StreamId &) {
constexpr KeyType get_key_type_for_data_stream(const StreamId &) {
return KeyType::TABLE_DATA;
}

inline constexpr KeyType get_key_type_for_index_stream(const StreamId &) {
constexpr KeyType get_key_type_for_index_stream(const StreamId &) {
return KeyType::TABLE_INDEX;
}

const char* get_key_description(KeyType type);

template <typename Function>
constexpr auto foreach_key_type_read_precedence(Function&& func) {
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/entity/metrics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace arcticdb {

const std::string MONGO_INSTANCE_LABEL = "mongo_instance";
const std::string PROMETHEUS_ENV_LABEL = "env";
const int SUMMARY_MAX_AGE = 30;
const int SUMMARY_AGE_BUCKETS = 5;
constexpr int SUMMARY_MAX_AGE = 30;
constexpr int SUMMARY_AGE_BUCKETS = 5;

class MetricsConfig {
public:
Expand Down
98 changes: 98 additions & 0 deletions cpp/arcticdb/storage/storage_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/pipeline/index_writer.hpp>
#include <arcticdb/stream/index_aggregator.hpp>
#include <arcticdb/async/tasks.hpp>

namespace arcticdb::storage {

AtomKey write_table_index_tree_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id
) {
ARCTICDB_SAMPLE(WriteIndexSourceToTarget, 0)
// In
auto [_, index_seg] = source_store->read_sync(index_key);
index::IndexSegmentReader index_segment_reader{std::move(index_seg)};
// Out
index::IndexWriter<stream::RowCountIndex> writer(target_store,
{index_key.id(), new_version_id.value_or(index_key.version_id())},
std::move(index_segment_reader.mutable_tsd()));
std::vector<folly::Future<bool>> futures;
// Process
for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) {
auto& sk = *iter;
auto& key = sk.key();
std::optional<entity::AtomKey> key_to_write = atom_key_builder()
.version_id(new_version_id.value_or(key.version_id()))
.creation_ts(util::SysClock::nanos_since_epoch())
.start_index(key.start_index())
.end_index(key.end_index())
.content_hash(key.content_hash())
.build(key.id(), key.type());

writer.add(*key_to_write, sk.slice()); // Both const ref
futures.emplace_back(async::submit_io_task(async::CopyCompressedInterStoreTask{sk.key(),
std::move(key_to_write),
false,
false,
source_store,
{target_store}}));
}
collect(futures).get();
// FUTURE: clean up already written keys if exception
return to_atom(writer.commit().get());
}

AtomKey copy_multi_key_from_source_to_target(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
using namespace arcticdb::stream;
auto fut_index = source_store->read(index_key);
auto [_, index_seg] = std::move(fut_index).get();
std::vector<AtomKey> keys;
for (size_t idx = 0; idx < index_seg.row_count(); idx++) {
keys.push_back(stream::read_key_row(index_seg, static_cast<ssize_t>(idx)));
}
// Recurse on the index keys inside MULTI_KEY
std::vector<VariantKey> new_data_keys;
for (const auto &k: keys) {
auto new_key = copy_index_key_recursively(source_store, target_store, k, new_version_id);
new_data_keys.emplace_back(std::move(new_key));
}
// Write new MULTI_KEY
google::protobuf::Any metadata = *index_seg.metadata();
folly::Future<VariantKey> multi_key_fut = folly::Future<VariantKey>::makeEmpty();
IndexAggregator<RowCountIndex> multi_index_agg(index_key.id(), [&new_version_id, &index_key, &multi_key_fut, &target_store](auto &&segment) {
multi_key_fut = target_store->write(KeyType::MULTI_KEY,
new_version_id.value_or(index_key.version_id()), // version_id
index_key.id(),
0, // start_index
0, // end_index
std::forward<SegmentInMemory>(segment)).wait();
});
for (auto &key: new_data_keys) {
multi_index_agg.add_key(to_atom(key));
}
multi_index_agg.set_metadata(std::move(metadata));
multi_index_agg.commit();
return to_atom(multi_key_fut.value());
}

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id) {
ARCTICDB_SAMPLE(RecurseIndexKey, 0)
if (index_key.type() == KeyType::TABLE_INDEX) {
return write_table_index_tree_from_source_to_target(source_store, target_store, index_key, new_version_id);
} else if (index_key.type() == KeyType::MULTI_KEY) {
return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id);
}
}

}
13 changes: 11 additions & 2 deletions cpp/arcticdb/storage/storage_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
#include <arcticdb/entity/types.hpp>
#include <arcticdb/storage/store.hpp>

#include <ranges>

namespace rng = std::ranges;

namespace arcticdb {

inline auto stream_id_prefix_matcher(const std::string &prefix) {
Expand All @@ -36,8 +40,7 @@ inline std::vector<VariantKey> filter_keys_on_existence(
inline void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(var_vector),
[](auto&& k) { return VariantKey(std::move(k)); });
rng::copy(keys, std::back_inserter(var_vector));

auto key_existence = store->batch_key_exists(var_vector);

Expand All @@ -52,4 +55,10 @@ inline void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shar
keys.erase(keys_itr, keys.end());
}

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
const std::shared_ptr<Store>& target_store,
const AtomKey& index_key,
std::optional<VersionId> new_version_id);

} //namespace arcticdb
3 changes: 1 addition & 2 deletions cpp/arcticdb/toolbox/library_tool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <arcticdb/entity/key.hpp>
#include <arcticdb/entity/types.hpp>
#include <arcticdb/entity/variant_key.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/library_manager.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/entity/read_result.hpp>
Expand Down Expand Up @@ -53,7 +52,7 @@ class LibraryTool {

void remove(VariantKey key);

std::vector<VariantKey> find_keys(arcticdb::entity::KeyType);
std::vector<VariantKey> find_keys(entity::KeyType);

bool key_exists(const VariantKey& key);

Expand Down
7 changes: 0 additions & 7 deletions cpp/arcticdb/toolbox/python_bindings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@
#pragma once

#include <pybind11/pybind11.h>
#include <pybind11/stl_bind.h>

#include <arcticdb/entity/key.hpp>
#include <arcticdb/storage/open_mode.hpp>

#include <string>
#include <memory>

namespace arcticdb::toolbox::apy {

Expand Down
Loading

0 comments on commit 41e69ee

Please sign in to comment.