From d2b86ac37efbd9d8b8ae30a6bf35904e11ef0315 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Wed, 1 May 2024 17:52:41 +0300 Subject: [PATCH] Removes `iterate_on_failure` argument and related functionality `iterate_on_failure` used to provide a back up way to load versions if the ref key is unreadable for some reason. The ref key should no longer be unreadable, thus `iterate_on_failure` should be removed. This commit removes the `iterate_on_failure` - argument in the `lib._nvs` functions - parameter of `VersionQuery` - parameter of `LoadParameter` and entirely removes the `LoadParameter` and replaces it with `LoadStrategy` - removes IterateOnFailure cpp test which used to test this behavior by artificially removing the ref key. --- cpp/arcticdb/pipeline/query.hpp | 5 - .../version/local_versioned_engine.cpp | 68 +++---- .../version/local_versioned_engine.hpp | 9 +- cpp/arcticdb/version/python_bindings.cpp | 6 +- .../version/test/rapidcheck_version_map.cpp | 8 +- .../version/test/test_stream_version_data.cpp | 74 +++---- .../version/test/test_version_map.cpp | 190 +++++++----------- .../version/test/test_version_map_batch.cpp | 20 +- .../version/test/test_version_store.cpp | 14 +- .../version/test/version_backwards_compat.hpp | 4 +- .../version/test/version_map_model.hpp | 4 +- cpp/arcticdb/version/version_functions.hpp | 78 +++---- cpp/arcticdb/version/version_map.hpp | 50 ++--- .../version/version_map_batch_methods.cpp | 10 +- .../version/version_map_batch_methods.hpp | 28 +-- cpp/arcticdb/version/version_map_entry.hpp | 17 +- cpp/arcticdb/version/version_store_api.cpp | 31 ++- cpp/arcticdb/version/version_store_api.hpp | 1 - cpp/arcticdb/version/version_tasks.hpp | 8 +- python/arcticdb/version_store/_store.py | 12 +- python/arcticdb/version_store/library.py | 1 - 21 files changed, 261 insertions(+), 377 deletions(-) diff --git a/cpp/arcticdb/pipeline/query.hpp b/cpp/arcticdb/pipeline/query.hpp index 5d20744500b..db5b1717a5d 100644 --- a/cpp/arcticdb/pipeline/query.hpp +++ b/cpp/arcticdb/pipeline/query.hpp @@ -96,7 +96,6 @@ using VersionQueryType = std::variant< struct VersionQuery { VersionQueryType content_; - std::optional iterate_on_failure_; void set_snap_name(const std::string& snap_name) { content_ = SnapshotVersionQuery{snap_name}; @@ -109,10 +108,6 @@ struct VersionQuery { void set_version(SignedVersionId version) { content_ = SpecificVersionQuery{version}; } - - void set_iterate_on_failure(const std::optional& iterate_on_failure) { - iterate_on_failure_ = iterate_on_failure; - } }; template diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 5c14a0c8231..86b5ea95b0b 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -208,9 +208,8 @@ std::string LocalVersionedEngine::dump_versions(const StreamId& stream_id) { } std::optional LocalVersionedEngine::get_latest_version( - const StreamId &stream_id, - const VersionQuery& version_query) { - auto key = get_latest_undeleted_version(store(), version_map(), stream_id, version_query); + const StreamId &stream_id) { + auto key = get_latest_undeleted_version(store(), version_map(), stream_id); if (!key) { ARCTICDB_DEBUG(log::version(), "get_latest_version didn't find version for stream_id: {}", stream_id); return std::nullopt; @@ -220,16 +219,15 @@ std::optional LocalVersionedEngine::get_latest_version( std::optional LocalVersionedEngine::get_specific_version( const StreamId &stream_id, - SignedVersionId signed_version_id, - const VersionQuery& version_query) { + SignedVersionId signed_version_id) { ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: get_specific_version"); - auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id, version_query); + auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id); if (!key) { VersionId version_id; if (signed_version_id >= 0) { version_id = static_cast(signed_version_id); } else { - auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, version_query); + auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); if (opt_latest_key.has_value()) { auto opt_version_id = get_version_id_negative_index(opt_latest_key->version_id(), signed_version_id); if (opt_version_id.has_value()) { @@ -261,11 +259,10 @@ std::optional LocalVersionedEngine::get_specific_version( std::optional LocalVersionedEngine::get_version_at_time( const StreamId& stream_id, - timestamp as_of, - const VersionQuery& version_query + timestamp as_of ) { - auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of, version_query); + auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of); if (!index_key) { auto index_keys = get_index_keys_in_snapshots(store(), stream_id); auto vector_index_keys = std::vector(index_keys.begin(), index_keys.end()); @@ -309,17 +306,17 @@ std::optional LocalVersionedEngine::get_version_to_read( const VersionQuery &version_query ) { return util::variant_match(version_query.content_, - [&stream_id, &version_query, this](const SpecificVersionQuery &specific) { - return get_specific_version(stream_id, specific.version_id_, version_query); + [&stream_id, this](const SpecificVersionQuery &specific) { + return get_specific_version(stream_id, specific.version_id_); }, [&stream_id, this](const SnapshotVersionQuery &snapshot) { return get_version_from_snapshot(stream_id, snapshot.name_); }, - [&stream_id, &version_query, this](const TimestampVersionQuery ×tamp) { - return get_version_at_time(stream_id, timestamp.timestamp_, version_query); + [&stream_id, this](const TimestampVersionQuery ×tamp) { + return get_version_at_time(stream_id, timestamp.timestamp_); }, - [&stream_id, &version_query, this](const std::monostate &) { - return get_latest_version(stream_id, version_query); + [&stream_id, this](const std::monostate &) { + return get_latest_version(stream_id); } ); } @@ -486,7 +483,7 @@ std::shared_ptr LocalVersionedEngine::get_de_dup_map( ){ auto de_dup_map = std::make_shared(); if (write_options.de_duplication) { - auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{}); + auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id); if (maybe_undeleted_prev) { // maybe_undeleted_prev is index key auto data_keys = get_data_keys(store(), {maybe_undeleted_prev.value()}, storage::ReadKeyOpts{}); @@ -511,7 +508,7 @@ std::shared_ptr LocalVersionedEngine::get_de_dup_map( VersionedItem LocalVersionedEngine::sort_index(const StreamId& stream_id, bool dynamic_schema) { - auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{}); + auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id); util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id); auto version_id = get_next_version_from_key(*maybe_prev); auto [index_segment_reader, slice_and_keys] = index::read_index_to_vector(store(), *maybe_prev); @@ -551,7 +548,7 @@ VersionedItem LocalVersionedEngine::delete_range_internal( const StreamId& stream_id, const UpdateQuery & query, bool dynamic_schema) { - auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{}); + auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id); util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id); auto versioned_item = delete_range_impl(store(), *maybe_prev, @@ -572,8 +569,7 @@ VersionedItem LocalVersionedEngine::update_internal( ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update"); auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), - stream_id, - VersionQuery{}); + stream_id); if (update_info.previous_index_key_.has_value()) { if (frame->empty()) { ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n" @@ -621,8 +617,7 @@ VersionedItem LocalVersionedEngine::write_versioned_metadata_internal( ) { auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), - stream_id, - VersionQuery{}); + stream_id); if(update_info.previous_index_key_.has_value()) { ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {}", stream_id); auto index_key = UpdateMetadataTask{store(), update_info, std::move(user_meta)}(); @@ -709,7 +704,7 @@ VersionedItem LocalVersionedEngine::write_versioned_dataframe_internal( ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0) py::gil_scoped_release release_gil; ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe"); - auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); + auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); auto version_id = get_next_version_from_key(maybe_prev); ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id); auto write_options = get_write_options(); @@ -739,7 +734,7 @@ std::pair LocalVersionedEngine::restore_ver auto version_to_restore = get_version_to_read(stream_id, version_query); missing_data::check(static_cast(version_to_restore), "Unable to restore {}@{}: version not found", stream_id, version_query); - auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); + auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); ARCTICDB_DEBUG(log::version(), "restore for stream_id: {} , version_id = {}", stream_id, version_to_restore->key_.version_id()); return AsyncRestoreVersionTask{store(), version_map(), stream_id, version_to_restore->key_, maybe_prev}().get(); } @@ -752,7 +747,7 @@ VersionedItem LocalVersionedEngine::write_individual_segment( ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0) ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe"); - auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); + auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); auto version_id = get_next_version_from_key(maybe_prev); ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id); auto index = index_type_from_descriptor(segment.descriptor()); @@ -870,10 +865,10 @@ folly::Future LocalVersionedEngine::delete_trees_responsibly( { auto min_versions = min_versions_for_each_stream(orig_keys_to_delete); for (const auto& min : min_versions) { - auto load_param = load_type == LoadType::LOAD_DOWNTO - ? LoadParameter{load_type, LoadObjective::ANY, static_cast(min.second)} - : LoadParameter{load_type, LoadObjective::ANY}; - const auto entry = version_map()->check_reload(store(), min.first, load_param, __FUNCTION__); + auto load_strategy = load_type == LoadType::LOAD_DOWNTO + ? LoadStrategy{load_type, LoadObjective::ANY, static_cast(min.second)} + : LoadStrategy{load_type, LoadObjective::ANY}; + const auto entry = version_map()->check_reload(store(), min.first, load_strategy, __FUNCTION__); entry_map.try_emplace(std::move(min.first), entry); } } @@ -1006,7 +1001,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic( bool prune_previous_versions) { log::version().debug("Compacting incomplete symbol {}", stream_id); - auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{}); + auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); auto versioned_item = compact_incomplete_impl( store_, stream_id, user_meta, update_info, append, convert_int_to_float, via_iteration, sparsify, get_write_options()); @@ -1022,7 +1017,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic( bool LocalVersionedEngine::is_symbol_fragmented(const StreamId& stream_id, std::optional segment_size) { auto update_info = get_latest_undeleted_version_and_next_version_id( - store(), version_map(), stream_id, VersionQuery{}); + store(), version_map(), stream_id); auto options = get_write_options(); auto pre_defragmentation_info = get_pre_defragmentation_info( store(), stream_id, update_info, options, segment_size.value_or(options.segment_row_size)); @@ -1034,7 +1029,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea // Currently defragmentation only for latest version - is there a use-case to allow compaction for older data? auto update_info = get_latest_undeleted_version_and_next_version_id( - store(), version_map(), stream_id, VersionQuery{}); + store(), version_map(), stream_id); auto options = get_write_options(); auto versioned_item = defragment_symbol_data_impl( @@ -1384,8 +1379,7 @@ VersionedItem LocalVersionedEngine::append_internal( auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), - stream_id, - VersionQuery{}); + stream_id); if(update_info.previous_index_key_.has_value()) { if (frame->empty()) { @@ -1689,7 +1683,7 @@ VersionedItem LocalVersionedEngine::sort_merge_internal( bool via_iteration, bool sparsify ) { - auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{}); + auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id); auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, append, convert_int_to_float, via_iteration, sparsify); version_map()->write_version(store(), versioned_item.key_, update_info.previous_index_key_); return versioned_item; @@ -1727,7 +1721,7 @@ timestamp LocalVersionedEngine::latest_timestamp(const std::string& symbol) { if(auto latest_incomplete = latest_incomplete_timestamp(store(), symbol); latest_incomplete) return *latest_incomplete; - if(auto latest_key = get_latest_version(symbol, VersionQuery{}); latest_key) + if(auto latest_key = get_latest_version(symbol); latest_key) return latest_key->key_.end_time(); return -1; diff --git a/cpp/arcticdb/version/local_versioned_engine.hpp b/cpp/arcticdb/version/local_versioned_engine.hpp index 0c9a1e6bc32..b0ce2def3db 100644 --- a/cpp/arcticdb/version/local_versioned_engine.hpp +++ b/cpp/arcticdb/version/local_versioned_engine.hpp @@ -101,18 +101,15 @@ class LocalVersionedEngine : public VersionedEngine { ) override; std::optional get_latest_version( - const StreamId &stream_id, - const VersionQuery& version_query); + const StreamId &stream_id); std::optional get_specific_version( const StreamId &stream_id, - SignedVersionId signed_version_id, - const VersionQuery& version_query); + SignedVersionId signed_version_id); std::optional get_version_at_time( const StreamId& stream_id, - timestamp as_of, - const VersionQuery& version_query); + timestamp as_of); std::optional get_version_from_snapshot( const StreamId& stream_id, diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index 5bf1420ee74..68ad26a6bc1 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -98,8 +98,7 @@ void register_bindings(py::module &version, py::exception(version, "PythonVersionStoreReadOptions") .def(py::init()) @@ -694,10 +693,9 @@ void register_bindings(py::module &version, py::exception & s_id, const std::optional & snap_id, const std::optional& latest, - const std::optional& iterate_on_failure, const std::optional& skip_snapshots ){ - return v.list_versions(s_id, snap_id, latest, iterate_on_failure, skip_snapshots); + return v.list_versions(s_id, snap_id, latest, skip_snapshots); }, py::call_guard(), "List all the version ids for this store.") .def("_compact_version_map", diff --git a/cpp/arcticdb/version/test/rapidcheck_version_map.cpp b/cpp/arcticdb/version/test/rapidcheck_version_map.cpp index 2bc9886bf52..26aa884aa06 100644 --- a/cpp/arcticdb/version/test/rapidcheck_version_map.cpp +++ b/cpp/arcticdb/version/test/rapidcheck_version_map.cpp @@ -23,7 +23,7 @@ template void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symbol) { using namespace arcticdb; - auto prev = get_latest_version(sut.store_,sut.map_, symbol, pipelines::VersionQuery{}).first; + auto prev = get_latest_version(sut.store_,sut.map_, symbol).first; auto sut_version_id = prev ? prev->version_id() : 0; auto model_prev = s0.get_latest_version(symbol); auto model_version_id = model_prev ? model_prev.value() : 0; @@ -33,9 +33,7 @@ void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symb template void check_latest_undeleted_versions(const Model& s0, MapStorePair &sut, std::string symbol) { using namespace arcticdb; - pipelines::VersionQuery version_query; - version_query.set_iterate_on_failure(true); - auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol, version_query); + auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol); auto sut_version_id = prev ? prev->version_id() : 0; auto model_prev = s0.get_latest_undeleted_version(symbol); auto model_version_id = model_prev ? model_prev.value() : 0; @@ -224,7 +222,7 @@ struct GetAllVersions : rc::state::Command { void run(const Model& s0, MapStorePair &sut) const override { auto model_versions = s0.get_all_versions(symbol_); using namespace arcticdb; - auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_, pipelines::VersionQuery{}); + auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_); RC_ASSERT(model_versions.size() == sut_version.size()); for(auto i = size_t{0}; i < model_versions.size(); ++i) diff --git a/cpp/arcticdb/version/test/test_stream_version_data.cpp b/cpp/arcticdb/version/test/test_stream_version_data.cpp index 6bb24dba743..b397f9c655b 100644 --- a/cpp/arcticdb/version/test/test_stream_version_data.cpp +++ b/cpp/arcticdb/version/test/test_stream_version_data.cpp @@ -7,27 +7,27 @@ TEST(StreamVersionData, SpecificVersion) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{SpecificVersionQuery{VersionId(12)}, false}; + VersionQuery query_1{SpecificVersionQuery{VersionId(12)}}; stream_version_data.react(query_1); - VersionQuery query_2{SpecificVersionQuery{VersionId(4)}, false}; + VersionQuery query_2{SpecificVersionQuery{VersionId(4)}}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_until_version_, 4); } TEST(StreamVersionData, SpecificVersionReversed) { using namespace arcticdb; using namespace arcticdb::pipelines; - StreamVersionData stream_version_data(VersionQuery{SpecificVersionQuery{VersionId(4)}, false}); - VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false}; + StreamVersionData stream_version_data(VersionQuery{SpecificVersionQuery{VersionId(4)}}); + VersionQuery query_2{SpecificVersionQuery{VersionId(12)}}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_, 4); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_DOWNTO); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_until_version_, 4); } TEST(StreamVersionData, Timestamp) { @@ -35,14 +35,14 @@ TEST(StreamVersionData, Timestamp) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{TimestampVersionQuery{timestamp(12)}, false}; + VersionQuery query_1{TimestampVersionQuery{timestamp(12)}}; stream_version_data.react(query_1); - VersionQuery query_2{TimestampVersionQuery{timestamp(4)}, false}; + VersionQuery query_2{TimestampVersionQuery{timestamp(4)}}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 4); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_from_time_, 4); } TEST(StreamVersionData, TimestampUnordered) { @@ -50,16 +50,16 @@ TEST(StreamVersionData, TimestampUnordered) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{TimestampVersionQuery{timestamp(3)}, false}; + VersionQuery query_1{TimestampVersionQuery{timestamp(3)}}; stream_version_data.react(query_1); - VersionQuery query_2{TimestampVersionQuery{timestamp(7)}, false}; + VersionQuery query_2{TimestampVersionQuery{timestamp(7)}}; stream_version_data.react(query_2); - VersionQuery query_3{TimestampVersionQuery{timestamp(4)}, false}; + VersionQuery query_3{TimestampVersionQuery{timestamp(4)}}; stream_version_data.react(query_3); ASSERT_EQ(stream_version_data.count_, 3); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_, 3); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_FROM_TIME); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_from_time_, 3); } TEST(StreamVersionData, Latest) { @@ -67,12 +67,12 @@ TEST(StreamVersionData, Latest) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{std::monostate{}, false}; + VersionQuery query_1{std::monostate{}}; stream_version_data.react(query_1); ASSERT_EQ(stream_version_data.count_, 1); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_LATEST); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_LATEST); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_until_version_.has_value(), false); } TEST(StreamVersionData, SpecificToTimestamp) { @@ -80,15 +80,15 @@ TEST(StreamVersionData, SpecificToTimestamp) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{SpecificVersionQuery{VersionId(12)}, false}; + VersionQuery query_1{SpecificVersionQuery{VersionId(12)}}; stream_version_data.react(query_1); - VersionQuery query_2{TimestampVersionQuery{timestamp(3)}, false}; + VersionQuery query_2{TimestampVersionQuery{timestamp(3)}}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_ALL); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_strategy_.load_from_time_.has_value(), false); } TEST(StreamVersionData, TimestampToSpecific) { @@ -96,13 +96,13 @@ TEST(StreamVersionData, TimestampToSpecific) { using namespace arcticdb::pipelines; StreamVersionData stream_version_data; - VersionQuery query_1{TimestampVersionQuery{timestamp(3)}, false}; + VersionQuery query_1{TimestampVersionQuery{timestamp(3)}}; stream_version_data.react(query_1); - VersionQuery query_2{SpecificVersionQuery{VersionId(12)}, false}; + VersionQuery query_2{SpecificVersionQuery{VersionId(12)}}; stream_version_data.react(query_2); ASSERT_EQ(stream_version_data.count_, 2); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_type_, LoadType::LOAD_ALL); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_objective_, LoadObjective::UNDELETED); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_until_version_.has_value(), false); - ASSERT_EQ(stream_version_data.load_param_.load_strategy_.load_from_time_.has_value(), false); + ASSERT_EQ(stream_version_data.load_strategy_.load_type_, LoadType::LOAD_ALL); + ASSERT_EQ(stream_version_data.load_strategy_.load_objective_, LoadObjective::UNDELETED); + ASSERT_EQ(stream_version_data.load_strategy_.load_until_version_.has_value(), false); + ASSERT_EQ(stream_version_data.load_strategy_.load_from_time_.has_value(), false); } \ No newline at end of file diff --git a/cpp/arcticdb/version/test/test_version_map.cpp b/cpp/arcticdb/version/test/test_version_map.cpp index 5c52af26d0e..560a18fd485 100644 --- a/cpp/arcticdb/version/test/test_version_map.cpp +++ b/cpp/arcticdb/version/test/test_version_map.cpp @@ -72,19 +72,17 @@ TEST(VersionMap, WithPredecessors) { auto version_map = std::make_shared(); version_map->set_validate(true); version_map->write_version(store, key1, std::nullopt); - pipelines::VersionQuery version_query; - version_query.set_iterate_on_failure(false); - auto latest = get_latest_undeleted_version(store, version_map, id, version_query); + auto latest = get_latest_undeleted_version(store, version_map, id); ASSERT_TRUE(latest); ASSERT_EQ(latest.value(), key1); version_map->write_version(store, key2, key1); - latest = get_latest_undeleted_version(store, version_map, id, version_query); + latest = get_latest_undeleted_version(store, version_map, id); ASSERT_EQ(latest.value(), key2); version_map->write_version(store, key3, key2); std::vector expected{ key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, version_query); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -96,52 +94,49 @@ TEST(VersionMap, TombstoneDelete) { auto key4 = atom_key_builder().version_id(4).creation_ts(PilotedClock::nanos_since_epoch()).content_hash(6).start_index( 7).end_index(8).build(id, KeyType::TABLE_INDEX); - pipelines::VersionQuery version_query; - version_query.set_iterate_on_failure(false); - auto version_map = std::make_shared(); version_map->set_validate(true); version_map->write_version(store, key1, std::nullopt); - auto latest = get_latest_undeleted_version(store, version_map, id, version_query); + auto latest = get_latest_undeleted_version(store, version_map, id); ASSERT_TRUE(latest); ASSERT_EQ(latest.value(), key1); version_map->write_version(store, key2, key1); - latest = get_latest_undeleted_version(store, version_map, id, version_query); + latest = get_latest_undeleted_version(store, version_map, id); ASSERT_EQ(latest.value(), key2); version_map->write_version(store, key3, key2); - latest = get_latest_undeleted_version(store, version_map, id, version_query); + latest = get_latest_undeleted_version(store, version_map, id); ASSERT_EQ(latest.value(), key3); version_map->write_version(store, key4, key3); - auto del_res = tombstone_version(store, version_map, id, VersionId{2}, pipelines::VersionQuery{}); + auto del_res = tombstone_version(store, version_map, id, VersionId{2}); ASSERT_FALSE(del_res.no_undeleted_left); ASSERT_EQ(del_res.keys_to_delete.front(), key2); ASSERT_THAT(del_res.could_share_data, UnorderedElementsAre(key1, key3)); std::vector expected{key4, key3, key1}; - auto result = get_all_versions(store, version_map, id, version_query); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); - del_res = tombstone_version(store, version_map, id, VersionId{3}, pipelines::VersionQuery{}); + del_res = tombstone_version(store, version_map, id, VersionId{3}); ASSERT_FALSE(del_res.no_undeleted_left); ASSERT_EQ(del_res.keys_to_delete.front(), key3); ASSERT_THAT(del_res.could_share_data, UnorderedElementsAre(key1, key4)); - latest = get_latest_undeleted_version(store, version_map, id, version_query); + latest = get_latest_undeleted_version(store, version_map, id); ASSERT_EQ(latest.value(), key4); - del_res = tombstone_version(store, version_map, id, VersionId{4}, pipelines::VersionQuery{}); + del_res = tombstone_version(store, version_map, id, VersionId{4}); ASSERT_FALSE(del_res.no_undeleted_left); ASSERT_EQ(del_res.keys_to_delete.front(), key4); ASSERT_EQ(*del_res.could_share_data.begin(), key1); - latest = get_latest_undeleted_version(store, version_map, id, version_query); + latest = get_latest_undeleted_version(store, version_map, id); ASSERT_EQ(latest.value(), key1); - del_res = tombstone_version(store, version_map, id, VersionId{1}, pipelines::VersionQuery{}); + del_res = tombstone_version(store, version_map, id, VersionId{1}); ASSERT_TRUE(del_res.no_undeleted_left); ASSERT_EQ(del_res.keys_to_delete.front(), key1); ASSERT_TRUE(del_res.could_share_data.empty()); @@ -161,7 +156,7 @@ TEST(VersionMap, PingPong) { 4).end_index(5).build(id, KeyType::TABLE_INDEX); left->write_version(store, key1, std::nullopt); - auto latest = get_latest_undeleted_version(store, right, id, pipelines::VersionQuery{}); + auto latest = get_latest_undeleted_version(store, right, id); ASSERT_EQ(latest.value(), key1); auto key2 = atom_key_builder().version_id(2).creation_ts(3).content_hash(4).start_index( @@ -175,9 +170,9 @@ TEST(VersionMap, PingPong) { left->write_version(store, key3, key2); std::vector expected{ key3, key2, key1}; - auto left_result = get_all_versions(store, left, id, pipelines::VersionQuery{}); + auto left_result = get_all_versions(store, left, id); ASSERT_EQ(left_result, expected); - auto right_result = get_all_versions(store, right, id, pipelines::VersionQuery{}); + auto right_result = get_all_versions(store, right, id); ASSERT_EQ(right_result, expected); } @@ -202,7 +197,7 @@ TEST(VersionMap, TestLoadsRefAndIteration) { ScopedConfig reload_interval("VersionMap.ReloadInterval", 0); // always reload std::vector expected{ key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); auto entry_iteration = std::make_shared(); @@ -240,7 +235,7 @@ TEST(VersionMap, TestCompact) { ASSERT_EQ(store->num_ref_keys(), 1); std::vector expected{ key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -254,7 +249,7 @@ TEST(VersionMap, TestCompactWithDelete) { version_map->write_version(store, key1, std::nullopt); version_map->write_version(store, key2, key1); version_map->write_version(store, key3, key2); - tombstone_version(store, version_map, id, 2, pipelines::VersionQuery{}); + tombstone_version(store, version_map, id, 2); ScopedConfig max_blocks("VersionMap.MaxVersionBlocks", 1); ScopedConfig reload_interval("VersionMap.ReloadInterval", 0); // always reload @@ -264,7 +259,7 @@ TEST(VersionMap, TestCompactWithDelete) { ASSERT_EQ(store->num_ref_keys(), 1); std::vector expected{ key3, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -280,8 +275,8 @@ TEST(VersionMap, TestLatestVersionWithDeleteTombstones) { version_map->write_version(store, key1, std::nullopt); version_map->write_version(store, key2, key1); version_map->write_version(store, key3, key2); - tombstone_version(store, version_map, id, 2, pipelines::VersionQuery{}); - auto [maybe_prev, deleted] = get_latest_version(store, version_map, id, pipelines::VersionQuery{}); + tombstone_version(store, version_map, id, 2); + auto [maybe_prev, deleted] = get_latest_version(store, version_map, id); auto version_id = get_next_version_from_key(maybe_prev); ASSERT_EQ(version_id, 4); } @@ -296,14 +291,14 @@ TEST(VersionMap, TestCompactWithDeleteTombstones) { version_map->write_version(store, key1, std::nullopt); version_map->write_version(store, key2, key1); version_map->write_version(store, key3, key2); - tombstone_version(store, version_map, id, 2, pipelines::VersionQuery{}); + tombstone_version(store, version_map, id, 2); ScopedConfig max_blocks("VersionMap.MaxVersionBlocks", 1); ScopedConfig reload_interval("VersionMap.ReloadInterval", 0); // always reload version_map->compact(store, id); std::vector expected{ key3, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -335,35 +330,6 @@ void write_old_style_journal_entry(const AtomKey &key, std::shared_ptr(); - StreamId id{"test1"}; - - auto version_map = std::make_shared(); - version_map->set_validate(true); - auto key1 = - atom_key_builder().version_id(1).creation_ts(PilotedClock::nanos_since_epoch()).content_hash(3).start_index( - 4).end_index(5).build(id, KeyType::TABLE_INDEX); - version_map->write_version(store, key1, std::nullopt); - auto key2 = - atom_key_builder().version_id(2).creation_ts(PilotedClock::nanos_since_epoch()).content_hash(4).start_index( - 5).end_index(6).build(id, KeyType::TABLE_INDEX); - version_map->write_version(store, key2, key1); - auto key3 = - atom_key_builder().version_id(3).creation_ts(PilotedClock::nanos_since_epoch()).content_hash(5).start_index( - 6).end_index(7).build(id, KeyType::TABLE_INDEX); - version_map->write_version(store, key3, key2); - - RefKey ref_key{id, KeyType::VERSION_REF}; - store->remove_key_sync(ref_key, storage::RemoveOpts{}); - - std::vector expected{ key3, key2, key1}; - pipelines::VersionQuery version_query; - version_query.set_iterate_on_failure(true); - auto result = get_all_versions(store, version_map, id, version_query); - ASSERT_EQ(result, expected); -} - TEST(VersionMap, GetNextVersionInEntry) { using namespace arcticdb; @@ -426,7 +392,7 @@ TEST(VersionMap, FixRefKey) { ASSERT_TRUE(version_map->check_ref_key(store, id)); std::vector expected{key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -452,7 +418,7 @@ TEST(VersionMap, FixRefKeyTombstones) { auto key5 = atom_key_with_version(id, 1, 1696590624590123209); version_map->write_version(store, key5, key4); auto key6 = atom_key_with_version(id, 0, 1696590624612743245); - auto entry = version_map->check_reload(store, id, LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, __FUNCTION__); + auto entry = version_map->check_reload(store, id, LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, __FUNCTION__); version_map->journal_single_key(store, key5, entry->head_.value()); auto valid = version_map->check_ref_key(store, id); @@ -485,11 +451,11 @@ TEST(VersionMap, RewriteVersionKeys) { ASSERT_TRUE(version_map->check_ref_key(store, id)); // This will just write tombstone key - arcticdb::tombstone_version(store, version_map, id, 2, pipelines::VersionQuery{}); - arcticdb::tombstone_version(store, version_map, id, 1, pipelines::VersionQuery{}); + arcticdb::tombstone_version(store, version_map, id, 2); + arcticdb::tombstone_version(store, version_map, id, 1); - auto index_key1 = arcticdb::get_specific_version(store, version_map, id, 1, pipelines::VersionQuery{}); - auto index_key2 = arcticdb::get_specific_version(store, version_map, id, 2, pipelines::VersionQuery{}); + auto index_key1 = arcticdb::get_specific_version(store, version_map, id, 1); + auto index_key2 = arcticdb::get_specific_version(store, version_map, id, 2); // will be null since they were tombstoned (but not actually deleted ASSERT_FALSE(index_key1.has_value()); @@ -497,15 +463,15 @@ TEST(VersionMap, RewriteVersionKeys) { version_map->remove_and_rewrite_version_keys(store, id); - auto final_index_key1 = arcticdb::get_specific_version(store, version_map, id, 1, pipelines::VersionQuery{}); - auto final_index_key2 = arcticdb::get_specific_version(store, version_map, id, 2, pipelines::VersionQuery{}); + auto final_index_key1 = arcticdb::get_specific_version(store, version_map, id, 1); + auto final_index_key2 = arcticdb::get_specific_version(store, version_map, id, 2); // will not be null anymore since the data actually existed ASSERT_TRUE(final_index_key1.has_value()); ASSERT_TRUE(final_index_key2.has_value()); std::vector expected{key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -529,11 +495,11 @@ TEST(VersionMap, RecoverDeleted) { deleted = version_map->find_deleted_version_keys(store, id); ASSERT_EQ(deleted.size(), 3); - EXPECT_THROW({ get_all_versions(store, version_map, id, pipelines::VersionQuery{}); }, std::runtime_error); + EXPECT_THROW({ get_all_versions(store, version_map, id); }, std::runtime_error); version_map->recover_deleted(store, id); std::vector expected{ key3, key2, key1}; - auto result = get_all_versions(store, version_map, id, pipelines::VersionQuery{}); + auto result = get_all_versions(store, version_map, id); ASSERT_EQ(result, expected); } @@ -548,9 +514,9 @@ TEST(VersionMap, StorageLogging) { version_map->write_version(store, key2, key1); version_map->write_version(store, key3, key2); - tombstone_version(store, version_map, id, key1.version_id(), pipelines::VersionQuery{}); - tombstone_version(store, version_map, id, key3.version_id(), pipelines::VersionQuery{}); - tombstone_version(store, version_map, id, key2.version_id(), pipelines::VersionQuery{}); + tombstone_version(store, version_map, id, key1.version_id()); + tombstone_version(store, version_map, id, key3.version_id()); + tombstone_version(store, version_map, id, key2.version_id()); std::unordered_set log_keys; @@ -578,7 +544,7 @@ std::shared_ptr write_two_versions(std::shared_ptrcheck_reload( store, id, - LoadParameter{LoadType::NOT_LOADED, LoadObjective::ANY}, + LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}, __FUNCTION__); auto key1 = atom_key_with_version(id, 0, 0); @@ -597,7 +563,7 @@ void write_alternating_deleted_undeleted(std::shared_ptr store, s auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::NOT_LOADED, LoadObjective::ANY}, + LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}, __FUNCTION__); auto key1 = atom_key_with_version(id, 0, 0); @@ -665,46 +631,46 @@ TEST(VersionMap, FollowingVersionChainWithCaching){ // We create an empty version map after populating the versions version_map = std::make_shared(); - auto check_loads_versions = [&](LoadParameter load_param, uint32_t should_load_any, uint32_t should_load_undeleted){ - auto loaded = version_map->check_reload(store, id, load_param, __FUNCTION__); + auto check_loads_versions = [&](LoadStrategy load_strategy, uint32_t should_load_any, uint32_t should_load_undeleted){ + auto loaded = version_map->check_reload(store, id, load_strategy, __FUNCTION__); EXPECT_EQ(loaded->get_indexes(true).size(), should_load_any); EXPECT_EQ(loaded->get_indexes(false).size(), should_load_undeleted); }; - check_loads_versions(LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-1)}, 1, 0); + check_loads_versions(LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-1)}, 1, 0); // LOAD_FROM_TIME should not be cached by the LOAD_DOWNTO and should reload from storage up to the latest undeleted version, hence loading 2 versions, 1 of which is undeleted. - check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, static_cast(10)}, 2, 1); + check_loads_versions(LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, static_cast(10)}, 2, 1); // LOAD_LATEST should be cached by the LOAD_FROM_TIME, so we still have the same 2 loaded versions - check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, 2, 1); + check_loads_versions(LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, 2, 1); // This LOAD_FROM_TIME should still use the cached 2 versions - check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(1)}, 2, 1); + check_loads_versions(LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(1)}, 2, 1); // We just get the entry to use for the tombstone and the write auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::NOT_LOADED, LoadObjective::ANY}, + LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}, __FUNCTION__); // We delete the only undeleted key version_map->write_tombstone(store, VersionId{1}, id, entry, timestamp{4}); // LOAD_LATEST should still be cached, but the cached entry now needs to have no undeleted keys - check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, 2, 0); + check_loads_versions(LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, 2, 0); // LOAD_FROM_TIME UNDELETED should no longer be cached even though we used the same request before because the undeleted key it went to got deleted. So it will load the entire version chain - check_loads_versions(LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, static_cast(10)}, 3, 0); + check_loads_versions(LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, static_cast(10)}, 3, 0); // We add a new undeleted key auto key4 = atom_key_with_version(id, 3, 5); version_map->do_write(store, key4, entry); // LOAD_LATEST should still be cached, but the cached entry now needs to have one more undeleted version - check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, 4, 1); + check_loads_versions(LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, 4, 1); // We delete everything with a tombstone_all version_map->delete_all_versions(store, id); // LOAD_LATEST should still be cached, but now have no undeleted versions - check_loads_versions(LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, 4, 0); + check_loads_versions(LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, 4, 0); } TEST(VersionMap, FollowingVersionChainEndEarlyOnTombstoneAll) { @@ -754,52 +720,52 @@ TEST(VersionMap, CacheInvalidation) { StreamId id{"test"}; write_alternating_deleted_undeleted(store, version_map, id); - auto check_caching = [&](LoadParameter to_load, LoadParameter to_check_if_cached, bool expected_outcome){ + auto check_caching = [&](LoadStrategy to_load, LoadStrategy to_check_if_cached, bool expected_outcome){ auto clean_version_map = std::make_shared(); // Load to_load inside the clean version map cache clean_version_map->check_reload(store, id, to_load, __FUNCTION__); // Check whether to_check_if_cached is being cached by to_load - EXPECT_EQ(clean_version_map->has_cached_entry(id, to_check_if_cached.load_strategy_), expected_outcome); + EXPECT_EQ(clean_version_map->has_cached_entry(id, to_check_if_cached), expected_outcome); }; - auto check_all_caching = [&](const std::vector& to_load, const std::vector& to_check_if_cached, bool expected_result){ - for (auto to_load_param : to_load) { + auto check_all_caching = [&](const std::vector& to_load, const std::vector& to_check_if_cached, bool expected_result){ + for (auto to_load_strategy : to_load) { for (auto to_check_if_cached_param : to_check_if_cached){ - check_caching(to_load_param, to_check_if_cached_param, expected_result); + check_caching(to_load_strategy, to_check_if_cached_param, expected_result); } } }; - auto load_all_param = LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}; - auto load_all_undeleted_param = LoadParameter{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; + auto load_all_param = LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}; + auto load_all_undeleted_param = LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; check_caching(load_all_param, load_all_undeleted_param, true); check_caching(load_all_undeleted_param, load_all_param, false); constexpr auto num_versions = 3u; - std::vector should_load_to_v[num_versions] = { + std::vector should_load_to_v[num_versions] = { // Different parameters which should all load to v0 - std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(0)}, - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-3)}, - LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(0)}, + std::vector{ + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(0)}, + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-3)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(0)}, }, // Different parameters which should all load to v1 - std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(1)}, - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-2)}, - LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(1)}, - LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, + std::vector{ + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(1)}, + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-2)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(1)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, static_cast(2)}, // when include_deleted=false LOAD_FROM_TIME searches for an undeleted version - LoadParameter{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}, + LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}, }, // Different parameters which should all load to v2 - std::vector{ - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(2)}, - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-1)}, - LoadParameter{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(2)}, - LoadParameter{LoadType::LOAD_LATEST, LoadObjective::ANY}, + std::vector{ + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(2)}, + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(-1)}, + LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(2)}, + LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::ANY}, } }; @@ -833,7 +799,7 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(1)}, + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(1)}, __FUNCTION__); ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED})); @@ -852,8 +818,8 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAfterLoad) { ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::ANY, static_cast(1)})); ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, static_cast(-1)})); - LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; - const auto latest_undeleted_entry = version_map->check_reload(store, id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; + const auto latest_undeleted_entry = version_map->check_reload(store, id, load_strategy, __FUNCTION__); // Then - version 0 should be returned ASSERT_TRUE(latest_undeleted_entry->get_first_index(false).first.has_value()); @@ -876,7 +842,7 @@ TEST(VersionMap, CacheInvalidationWithTombstoneAllAfterLoad) { auto entry = version_map->check_reload( store, id, - LoadParameter{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(0)}, + LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::ANY, static_cast(0)}, __FUNCTION__); ASSERT_TRUE(version_map->has_cached_entry(id, LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED})); diff --git a/cpp/arcticdb/version/test/test_version_map_batch.cpp b/cpp/arcticdb/version/test/test_version_map_batch.cpp index 761aa1c01cb..9713741824f 100644 --- a/cpp/arcticdb/version/test/test_version_map_batch.cpp +++ b/cpp/arcticdb/version/test/test_version_map_batch.cpp @@ -57,7 +57,7 @@ TEST_F(VersionMapBatchStore, SimpleVersionIdQueries) { auto stream = fmt::format("stream_{}", i); for(uint64_t j = 0; j < num_versions_per_stream; j++){ stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}}); } } @@ -99,7 +99,7 @@ TEST_F(VersionMapBatchStore, SimpleTimestampQueries) { auto stream = fmt::format("stream_{}", i); for(uint64_t j = 0; j < num_versions_per_stream; j++){ stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}}); } } @@ -111,7 +111,7 @@ TEST_F(VersionMapBatchStore, SimpleTimestampQueries) { for(uint64_t i = 0; i < num_streams; i++){ for(uint64_t j = 0; j < num_versions_per_stream; j++){ uint64_t idx = i * num_versions_per_stream + j; - version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[idx]->creation_ts())}, false}); + version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[idx]->creation_ts())}}); } } @@ -149,7 +149,7 @@ TEST_F(VersionMapBatchStore, MultipleVersionsSameSymbolVersionIdQueries) { // Add queries for(uint64_t i = 0; i < num_versions; i++){ stream_ids.emplace_back("stream_0"); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(i)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(i)}}); } // Do query @@ -182,7 +182,7 @@ TEST_F(VersionMapBatchStore, MultipleVersionsSameSymbolTimestampQueries) { // Add queries for(uint64_t i = 0; i < num_versions; i++){ stream_ids.emplace_back("stream_0"); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(i)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(i)}}); } // Do query @@ -191,7 +191,7 @@ TEST_F(VersionMapBatchStore, MultipleVersionsSameSymbolTimestampQueries) { //Secondly, once we have the timestamps in hand, we are going to query them version_queries.clear(); for(uint64_t i = 0; i < num_versions; i++){ - version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[i]->creation_ts())}, false}); + version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[i]->creation_ts())}}); } // Now we can perform the actual batch query per timestamps @@ -229,7 +229,7 @@ TEST_F(VersionMapBatchStore, CombinedQueries) { auto stream = fmt::format("stream_{}", i); for(uint64_t j = 0; j < num_versions_per_stream; j++){ stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}}); } } @@ -244,11 +244,11 @@ TEST_F(VersionMapBatchStore, CombinedQueries) { for(uint64_t j = 0; j < num_versions_per_stream; j++){ uint64_t idx = i * num_versions_per_stream + j; stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}, false}); + version_queries.emplace_back(VersionQuery{SpecificVersionQuery{static_cast(j)}}); stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[idx]->creation_ts())}, false}); + version_queries.emplace_back(VersionQuery{TimestampVersionQuery{timestamp(versions[idx]->creation_ts())}}); stream_ids.emplace_back(stream); - version_queries.emplace_back(VersionQuery{std::monostate{}, false}); + version_queries.emplace_back(VersionQuery{std::monostate{}}); } } diff --git a/cpp/arcticdb/version/test/test_version_store.cpp b/cpp/arcticdb/version/test/test_version_store.cpp index bc7a47d976a..276bd955db6 100644 --- a/cpp/arcticdb/version/test/test_version_store.cpp +++ b/cpp/arcticdb/version/test/test_version_store.cpp @@ -381,21 +381,21 @@ TEST(VersionStore, TestReadTimestampAt) { auto version_map = version_store._test_get_version_map(); version_map->write_version(mock_store, key1, std::nullopt); - auto key = load_index_key_from_time(mock_store, version_map, id, timestamp(0), pipelines::VersionQuery{}); + auto key = load_index_key_from_time(mock_store, version_map, id, timestamp(0)); ASSERT_EQ(key.value().content_hash(), 3); version_map->write_version(mock_store, key2, key1); - key = load_index_key_from_time(mock_store, version_map, id, timestamp(0), pipelines::VersionQuery{}); + key = load_index_key_from_time(mock_store, version_map, id, timestamp(0)); ASSERT_EQ(key.value().content_hash(), 3); - key = load_index_key_from_time(mock_store, version_map, id, timestamp(1), pipelines::VersionQuery{}); + key = load_index_key_from_time(mock_store, version_map, id, timestamp(1)); ASSERT_EQ(key.value().content_hash(), 4); version_map->write_version(mock_store, key3, key2); - key = load_index_key_from_time(mock_store, version_map, id, timestamp(0), pipelines::VersionQuery{}); + key = load_index_key_from_time(mock_store, version_map, id, timestamp(0)); ASSERT_EQ(key.value().content_hash(), 3); - key = load_index_key_from_time(mock_store, version_map, id, timestamp(1), pipelines::VersionQuery{}); + key = load_index_key_from_time(mock_store, version_map, id, timestamp(1)); ASSERT_EQ(key.value().content_hash(), 4); - key = load_index_key_from_time(mock_store, version_map, id, timestamp(2), pipelines::VersionQuery{}); + key = load_index_key_from_time(mock_store, version_map, id, timestamp(2)); ASSERT_EQ(key.value().content_hash(), 5); } @@ -413,7 +413,7 @@ TEST(VersionStore, TestReadTimestampAtInequality) { auto version_map = version_store._test_get_version_map(); version_map->write_version(mock_store, key1, std::nullopt); - auto key = load_index_key_from_time(mock_store, version_map, id, timestamp(1), pipelines::VersionQuery{}); + auto key = load_index_key_from_time(mock_store, version_map, id, timestamp(1)); ASSERT_EQ(static_cast(key), true); ASSERT_EQ(key.value().content_hash(), 3); } diff --git a/cpp/arcticdb/version/test/version_backwards_compat.hpp b/cpp/arcticdb/version/test/version_backwards_compat.hpp index a236ab08af5..51f6c014baf 100644 --- a/cpp/arcticdb/version/test/version_backwards_compat.hpp +++ b/cpp/arcticdb/version/test/version_backwards_compat.hpp @@ -33,7 +33,7 @@ std::deque backwards_compat_delete_all_versions( const StreamId& stream_id ) { std::deque output; - auto entry = version_map->check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + auto entry = version_map->check_reload(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); auto indexes = entry->get_indexes(false); output.assign(std::begin(indexes), std::end(indexes)); @@ -49,7 +49,7 @@ std::vector backwards_compat_write_and_prune_previous(std::shared_ptr output; - auto entry = version_map->check_reload(store, key.id(), LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + auto entry = version_map->check_reload(store, key.id(), LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); auto old_entry = *entry; entry->clear(); diff --git a/cpp/arcticdb/version/test/version_map_model.hpp b/cpp/arcticdb/version/test/version_map_model.hpp index cf6be722d52..83aaf036d23 100644 --- a/cpp/arcticdb/version/test/version_map_model.hpp +++ b/cpp/arcticdb/version/test/version_map_model.hpp @@ -36,7 +36,7 @@ struct MapStorePair { void write_version(const std::string &id) { log::version().info("MapStorePair, write version {}", id); - auto prev = get_latest_version(store_, map_, id, pipelines::VersionQuery{}).first; + auto prev = get_latest_version(store_, map_, id).first; auto version_id = prev ? prev->version_id() + 1 : 0; map_->write_version(store_, make_test_index_key(id, version_id, KeyType::TABLE_INDEX), prev); } @@ -51,7 +51,7 @@ struct MapStorePair { void write_and_prune_previous(const std::string &id) { log::version().info("MapStorePair, write_and_prune_previous version {}", id); - auto prev = get_latest_version(store_, map_, id, pipelines::VersionQuery{}).first; + auto prev = get_latest_version(store_, map_, id).first; auto version_id = prev ? prev->version_id() + 1 : 0; if(tombstones_) diff --git a/cpp/arcticdb/version/version_functions.hpp b/cpp/arcticdb/version/version_functions.hpp index 871fb523143..5954b8e4859 100644 --- a/cpp/arcticdb/version/version_functions.hpp +++ b/cpp/arcticdb/version/version_functions.hpp @@ -14,31 +14,23 @@ namespace arcticdb { -inline void set_load_param_options(LoadParameter& load_param, const pipelines::VersionQuery& version_query) { - load_param.iterate_on_failure_ = version_query.iterate_on_failure_.value_or(false); -} - inline std::optional get_latest_undeleted_version( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query) { + const StreamId &stream_id) { ARCTICDB_RUNTIME_SAMPLE(GetLatestUndeletedVersion, 0) - LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; - set_load_param_options(load_param, version_query); - const auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; + const auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); return entry->get_first_index(false).first; } inline std::pair, bool> get_latest_version( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query) { + const StreamId &stream_id) { ARCTICDB_SAMPLE(GetLatestVersion, 0) - LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::ANY}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_LATEST, LoadObjective::ANY}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); return entry->get_first_index(true); } @@ -46,12 +38,10 @@ inline std::pair, bool> get_latest_version( inline version_store::UpdateInfo get_latest_undeleted_version_and_next_version_id( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query) { + const StreamId &stream_id) { ARCTICDB_SAMPLE(GetLatestUndeletedVersionAndHighestVersionId, 0) - LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); auto latest_version = entry->get_first_index(true).first; auto latest_undeleted_version = entry->get_first_index(false).first; VersionId next_version_id = latest_version.has_value() ? latest_version->version_id() + 1 : 0; @@ -61,13 +51,11 @@ inline version_store::UpdateInfo get_latest_undeleted_version_and_next_version_i inline std::vector get_all_versions( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query + const StreamId &stream_id ) { ARCTICDB_SAMPLE(GetAllVersions, 0) - LoadParameter load_param{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); return entry->get_indexes(false); } @@ -76,11 +64,9 @@ inline std::optional get_specific_version( const std::shared_ptr &version_map, const StreamId &stream_id, SignedVersionId signed_version_id, - const pipelines::VersionQuery& version_query, bool include_deleted = false) { - LoadParameter load_param{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, signed_version_id}; - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); - set_load_param_options(load_param, version_query); + LoadStrategy load_strategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, signed_version_id}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); VersionId version_id; if (signed_version_id >= 0) { version_id = static_cast(signed_version_id); @@ -134,9 +120,7 @@ inline bool has_undeleted_version( const std::shared_ptr &store, const std::shared_ptr &version_map, const StreamId &id) { - pipelines::VersionQuery version_query; - version_query.set_iterate_on_failure(false); - auto maybe_undeleted = get_latest_undeleted_version(store, version_map, id, version_query); + auto maybe_undeleted = get_latest_undeleted_version(store, version_map, id); return static_cast(maybe_undeleted); } @@ -153,11 +137,9 @@ inline void insert_if_undeleted( inline std::unordered_map get_all_tombstoned_versions( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_ALL, LoadObjective::ANY}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + const StreamId &stream_id) { + LoadStrategy load_strategy{LoadType::LOAD_ALL, LoadObjective::ANY}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); std::unordered_map result; for (auto key: entry->get_tombstoned_indexes()) result[key.version_id()] = store->key_exists(key).get(); @@ -170,13 +152,11 @@ inline version_store::TombstoneVersionResult tombstone_version( const std::shared_ptr &version_map, const StreamId &stream_id, VersionId version_id, - const pipelines::VersionQuery& version_query, bool allow_tombstoning_beyond_latest_version=false, const std::optional& creation_ts=std::nullopt) { ARCTICDB_DEBUG(log::version(), "Tombstoning version {} for stream {}", version_id, stream_id); - LoadParameter load_param{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + LoadStrategy load_strategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); // Might as well do the previous/next version check while we find the required version_id. // But if entry is empty, it's possible the load failed (since iterate_on_failure=false above), so set the flag // to defer the check to delete_tree() (instead of reloading in case eager delete is disabled). @@ -195,7 +175,7 @@ inline version_store::TombstoneVersionResult tombstone_version( util::raise_rte("Version {} for symbol {} is already deleted", version_id, stream_id); } else { if (!allow_tombstoning_beyond_latest_version) { - auto latest_key = get_latest_version(store, version_map, stream_id, version_query).first; + auto latest_key = get_latest_version(store, version_map, stream_id).first; if (!latest_key || latest_key->version_id() < version_id) util::raise_rte("Can't delete version {} for symbol {} - it's higher than the latest version", stream_id, version_id); @@ -236,11 +216,9 @@ inline std::optional load_index_key_from_time( const std::shared_ptr &store, const std::shared_ptr &version_map, const StreamId &stream_id, - timestamp from_time, - const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, from_time}; - set_load_param_options(load_param, version_query); - auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + timestamp from_time) { + LoadStrategy load_strategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, from_time}; + auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); auto indexes = entry->get_indexes(false); return get_index_key_from_time(from_time, indexes); } @@ -248,11 +226,9 @@ inline std::optional load_index_key_from_time( inline std::vector get_index_and_tombstone_keys( const std::shared_ptr &store, const std::shared_ptr &version_map, - const StreamId &stream_id, - const pipelines::VersionQuery& version_query) { - LoadParameter load_param{LoadType::LOAD_ALL, LoadObjective::ANY}; - set_load_param_options(load_param, version_query); - const auto entry = version_map->check_reload(store, stream_id, load_param, __FUNCTION__); + const StreamId &stream_id) { + LoadStrategy load_strategy{LoadType::LOAD_ALL, LoadObjective::ANY}; + const auto entry = version_map->check_reload(store, stream_id, load_strategy, __FUNCTION__); std::vector res; std::copy_if(std::begin(entry->keys_), std::end(entry->keys_), std::back_inserter(res), [&](const auto &key) { return is_index_or_tombstone(key); }); diff --git a/cpp/arcticdb/version/version_map.hpp b/cpp/arcticdb/version/version_map.hpp index ae69afd4944..081a4525d26 100644 --- a/cpp/arcticdb/version/version_map.hpp +++ b/cpp/arcticdb/version/version_map.hpp @@ -234,7 +234,7 @@ class VersionMapImpl { } void write_version(std::shared_ptr store, const AtomKey &key, const std::optional& previous_key) { - LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::ANY}; + LoadStrategy load_param{LoadType::LOAD_LATEST, LoadObjective::ANY}; auto entry = check_reload(store, key.id(), load_param, __FUNCTION__); do_write(store, key, entry); @@ -267,7 +267,7 @@ class VersionMapImpl { auto entry = check_reload( store, stream_id, - LoadParameter{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, + LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, __FUNCTION__); auto output = tombstone_from_key_or_all_internal(store, stream_id, first_key_to_tombstone, entry); @@ -281,7 +281,7 @@ class VersionMapImpl { } std::string dump_entry(const std::shared_ptr& store, const StreamId& stream_id) { - const auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + const auto entry = check_reload(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); return entry->dump(); } @@ -293,7 +293,7 @@ class VersionMapImpl { auto entry = check_reload( store, key.id(), - LoadParameter{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, + LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, __FUNCTION__); auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry); @@ -332,7 +332,7 @@ class VersionMapImpl { // This method has no API, and is not tested in the rapidcheck tests, but could easily be enabled there. // It compacts the version map but skips any keys which have been deleted (to free up space). ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id); - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); if (!requires_compaction(entry)) return; @@ -448,7 +448,7 @@ class VersionMapImpl { void compact(std::shared_ptr store, const StreamId& stream_id) { ARCTICDB_DEBUG(log::version(), "Version map compacting versions for stream {}", stream_id); - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); if (entry->empty()) { log::version().warn("Entry is empty in compact"); return; @@ -470,7 +470,7 @@ class VersionMapImpl { void overwrite_symbol_tree( std::shared_ptr store, const StreamId& stream_id, const std::vector& index_keys) { - auto entry = check_reload(store, stream_id, LoadParameter{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); + auto entry = check_reload(store, stream_id, LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}, __FUNCTION__); auto old_entry = *entry; if (!index_keys.empty()) { entry->keys_.assign(std::begin(index_keys), std::end(index_keys)); @@ -490,15 +490,15 @@ class VersionMapImpl { std::shared_ptr check_reload( std::shared_ptr store, const StreamId& stream_id, - const LoadParameter& load_param, + const LoadStrategy& load_strategy, const char* function ARCTICDB_UNUSED) { ARCTICDB_DEBUG(log::version(), "Check reload in function {} for id {}", function, stream_id); - if (has_cached_entry(stream_id, load_param.load_strategy_)) { + if (has_cached_entry(stream_id, load_strategy)) { return get_entry(stream_id); } - return storage_reload(store, stream_id, load_param.load_strategy_, load_param.iterate_on_failure_); + return storage_reload(store, stream_id, load_strategy); } void do_write( @@ -749,8 +749,7 @@ class VersionMapImpl { std::shared_ptr storage_reload( std::shared_ptr store, const StreamId& stream_id, - const LoadStrategy& load_strategy, - bool iterate_on_failure) { + const LoadStrategy& load_strategy) { /* * Goes to the storage for a given symbol, and recreates the VersionMapEntry from preferably the ref key * structure, and if that fails it then goes and builds that from iterating all keys from storage which can @@ -762,27 +761,12 @@ class VersionMapImpl { const auto clock_unsync_tolerance = ConfigsMap::instance()->get_int("VersionMap.UnsyncTolerance", DEFAULT_CLOCK_UNSYNC_TOLERANCE); entry->last_reload_time_ = Clock::nanos_since_epoch() - clock_unsync_tolerance; - entry->load_strategy_ = LoadStrategy{LoadType::NOT_LOADED}; // FUTURE: to make more thread-safe with #368 + entry->load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}; // FUTURE: to make more thread-safe with #368 - try { - auto temp = std::make_shared(*entry); - load_via_ref_key(store, stream_id, load_strategy, temp); - std::swap(*entry, *temp); - entry->load_strategy_ = load_strategy; - } - catch (const std::runtime_error &err) { - if (iterate_on_failure) { - log::version().info( - "Loading versions from storage via ref key failed with error: {}, will load via iteration", - err.what()); - } else { - throw; - } - } - if (iterate_on_failure && entry->empty()) { - (void) load_via_iteration(store, stream_id, entry); - entry->load_strategy_ = LoadStrategy{LoadType::LOAD_ALL, LoadObjective::ANY}; - } + auto temp = std::make_shared(*entry); + load_via_ref_key(store, stream_id, load_strategy, temp); + std::swap(*entry, *temp); + entry->load_strategy_ = load_strategy; util::check(entry->keys_.empty() || entry->head_, "Non-empty VersionMapEntry should set head"); if (validate_) @@ -953,7 +937,7 @@ class VersionMapImpl { entry = check_reload( store, stream_id, - LoadParameter{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, + LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, __FUNCTION__); } diff --git a/cpp/arcticdb/version/version_map_batch_methods.cpp b/cpp/arcticdb/version/version_map_batch_methods.cpp index ccbef92a809..1221f2190e6 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.cpp +++ b/cpp/arcticdb/version/version_map_batch_methods.cpp @@ -21,17 +21,17 @@ void StreamVersionData::react(const pipelines::VersionQuery &version_query) { void StreamVersionData::do_react(std::monostate) { ++count_; - load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}); + load_strategy_ = union_of_undeleted_strategies(load_strategy_, LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}); } void StreamVersionData::do_react(const pipelines::SpecificVersionQuery &specific_version) { ++count_; - load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, specific_version.version_id_}); + load_strategy_ = union_of_undeleted_strategies(load_strategy_, LoadStrategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, specific_version.version_id_}); } void StreamVersionData::do_react(const pipelines::TimestampVersionQuery ×tamp_query) { ++count_; - load_param_.load_strategy_ = union_of_undeleted_strategies(load_param_.load_strategy_, LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, timestamp_query.timestamp_}); + load_strategy_ = union_of_undeleted_strategies(load_strategy_, LoadStrategy{LoadType::LOAD_FROM_TIME, LoadObjective::UNDELETED, timestamp_query.timestamp_}); } void StreamVersionData::do_react(const pipelines::SnapshotVersionQuery &snapshot_query) { @@ -181,7 +181,7 @@ folly::Future set_up_version_future( ) { if (version_data.count_ == 1) { return async::submit_io_task(CheckReloadTask{store, version_map, symbol, - version_data.load_param_}).thenValue( + version_data.load_strategy_}).thenValue( [](std::shared_ptr version_map_entry) { return VersionEntryOrSnapshot{std::move(version_map_entry)}; }); @@ -194,7 +194,7 @@ folly::Future set_up_version_future( CheckReloadTask{store, version_map, symbol, - version_data.load_param_}).thenValue( + version_data.load_strategy_}).thenValue( [](std::shared_ptr version_map_entry) { return VersionEntryOrSnapshot{ std::move(version_map_entry)}; diff --git a/cpp/arcticdb/version/version_map_batch_methods.hpp b/cpp/arcticdb/version/version_map_batch_methods.hpp index bdafdfb91d8..2c3977cfc4a 100644 --- a/cpp/arcticdb/version/version_map_batch_methods.hpp +++ b/cpp/arcticdb/version/version_map_batch_methods.hpp @@ -64,13 +64,13 @@ inline std::shared_ptr> batch_check_l const std::shared_ptr &version_map, const std::shared_ptr> &symbols) { ARCTICDB_SAMPLE(BatchGetLatestVersion, 0) - const LoadParameter load_param{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; + const LoadStrategy load_strategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}; auto output = std::make_shared>(); auto mutex = std::make_shared(); submit_tasks_for_range(*symbols, - [store, version_map, &load_param](auto &symbol) { - return async::submit_io_task(CheckReloadTask{store, version_map, symbol, load_param}); + [store, version_map, &load_strategy](auto &symbol) { + return async::submit_io_task(CheckReloadTask{store, version_map, symbol, load_strategy}); }, [output, mutex](const auto& id, const std::shared_ptr &entry) { auto index_key = entry->get_first_index(false).first; @@ -100,13 +100,13 @@ inline std::shared_ptr> batch_get_latest_v const std::vector &stream_ids, bool include_deleted) { ARCTICDB_SAMPLE(BatchGetLatestVersion, 0) - const LoadParameter load_param{LoadType::LOAD_LATEST, include_deleted ? LoadObjective::ANY : LoadObjective::UNDELETED}; + const LoadStrategy load_strategy{LoadType::LOAD_LATEST, include_deleted ? LoadObjective::ANY : LoadObjective::UNDELETED}; auto output = std::make_shared>(); auto mutex = std::make_shared(); submit_tasks_for_range(stream_ids, - [store, version_map, &load_param](auto& stream_id) { - return async::submit_io_task(CheckReloadTask{store, version_map, stream_id, load_param}); + [store, version_map, &load_strategy](auto& stream_id) { + return async::submit_io_task(CheckReloadTask{store, version_map, stream_id, load_strategy}); }, [output, include_deleted, mutex](auto id, auto entry) { auto [index_key, deleted] = entry->get_first_index(include_deleted); @@ -129,7 +129,7 @@ inline std::vector, std::optional vector_fut.push_back(async::submit_io_task(CheckReloadTask{store, version_map, stream_id, - LoadParameter{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}}) + LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}}) .thenValue([](const std::shared_ptr& entry){ return std::make_pair(entry->get_first_index(false).first, entry->get_first_index(true).first); })); @@ -147,7 +147,7 @@ inline std::vector> batch_get_latest_un vector_fut.push_back(async::submit_io_task(CheckReloadTask{store, version_map, stream_id, - LoadParameter{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}}) + LoadStrategy{LoadType::LOAD_LATEST, LoadObjective::UNDELETED}}) .thenValue([](auto entry){ auto latest_version = entry->get_first_index(true).first; auto latest_undeleted_version = entry->get_first_index(false).first; @@ -194,8 +194,8 @@ inline std::shared_ptr> batch_get_specific MapRandomAccessWrapper wrapper{sym_versions}; submit_tasks_for_range(wrapper, [store, version_map](auto& sym_version) { - LoadParameter load_param{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, static_cast(sym_version.second)}; - return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_param}); + LoadStrategy load_strategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, static_cast(sym_version.second)}; + return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_strategy}); }, [output, option, output_mutex, store, tombstoned_vers, tombstoned_vers_mutex] (auto sym_version, const std::shared_ptr& entry) { @@ -247,8 +247,8 @@ inline std::shared_ptr, AtomKe submit_tasks_for_range(wrapper, [store, version_map](auto sym_version) { auto first_version = *std::min_element(std::begin(sym_version.second), std::end(sym_version.second)); - LoadParameter load_param{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, static_cast(first_version)}; - return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_param}); + LoadStrategy load_strategy{LoadType::LOAD_DOWNTO, LoadObjective::UNDELETED, static_cast(first_version)}; + return async::submit_io_task(CheckReloadTask{store, version_map, sym_version.first, load_strategy}); }, [output, &sym_versions, include_deleted, mutex](auto sym_version, const std::shared_ptr& entry) { @@ -268,11 +268,11 @@ inline std::shared_ptr, AtomKe } // [StreamVersionData] is used to combine different [VersionQuery]s for a stream_id into a list of needed snapshots and -// a single [LoadParameter] which will query the union of all version queries. +// a single [LoadStrategy] which will query the union of all version queries. // It only ever produces load parameters where to_load=UNDELETED. struct StreamVersionData { size_t count_ = 0; - LoadParameter load_param_ = LoadParameter{LoadType::NOT_LOADED, LoadObjective::UNDELETED}; + LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::UNDELETED}; boost::container::small_vector snapshots_; explicit StreamVersionData(const pipelines::VersionQuery& version_query); diff --git a/cpp/arcticdb/version/version_map_entry.hpp b/cpp/arcticdb/version/version_map_entry.hpp index 82640931123..2f7e7c55a73 100644 --- a/cpp/arcticdb/version/version_map_entry.hpp +++ b/cpp/arcticdb/version/version_map_entry.hpp @@ -54,7 +54,7 @@ struct VersionDetails { // load_type: Describes up to which point in the chain we need to go. // load_objective: Whether to include tombstoned versions struct LoadStrategy { - explicit LoadStrategy(LoadType load_type, LoadObjective load_objective = LoadObjective::ANY) : + explicit LoadStrategy(LoadType load_type, LoadObjective load_objective) : load_type_(load_type), load_objective_(load_objective) { } @@ -149,17 +149,6 @@ inline LoadStrategy union_of_undeleted_strategies(const LoadStrategy& left, cons return LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}; } -// LoadParameter is just a LoadStrategy and a boolean specified from VersionQuery.iterate_on_failure defaulting to false. -struct LoadParameter { - LoadParameter(const LoadStrategy& load_strategy) : load_strategy_(load_strategy) {} - LoadParameter(LoadType load_type, LoadObjective load_objective) : load_strategy_(load_type, load_objective) {} - LoadParameter(LoadType load_type, LoadObjective load_objective, int64_t load_from_time_or_until) : - load_strategy_(load_type, load_objective, load_from_time_or_until) {} - - LoadStrategy load_strategy_; - bool iterate_on_failure_ = false; -}; - template bool deque_is_unique(std::deque vec) { sort(vec.begin(), vec.end()); @@ -252,7 +241,7 @@ struct VersionMapEntry { tombstone_all_.reset(); keys_.clear(); loaded_with_progress_ = LoadProgress{}; - load_strategy_ = LoadStrategy{LoadType::NOT_LOADED}; + load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}; } bool empty() const { @@ -443,7 +432,7 @@ struct VersionMapEntry { } std::optional head_; - LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED }; + LoadStrategy load_strategy_ = LoadStrategy{LoadType::NOT_LOADED, LoadObjective::ANY}; timestamp last_reload_time_ = 0; LoadProgress loaded_with_progress_; std::deque keys_; diff --git a/cpp/arcticdb/version/version_store_api.cpp b/cpp/arcticdb/version/version_store_api.cpp index f517d000ca9..74a4db53f24 100644 --- a/cpp/arcticdb/version/version_store_api.cpp +++ b/cpp/arcticdb/version/version_store_api.cpp @@ -52,7 +52,7 @@ VersionedItem PythonVersionStore::write_dataframe_specific_version( ARCTICDB_SAMPLE(WriteDataFrame, 0) ARCTICDB_DEBUG(log::version(), "write_dataframe_specific_version stream_id: {} , version_id: {}", stream_id, version_id); - if (auto version_key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, version_id, VersionQuery{}); version_key) { + if (auto version_key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, version_id); version_key) { log::version().warn("Symbol stream_id: {} already exists with version_id: {}", stream_id, version_id); return {std::move(*version_key)}; } @@ -190,12 +190,11 @@ VersionResultVector get_latest_versions_for_symbols( const std::shared_ptr& store, const std::shared_ptr& version_map, const std::set& stream_ids, - SymbolVersionToSnapshotMap& snapshots_for_symbol, - const VersionQuery& version_query + SymbolVersionToSnapshotMap& snapshots_for_symbol ) { VersionResultVector res; for (auto &s_id: stream_ids) { - const auto& opt_version_key = get_latest_undeleted_version(store, version_map, s_id, version_query); + const auto& opt_version_key = get_latest_undeleted_version(store, version_map, s_id); if (opt_version_key) { res.emplace_back( s_id, @@ -220,7 +219,7 @@ VersionResultVector get_all_versions_for_symbols( VersionResultVector res; std::unordered_set> unpruned_versions; for (auto &s_id: stream_ids) { - auto all_versions = get_all_versions(store, version_map, s_id, VersionQuery{}); + auto all_versions = get_all_versions(store, version_map, s_id); unpruned_versions = {}; for (const auto &entry: all_versions) { unpruned_versions.emplace(s_id, entry.version_id()); @@ -252,7 +251,6 @@ VersionResultVector PythonVersionStore::list_versions( const std::optional &stream_id, const std::optional &snap_name, const std::optional& latest_only, - const std::optional& iterate_on_failure, const std::optional& skip_snapshots) { ARCTICDB_SAMPLE(ListVersions, 0) ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: list_versions"); @@ -276,11 +274,8 @@ VersionResultVector PythonVersionStore::list_versions( return list_versions_for_snapshot(stream_ids, snap_name, *versions_for_snapshots, snapshots_for_symbol); } - VersionQuery version_query; - version_query.set_iterate_on_failure(iterate_on_failure); - if(opt_false(latest_only)) - return get_latest_versions_for_symbols(store(), version_map(), stream_ids, snapshots_for_symbol, version_query); + return get_latest_versions_for_symbols(store(), version_map(), stream_ids, snapshots_for_symbol); else return get_all_versions_for_symbols(store(), version_map(), stream_ids, snapshots_for_symbol, creation_ts_for_version_symbol); } @@ -511,7 +506,7 @@ VersionedItem PythonVersionStore::write_partitioned_dataframe( const std::vector& partition_value ) { ARCTICDB_SAMPLE(WritePartitionedDataFrame, 0) - auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); + auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); auto version_id = get_next_version_from_key(maybe_prev); // TODO: We are not actually partitioning stuff atm, just assuming a single partition is passed for now. @@ -564,7 +559,7 @@ VersionedItem PythonVersionStore::write_versioned_composite_data( ARCTICDB_SAMPLE(WriteVersionedMultiKey, 0) ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_composite_data"); - auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{}); + auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id); auto version_id = get_next_version_from_key(maybe_prev); ARCTICDB_DEBUG(log::version(), "write_versioned_composite_data for stream_id: {} , version_id = {}", stream_id, version_id); // TODO: Assuming each sub key is always going to have the same version attached to it. @@ -742,7 +737,7 @@ void PythonVersionStore::write_parallel( } std::unordered_map PythonVersionStore::get_all_tombstoned_versions(const StreamId &stream_id) { - return ::arcticdb::get_all_tombstoned_versions(store(), version_map(), stream_id, VersionQuery{}); + return ::arcticdb::get_all_tombstoned_versions(store(), version_map(), stream_id); } FrameAndDescriptor create_frame(const StreamId& target_id, SegmentInMemory seg, const ReadOptions& read_options) { @@ -909,7 +904,7 @@ void PythonVersionStore::delete_version( const StreamId& stream_id, VersionId version_id) { ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: delete_version"); - auto result = ::arcticdb::tombstone_version(store(), version_map(), stream_id, version_id, VersionQuery{}); + auto result = ::arcticdb::tombstone_version(store(), version_map(), stream_id, version_id); if (!result.keys_to_delete.empty() && !cfg().write_options().delayed_deletes()) { delete_tree(result.keys_to_delete, result); @@ -923,7 +918,7 @@ void PythonVersionStore::delete_version( void PythonVersionStore::fix_symbol_trees(const std::vector& symbols) { auto snaps = get_master_snapshots_map(store()); for (const auto& sym : symbols) { - auto index_keys_from_symbol_tree = get_all_versions(store(), version_map(), sym, VersionQuery{}); + auto index_keys_from_symbol_tree = get_all_versions(store(), version_map(), sym); for(const auto& [key, map] : snaps[sym]) { index_keys_from_symbol_tree.push_back(key); } @@ -941,7 +936,7 @@ void PythonVersionStore::prune_previous_versions(const StreamId& stream_id) { const std::shared_ptr& entry = version_map()->check_reload( store(), stream_id, - LoadParameter{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, + LoadStrategy{LoadType::LOAD_ALL, LoadObjective::UNDELETED}, __FUNCTION__); storage::check(!entry->empty(), "Symbol {} is not found", stream_id); auto [latest, deleted] = entry->get_first_index(false); @@ -952,7 +947,7 @@ void PythonVersionStore::prune_previous_versions(const StreamId& stream_id) { return; } - auto previous = ::arcticdb::get_specific_version(store(), version_map(), stream_id, *prev_id, VersionQuery{}); + auto previous = ::arcticdb::get_specific_version(store(), version_map(), stream_id, *prev_id); auto [_, pruned_indexes] = version_map()->tombstone_from_key_or_all(store(), stream_id, previous); delete_unreferenced_pruned_indexes(pruned_indexes, *latest).get(); } @@ -1109,7 +1104,7 @@ ReadResult PythonVersionStore::read_index( } std::vector PythonVersionStore::get_version_history(const StreamId& stream_id) { - return get_index_and_tombstone_keys(store(), version_map(), stream_id, VersionQuery{}); + return get_index_and_tombstone_keys(store(), version_map(), stream_id); } void PythonVersionStore::_compact_version_map(const StreamId& id) { diff --git a/cpp/arcticdb/version/version_store_api.hpp b/cpp/arcticdb/version/version_store_api.hpp index 326d5d61240..69389d73243 100644 --- a/cpp/arcticdb/version/version_store_api.hpp +++ b/cpp/arcticdb/version/version_store_api.hpp @@ -260,7 +260,6 @@ class PythonVersionStore : public LocalVersionedEngine { const std::optional &stream_id, const std::optional& snap_name, const std::optional &latest_only, - const std::optional& iterate_on_failure, const std::optional& skip_snapshots); // Batch methods diff --git a/cpp/arcticdb/version/version_tasks.hpp b/cpp/arcticdb/version/version_tasks.hpp index b44c2dfed96..e383c91e9c0 100644 --- a/cpp/arcticdb/version/version_tasks.hpp +++ b/cpp/arcticdb/version/version_tasks.hpp @@ -108,21 +108,21 @@ struct CheckReloadTask : async::BaseTask { const std::shared_ptr store_; const std::shared_ptr version_map_; const StreamId stream_id_; - const LoadParameter load_param_; + const LoadStrategy load_strategy_; CheckReloadTask( std::shared_ptr store, std::shared_ptr version_map, StreamId stream_id, - LoadParameter load_param) : + LoadStrategy load_strategy) : store_(std::move(store)), version_map_(std::move(version_map)), stream_id_(std::move(stream_id)), - load_param_(load_param) { + load_strategy_(load_strategy) { } std::shared_ptr operator()() const { - return version_map_->check_reload(store_, stream_id_, load_param_, __FUNCTION__); + return version_map_->check_reload(store_, stream_id_, load_strategy_, __FUNCTION__); } }; diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 96144cc407e..1a97831bb64 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -1484,7 +1484,6 @@ def batch_restore_version( def _get_version_query(self, as_of: VersionQueryInput, **kwargs): version_query = _PythonVersionStoreVersionQuery() - version_query.set_iterate_on_failure(_assume_false("iterate_on_failure", kwargs)) if isinstance(as_of, str): version_query.set_snap_name(as_of) @@ -1968,7 +1967,6 @@ def list_versions( symbol: Optional[str] = None, snapshot: Optional[str] = None, latest_only: Optional[bool] = False, - iterate_on_failure: Optional[bool] = False, skip_snapshots: Optional[bool] = False, ) -> List[Dict]: """ @@ -1983,8 +1981,6 @@ def list_versions( latest_only : `bool` Only include the latest version for each returned symbol. Has no effect if `snapshot` argument is also specified. - iterate_on_failure: `bool` - Iterate the type in the storage if the top-level key isn't present. skip_snapshots: `bool` Don't populate version list with snapshot information. Can improve performance significantly if there are many snapshots. @@ -2015,7 +2011,7 @@ def list_versions( log.warning("latest_only has no effect when snapshot is specified") NativeVersionStore._warned_about_list_version_latest_only_and_snapshot = True - result = self.version_store.list_versions(symbol, snapshot, latest_only, iterate_on_failure, skip_snapshots) + result = self.version_store.list_versions(symbol, snapshot, latest_only, skip_snapshots) return [ { "symbol": version_result[0], @@ -2267,7 +2263,7 @@ def _prune_previous_versions( log.info("Done deleting version: {}".format(str(v_info["version"]))) def has_symbol( - self, symbol: str, as_of: Optional[VersionQueryInput] = None, iterate_on_failure: Optional[bool] = False + self, symbol: str, as_of: Optional[VersionQueryInput] = None ) -> bool: """ Return True if the 'symbol' exists in this library AND the symbol isn't deleted in the specified as_of. @@ -2279,8 +2275,6 @@ def has_symbol( symbol name as_of : `Optional[VersionQueryInput]`, default=None See documentation of `read` method for more details. - iterate_on_failure: `Optional[bool]`, default=False - Iterate the type in the storage if the top-level key isn;t present Returns ------- @@ -2289,7 +2283,7 @@ def has_symbol( """ return ( - self._find_version(symbol, as_of=as_of, raise_on_missing=False, iterate_on_failure=iterate_on_failure) + self._find_version(symbol, as_of=as_of, raise_on_missing=False) is not None ) diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index f3eb1a55f9f..f46b64d12dc 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -1509,7 +1509,6 @@ def list_versions( symbol=symbol, snapshot=snapshot, latest_only=latest_only, - iterate_on_failure=False, skip_snapshots=skip_snapshots, ) return {