Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1384 by refactoring VersionMap to include all combinations of LoadTypes and whether to include_deleted #1443

Merged
merged 11 commits into from
Jul 24, 2024
Merged
5 changes: 0 additions & 5 deletions cpp/arcticdb/pipeline/query.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ using VersionQueryType = std::variant<

struct VersionQuery {
VersionQueryType content_;
std::optional<bool> iterate_on_failure_;

void set_snap_name(const std::string& snap_name) {
content_ = SnapshotVersionQuery{snap_name};
Expand All @@ -109,10 +108,6 @@ struct VersionQuery {
void set_version(SignedVersionId version) {
content_ = SpecificVersionQuery{version};
}

void set_iterate_on_failure(const std::optional<bool>& iterate_on_failure) {
iterate_on_failure_ = iterate_on_failure;
}
};

template<typename ContainerType>
Expand Down
4 changes: 3 additions & 1 deletion cpp/arcticdb/util/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ constexpr auto string_nan = std::numeric_limits<position_t>::max() - 1;

constexpr auto NaT = std::numeric_limits<timestamp>::min();

static constexpr decltype(timestamp(0) - timestamp(0)) ONE_SECOND = 1'000'000'000;
static constexpr decltype(timestamp(0) - timestamp(0)) ONE_MILLISECOND = 1'000'000;

static constexpr decltype(timestamp(0) - timestamp(0)) ONE_SECOND = 1'000 * ONE_MILLISECOND;

static constexpr decltype(timestamp(0) - timestamp(0)) ONE_MINUTE = 60 * ONE_SECOND;

Expand Down
68 changes: 31 additions & 37 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ std::string LocalVersionedEngine::dump_versions(const StreamId& stream_id) {
}

std::optional<VersionedItem> LocalVersionedEngine::get_latest_version(
const StreamId &stream_id,
const VersionQuery& version_query) {
auto key = get_latest_undeleted_version(store(), version_map(), stream_id, version_query);
const StreamId &stream_id) {
auto key = get_latest_undeleted_version(store(), version_map(), stream_id);
if (!key) {
ARCTICDB_DEBUG(log::version(), "get_latest_version didn't find version for stream_id: {}", stream_id);
return std::nullopt;
Expand All @@ -235,16 +234,15 @@ std::optional<VersionedItem> LocalVersionedEngine::get_latest_version(

std::optional<VersionedItem> LocalVersionedEngine::get_specific_version(
const StreamId &stream_id,
SignedVersionId signed_version_id,
const VersionQuery& version_query) {
SignedVersionId signed_version_id) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: get_specific_version");
auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id, version_query);
auto key = ::arcticdb::get_specific_version(store(), version_map(), stream_id, signed_version_id);
if (!key) {
VersionId version_id;
if (signed_version_id >= 0) {
version_id = static_cast<VersionId>(signed_version_id);
} else {
auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, version_query);
auto [opt_latest_key, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
if (opt_latest_key.has_value()) {
auto opt_version_id = get_version_id_negative_index(opt_latest_key->version_id(), signed_version_id);
if (opt_version_id.has_value()) {
Expand Down Expand Up @@ -276,11 +274,10 @@ std::optional<VersionedItem> LocalVersionedEngine::get_specific_version(

std::optional<VersionedItem> LocalVersionedEngine::get_version_at_time(
const StreamId& stream_id,
timestamp as_of,
const VersionQuery& version_query
timestamp as_of
) {

auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of, version_query);
auto index_key = load_index_key_from_time(store(), version_map(), stream_id, as_of);
if (!index_key) {
auto index_keys = get_index_keys_in_snapshots(store(), stream_id);
auto vector_index_keys = std::vector<AtomKey>(index_keys.begin(), index_keys.end());
Expand Down Expand Up @@ -324,17 +321,17 @@ std::optional<VersionedItem> LocalVersionedEngine::get_version_to_read(
const VersionQuery &version_query
) {
return util::variant_match(version_query.content_,
[&stream_id, &version_query, this](const SpecificVersionQuery &specific) {
return get_specific_version(stream_id, specific.version_id_, version_query);
[&stream_id, this](const SpecificVersionQuery &specific) {
return get_specific_version(stream_id, specific.version_id_);
},
[&stream_id, this](const SnapshotVersionQuery &snapshot) {
return get_version_from_snapshot(stream_id, snapshot.name_);
},
[&stream_id, &version_query, this](const TimestampVersionQuery &timestamp) {
return get_version_at_time(stream_id, timestamp.timestamp_, version_query);
[&stream_id, this](const TimestampVersionQuery &timestamp) {
return get_version_at_time(stream_id, timestamp.timestamp_);
},
[&stream_id, &version_query, this](const std::monostate &) {
return get_latest_version(stream_id, version_query);
[&stream_id, this](const std::monostate &) {
return get_latest_version(stream_id);
}
);
}
Expand Down Expand Up @@ -481,7 +478,7 @@ std::shared_ptr<DeDupMap> LocalVersionedEngine::get_de_dup_map(
){
auto de_dup_map = std::make_shared<DeDupMap>();
if (write_options.de_duplication) {
auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_undeleted_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
if (maybe_undeleted_prev) {
// maybe_undeleted_prev is index key
auto data_keys = get_data_keys(store(), {maybe_undeleted_prev.value()}, storage::ReadKeyOpts{});
Expand All @@ -506,7 +503,7 @@ std::shared_ptr<DeDupMap> LocalVersionedEngine::get_de_dup_map(


VersionedItem LocalVersionedEngine::sort_index(const StreamId& stream_id, bool dynamic_schema, bool prune_previous_versions) {
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id);
auto version_id = get_next_version_from_key(*maybe_prev);
auto [index_segment_reader, slice_and_keys] = index::read_index_to_vector(store(), *maybe_prev);
Expand Down Expand Up @@ -546,7 +543,7 @@ VersionedItem LocalVersionedEngine::delete_range_internal(
const StreamId& stream_id,
const UpdateQuery & query,
const DeleteRangeOptions& option) {
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id, VersionQuery{});
auto maybe_prev = get_latest_undeleted_version(store(), version_map(), stream_id);
util::check(maybe_prev.has_value(), "Cannot delete from non-existent symbol {}", stream_id);
auto versioned_item = delete_range_impl(store(),
*maybe_prev,
Expand All @@ -568,8 +565,7 @@ VersionedItem LocalVersionedEngine::update_internal(
py::gil_scoped_release release_gil;
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);
if (update_info.previous_index_key_.has_value()) {
if (frame->empty()) {
ARCTICDB_DEBUG(log::version(), "Updating existing data with an empty item has no effect. \n"
Expand Down Expand Up @@ -618,8 +614,7 @@ VersionedItem LocalVersionedEngine::write_versioned_metadata_internal(
) {
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);
if(update_info.previous_index_key_.has_value()) {
ARCTICDB_DEBUG(log::version(), "write_versioned_metadata for stream_id: {}", stream_id);
auto index_key = UpdateMetadataTask{store(), update_info, std::move(user_meta)}();
Expand Down Expand Up @@ -706,7 +701,7 @@ VersionedItem LocalVersionedEngine::write_versioned_dataframe_internal(
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)
py::gil_scoped_release release_gil;
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write_versioned_dataframe");
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
auto version_id = get_next_version_from_key(maybe_prev);
ARCTICDB_DEBUG(log::version(), "write_versioned_dataframe for stream_id: {} , version_id = {}", stream_id, version_id);
auto write_options = get_write_options();
Expand Down Expand Up @@ -736,7 +731,7 @@ std::pair<VersionedItem, TimeseriesDescriptor> LocalVersionedEngine::restore_ver
auto version_to_restore = get_version_to_read(stream_id, version_query);
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(static_cast<bool>(version_to_restore),
"Unable to restore {}@{}: version not found", stream_id, version_query);
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
ARCTICDB_DEBUG(log::version(), "restore for stream_id: {} , version_id = {}", stream_id, version_to_restore->key_.version_id());
return AsyncRestoreVersionTask{store(), version_map(), stream_id, version_to_restore->key_, maybe_prev}().get();
}
Expand All @@ -749,7 +744,7 @@ VersionedItem LocalVersionedEngine::write_individual_segment(
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)

ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write individual segment");
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id, VersionQuery{});
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
auto version_id = get_next_version_from_key(maybe_prev);
ARCTICDB_DEBUG(log::version(), "write individual segment for stream_id: {} , version_id = {}", stream_id, version_id);
auto index = index_type_from_descriptor(segment.descriptor());
Expand Down Expand Up @@ -869,10 +864,10 @@ folly::Future<folly::Unit> delete_trees_responsibly(
{
auto min_versions = min_versions_for_each_stream(orig_keys_to_delete);
for (const auto& min : min_versions) {
auto load_param = load_type == LoadType::LOAD_DOWNTO
? LoadParameter{load_type, static_cast<SignedVersionId>(min.second)}
: LoadParameter{load_type};
const auto entry = version_map->check_reload(store, min.first, load_param, __FUNCTION__);
auto load_strategy = load_type == LoadType::DOWNTO
? LoadStrategy{load_type, LoadObjective::INCLUDE_DELETED, static_cast<SignedVersionId>(min.second)}
: LoadStrategy{load_type, LoadObjective::INCLUDE_DELETED};
const auto entry = version_map->check_reload(store, min.first, load_strategy, __FUNCTION__);
entry_map.try_emplace(std::move(min.first), entry);
}
}
Expand Down Expand Up @@ -1002,7 +997,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(
const CompactIncompleteOptions& options) {
log::version().debug("Compacting incomplete symbol {}", stream_id);

auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto versioned_item = compact_incomplete_impl(store_, stream_id, user_meta, update_info, options, get_write_options());

write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
Expand All @@ -1015,7 +1010,7 @@ VersionedItem LocalVersionedEngine::compact_incomplete_dynamic(

bool LocalVersionedEngine::is_symbol_fragmented(const StreamId& stream_id, std::optional<size_t> segment_size) {
auto update_info = get_latest_undeleted_version_and_next_version_id(
store(), version_map(), stream_id, VersionQuery{});
store(), version_map(), stream_id);
auto options = get_write_options();
auto pre_defragmentation_info = get_pre_defragmentation_info(
store(), stream_id, update_info, options, segment_size.value_or(options.segment_row_size));
Expand All @@ -1027,7 +1022,7 @@ VersionedItem LocalVersionedEngine::defragment_symbol_data(const StreamId& strea

// Currently defragmentation only for latest version - is there a use-case to allow compaction for older data?
auto update_info = get_latest_undeleted_version_and_next_version_id(
store(), version_map(), stream_id, VersionQuery{});
store(), version_map(), stream_id);

auto options = get_write_options();
auto versioned_item = defragment_symbol_data_impl(
Expand Down Expand Up @@ -1377,8 +1372,7 @@ VersionedItem LocalVersionedEngine::append_internal(
py::gil_scoped_release release_gil;
auto update_info = get_latest_undeleted_version_and_next_version_id(store(),
version_map(),
stream_id,
VersionQuery{});
stream_id);

if(update_info.previous_index_key_.has_value()) {
if (frame->empty()) {
Expand Down Expand Up @@ -1679,7 +1673,7 @@ VersionedItem LocalVersionedEngine::sort_merge_internal(
const StreamId& stream_id,
const std::optional<arcticdb::proto::descriptors::UserDefinedMetadata>& user_meta,
const CompactIncompleteOptions& options) {
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id, VersionQuery{});
auto update_info = get_latest_undeleted_version_and_next_version_id(store(), version_map(), stream_id);
auto versioned_item = sort_merge_impl(store_, stream_id, user_meta, update_info, options);
write_version_and_prune_previous(options.prune_previous_versions_, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1717,7 +1711,7 @@ timestamp LocalVersionedEngine::latest_timestamp(const std::string& symbol) {
if(auto latest_incomplete = latest_incomplete_timestamp(store(), symbol); latest_incomplete)
return *latest_incomplete;

if(auto latest_key = get_latest_version(symbol, VersionQuery{}); latest_key)
if(auto latest_key = get_latest_version(symbol); latest_key)
return latest_key->key_.end_time();

return -1;
Expand Down
9 changes: 3 additions & 6 deletions cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,18 +112,15 @@ class LocalVersionedEngine : public VersionedEngine {
) override;

std::optional<VersionedItem> get_latest_version(
const StreamId &stream_id,
const VersionQuery& version_query);
const StreamId &stream_id);

std::optional<VersionedItem> get_specific_version(
const StreamId &stream_id,
SignedVersionId signed_version_id,
const VersionQuery& version_query);
SignedVersionId signed_version_id);

std::optional<VersionedItem> get_version_at_time(
const StreamId& stream_id,
timestamp as_of,
const VersionQuery& version_query);
timestamp as_of);

std::optional<VersionedItem> get_version_from_snapshot(
const StreamId& stream_id,
Expand Down
6 changes: 2 additions & 4 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def(py::init())
.def("set_snap_name", &VersionQuery::set_snap_name)
.def("set_timestamp", &VersionQuery::set_timestamp)
.def("set_version", &VersionQuery::set_version)
.def("set_iterate_on_failure", &VersionQuery::set_iterate_on_failure);
.def("set_version", &VersionQuery::set_version);

py::class_<ReadOptions>(version, "PythonVersionStoreReadOptions")
.def(py::init())
Expand Down Expand Up @@ -715,10 +714,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
const std::optional<StreamId> & s_id,
const std::optional<SnapshotId> & snap_id,
const std::optional<bool>& latest,
const std::optional<bool>& iterate_on_failure,
const std::optional<bool>& skip_snapshots
){
return v.list_versions(s_id, snap_id, latest, iterate_on_failure, skip_snapshots);
return v.list_versions(s_id, snap_id, latest, skip_snapshots);
},
py::call_guard<SingleThreadMutexHolder>(), "List all the version ids for this store.")
.def("_compact_version_map",
Expand Down
8 changes: 3 additions & 5 deletions cpp/arcticdb/version/test/rapidcheck_version_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
template <typename Model>
void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symbol) {
using namespace arcticdb;
auto prev = get_latest_version(sut.store_,sut.map_, symbol, pipelines::VersionQuery{}).first;
auto prev = get_latest_version(sut.store_,sut.map_, symbol).first;
auto sut_version_id = prev ? prev->version_id() : 0;
auto model_prev = s0.get_latest_version(symbol);
auto model_version_id = model_prev ? model_prev.value() : 0;
Expand All @@ -33,9 +33,7 @@ void check_latest_versions(const Model& s0, MapStorePair &sut, std::string symb
template <typename Model>
void check_latest_undeleted_versions(const Model& s0, MapStorePair &sut, std::string symbol) {
using namespace arcticdb;
pipelines::VersionQuery version_query;
version_query.set_iterate_on_failure(true);
auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol, version_query);
auto prev = get_latest_undeleted_version(sut.store_, sut.map_, symbol);
auto sut_version_id = prev ? prev->version_id() : 0;
auto model_prev = s0.get_latest_undeleted_version(symbol);
auto model_version_id = model_prev ? model_prev.value() : 0;
Expand Down Expand Up @@ -224,7 +222,7 @@ struct GetAllVersions : rc::state::Command<Model, MapStorePair> {
void run(const Model& s0, MapStorePair &sut) const override {
auto model_versions = s0.get_all_versions(symbol_);
using namespace arcticdb;
auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_, pipelines::VersionQuery{});
auto sut_version = get_all_versions(sut.store_, sut.map_, symbol_);
RC_ASSERT(model_versions.size() == sut_version.size());

for(auto i = size_t{0}; i < model_versions.size(); ++i)
Expand Down
Loading
Loading