Skip to content

Commit

Permalink
Clean up data keys that were written during a failed compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
poodlewars committed Nov 22, 2024
1 parent 358f81c commit 1132b96
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 55 deletions.
63 changes: 47 additions & 16 deletions cpp/arcticdb/version/version_core-inl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,40 +100,63 @@ void merge_frames_for_keys(

}

using StaticSchemaCompactionChecks = folly::Function<void(const SegmentInMemory&, const pipelines::PipelineContext* const)>;
struct Error {
ErrorCode code;
std::string message;
};

using CheckOutcome = std::variant<Error, std::monostate>;
using StaticSchemaCompactionChecks = folly::Function<CheckOutcome(const SegmentInMemory&, const pipelines::PipelineContext* const)>;
using CompactionWrittenKeys = std::vector<VariantKey>;
using CompactionResult = std::variant<CompactionWrittenKeys, Error>;

namespace compaction_details {

void remove_written_keys(
Store* const store,
CompactionWrittenKeys&& written_keys
);

}

template <typename IndexType, typename SchemaType, typename SegmentationPolicy, typename DensityPolicy, typename IteratorType>
void do_compact(
IteratorType target_start,
IteratorType target_end,
[[nodiscard]] CompactionResult do_compact(
IteratorType to_compact_start,
IteratorType to_compact_end,
const std::shared_ptr<pipelines::PipelineContext>& pipeline_context,
std::vector<folly::Future<VariantKey>>& fut_vec,
std::vector<pipelines::FrameSlice>& slices,
const std::shared_ptr<Store>& store,
bool convert_int_to_float,
std::optional<size_t> segment_size,
bool validate_index,
StaticSchemaCompactionChecks&& checks) {
using namespace compaction_details;
CompactionResult result;
auto index = stream::index_type_from_descriptor(pipeline_context->descriptor());

std::vector<folly::Future<VariantKey>> write_futures;

stream::SegmentAggregator<IndexType, SchemaType, SegmentationPolicy, DensityPolicy>
aggregator{
[&slices](pipelines::FrameSlice &&slice) {
slices.emplace_back(std::move(slice));
},
SchemaType{pipeline_context->descriptor(), index},
[&fut_vec, &store, &pipeline_context](SegmentInMemory &&segment) {
[&write_futures, &store, &pipeline_context](SegmentInMemory &&segment) {
auto local_index_start = IndexType::start_value_for_segment(segment);
auto local_index_end = pipelines::end_index_generator(IndexType::end_value_for_segment(segment));
stream::StreamSink::PartialKey
pk{KeyType::TABLE_DATA, pipeline_context->version_id_, pipeline_context->stream_id_, local_index_start, local_index_end};
fut_vec.emplace_back(store->write(pk, std::move(segment)));

// TODO We should apply back pressure to the work we are generating here, to bound memory use
write_futures.emplace_back(store->write(pk, std::move(segment)));
},
segment_size.has_value() ? SegmentationPolicy{*segment_size} : SegmentationPolicy{}
};

for(auto it = target_start; it != target_end; ++it) {
auto sk = [&it](){
if constexpr(std::is_same_v<IteratorType, pipelines::PipelineContext::iterator>)
for (auto it = to_compact_start; it != to_compact_end; ++it) {
auto sk = [&it]() {
if constexpr (std::is_same_v<IteratorType, pipelines::PipelineContext::iterator>)
return it->slice_and_key();
else
return *it;
Expand All @@ -143,14 +166,20 @@ void do_compact(
}

const auto& segment = sk.segment(store);
sorting::check<ErrorCode::E_UNSORTED_DATA>(
!validate_index || segment.descriptor().sorted() == SortedValue::ASCENDING ||
segment.descriptor().sorted() == SortedValue::UNKNOWN,
"Cannot compact unordered segment."
);

if(validate_index && segment.descriptor().sorted() != SortedValue::ASCENDING && segment.descriptor().sorted() != SortedValue::UNKNOWN) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
return Error{ErrorCode::E_UNSORTED_DATA, "Cannot compact unordered segment"};
}

if constexpr (std::is_same_v<SchemaType, FixedSchema>) {
checks(segment, pipeline_context.get());
CheckOutcome outcome = checks(segment, pipeline_context.get());
if (std::holds_alternative<Error>(outcome)) {
auto written_keys = folly::collect(write_futures).get();
remove_written_keys(store.get(), std::move(written_keys));
return std::get<Error>(outcome);
}
}

aggregator.add_segment(
Expand All @@ -160,7 +189,9 @@ void do_compact(
);
sk.unset_segment();
}

aggregator.commit();
return folly::collect(std::move(write_futures)).get();
}

[[nodiscard]] inline ReadOptions defragmentation_read_options_generator(const WriteOptions &options){
Expand Down
103 changes: 64 additions & 39 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,6 @@ VersionedItem compact_incomplete_impl(
}
const auto& first_seg = pipeline_context->slice_and_keys_.begin()->segment(store);

std::vector<folly::Future<VariantKey>> fut_vec;
std::vector<FrameSlice> slices;
bool dynamic_schema = write_options.dynamic_schema;
const auto index = index_type_from_descriptor(first_seg.descriptor());
Expand All @@ -1603,29 +1602,30 @@ VersionedItem compact_incomplete_impl(
options.sparsify_ ? VariantColumnPolicy{SparseColumnPolicy{}} : VariantColumnPolicy{DenseColumnPolicy{}}
);

util::variant_match(std::move(policies), [
&fut_vec, &slices, pipeline_context=pipeline_context, &store, &options, &previous_sorted_value, &write_options] (auto &&idx, auto &&schema, auto &&column_policy) {
CompactionResult result = util::variant_match(std::move(policies), [
&slices, pipeline_context=pipeline_context, &store, &options, &previous_sorted_value, &write_options] (auto &&idx, auto &&schema, auto &&column_policy) {
using IndexType = std::remove_reference_t<decltype(idx)>;
using SchemaType = std::remove_reference_t<decltype(schema)>;
using ColumnPolicyType = std::remove_reference_t<decltype(column_policy)>;
constexpr bool validate_index_sorted = IndexType::type() == IndexDescriptorImpl::Type::TIMESTAMP;

StaticSchemaCompactionChecks checks = [](const SegmentInMemory& cb_segment, const pipelines::PipelineContext* const cb_pipeline_context) {
StaticSchemaCompactionChecks checks = [](const SegmentInMemory& cb_segment, const pipelines::PipelineContext* const cb_pipeline_context) -> CheckOutcome {
if (!columns_match(cb_segment.descriptor(), cb_pipeline_context->descriptor())) {
schema::raise<ErrorCode::E_DESCRIPTOR_MISMATCH>(
"When static schema is used all staged segments must have the same column and column types."
return Error{
ErrorCode::E_DESCRIPTOR_MISMATCH,
fmt::format("When static schema is used all staged segments must have the same column and column types."
"{} is different than {}",
cb_segment.descriptor(),
cb_pipeline_context->descriptor()
);
cb_pipeline_context->descriptor())
};
}
return std::monostate{};
};

do_compact<IndexType, SchemaType, RowCountSegmentPolicy, ColumnPolicyType>(
CompactionResult result = do_compact<IndexType, SchemaType, RowCountSegmentPolicy, ColumnPolicyType>(
pipeline_context->incompletes_begin(),
pipeline_context->end(),
pipeline_context,
fut_vec,
slices,
store,
options.convert_int_to_float_,
Expand All @@ -1635,20 +1635,29 @@ VersionedItem compact_incomplete_impl(
if constexpr(std::is_same_v<IndexType, TimeseriesIndex>) {
pipeline_context->desc_->set_sorted(deduce_sorted(previous_sorted_value.value_or(SortedValue::ASCENDING), SortedValue::ASCENDING));
}

return result;
});

auto keys = folly::collect(fut_vec).get();
auto vit = collate_and_write(
store,
pipeline_context,
slices,
keys,
pipeline_context->incompletes_after(),
user_meta);
if (!options.delete_staged_data_on_failure_) {
delete_incomplete_keys(pipeline_context.get(), store.get());
}
return vit;
return util::variant_match(std::move(result),
[&slices, &pipeline_context, &store, &options, &user_meta](CompactionWrittenKeys& written_keys) -> VersionedItem {
auto vit = collate_and_write(
store,
pipeline_context,
slices,
std::move(written_keys),
pipeline_context->incompletes_after(),
user_meta);
if (!options.delete_staged_data_on_failure_) {
delete_incomplete_keys(pipeline_context.get(), store.get());
}
return vit;
},
[](Error& error) -> VersionedItem {
// TODO tidy up my dynamic error raising stuff, put it in error_code.hpp
throw ArcticException(fmt::format("{} {}", get_error_code_data(error.code).name_, error.message));
}
);
}

PredefragmentationInfo get_pre_defragmentation_info(
Expand All @@ -1672,8 +1681,8 @@ PredefragmentationInfo get_pre_defragmentation_info(
first_col_segment_idx.reserve(slice_and_keys.size());
std::optional<CompactionStartInfo> compaction_start_info;
size_t segment_idx = 0, num_to_segments_after_compact = 0, new_segment_row_size = 0;
for(auto it = slice_and_keys.begin(); it != slice_and_keys.end(); ++it) {
auto &slice = it->slice();
for(const auto & slice_and_key : slice_and_keys) {
auto &slice = slice_and_key.slice();

if (slice.row_range.diff() < segment_size && !compaction_start_info)
compaction_start_info = {slice.row_range.start(), segment_idx};
Expand Down Expand Up @@ -1714,30 +1723,29 @@ VersionedItem defragment_symbol_data_impl(
util::check(is_symbol_fragmented_impl(pre_defragmentation_info.segments_need_compaction) && pre_defragmentation_info.append_after.has_value(), "Nothing to compact in defragment_symbol_data");

// in the new index segment, we will start appending after this value
std::vector<folly::Future<VariantKey>> fut_vec;
std::vector<FrameSlice> slices;
const auto index = index_type_from_descriptor(pre_defragmentation_info.pipeline_context->descriptor());
auto policies = std::make_tuple(
index,
options.dynamic_schema ? VariantSchema{DynamicSchema::default_schema(index, stream_id)} : VariantSchema{FixedSchema::default_schema(index, stream_id)}
);

util::variant_match(std::move(policies), [
&fut_vec, &slices, &store, &options, &pre_defragmentation_info, segment_size=segment_size] (auto &&idx, auto &&schema) {
CompactionResult result = util::variant_match(std::move(policies), [
&slices, &store, &options, &pre_defragmentation_info, segment_size=segment_size] (auto &&idx, auto &&schema) {
pre_defragmentation_info.read_query->clauses_.emplace_back(std::make_shared<Clause>(RemoveColumnPartitioningClause{pre_defragmentation_info.append_after.value()}));
auto segments = read_and_process(store, pre_defragmentation_info.pipeline_context, pre_defragmentation_info.read_query, defragmentation_read_options_generator(options)).get();
using IndexType = std::remove_reference_t<decltype(idx)>;
using SchemaType = std::remove_reference_t<decltype(schema)>;

StaticSchemaCompactionChecks checks = [](const SegmentInMemory&, const pipelines::PipelineContext* const) {
// No defrag specific checks yet
return std::monostate{};
};

do_compact<IndexType, SchemaType, RowCountSegmentPolicy, DenseColumnPolicy>(
return do_compact<IndexType, SchemaType, RowCountSegmentPolicy, DenseColumnPolicy>(
segments.begin(),
segments.end(),
pre_defragmentation_info.pipeline_context,
fut_vec,
slices,
store,
false,
Expand All @@ -1746,16 +1754,21 @@ VersionedItem defragment_symbol_data_impl(
std::move(checks));
});

auto keys = folly::collect(fut_vec).get();
auto vit = collate_and_write(
store,
pre_defragmentation_info.pipeline_context,
slices,
keys,
pre_defragmentation_info.append_after.value(),
std::nullopt);

return vit;
return util::variant_match(std::move(result),
[&slices, &pre_defragmentation_info, &store](CompactionWrittenKeys& written_keys) -> VersionedItem {
return collate_and_write(
store,
pre_defragmentation_info.pipeline_context,
slices,
std::move(written_keys),
pre_defragmentation_info.append_after.value(),
std::nullopt);
},
[](Error& error) -> VersionedItem {
// TODO tidy up my dynamic error raising stuff, put it in error_code.hpp
throw ArcticException(fmt::format("{} {}", get_error_code_data(error.code).name_, error.message));
}
);
}

void set_row_id_if_index_only(
Expand Down Expand Up @@ -1838,3 +1851,15 @@ folly::Future<ReadVersionOutput> read_frame_for_version(
}

} //namespace arcticdb::version_store

namespace arcticdb::compaction_details {

void remove_written_keys(
Store* const store,
CompactionWrittenKeys&& written_keys
) {
log::version().debug("Error during compaction, removing {} keys written before failure", written_keys.size());
store->remove_keys_sync(std::move(written_keys));
}

} // namespace arcticdb::compaction_details

0 comments on commit 1132b96

Please sign in to comment.