Skip to content

Commit

Permalink
Storage mover wip3 - source compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
vasil-pashov committed Nov 22, 2024
1 parent 41e69ee commit 373df5f
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 60 deletions.
Binary file removed cpp/.CMakeUserPresets.json.swp
Binary file not shown.
4 changes: 2 additions & 2 deletions cpp/arcticdb/async/task_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,13 +300,13 @@ inline auto& io_executor() {
}

template <typename Task>
inline auto submit_cpu_task(Task&& task) {
auto submit_cpu_task(Task&& task) {
return TaskScheduler::instance()->submit_cpu_task(std::forward<decltype(task)>(task));
}


template <typename Task>
inline auto submit_io_task(Task&& task) {
auto submit_io_task(Task&& task) {
return TaskScheduler::instance()->submit_io_task(std::forward<decltype(task)>(task));
}

Expand Down
61 changes: 52 additions & 9 deletions cpp/arcticdb/storage/storage_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,40 @@
#include <arcticdb/stream/index_aggregator.hpp>
#include <arcticdb/async/tasks.hpp>

namespace arcticdb::storage {
namespace arcticdb {

std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
rng::copy(keys, std::back_inserter(var_vector));

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}

AtomKey write_table_index_tree_from_source_to_target(
const std::shared_ptr<Store>& source_store,
Expand All @@ -19,7 +52,7 @@ AtomKey write_table_index_tree_from_source_to_target(
index::IndexWriter<stream::RowCountIndex> writer(target_store,
{index_key.id(), new_version_id.value_or(index_key.version_id())},
std::move(index_segment_reader.mutable_tsd()));
std::vector<folly::Future<bool>> futures;
std::vector<folly::Future<async::CopyCompressedInterStoreTask::ProcessingResult>> futures;
// Process
for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) {
auto& sk = *iter;
Expand All @@ -33,14 +66,23 @@ AtomKey write_table_index_tree_from_source_to_target(
.build(key.id(), key.type());

writer.add(*key_to_write, sk.slice()); // Both const ref
futures.emplace_back(async::submit_io_task(async::CopyCompressedInterStoreTask{sk.key(),
std::move(key_to_write),
false,
false,
source_store,
{target_store}}));
futures.emplace_back(submit_io_task(async::CopyCompressedInterStoreTask{
sk.key(),
std::move(key_to_write),
false,
false,
source_store,
{target_store}}));
}
const std::vector<async::CopyCompressedInterStoreTask::ProcessingResult> store_results = collect(futures).get();
for (const async::CopyCompressedInterStoreTask::ProcessingResult& res: store_results) {
util::variant_match(
res,
[&](const async::CopyCompressedInterStoreTask::FailedTargets& failed) {
log::storage().error("Failed to move targets: {} from {} to {}", failed, source_store->name(), target_store->name());
},
[](const auto&){});
}
collect(futures).get();
// FUTURE: clean up already written keys if exception
return to_atom(writer.commit().get());
}
Expand Down Expand Up @@ -93,6 +135,7 @@ AtomKey copy_index_key_recursively(
} else if (index_key.type() == KeyType::MULTI_KEY) {
return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id);
}
internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Cannot copy index recursively. Unsupported index key type {}", index_key.type());
}

}
38 changes: 2 additions & 36 deletions cpp/arcticdb/storage/storage_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
#include <arcticdb/entity/types.hpp>
#include <arcticdb/storage/store.hpp>

#include <ranges>

namespace rng = std::ranges;

namespace arcticdb {

inline auto stream_id_prefix_matcher(const std::string &prefix) {
Expand All @@ -22,38 +18,8 @@ inline auto stream_id_prefix_matcher(const std::string &prefix) {
std::get<std::string>(id).compare(0u, prefix.size(), prefix) == 0); };
}

inline std::vector<VariantKey> filter_keys_on_existence(
const std::vector<VariantKey>& keys,
const std::shared_ptr<Store>& store,
bool pred
){
auto key_existence = folly::collect(store->batch_key_exists(keys)).get();
std::vector<VariantKey> res;
for (size_t i = 0; i != keys.size(); i++) {
if (key_existence[i] == pred) {
res.push_back(keys[i]);
}
}
return res;
}

inline void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred) {
std::vector<VariantKey> var_vector;
var_vector.reserve(keys.size());
rng::copy(keys, std::back_inserter(var_vector));

auto key_existence = store->batch_key_exists(var_vector);

auto keys_itr = keys.begin();
for (size_t i = 0; i != var_vector.size(); i++) {
bool resolved = key_existence[i].wait().value();
if (resolved == pred) {
*keys_itr = std::move(std::get<AtomKey>(var_vector[i]));
++keys_itr;
}
}
keys.erase(keys_itr, keys.end());
}
std::vector<VariantKey> filter_keys_on_existence(const std::vector<VariantKey>& keys, const std::shared_ptr<Store>& store, bool pred);
void filter_keys_on_existence(std::vector<AtomKey>& keys, const std::shared_ptr<Store>& store, bool pred);

