Skip to content

Commit

Permalink
Fixes #1384 by checking for tombstone_all on LOAD_FROM_TIME and
Browse files Browse the repository at this point in the history
LOAD_DOWNTO WORK IN PROGRESS

- previously we did the check for tombstone_all only on LOAD_UNDELETED
  and LOAD_LATEST_UNDELETED
- Renamed the functions in version_utils which we use to check when to
  stop following the version chain to less confusing names
- Google test VersionMap.FollowingVersionChainEndEarly now passes
  • Loading branch information
IvoDD committed Mar 22, 2024
1 parent 9673fa4 commit ae70971
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
8 changes: 4 additions & 4 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ class VersionMapImpl {
next_key = read_segment_with_keys(seg, entry, load_progress);
set_latest_version(entry, latest_version);
} while (next_key
&& !loaded_until_version_id(load_strategy, load_progress, latest_version)
&& !loaded_until_timestamp(load_strategy, load_progress)
&& load_latest_ongoing(load_strategy, entry)
&& looking_for_undeleted(load_strategy, entry, load_progress));
&& continue_when_loading_version(load_strategy, load_progress, latest_version)
&& continue_when_loading_from_time(load_strategy, load_progress)
&& continue_when_loading_latest(load_strategy, entry)
&& continue_when_loading_undeleted(load_strategy, entry, load_progress));
}
entry->loaded_with_progress_ = load_progress;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/arcticdb/version/version_map_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ inline constexpr bool is_partial_load_type(LoadType load_type) {
return load_type == LoadType::LOAD_DOWNTO || load_type == LoadType::LOAD_FROM_TIME;
}


enum class VersionStatus {
LIVE,
TOMBSTONED,
Expand Down
55 changes: 29 additions & 26 deletions cpp/arcticdb/version/version_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,22 +231,27 @@ inline bool is_positive_version_query(const LoadStrategy& load_strategy) {
return load_strategy.load_until_version_.value() >= 0;
}

inline bool loaded_until_version_id(const LoadStrategy &load_strategy, const LoadProgress& load_progress, const std::optional<VersionId>& latest_version) {
inline bool continue_when_loading_version(const LoadStrategy& load_strategy, const LoadProgress& load_progress, const std::optional<VersionId>& latest_version) {
if (!load_strategy.load_until_version_)
return false;
// Should continue when not loading down to a version
return true;

if (is_positive_version_query(load_strategy)) {
if (load_progress.oldest_loaded_index_version_ > static_cast<VersionId>(*load_strategy.load_until_version_)) {
return false;
// Should continue when version was not reached
return true;
}
} else {
if (latest_version.has_value()) {
if (auto opt_version_id = get_version_id_negative_index(*latest_version, *load_strategy.load_until_version_);
opt_version_id && load_progress.oldest_loaded_index_version_ > *opt_version_id) {
return false;
// Should continue when version was not reached
return true;
}
} else {
return false;
// TODO: Think about this in the context of slack discussion
// Should continue when latest undeleted version was not reached
return true;
}
}
ARCTICDB_DEBUG(log::version(),
Expand All @@ -255,7 +260,7 @@ inline bool loaded_until_version_id(const LoadStrategy &load_strategy, const Loa
*load_strategy.load_until_version_,
latest_version.value()
);
return true;
return false;
}

inline void set_latest_version(const std::shared_ptr<VersionMapEntry>& entry, std::optional<VersionId>& latest_version) {
Expand All @@ -270,51 +275,49 @@ static constexpr timestamp nanos_to_seconds(timestamp nanos) {
return nanos / timestamp(10000000000);
}

inline bool loaded_until_timestamp(const LoadStrategy &load_strategy, const LoadProgress& load_progress) {
inline bool continue_when_loading_from_time(const LoadStrategy &load_strategy, const LoadProgress& load_progress) {
if (!load_strategy.load_from_time_)
return false;
return true;

// TODO: Add tests for new behavior here!
auto loaded_deleted_or_undeleted_timestamp = load_strategy.include_deleted_ ? load_progress.earliest_loaded_timestamp_ : load_progress.earliest_loaded_undeleted_timestamp_;

if (loaded_deleted_or_undeleted_timestamp > *load_strategy.load_from_time_)
return false;
return true;

ARCTICDB_DEBUG(log::version(),
"Exiting load from timestamp because request {} <= {}",
*load_strategy.load_from_time_,
loaded_deleted_or_undeleted_timestamp);
return true;
loaded_deleted_or_undeleted_timestamp,
*load_strategy.load_from_time_);
return false;
}

inline bool load_latest_ongoing(const LoadStrategy &load_strategy, const std::shared_ptr<VersionMapEntry> &entry) {
inline bool continue_when_loading_latest(const LoadStrategy& load_strategy, const std::shared_ptr<VersionMapEntry> &entry) {
if (!(load_strategy.load_type_ == LoadType::LOAD_LATEST && entry->get_first_index(load_strategy.include_deleted_).first))
return true;

ARCTICDB_DEBUG(log::version(), "Exiting because we found a non-deleted index in load latest");
ARCTICDB_DEBUG(log::version(), "Exiting because we found the latest version with include_deleted: {}", load_strategy.include_deleted);

Check failure on line 299 in cpp/arcticdb/version/version_utils.hpp

View workflow job for this annotation

GitHub Actions / code_coverage

‘const struct arcticdb::LoadStrategy’ has no member named ‘include_deleted’; did you mean ‘include_deleted_’?
return false;
}

inline bool looking_for_undeleted(const LoadStrategy& load_strategy, const std::shared_ptr<VersionMapEntry>& entry, const LoadProgress& load_progress) {
if(load_strategy.include_deleted_) {
inline bool continue_when_loading_undeleted(const LoadStrategy& load_strategy, const std::shared_ptr<VersionMapEntry>& entry, const LoadProgress& load_progress) {
if (load_strategy.include_deleted_){
return true;
}

if(entry->tombstone_all_) {
const bool is_deleted_by_tombstone_all = entry->tombstone_all_->version_id() >= load_progress.oldest_loaded_index_version_;
if(is_deleted_by_tombstone_all) {
const bool is_deleted_by_tombstone_all =
entry->tombstone_all_->version_id() >= load_progress.oldest_loaded_index_version_;
if (is_deleted_by_tombstone_all) {
ARCTICDB_DEBUG(
log::version(),
"Exiting because tombstone all key deletes all versions beyond: {} and the oldest loaded index has version: {}",
entry->tombstone_all_->version_id(),
load_progress.oldest_loaded_index_version_);
log::version(),
"Exiting because tombstone all key deletes all versions beyond: {} and the oldest loaded index has version: {}",
entry->tombstone_all_->version_id(),
load_progress.oldest_loaded_index_version_);
return false;
} else {
return true;
}
} else {
return true;
}
return true;
}

inline bool penultimate_key_contains_required_version_id(const AtomKey& key, const LoadStrategy& load_strategy) {
Expand Down

0 comments on commit ae70971

Please sign in to comment.