From 373df5f3f92925f6e7f30fd931fb743d36aa33ec Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Fri, 22 Nov 2024 14:13:37 +0200 Subject: [PATCH] Storage mover wip3 - source compiles --- cpp/.CMakeUserPresets.json.swp | Bin 16384 -> 0 bytes cpp/arcticdb/async/task_scheduler.hpp | 4 +- cpp/arcticdb/storage/storage_utils.cpp | 61 ++++++++++++++++++--- cpp/arcticdb/storage/storage_utils.hpp | 38 +------------ cpp/arcticdb/stream/index_aggregator.hpp | 8 +++ cpp/arcticdb/toolbox/storage_mover.hpp | 24 ++++---- cpp/arcticdb/version/version_store_api.cpp | 1 - 7 files changed, 76 insertions(+), 60 deletions(-) delete mode 100644 cpp/.CMakeUserPresets.json.swp diff --git a/cpp/.CMakeUserPresets.json.swp b/cpp/.CMakeUserPresets.json.swp deleted file mode 100644 index 4d71ac8f9ce6b771ed13eeac0a6978d6f5c6213e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16384 zcmeHOUx*|}8SjgU<}T(;)F%m{ZiJ;LJ3G6{`f%ynTiuTF_p7g}zWRE0d+us&jXhtU({MeZX?H&L@%D4k!>8WiSM6{KdTev@ooN*GmtZoGmtZoGmtZoGmtZoGmtZo zGmtZoGmtaz05V`THEj+3`nEjq|IZ-BRe?*d;20^k}j51awsIjd>6 zfgNBOV8A<%YuZnNuL4g4PXhmVzovZ!_#*HH;Pb#IfpfsSk7?Skfj5BH0RcF`H1Nwa zn)YL$3p@q<=`{KRybde_I&cQK^Qfl144elZ1^)0p^b2?e7y<_HyZ1sb@EGu&M>Opc z@W+QW?YF=WfX@Oi0$V@>m;(NW#K(_-*8m4t0iFPUkA%thfmZ+>C;)#2hd%)%mtO*g zcxC|?fO%jBkg^{J&I0d3<`=;CfHzNREWWHL^qI$Z%`P9Xi>y$hl9nGZ-S#~ZG7^~T z8VmxW$c9ud!lq&Da69yag{xNJSYEiw2aBYrX@!>5CN7ZZ>@3^hb`%KCe2;}4&K$4F z_edSS3a820+E%?$GgqtEsm4D2Ww@l4az7FsbKA1Jx(EYq^$Y`lo3aHn@OO36W(wXe zF%eq$X@_ZBB3ux{d6}ExqV6>Jip8Sp3z?+~7@ou4xeN4VL3K*1DMP7g`;iywMJk-0 zKFsQ+(C?ASsinl!4YQQ$x``Hus-C~Y^;Q&`)Z1dY&82p-lWL8MWCi-2iYdS?pIMZ) z#x|m*xK1P-ugyHG$3@?=IkV_DEw*7OhLmDSAT{pyL&x_lci}1z1isapYM^%1t?4uq z17#I8>alrKu`Os?IYuf$szt{Nc+>2M!NR8O@8T2-tpL`k?w_>IxXC3gZW}%656oWV zh7JvOEAV?}EE)QyYDd>Jd6FWwk_xdg-rdqZvG4SNvjJDy6 zrXyWfboKaY1x{Y^dwsZ*;LVx`ucevPfXq;I%VELzOZQip7I9 zxYtz69f_xRhsk2)4&ud;q{r_MleaNj<)UVWLXSenHH^)HsJOn}H4M-4d|_gR1pmD6 z+Z`}+uy^U=r_0?hy)ExpZp2MLppK-^p(Rz5U01=26Albxd`}p82Uc{sBGD2?l=DDO z70K0qN>VQu{&=+0)F^YM%#vbhO*HXD1Yc^%7qL|-!^YM9&KS#=-d%f z7#1lrJuOuw+W@?0J@9RKB=JVZ#+!Z1cU@Te0xf_G+rIBEF0$C`(|$8t>0CLqd%Y;M zwq5Md0v(uK(}t=_>YGOmQ7V@isC_jfN84iG>2ufdxG)T=VHhzZ6pgTnP?4L*hYabC9S0953OF(y$VNM3!4gHnsZYj{2d0Hw znkT|t#<3&Dfwh!~dvi}V{U^P0S}b**cne?`FS1oWXkb4ujKC-H<}IYkBSI=|cn$ij9N{wqZ?BG74cgIMSwq z7e@U$GVaXAmNKYKg2$s`%OKXRLQfUI#>e@TnIrnOLNxq0dNs)*x-yE5xwqp4z9-N5 z7?EMY?%{t*Wj+sCaK6_^z8D$ujxUfCABd3m>$R>>580nDwQt%au#& zU?6M?@)PRQ3Zrwuq2eWNKX#9aJo#99JqlIoOC}JBLsh0uH&(knN~RxPD$VWj(2}oz z@TLauhv=n8@+eW7*v#0ZdgL@^l1xuh`1qbwqW3nG4Vl7IBm0^uIXLc*9kSU8F6tvR z?^;1~Ax_lPyfxnQgTbPXH$ieli^cM;CEp+@qfelgMCM9#&>tl|wUbtqa&hIc9Oyk& zA003K`1rB-+d(Wj6HHB)u?7ho~t z%T2h=Smr{dv2RRejVQ6H5hM7X=^5_Fdv;S-J~ovMK2z&YHt9M3zB8i@b{;8-*??7{ z+Uti%=PO@o`+nO+cBLQqp}!rqkU#bYESWEq@|8(SdMBAYP3j_vL|HsJ6DHG1qd|~7 nj9$iOUrD6%QwojEG=_^vh@Yby$K*INIg`vF<8?ymU=Hlx;Wohr diff --git a/cpp/arcticdb/async/task_scheduler.hpp b/cpp/arcticdb/async/task_scheduler.hpp index 08e4ba419c9..4efd408cdde 100644 --- a/cpp/arcticdb/async/task_scheduler.hpp +++ b/cpp/arcticdb/async/task_scheduler.hpp @@ -300,13 +300,13 @@ inline auto& io_executor() { } template -inline auto submit_cpu_task(Task&& task) { +auto submit_cpu_task(Task&& task) { return TaskScheduler::instance()->submit_cpu_task(std::forward(task)); } template -inline auto submit_io_task(Task&& task) { +auto submit_io_task(Task&& task) { return TaskScheduler::instance()->submit_io_task(std::forward(task)); } diff --git a/cpp/arcticdb/storage/storage_utils.cpp b/cpp/arcticdb/storage/storage_utils.cpp index b435db6090d..cef07caf257 100644 --- a/cpp/arcticdb/storage/storage_utils.cpp +++ b/cpp/arcticdb/storage/storage_utils.cpp @@ -3,7 +3,40 @@ #include #include -namespace arcticdb::storage { +namespace arcticdb { + +std::vector filter_keys_on_existence( + const std::vector& keys, + const std::shared_ptr& store, + bool pred + ){ + auto key_existence = folly::collect(store->batch_key_exists(keys)).get(); + std::vector res; + for (size_t i = 0; i != keys.size(); i++) { + if (key_existence[i] == pred) { + res.push_back(keys[i]); + } + } + return res; +} + +void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred) { + std::vector var_vector; + var_vector.reserve(keys.size()); + rng::copy(keys, std::back_inserter(var_vector)); + + auto key_existence = store->batch_key_exists(var_vector); + + auto keys_itr = keys.begin(); + for (size_t i = 0; i != var_vector.size(); i++) { + bool resolved = key_existence[i].wait().value(); + if (resolved == pred) { + *keys_itr = std::move(std::get(var_vector[i])); + ++keys_itr; + } + } + keys.erase(keys_itr, keys.end()); +} AtomKey write_table_index_tree_from_source_to_target( const std::shared_ptr& source_store, @@ -19,7 +52,7 @@ AtomKey write_table_index_tree_from_source_to_target( index::IndexWriter writer(target_store, {index_key.id(), new_version_id.value_or(index_key.version_id())}, std::move(index_segment_reader.mutable_tsd())); - std::vector> futures; + std::vector> futures; // Process for (auto iter = index_segment_reader.begin(); iter != index_segment_reader.end(); ++iter) { auto& sk = *iter; @@ -33,14 +66,23 @@ AtomKey write_table_index_tree_from_source_to_target( .build(key.id(), key.type()); writer.add(*key_to_write, sk.slice()); // Both const ref - futures.emplace_back(async::submit_io_task(async::CopyCompressedInterStoreTask{sk.key(), - std::move(key_to_write), - false, - false, - source_store, - {target_store}})); + futures.emplace_back(submit_io_task(async::CopyCompressedInterStoreTask{ + sk.key(), + std::move(key_to_write), + false, + false, + source_store, + {target_store}})); + } + const std::vector store_results = collect(futures).get(); + for (const async::CopyCompressedInterStoreTask::ProcessingResult& res: store_results) { + util::variant_match( + res, + [&](const async::CopyCompressedInterStoreTask::FailedTargets& failed) { + log::storage().error("Failed to move targets: {} from {} to {}", failed, source_store->name(), target_store->name()); + }, + [](const auto&){}); } - collect(futures).get(); // FUTURE: clean up already written keys if exception return to_atom(writer.commit().get()); } @@ -93,6 +135,7 @@ AtomKey copy_index_key_recursively( } else if (index_key.type() == KeyType::MULTI_KEY) { return copy_multi_key_from_source_to_target(source_store, target_store, index_key, new_version_id); } + internal::raise("Cannot copy index recursively. Unsupported index key type {}", index_key.type()); } } \ No newline at end of file diff --git a/cpp/arcticdb/storage/storage_utils.hpp b/cpp/arcticdb/storage/storage_utils.hpp index 4649491b599..d825d370d79 100644 --- a/cpp/arcticdb/storage/storage_utils.hpp +++ b/cpp/arcticdb/storage/storage_utils.hpp @@ -10,10 +10,6 @@ #include #include -#include - -namespace rng = std::ranges; - namespace arcticdb { inline auto stream_id_prefix_matcher(const std::string &prefix) { @@ -22,38 +18,8 @@ inline auto stream_id_prefix_matcher(const std::string &prefix) { std::get(id).compare(0u, prefix.size(), prefix) == 0); }; } -inline std::vector filter_keys_on_existence( - const std::vector& keys, - const std::shared_ptr& store, - bool pred - ){ - auto key_existence = folly::collect(store->batch_key_exists(keys)).get(); - std::vector res; - for (size_t i = 0; i != keys.size(); i++) { - if (key_existence[i] == pred) { - res.push_back(keys[i]); - } - } - return res; -} - -inline void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred) { - std::vector var_vector; - var_vector.reserve(keys.size()); - rng::copy(keys, std::back_inserter(var_vector)); - - auto key_existence = store->batch_key_exists(var_vector); - - auto keys_itr = keys.begin(); - for (size_t i = 0; i != var_vector.size(); i++) { - bool resolved = key_existence[i].wait().value(); - if (resolved == pred) { - *keys_itr = std::move(std::get(var_vector[i])); - ++keys_itr; - } - } - keys.erase(keys_itr, keys.end()); -} +std::vector filter_keys_on_existence(const std::vector& keys, const std::shared_ptr& store, bool pred); +void filter_keys_on_existence(std::vector& keys, const std::shared_ptr& store, bool pred); AtomKey copy_index_key_recursively( const std::shared_ptr& source_store, diff --git a/cpp/arcticdb/stream/index_aggregator.hpp b/cpp/arcticdb/stream/index_aggregator.hpp index 1af2294fcd0..22cf2c9123b 100644 --- a/cpp/arcticdb/stream/index_aggregator.hpp +++ b/cpp/arcticdb/stream/index_aggregator.hpp @@ -60,6 +60,10 @@ class FlatIndexingPolicy { segment_.set_timeseries_descriptor(timeseries_descriptor); } + void set_metadata(google::protobuf::Any&& metadata) { + segment_.set_metadata(std::move(metadata)); + } + private: Callback callback_; FixedSchema schema_; @@ -89,6 +93,10 @@ class IndexAggregator { indexing_policy_.set_timeseries_descriptor(timeseries_descriptor); } + void set_metadata(google::protobuf::Any&& metadata) { + indexing_policy_.set_metadata(std::move(metadata)); + } + private: IndexingPolicy indexing_policy_; }; diff --git a/cpp/arcticdb/toolbox/storage_mover.hpp b/cpp/arcticdb/toolbox/storage_mover.hpp index 72b51d280b5..e1d27af93b0 100644 --- a/cpp/arcticdb/toolbox/storage_mover.hpp +++ b/cpp/arcticdb/toolbox/storage_mover.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include "version/version_functions.hpp" @@ -255,7 +256,7 @@ inline MetricsConfig::Model get_model_from_proto_config(const proto::utils::Prom case proto::utils::PrometheusConfig_PrometheusModel_NO_INIT: return MetricsConfig::Model::NO_INIT; case proto::utils::PrometheusConfig_PrometheusModel_PUSH: return MetricsConfig::Model::PUSH; case proto::utils::PrometheusConfig_PrometheusModel_WEB: return MetricsConfig::Model::PULL; - default: internal::raise("Unknown Prometheus model {}", cfg.prometheus_model()); + default: internal::raise("Unknown Prometheus proto model {}", int{cfg.prometheus_model()}); } } @@ -589,36 +590,35 @@ class ARCTICDB_VISIBILITY_HIDDEN StorageMover { } ); } - std::sort(std::begin(index_keys), std::end(index_keys), - [&](const auto& k1, const auto& k2) {return k1.version_id() < k2.version_id();}); // Remove duplicate keys - index_keys.erase(std::unique(std::begin(index_keys), std::end(index_keys), - [&](const auto& k1, const auto& k2) {return k1.version_id()==k2.version_id();}), index_keys.end()); - + rng::sort(index_keys, [&](const auto& k1, const auto& k2) {return k1.version_id() < k2.version_id();}); + auto to_erase = rng::unique(index_keys, rng::equal_to{}, [](const auto& k){ return k.version_id();}); + index_keys.erase(to_erase.begin(), to_erase.end()); for(const auto& index_key: index_keys) { VersionId v_id = index_key.version_id(); try { std::optional new_version_id; + std::optional previous_key; if (append_versions) { auto [maybe_prev, _] = get_latest_version(target_store_, target_map, sym); if (maybe_prev){ new_version_id = std::make_optional(maybe_prev.value().version_id() + 1); + previous_key = std::move(maybe_prev); } } else { if (auto target_index_key = get_specific_version(target_store_, target_map, sym, v_id)) { throw storage::DuplicateKeyException(target_index_key.value()); } } - const auto new_index_key = storage::copy_index_key_recursively(source_store_, target_store_, - index_key, new_version_id); - target_map->write_version(target_store_, new_index_key); + const auto new_index_key = copy_index_key_recursively(source_store_, target_store_, index_key, new_version_id); + target_map->write_version(target_store_, new_index_key, previous_key); if(symbol_list) - symbol_list->add_symbol(target_store_, new_index_key.id()); + symbol_list->add_symbol(target_store_, new_index_key.id(), new_version_id.value_or(0)); // Change the version in the result map sym_data[py::int_(v_id)] = new_version_id ? new_version_id.value() : v_id; // Give the new version id to the snapshots - if (version_to_snapshot_map.count(v_id)) { + if (version_to_snapshot_map.contains(v_id)) { for(const auto& snap_name: version_to_snapshot_map[v_id]) { sym_data[py::str(snap_name)] = sym_data[py::int_(v_id)]; } @@ -629,7 +629,7 @@ class ARCTICDB_VISIBILITY_HIDDEN StorageMover { auto error = fmt::format("Sym:{},Version:{},Ex:{}", sym, v_id, e.what()); sym_data[key] = error; // Give the error to snapshots which also had the same version_id - if (version_to_snapshot_map.count(v_id)) { + if (version_to_snapshot_map.contains(v_id)) { for(const auto& snap_name: version_to_snapshot_map[v_id]) { sym_data[py::str(snap_name)] = error; } diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index bbb1d10b5e6..1f217ac6632 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include