AtomKey copy_index_key_recursively(
const std::shared_ptr<Store>& source_store,
Expand Down
8 changes: 8 additions & 0 deletions cpp/arcticdb/stream/index_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class FlatIndexingPolicy {
segment_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
segment_.set_metadata(std::move(metadata));
}

private:
Callback callback_;
FixedSchema schema_;
Expand Down Expand Up @@ -89,6 +93,10 @@ class IndexAggregator {
indexing_policy_.set_timeseries_descriptor(timeseries_descriptor);
}

void set_metadata(google::protobuf::Any&& metadata) {
indexing_policy_.set_metadata(std::move(metadata));
}

private:
IndexingPolicy indexing_policy_;
};
Expand Down
24 changes: 12 additions & 12 deletions cpp/arcticdb/toolbox/storage_mover.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <arcticdb/stream/stream_source.hpp>
#include <arcticdb/entity/metrics.hpp>
#include <ranges>
#include <functional>

#include "version/version_functions.hpp"

Expand Down Expand Up @@ -255,7 +256,7 @@ inline MetricsConfig::Model get_model_from_proto_config(const proto::utils::Prom
case proto::utils::PrometheusConfig_PrometheusModel_NO_INIT: return MetricsConfig::Model::NO_INIT;
case proto::utils::PrometheusConfig_PrometheusModel_PUSH: return MetricsConfig::Model::PUSH;
case proto::utils::PrometheusConfig_PrometheusModel_WEB: return MetricsConfig::Model::PULL;
default: internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown Prometheus model {}", cfg.prometheus_model());
default: internal::raise<ErrorCode::E_ASSERTION_FAILURE>("Unknown Prometheus proto model {}", int{cfg.prometheus_model()});
}
}

Expand Down Expand Up @@ -589,36 +590,35 @@ class ARCTICDB_VISIBILITY_HIDDEN StorageMover {
}
);
}
std::sort(std::begin(index_keys), std::end(index_keys),
[&](const auto& k1, const auto& k2) {return k1.version_id() < k2.version_id();});
// Remove duplicate keys
index_keys.erase(std::unique(std::begin(index_keys), std::end(index_keys),
[&](const auto& k1, const auto& k2) {return k1.version_id()==k2.version_id();}), index_keys.end());

rng::sort(index_keys, [&](const auto& k1, const auto& k2) {return k1.version_id() < k2.version_id();});
auto to_erase = rng::unique(index_keys, rng::equal_to<VersionId>{}, [](const auto& k){ return k.version_id();});
index_keys.erase(to_erase.begin(), to_erase.end());
for(const auto& index_key: index_keys) {
VersionId v_id = index_key.version_id();
try {
std::optional<VersionId> new_version_id;
std::optional<AtomKey> previous_key;
if (append_versions) {
auto [maybe_prev, _] = get_latest_version(target_store_, target_map, sym);
if (maybe_prev){
new_version_id = std::make_optional(maybe_prev.value().version_id() + 1);
previous_key = std::move(maybe_prev);
}
} else {
if (auto target_index_key = get_specific_version(target_store_, target_map, sym, v_id)) {
throw storage::DuplicateKeyException(target_index_key.value());
}
}
const auto new_index_key = storage::copy_index_key_recursively(source_store_, target_store_,
index_key, new_version_id);
target_map->write_version(target_store_, new_index_key);
const auto new_index_key = copy_index_key_recursively(source_store_, target_store_, index_key, new_version_id);
target_map->write_version(target_store_, new_index_key, previous_key);
if(symbol_list)
symbol_list->add_symbol(target_store_, new_index_key.id());
symbol_list->add_symbol(target_store_, new_index_key.id(), new_version_id.value_or(0));

// Change the version in the result map
sym_data[py::int_(v_id)] = new_version_id ? new_version_id.value() : v_id;
// Give the new version id to the snapshots
if (version_to_snapshot_map.count(v_id)) {
if (version_to_snapshot_map.contains(v_id)) {
for(const auto& snap_name: version_to_snapshot_map[v_id]) {
sym_data[py::str(snap_name)] = sym_data[py::int_(v_id)];
}
Expand All @@ -629,7 +629,7 @@ class ARCTICDB_VISIBILITY_HIDDEN StorageMover {
auto error = fmt::format("Sym:{},Version:{},Ex:{}", sym, v_id, e.what());
sym_data[key] = error;
// Give the error to snapshots which also had the same version_id
if (version_to_snapshot_map.count(v_id)) {
if (version_to_snapshot_map.contains(v_id)) {
for(const auto& snap_name: version_to_snapshot_map[v_id]) {
sym_data[py::str(snap_name)] = error;
}
Expand Down
1 change: 0 additions & 1 deletion cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/version/version_map.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/util/timer.hpp>
#include <arcticdb/storage/storage.hpp>
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/util/ranges_from_future.hpp>
Expand Down

0 comments on commit 373df5f

Please sign in to comment.