Skip to content

Commit

Permalink
Removes iterate_on_failure argument and related functionality
Browse files Browse the repository at this point in the history
`iterate_on_failure` used to provide a back up way to load versions if
the ref key is unreadable for some reason. The ref key should no longer
be unreadable, thus `iterate_on_failure` should be removed.

This commit removes the `iterate_on_failure`
- argument in the `lib._nvs` functions
- parameter of `VersionQuery`
- parameter of `LoadParameter` and entirely removes the `LoadParameter`
  and replaces it with `LoadStrategy`
- removes IterateOnFailure cpp test which used to test this behavior by
  artificially removing the ref key.
  • Loading branch information
IvoDD committed May 3, 2024
1 parent 6e66dc0 commit d2b86ac
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 377 deletions.
5 changes: 0 additions & 5 deletions cpp/arcticdb/pipeline/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ using VersionQueryType = std::variant<

struct VersionQuery {
VersionQueryType content_;
std::optional<bool> iterate_on_failure_;

void set_snap_name(const std::string& snap_name) {
content_ = SnapshotVersionQuery{snap_name};
Expand All @@ -109,10 +108,6 @@ struct VersionQuery {
void set_version(SignedVersionId version) {
content_ = SpecificVersionQuery{version};
}

void set_iterate_on_failure(const std::optional<bool>& iterate_on_failure) {
iterate_on_failure_ = iterate_on_failure;
}
};

template<typename ContainerType>
Expand Down
68 changes: 31 additions & 37 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,8 @@ std::string LocalVersionedEngine::dump_versions(const StreamId& stream_id) {
}

std::optional<VersionedItem> LocalVersionedEngine::get_latest_version(
const StreamId &stream_id,
const VersionQuery& version_query) {
auto key = get_latest_undeleted_version(store(), version_map(), stream_id, version_query);
const StreamId &stream_id) {
auto key = get_latest_undeleted_version(store(), version_map(), stream_id);
if (!key) {
ARCTICDB_DEBUG(log::version(), "get_latest_version didn't find version for stream_id: {}", stream_id);
return std::nullopt;
Expand All @@ -220,16 +219,15 @@ std::optional<VersionedItem> LocalVersionedEngine::get_latest_version(

std::optional<VersionedItem> LocalVersionedEngine::get_specific_version(
const StreamId &stream_id,
SignedVersionId signed_version_id,
const VersionQuery& version_query) {
SignedVersionId signed_version_id) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: get_specific_version");
auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id, version_query);
auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id);
if (!key) {
VersionId version_id;
if (signed_version_id >= 0) {
version_id = static_cast<VersionId>(signed_version_id);
} else {
auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, version_query);
auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
if (opt_latest_key.has_value()) {
auto opt_version_id = get_version_id_negative_index(opt_latest_key->version_id(), signed_version_id);
if (opt_version_id.has_value()) {
Expand Down Expand Up @@ -261,11 +259,10 @@ std::optional<VersionedItem> LocalVersionedEngine::get_specific_version(

std::optional<VersionedItem> LocalVersionedEngine::get_version_at_time(
const StreamId& stream_id,
timestamp as_of,
const VersionQuery& version_query
timestamp as_of
) {

auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of, version_query);
auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of);
if (!index_key) {
auto index_keys = get_index_keys_in_snapshots(store(), stream_id);
auto vector_index_keys = std::vector<AtomKey>(index_keys.begin(), index_keys.end());
Expand Down Expand Up @@ -309,17 +306,17 @@ std::optional<VersionedItem> LocalVersionedEngine::get_version_to_read(
const VersionQuery &version_query
) {
return util::variant_match(version_query.content_,
[&stream_id, &version_query, this](const SpecificVersionQuery &specific) {
return get_specific_version(stream_id, specific.version_id_, version_query);
[&stream_id, this](const SpecificVersionQuery &specific) {
return get_specific_version(stream_id, specific.version_id_);
},
[&stream_id, this](const SnapshotVersionQuery &snapshot) {
return get_version_from_snapshot(stream_id, snapshot.name_);
},
[&stream_id, &version_query, this](const TimestampVersionQuery &timestamp) {
return get_version_at_time(stream_id, timestamp.timestamp_, version_query);
[&stream_id, this](const TimestampVersionQuery &timestamp) {
return get_version_at_time(stream_id, timestamp.timestamp_);
},
[&stream_id, &version_query, this](const std::monostate &) {
return get_latest_version(stream_id, version_query);
[&stream_id, this](const std::monostate &) {
return get_latest_version(stream_id);
}
);
}
Expand Down Expand Up @@ -486,7 +483,7 @@ std::shared_ptr<DeDupMap> LocalVersionedEngine::get_de_dup_map(
){
auto de_dup_map = std::make_shared<DeDupMap>();
if (write_options.de_duplication) {
auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
if (maybe_undeleted_prev) {
// maybe_undeleted_prev is index key
auto data_keys = get_data_keys(store(), {maybe_undeleted_prev.value()}, storage::ReadKeyOpts{});
Expand All @@ -511,7 +508,7 @@ std::shared_ptr<DeDupMap> LocalVersionedEngine::get_de_dup_map(


VersionedItem LocalVersionedEngine::sort_index(const StreamId& stream_id, bool dynamic_schema) {
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id);
auto version_id = get_next_version_from_key(*maybe_prev);
auto [index_segment_reader, slice_and_keys] = index::read_index_to_vector(store(), *maybe_prev);
Expand Down Expand Up @@ -551,7 +548,7 @@ VersionedItem LocalVersionedEngine::delete_range_internal(
const StreamId& stream_id,
const UpdateQuery & query,
bool dynamic_schema) {
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id);
auto versioned_item = delete_range_impl(store(),
*maybe_prev,
Expand All @@ -572,8 +569,7 @@ VersionedItem LocalVersionedEngine::update_internal(
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: update");
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);
if (update_info.previous_index_key_.has_value()) {
if (frame->empty()) {
ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n"
Expand Down Expand Up @@ -621,8 +617,7 @@ VersionedItem LocalVersionedEngine::write_versioned_metadata_internal(
) {
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);
if(update_info.previous_index_key_.has_value()) {
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {}", stream_id);
auto index_key = UpdateMetadataTask{store(), update_info, std::move(user_meta)}();
Expand Down Expand Up @@ -709,7 +704,7 @@ VersionedItem LocalVersionedEngine::write_versioned_dataframe_internal(
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)
py::gil_scoped_release release_gil;
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe");
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
auto version_id = get_next_version_from_key(maybe_prev);
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id);
auto write_options = get_write_options();
Expand Down Expand Up @@ -739,7 +734,7 @@ std::pair<VersionedItem, TimeseriesDescriptor> LocalVersionedEngine::restore_ver
auto version_to_restore = get_version_to_read(stream_id, version_query);
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(static_cast<bool>(version_to_restore),
"Unable to restore {}@{}: version not found", stream_id, version_query);
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
ARCTICDB_DEBUG(log::version(), "restore for stream_id: {} , version_id = {}", stream_id, version_to_restore->key_.version_id());
return AsyncRestoreVersionTask{store(), version_map(), stream_id, version_to_restore->key_, maybe_prev}().get();
}
Expand All @@ -752,7 +747,7 @@ VersionedItem LocalVersionedEngine::write_individual_segment(
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)

ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe");
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
auto version_id = get_next_version_from_key(maybe_prev);
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id);
auto index = index_type_from_descriptor(segment.descriptor());
Expand Down Expand Up @@ -870,10 +865,10 @@ folly::Future<folly::Unit> LocalVersionedEngine::delete_trees_responsibly(
{
auto min_versions = min_versions_for_each_stream(orig_keys_to_delete);
for (const auto& min : min_versions) {
auto load_param = load_type == LoadType::LOAD_DOWNTO
? LoadParameter{load_type, LoadObjective::ANY, static_cast<SignedVersionId>(min.second)}
: LoadParameter{load_type, LoadObjective::ANY};
const auto entry = version_map()->check_reload(store(), min.first, load_param, __FUNCTION__);
auto load_strategy = load_type == LoadType::LOAD_DOWNTO
? LoadStrategy{load_type, LoadObjective::ANY, static_cast<SignedVersionId>(min.second)}
: LoadStrategy{load_type, LoadObjective::ANY};
const auto entry = version_map()->check_reload(store(), min.first, load_strategy, __FUNCTION__);
entry_map.try_emplace(std::move(min.first), entry);
}
}
Expand Down Expand Up @@ -1006,7 +1001,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
bool prune_previous_versions) {
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto versioned_item = compact_incomplete_impl(
store_, stream_id, user_meta, update_info,
append, convert_int_to_float, via_iteration, sparsify, get_write_options());
Expand All @@ -1022,7 +1017,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(

bool LocalVersionedEngine::is_symbol_fragmented(const StreamId& stream_id, std::optional<size_t> segment_size) {
auto update_info = get_latest_undeleted_version_and_next_version_id(
store(), version_map(), stream_id, VersionQuery{});
store(), version_map(), stream_id);
auto options = get_write_options();
auto pre_defragmentation_info = get_pre_defragmentation_info(
store(), stream_id, update_info, options, segment_size.value_or(options.segment_row_size));
Expand All @@ -1034,7 +1029,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea

// Currently defragmentation only for latest version - is there a use-case to allow compaction for older data?
auto update_info = get_latest_undeleted_version_and_next_version_id(
store(), version_map(), stream_id, VersionQuery{});
store(), version_map(), stream_id);

auto options = get_write_options();
auto versioned_item = defragment_symbol_data_impl(
Expand Down Expand Up @@ -1384,8 +1379,7 @@ VersionedItem LocalVersionedEngine::append_internal(

auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);

if(update_info.previous_index_key_.has_value()) {
if (frame->empty()) {
Expand Down Expand Up @@ -1689,7 +1683,7 @@ VersionedItem LocalVersionedEngine::sort_merge_internal(
bool via_iteration,
bool sparsify
) {
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, append, convert_int_to_float, via_iteration, sparsify);
version_map()->write_version(store(), versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1727,7 +1721,7 @@ timestamp LocalVersionedEngine::latest_timestamp(const std::string& symbol) {
if(auto latest_incomplete = latest_incomplete_timestamp(store(), symbol); latest_incomplete)
return *latest_incomplete;

if(auto latest_key = get_latest_version(symbol, VersionQuery{}); latest_key)
if(auto latest_key = get_latest_version(symbol); latest_key)
return latest_key->key_.end_time();

return -1;
Expand Down
9 changes: 3 additions & 6 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,15 @@ class LocalVersionedEngine : public VersionedEngine {
) override;

std::optional<VersionedItem> get_latest_version(
const StreamId &stream_id,
const VersionQuery& version_query);
const StreamId &stream_id);

std::optional<VersionedItem> get_specific_version(
const StreamId &stream_id,
SignedVersionId signed_version_id,
const VersionQuery& version_query);
SignedVersionId signed_version_id);

std::optional<VersionedItem> get_version_at_time(
const StreamId& stream_id,
timestamp as_of,
const VersionQuery& version_query);
timestamp as_of);

std::optional<VersionedItem> get_version_from_snapshot(
const StreamId& stream_id,
Expand Down
6 changes: 2 additions & 4 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def(py::init())
.def("set_snap_name", &VersionQuery::set_snap_name)
.def("set_timestamp", &VersionQuery::set_timestamp)
.def("set_version", &VersionQuery::set_version)
.def("set_iterate_on_failure", &VersionQuery::set_iterate_on_failure);
.def("set_version", &VersionQuery::set_version);

py::class_<ReadOptions>(version, "PythonVersionStoreReadOptions")
.def(py::init())
Expand Down Expand Up @@ -694,10 +693,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
const std::optional<StreamId> & s_id,
const std::optional<SnapshotId> & snap_id,
const std::optional<bool>& latest,
const std::optional<bool>& iterate_on_failure,
const std::optional<bool>& skip_snapshots
){
return v.list_versions(s_id, snap_id, latest, iterate_on_failure, skip_snapshots);
return v.list_versions(s_id, snap_id, latest, skip_snapshots);
},
py::call_guard<SingleThreadMutexHolder>(), "List all the version ids for this store.")
.def("_compact_version_map",
Expand Down
8 changes: 3 additions & 5 deletions cpp/arcticdb/version/test/rapidcheck_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
template <typename Model>
void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symbol) {
using namespace arcticdb;
auto prev = get_latest_version(sut.store_,sut.map_, symbol, pipelines::VersionQuery{}).first;
auto prev = get_latest_version(sut.store_,sut.map_, symbol).first;
auto sut_version_id = prev ? prev->version_id() : 0;
auto model_prev = s0.get_latest_version(symbol);
auto model_version_id = model_prev ? model_prev.value() : 0;
Expand All @@ -33,9 +33,7 @@ void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symb
template <typename Model>
void check_latest_undeleted_versions(const Model& s0, MapStorePair &sut, std::string symbol) {
using namespace arcticdb;
pipelines::VersionQuery version_query;
version_query.set_iterate_on_failure(true);
auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol, version_query);
auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol);
auto sut_version_id = prev ? prev->version_id() : 0;
auto model_prev = s0.get_latest_undeleted_version(symbol);
auto model_version_id = model_prev ? model_prev.value() : 0;
Expand Down Expand Up @@ -224,7 +222,7 @@ struct GetAllVersions : rc::state::Command<Model, MapStorePair> {
void run(const Model& s0, MapStorePair &sut) const override {
auto model_versions = s0.get_all_versions(symbol_);
using namespace arcticdb;
auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_, pipelines::VersionQuery{});
auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_);
RC_ASSERT(model_versions.size() == sut_version.size());

for(auto i = size_t{0}; i < model_versions.size(); ++i)
Expand Down
Loading

0 comments on commit d2b86ac

Please sign in to comment.