diff --git a/Catalog/Catalog.cpp b/Catalog/Catalog.cpp index 479296e892..d964d10de9 100644 --- a/Catalog/Catalog.cpp +++ b/Catalog/Catalog.cpp @@ -3954,7 +3954,7 @@ void Catalog::checkpoint(const int logicalTableId) const { } } -void Catalog::checkpointWithAutoRollback(const int logical_table_id) { +void Catalog::checkpointWithAutoRollback(const int logical_table_id) const { auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id); try { checkpoint(logical_table_id); @@ -4012,52 +4012,6 @@ std::string Catalog::generatePhysicalTableName(const std::string& logicalTableNa return (physicalTableName); } -void Catalog::vacuumDeletedRows(const int logicalTableId) const { - // shard here to serve request from TableOptimizer and elsewhere - const auto td = getMetadataForTable(logicalTableId); - const auto shards = getPhysicalTablesDescriptors(td); - for (const auto shard : shards) { - vacuumDeletedRows(shard); - } -} - -void Catalog::vacuumDeletedRows(const TableDescriptor* td) const { - // "if not a table that supports delete return nullptr, nothing more to do" - const ColumnDescriptor* cd = getDeletedColumn(td); - if (nullptr == cd) { - return; - } - // vacuum chunks which show sign of deleted rows in metadata - ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId, cd->columnId}; - ChunkMetadataVector chunkMetadataVec; - dataMgr_->getChunkMetadataVecForKeyPrefix(chunkMetadataVec, chunkKeyPrefix); - for (auto cm : chunkMetadataVec) { - // "delete has occured" - if (cm.second->chunkStats.max.tinyintval == 1) { - UpdelRoll updel_roll; - updel_roll.catalog = this; - updel_roll.logicalTableId = getLogicalTableId(td->tableId); - updel_roll.memoryLevel = Data_Namespace::MemoryLevel::CPU_LEVEL; - updel_roll.table_descriptor = td; - const auto cd = getMetadataForColumn(td->tableId, cm.first[2]); - const auto chunk = Chunk_NS::Chunk::getChunk(cd, - &getDataMgr(), - cm.first, - updel_roll.memoryLevel, - 0, - cm.second->numBytes, - cm.second->numElements); - td->fragmenter->compactRows(this, - td, - cm.first[3], - td->fragmenter->getVacuumOffsets(chunk), - updel_roll.memoryLevel, - updel_roll); - updel_roll.stageUpdate(); - } - } -} - void Catalog::buildForeignServerMap() { sqliteConnector_.query( "SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM " diff --git a/Catalog/Catalog.h b/Catalog/Catalog.h index 484e33b420..54cbb47f44 100644 --- a/Catalog/Catalog.h +++ b/Catalog/Catalog.h @@ -288,12 +288,10 @@ class Catalog final { void setDeletedColumnUnlocked(const TableDescriptor* td, const ColumnDescriptor* cd); int getLogicalTableId(const int physicalTableId) const; void checkpoint(const int logicalTableId) const; - void checkpointWithAutoRollback(const int logical_table_id); + void checkpointWithAutoRollback(const int logical_table_id) const; std::string name() const { return getCurrentDB().dbName; } void eraseDBData(); void eraseTablePhysicalData(const TableDescriptor* td); - void vacuumDeletedRows(const TableDescriptor* td) const; - void vacuumDeletedRows(const int logicalTableId) const; void setForReload(const int32_t tableId); std::vector getTableDataDirectories(const TableDescriptor* td) const; diff --git a/DataMgr/ArrayNoneEncoder.h b/DataMgr/ArrayNoneEncoder.h index 4afa4e2a20..c2dc50e241 100644 --- a/DataMgr/ArrayNoneEncoder.h +++ b/DataMgr/ArrayNoneEncoder.h @@ -272,6 +272,11 @@ class ArrayNoneEncoder : public Encoder { return true; } + void resetChunkStats() override { + has_nulls = false; + initialized = false; + } + Datum elem_min; Datum elem_max; bool has_nulls; diff --git a/DataMgr/DateDaysEncoder.h b/DataMgr/DateDaysEncoder.h index 7f73d559c2..a248a0f4f2 100644 --- a/DataMgr/DateDaysEncoder.h +++ b/DataMgr/DateDaysEncoder.h @@ -171,17 +171,17 @@ class DateDaysEncoder : public Encoder { return true; } - T dataMin; - T dataMax; - bool has_nulls; - - private: - void resetChunkStats() { + void resetChunkStats() override { dataMin = std::numeric_limits::max(); dataMax = std::numeric_limits::lowest(); has_nulls = false; } + T dataMin; + T dataMax; + bool has_nulls; + + private: V encodeDataAndUpdateStats(const T& unencoded_data) { V encoded_data; if (unencoded_data == std::numeric_limits::min()) { diff --git a/DataMgr/Encoder.h b/DataMgr/Encoder.h index eec2dc5bd7..21455c4ba4 100644 --- a/DataMgr/Encoder.h +++ b/DataMgr/Encoder.h @@ -225,6 +225,11 @@ class Encoder { return false; } + /** + * Resets chunk metadata stats to their default values. + */ + virtual void resetChunkStats() = 0; + size_t getNumElems() const { return num_elems_; } void setNumElems(const size_t num_elems) { num_elems_ = num_elems; } diff --git a/DataMgr/FileMgr/FileInfo.h b/DataMgr/FileMgr/FileInfo.h index a24fcfd13b..4a4a68e7d1 100644 --- a/DataMgr/FileMgr/FileInfo.h +++ b/DataMgr/FileMgr/FileInfo.h @@ -58,7 +58,7 @@ struct FileInfo { FILE* f; /// file stream object for the represented file size_t pageSize; /// the fixed size of each page in the file size_t numPages; /// the number of pages in the file - bool isDirty; // True if writes have occured since last sync + bool isDirty{false}; // True if writes have occured since last sync std::set freePages; /// set of page numbers of free pages std::mutex freePagesMutex_; std::mutex readWriteMutex_; diff --git a/DataMgr/FixedLengthArrayNoneEncoder.h b/DataMgr/FixedLengthArrayNoneEncoder.h index 36bd77c23b..dfa3245d52 100644 --- a/DataMgr/FixedLengthArrayNoneEncoder.h +++ b/DataMgr/FixedLengthArrayNoneEncoder.h @@ -229,6 +229,11 @@ class FixedLengthArrayNoneEncoder : public Encoder { return true; } + void resetChunkStats() override { + has_nulls = false; + initialized = false; + } + Datum elem_min; Datum elem_max; bool has_nulls; diff --git a/DataMgr/FixedLengthEncoder.h b/DataMgr/FixedLengthEncoder.h index dda1f1508d..bb1da56fdb 100644 --- a/DataMgr/FixedLengthEncoder.h +++ b/DataMgr/FixedLengthEncoder.h @@ -206,17 +206,17 @@ class FixedLengthEncoder : public Encoder { return true; } - T dataMin; - T dataMax; - bool has_nulls; - - private: - void resetChunkStats() { + void resetChunkStats() override { dataMin = std::numeric_limits::max(); dataMax = std::numeric_limits::lowest(); has_nulls = false; } + T dataMin; + T dataMax; + bool has_nulls; + + private: V encodeDataAndUpdateStats(const T& unencoded_data) { V encoded_data = static_cast(unencoded_data); if (unencoded_data != encoded_data) { diff --git a/DataMgr/NoneEncoder.h b/DataMgr/NoneEncoder.h index 93f8f23cb9..a179965ec1 100644 --- a/DataMgr/NoneEncoder.h +++ b/DataMgr/NoneEncoder.h @@ -209,17 +209,17 @@ class NoneEncoder : public Encoder { has_nulls = castedEncoder->has_nulls; } - T dataMin; - T dataMax; - bool has_nulls; - - private: - void resetChunkStats() { + void resetChunkStats() override { dataMin = std::numeric_limits::max(); dataMax = std::numeric_limits::lowest(); has_nulls = false; } + T dataMin; + T dataMax; + bool has_nulls; + + private: T validateDataAndUpdateStats(const T& unencoded_data) { if (unencoded_data == none_encoded_null_value()) { has_nulls = true; diff --git a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp index 6addc0c32b..551dc92afe 100644 --- a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp +++ b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp @@ -31,7 +31,7 @@ AbstractBuffer* MutableCachePersistentStorageMgr::createBuffer( const size_t initial_size) { auto buf = PersistentStorageMgr::createBuffer(chunk_key, page_size, initial_size); if (isChunkPrefixCacheable(chunk_key)) { - cached_buffer_map_.emplace(chunk_key, buf); + cached_chunk_keys_.emplace(chunk_key); } return buf; } @@ -42,7 +42,7 @@ void MutableCachePersistentStorageMgr::deleteBuffer(const ChunkKey& chunk_key, // not be deleting buffers for them. CHECK(!isForeignStorage(chunk_key)); disk_cache_->deleteBufferIfExists(chunk_key); - cached_buffer_map_.erase(chunk_key); + cached_chunk_keys_.erase(chunk_key); PersistentStorageMgr::deleteBuffer(chunk_key, purge); } @@ -54,10 +54,10 @@ void MutableCachePersistentStorageMgr::deleteBuffersWithPrefix( ChunkKey upper_prefix(chunk_key_prefix); upper_prefix.push_back(std::numeric_limits::max()); - auto end_it = cached_buffer_map_.upper_bound(static_cast(upper_prefix)); - for (auto&& chunk_it = cached_buffer_map_.lower_bound(chunk_key_prefix); - chunk_it != end_it;) { - chunk_it = cached_buffer_map_.erase(chunk_it); + auto end_it = cached_chunk_keys_.upper_bound(static_cast(upper_prefix)); + for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_key_prefix); + chunk_key_it != end_it;) { + chunk_key_it = cached_chunk_keys_.erase(chunk_key_it); } PersistentStorageMgr::deleteBuffersWithPrefix(chunk_key_prefix, purge); } @@ -71,8 +71,8 @@ AbstractBuffer* MutableCachePersistentStorageMgr::putBuffer(const ChunkKey& chun } void MutableCachePersistentStorageMgr::checkpoint() { - for (auto& [key, buf] : cached_buffer_map_) { - if (buf->isDirty()) { + for (auto& key : cached_chunk_keys_) { + if (global_file_mgr_->getBuffer(key)->isDirty()) { foreign_storage::ForeignStorageBuffer temp_buf; global_file_mgr_->fetchBuffer(key, &temp_buf, 0); disk_cache_->cacheChunk(key, &temp_buf); @@ -85,13 +85,14 @@ void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_ ChunkKey chunk_prefix{db_id, tb_id}; ChunkKey upper_prefix(chunk_prefix); upper_prefix.push_back(std::numeric_limits::max()); - auto end_it = cached_buffer_map_.upper_bound(static_cast(upper_prefix)); - for (auto&& chunk_it = cached_buffer_map_.lower_bound(chunk_prefix); chunk_it != end_it; - ++chunk_it) { - if (chunk_it->second->isDirty()) { + auto end_it = cached_chunk_keys_.upper_bound(static_cast(upper_prefix)); + for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_prefix); + chunk_key_it != end_it; + ++chunk_key_it) { + if (global_file_mgr_->getBuffer(*chunk_key_it)->isDirty()) { foreign_storage::ForeignStorageBuffer temp_buf; - global_file_mgr_->fetchBuffer(chunk_it->first, &temp_buf, 0); - disk_cache_->cacheChunk(chunk_it->first, &temp_buf); + global_file_mgr_->fetchBuffer(*chunk_key_it, &temp_buf, 0); + disk_cache_->cacheChunk(*chunk_key_it, &temp_buf); } } PersistentStorageMgr::global_file_mgr_->checkpoint(db_id, tb_id); @@ -103,8 +104,9 @@ void MutableCachePersistentStorageMgr::removeTableRelatedDS(const int db_id, const ChunkKey table_key{db_id, table_id}; ChunkKey upper_prefix(table_key); upper_prefix.push_back(std::numeric_limits::max()); - auto end_it = cached_buffer_map_.upper_bound(static_cast(upper_prefix)); - for (auto&& chunk_it = cached_buffer_map_.lower_bound(table_key); chunk_it != end_it;) { - chunk_it = cached_buffer_map_.erase(chunk_it); + auto end_it = cached_chunk_keys_.upper_bound(static_cast(upper_prefix)); + for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(table_key); + chunk_key_it != end_it;) { + chunk_key_it = cached_chunk_keys_.erase(chunk_key_it); } } diff --git a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.h b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.h index 4829a2a62a..16d5d153e7 100644 --- a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.h +++ b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.h @@ -40,5 +40,5 @@ class MutableCachePersistentStorageMgr : public PersistentStorageMgr { void removeTableRelatedDS(const int db_id, const int table_id) override; private: - std::map cached_buffer_map_; + std::set cached_chunk_keys_; }; diff --git a/DataMgr/StringNoneEncoder.h b/DataMgr/StringNoneEncoder.h index 9cb4f4efa5..154630073b 100644 --- a/DataMgr/StringNoneEncoder.h +++ b/DataMgr/StringNoneEncoder.h @@ -123,6 +123,8 @@ class StringNoneEncoder : public Encoder { return true; } + void resetChunkStats() override { has_nulls = false; } + private: AbstractBuffer* index_buf; StringOffsetT last_offset; diff --git a/Fragmenter/UpdelStorage.cpp b/Fragmenter/UpdelStorage.cpp index c21ec51c8d..c9f7e95084 100644 --- a/Fragmenter/UpdelStorage.cpp +++ b/Fragmenter/UpdelStorage.cpp @@ -1099,6 +1099,9 @@ static void set_chunk_metadata(const Catalog_Namespace::Catalog* catalog, chunkMetadata[cd->columnId]->numBytes = data_buffer->size(); if (updel_roll.dirtyChunks.count(chunk.get()) == 0) { updel_roll.dirtyChunks.emplace(chunk.get(), chunk); + ChunkKey chunk_key{ + catalog->getDatabaseId(), cd->tableId, cd->columnId, fragment.fragmentId}; + updel_roll.dirtyChunkeys.emplace(chunk_key); } } @@ -1347,6 +1350,7 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo auto daddr = data_addr; auto element_size = col_type.is_fixlen_array() ? col_type.get_size() : get_element_size(col_type); + data_buffer->getEncoder()->resetChunkStats(); for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) { if (col_type.is_fixlen_array()) { auto encoder = @@ -1355,13 +1359,13 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo encoder->updateMetadata((int8_t*)daddr); } else if (col_type.is_fp()) { set_chunk_stats(col_type, - data_addr, + daddr, update_stats_per_thread[ci].new_values_stats.has_null, update_stats_per_thread[ci].new_values_stats.min_double, update_stats_per_thread[ci].new_values_stats.max_double); } else { set_chunk_stats(col_type, - data_addr, + daddr, update_stats_per_thread[ci].new_values_stats.has_null, update_stats_per_thread[ci].new_values_stats.min_int64t, update_stats_per_thread[ci].new_values_stats.max_int64t); @@ -1406,6 +1410,14 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo auto chunk = chunks[ci]; auto cd = chunk->getColumnDesc(); if (!cd->columnType.is_fixlen_array()) { + // For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is + // stored in seconds. Do the metadata conversion here before updating the chunk + // stats. + if (cd->columnType.is_date_in_days()) { + auto& stats = update_stats_per_thread[ci].new_values_stats; + stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t); + stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t); + } updateColumnMetadata(cd, fragment, chunk, diff --git a/QueryEngine/Execute.h b/QueryEngine/Execute.h index 66881d2491..5d5c743622 100644 --- a/QueryEngine/Execute.h +++ b/QueryEngine/Execute.h @@ -305,7 +305,13 @@ class SringConstInResultSet : public std::runtime_error { class ExtensionFunction; using RowDataProvider = Fragmenter_Namespace::RowDataProvider; -using ColumnToFragmentsMap = std::map>; +using ColumnToFragmentsMap = std::map>; +using TableToFragmentIds = std::map>; + +struct TableUpdateMetadata { + ColumnToFragmentsMap columns_for_metadata_update; + TableToFragmentIds fragments_with_deleted_rows; +}; class UpdateLogForFragment : public RowDataProvider { public: @@ -334,8 +340,7 @@ class UpdateLogForFragment : public RowDataProvider { SQLTypeInfo getColumnType(const size_t col_idx) const; - using Callback = - std::function; + using Callback = std::function; auto getResultSet() const { return rs_; } @@ -464,14 +469,14 @@ class Executor { const bool has_cardinality_estimation, ColumnCacheMap& column_cache); - void executeUpdate(const RelAlgExecutionUnit& ra_exe_unit, - const std::vector& table_infos, - const CompilationOptions& co, - const ExecutionOptions& eo, - const Catalog_Namespace::Catalog& cat, - std::shared_ptr row_set_mem_owner, - const UpdateLogForFragment::Callback& cb, - const bool is_agg); + TableUpdateMetadata executeUpdate(const RelAlgExecutionUnit& ra_exe_unit, + const std::vector& table_infos, + const CompilationOptions& co, + const ExecutionOptions& eo, + const Catalog_Namespace::Catalog& cat, + std::shared_ptr row_set_mem_owner, + const UpdateLogForFragment::Callback& cb, + const bool is_agg); private: void clearMetaInfoCache(); diff --git a/QueryEngine/ExecuteUpdate.cpp b/QueryEngine/ExecuteUpdate.cpp index 59b6bc22f8..e54712a459 100644 --- a/QueryEngine/ExecuteUpdate.cpp +++ b/QueryEngine/ExecuteUpdate.cpp @@ -62,14 +62,15 @@ SQLTypeInfo UpdateLogForFragment::getColumnType(const size_t col_idx) const { return rs_->getColType(col_idx); } -void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in, - const std::vector& table_infos, - const CompilationOptions& co, - const ExecutionOptions& eo, - const Catalog_Namespace::Catalog& cat, - std::shared_ptr row_set_mem_owner, - const UpdateLogForFragment::Callback& cb, - const bool is_agg) { +TableUpdateMetadata Executor::executeUpdate( + const RelAlgExecutionUnit& ra_exe_unit_in, + const std::vector& table_infos, + const CompilationOptions& co, + const ExecutionOptions& eo, + const Catalog_Namespace::Catalog& cat, + std::shared_ptr row_set_mem_owner, + const UpdateLogForFragment::Callback& cb, + const bool is_agg) { CHECK(cb); VLOG(1) << "Executor " << executor_id_ << " is executing update/delete work unit:" << ra_exe_unit_in; @@ -95,7 +96,7 @@ void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in, } if (outer_fragments.empty()) { - return; + return {}; } const auto max_tuple_count_fragment_it = std::max_element( @@ -131,7 +132,7 @@ void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in, } CHECK(query_mem_desc); - ColumnToFragmentsMap optimize_candidates; + TableUpdateMetadata table_update_metadata; for (size_t fragment_index = 0; fragment_index < outer_fragments.size(); ++fragment_index) { const int64_t crt_fragment_tuple_count = @@ -182,12 +183,13 @@ void Executor::executeUpdate(const RelAlgExecutionUnit& ra_exe_unit_in, const auto proj_result_set = proj_fragment_result.first; CHECK(proj_result_set); cb({outer_fragments[fragment_index], fragment_index, proj_result_set}, - optimize_candidates); + table_update_metadata); } if (g_enable_auto_metadata_update) { auto td = cat.getMetadataForTable(table_id); TableOptimizer table_optimizer{td, this, cat}; - table_optimizer.recomputeMetadataUnlocked(optimize_candidates); + table_optimizer.recomputeMetadataUnlocked(table_update_metadata); } + return table_update_metadata; } diff --git a/QueryEngine/RelAlgExecutor.cpp b/QueryEngine/RelAlgExecutor.cpp index 8fa4905583..d668c7aa88 100644 --- a/QueryEngine/RelAlgExecutor.cpp +++ b/QueryEngine/RelAlgExecutor.cpp @@ -33,6 +33,7 @@ #include "QueryEngine/RelAlgTranslator.h" #include "QueryEngine/ResultSetBuilder.h" #include "QueryEngine/RexVisitor.h" +#include "QueryEngine/TableOptimizer.h" #include "QueryEngine/WindowContext.h" #include "Shared/TypedDataAccessors.h" #include "Shared/measure.h" @@ -1518,16 +1519,19 @@ void RelAlgExecutor::executeUpdate(const RelAlgNode* node, dynamic_cast(dml_transaction_parameters_.get()); CHECK(update_transaction_parameters); auto update_callback = yieldUpdateCallback(*update_transaction_parameters); - executor_->executeUpdate(ra_exe_unit, - table_infos, - co_project, - eo, - cat_, - executor_->row_set_mem_owner_, - update_callback, - is_aggregate); - post_execution_callback_ = [this]() { + auto table_update_metadata = executor_->executeUpdate(ra_exe_unit, + table_infos, + co_project, + eo, + cat_, + executor_->row_set_mem_owner_, + update_callback, + is_aggregate); + post_execution_callback_ = [table_update_metadata, this]() { dml_transaction_parameters_->finalizeTransaction(cat_); + TableOptimizer table_optimizer{ + dml_transaction_parameters_->getTableDescriptor(), executor_, cat_}; + table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata); }; }; @@ -1631,16 +1635,20 @@ void RelAlgExecutor::executeDelete(const RelAlgNode* node, CHECK_EQ(exe_unit.target_exprs.size(), size_t(1)); } - executor_->executeUpdate(exe_unit, - table_infos, - co_delete, - eo, - cat_, - executor_->row_set_mem_owner_, - delete_callback, - is_aggregate); - post_execution_callback_ = [this]() { + auto table_update_metadata = + executor_->executeUpdate(exe_unit, + table_infos, + co_delete, + eo, + cat_, + executor_->row_set_mem_owner_, + delete_callback, + is_aggregate); + post_execution_callback_ = [table_update_metadata, this]() { dml_transaction_parameters_->finalizeTransaction(cat_); + TableOptimizer table_optimizer{ + dml_transaction_parameters_->getTableDescriptor(), executor_, cat_}; + table_optimizer.vacuumFragmentsAboveMinSelectivity(table_update_metadata); }; }; diff --git a/QueryEngine/StorageIOFacility.h b/QueryEngine/StorageIOFacility.h index 2a23b12f06..dc053e25f7 100644 --- a/QueryEngine/StorageIOFacility.h +++ b/QueryEngine/StorageIOFacility.h @@ -135,7 +135,7 @@ class StorageIOFacility { Data_Namespace::MemoryLevel::DISK_LEVEL) { // If commitUpdate() did not checkpoint, then we need to checkpoint here in order // to ensure that epochs are uniformly incremented in distributed mode. - catalog.checkpoint(table_descriptor_->tableId); + catalog.checkpointWithAutoRollback(table_descriptor_->tableId); } } @@ -197,9 +197,9 @@ class StorageIOFacility { using RowProcessingFuturesVector = std::vector>; if (update_parameters.isVarlenUpdateRequired()) { - // TODO: Add support for opportunistic vacuuming - auto callback = [this, &update_parameters](UpdateLogForFragment const& update_log, - ColumnToFragmentsMap&) -> void { + auto callback = [this, &update_parameters]( + UpdateLogForFragment const& update_log, + TableUpdateMetadata& table_update_metadata) -> void { std::vector columnDescriptors; std::vector sourceMetaInfos; @@ -227,11 +227,13 @@ class StorageIOFacility { Data_Namespace::MemoryLevel::CPU_LEVEL, update_parameters.getTransactionTracker(), executor_); + table_update_metadata.fragments_with_deleted_rows[td->tableId].emplace( + update_log.getFragmentId()); }; return callback; } else if (update_parameters.tableIsTemporary()) { auto callback = [this, &update_parameters](UpdateLogForFragment const& update_log, - ColumnToFragmentsMap&) -> void { + TableUpdateMetadata&) -> void { auto rs = update_log.getResultSet(); CHECK(rs->didOutputColumnar()); CHECK(rs->isDirectColumnarConversionPossible()); @@ -306,7 +308,7 @@ class StorageIOFacility { } else { auto callback = [this, &update_parameters]( UpdateLogForFragment const& update_log, - ColumnToFragmentsMap& optimize_candidates) -> void { + TableUpdateMetadata& table_update_metadata) -> void { auto entries_per_column = update_log.getEntryCount(); auto rows_per_column = update_log.getRowCount(); if (rows_per_column == 0) { @@ -430,7 +432,8 @@ class StorageIOFacility { Data_Namespace::MemoryLevel::CPU_LEVEL, update_parameters.getTransactionTracker()); if (should_recompute_metadata(update_stats)) { - optimize_candidates[target_column].emplace(fragment_id); + table_update_metadata.columns_for_metadata_update[target_column].emplace( + fragment_id); } } }; @@ -444,7 +447,7 @@ class StorageIOFacility { if (delete_parameters.tableIsTemporary()) { auto callback = [this](UpdateLogForFragment const& update_log, - ColumnToFragmentsMap&) -> void { + TableUpdateMetadata&) -> void { auto rs = update_log.getResultSet(); CHECK(rs->didOutputColumnar()); CHECK(rs->isDirectColumnarConversionPossible()); @@ -515,9 +518,9 @@ class StorageIOFacility { }; return callback; } else { - // TODO: Add support for opportunistic vacuuming - auto callback = [this, &delete_parameters](UpdateLogForFragment const& update_log, - ColumnToFragmentsMap&) -> void { + auto callback = [this, &delete_parameters]( + UpdateLogForFragment const& update_log, + TableUpdateMetadata& table_update_metadata) -> void { auto entries_per_column = update_log.getEntryCount(); auto rows_per_column = update_log.getRowCount(); if (rows_per_column == 0) { @@ -610,6 +613,8 @@ class StorageIOFacility { update_log.getColumnType(0), Data_Namespace::MemoryLevel::CPU_LEVEL, delete_parameters.getTransactionTracker()); + table_update_metadata.fragments_with_deleted_rows[table_descriptor->tableId] + .emplace(update_log.getFragmentId()); }; return callback; } diff --git a/QueryEngine/TableOptimizer.cpp b/QueryEngine/TableOptimizer.cpp index 5c2c94f42e..c0eb98f2b1 100644 --- a/QueryEngine/TableOptimizer.cpp +++ b/QueryEngine/TableOptimizer.cpp @@ -23,6 +23,9 @@ #include "Shared/misc.h" #include "Shared/scope.h" +// By default, when rows are deleted, vacuum fragments with a least 10% deleted rows +float g_vacuum_min_selectivity{0.1}; + TableOptimizer::TableOptimizer(const TableDescriptor* td, Executor* executor, const Catalog_Namespace::Catalog& cat) @@ -115,7 +118,7 @@ inline ExecutionOptions get_execution_options() { } // namespace void TableOptimizer::recomputeMetadata() const { - INJECT_TIMER(optimizeMetadata); + auto timer = DEBUG_TIMER(__func__); mapd_unique_lock lock(executor_->execute_mutex_); LOG(INFO) << "Recomputing metadata for " << td_->tableName; @@ -138,19 +141,16 @@ void TableOptimizer::recomputeMetadata() const { for (const auto td : table_descriptors) { ScopeGuard row_set_holder = [this] { executor_->row_set_mem_owner_ = nullptr; }; - // We can use a smaller block size here, since we won't be running projection queries executor_->row_set_mem_owner_ = - std::make_shared(1000000000, /*num_threads=*/1); + std::make_shared(ROW_SET_SIZE, /*num_threads=*/1); executor_->catalog_ = &cat_; const auto table_id = td->tableId; - - std::unordered_map tuple_count_map; - recomputeDeletedColumnMetadata(td, tuple_count_map); + auto stats = recomputeDeletedColumnMetadata(td); // TODO(adb): Support geo auto col_descs = cat_.getAllColumnMetadataForTable(table_id, false, false, false); for (const auto& cd : col_descs) { - recomputeColumnMetadata(td, cd, tuple_count_map, {}, {}); + recomputeColumnMetadata(td, cd, stats.visible_row_count_per_fragment, {}, {}); } data_mgr.checkpoint(cat_.getCurrentDB().dbId, table_id); executor_->clearMetaInfoCache(); @@ -163,24 +163,24 @@ void TableOptimizer::recomputeMetadata() const { } void TableOptimizer::recomputeMetadataUnlocked( - const ColumnToFragmentsMap& optimize_candidates) const { + const TableUpdateMetadata& table_update_metadata) const { + auto timer = DEBUG_TIMER(__func__); std::map> columns_by_table_id; - for (const auto& entry : optimize_candidates) { + auto& columns_for_update = table_update_metadata.columns_for_metadata_update; + for (const auto& entry : columns_for_update) { auto column_descriptor = entry.first; columns_by_table_id[column_descriptor->tableId].emplace_back(column_descriptor); } for (const auto& [table_id, columns] : columns_by_table_id) { auto td = cat_.getMetadataForTable(table_id); - std::unordered_map tuple_count_map; - recomputeDeletedColumnMetadata(td, tuple_count_map); + auto stats = recomputeDeletedColumnMetadata(td); for (const auto cd : columns) { - CHECK(optimize_candidates.find(cd) != optimize_candidates.end()); - auto fragment_indexes = - getFragmentIndexes(td, optimize_candidates.find(cd)->second); + CHECK(columns_for_update.find(cd) != columns_for_update.end()); + auto fragment_indexes = getFragmentIndexes(td, columns_for_update.find(cd)->second); recomputeColumnMetadata(td, cd, - tuple_count_map, + stats.visible_row_count_per_fragment, Data_Namespace::MemoryLevel::CPU_LEVEL, fragment_indexes); } @@ -190,11 +190,27 @@ void TableOptimizer::recomputeMetadataUnlocked( // Special case handle $deleted column if it exists // whilst handling the delete column also capture // the number of non deleted rows per fragment -void TableOptimizer::recomputeDeletedColumnMetadata( +DeletedColumnStats TableOptimizer::recomputeDeletedColumnMetadata( const TableDescriptor* td, - std::unordered_map& tuple_count_map) const { + const std::set& fragment_indexes) const { if (!td->hasDeletedCol) { - return; + return {}; + } + + auto stats = getDeletedColumnStats(td, fragment_indexes); + auto* fragmenter = td->fragmenter.get(); + CHECK(fragmenter); + auto cd = cat_.getDeletedColumn(td); + fragmenter->updateChunkStats(cd, stats.chunk_stats_per_fragment, {}); + fragmenter->setNumRows(stats.total_row_count); + return stats; +} + +DeletedColumnStats TableOptimizer::getDeletedColumnStats( + const TableDescriptor* td, + const std::set& fragment_indexes) const { + if (!td->hasDeletedCol) { + return {}; } auto cd = cat_.getDeletedColumn(td); @@ -214,15 +230,13 @@ void TableOptimizer::recomputeDeletedColumnMetadata( const auto co = get_compilation_options(ExecutorDeviceType::CPU); const auto eo = get_execution_options(); - std::unordered_map stats_map; - - size_t total_num_tuples = 0; + DeletedColumnStats deleted_column_stats; Executor::PerFragmentCallBack compute_deleted_callback = - [&stats_map, &tuple_count_map, &total_num_tuples, cd]( + [&deleted_column_stats, cd]( ResultSetPtr results, const Fragmenter_Namespace::FragmentInfo& fragment_info) { // count number of tuples in $deleted as total number of tuples in table. if (cd->isDeletedCol) { - total_num_tuples += fragment_info.getPhysicalNumTuples(); + deleted_column_stats.total_row_count += fragment_info.getPhysicalNumTuples(); } if (fragment_info.getPhysicalNumTuples() == 0) { // TODO(adb): Should not happen, but just to be safe... @@ -276,18 +290,20 @@ void TableOptimizer::recomputeDeletedColumnMetadata( return; } - stats_map.emplace( + deleted_column_stats.chunk_stats_per_fragment.emplace( std::make_pair(fragment_info.fragmentId, chunk_metadata->chunkStats)); - tuple_count_map.emplace(std::make_pair(fragment_info.fragmentId, num_tuples)); + deleted_column_stats.visible_row_count_per_fragment.emplace( + std::make_pair(fragment_info.fragmentId, num_tuples)); }; - executor_->executeWorkUnitPerFragment( - ra_exe_unit, table_infos[0], co, eo, cat_, compute_deleted_callback, {}); - - auto* fragmenter = td->fragmenter.get(); - CHECK(fragmenter); - fragmenter->updateChunkStats(cd, stats_map, {}); - fragmenter->setNumRows(total_num_tuples); + executor_->executeWorkUnitPerFragment(ra_exe_unit, + table_infos[0], + co, + eo, + cat_, + compute_deleted_callback, + fragment_indexes); + return deleted_column_stats; } void TableOptimizer::recomputeColumnMetadata( @@ -404,21 +420,131 @@ std::set TableOptimizer::getFragmentIndexes( } void TableOptimizer::vacuumDeletedRows() const { + auto timer = DEBUG_TIMER(__func__); const auto table_id = td_->tableId; const auto db_id = cat_.getDatabaseId(); const auto table_epochs = cat_.getTableEpochs(db_id, table_id); + const auto shards = cat_.getPhysicalTablesDescriptors(td_); try { - cat_.vacuumDeletedRows(table_id); + for (const auto shard : shards) { + vacuumFragments(shard); + } cat_.checkpoint(table_id); } catch (...) { cat_.setTableEpochsLogExceptions(db_id, table_epochs); throw; } - auto shards = cat_.getPhysicalTablesDescriptors(td_); for (auto shard : shards) { cat_.removeFragmenterForTable(shard->tableId); cat_.getDataMgr().getGlobalFileMgr()->compactDataFiles(cat_.getDatabaseId(), shard->tableId); } } + +void TableOptimizer::vacuumFragments(const TableDescriptor* td, + const std::set& fragment_ids) const { + // "if not a table that supports delete return, nothing more to do" + const ColumnDescriptor* cd = cat_.getDeletedColumn(td); + if (nullptr == cd) { + return; + } + // vacuum chunks which show sign of deleted rows in metadata + ChunkKey chunk_key_prefix = {cat_.getDatabaseId(), td->tableId, cd->columnId}; + ChunkMetadataVector chunk_metadata_vec; + cat_.getDataMgr().getChunkMetadataVecForKeyPrefix(chunk_metadata_vec, chunk_key_prefix); + for (auto& [chunk_key, chunk_metadata] : chunk_metadata_vec) { + auto fragment_id = chunk_key[CHUNK_KEY_FRAGMENT_IDX]; + // If delete has occurred, only vacuum fragments that are in the fragment_ids set. + // Empty fragment_ids set implies all fragments. + if (chunk_metadata->chunkStats.max.tinyintval == 1 && + (fragment_ids.empty() || shared::contains(fragment_ids, fragment_id))) { + UpdelRoll updel_roll; + updel_roll.catalog = &cat_; + updel_roll.logicalTableId = cat_.getLogicalTableId(td->tableId); + updel_roll.memoryLevel = Data_Namespace::MemoryLevel::CPU_LEVEL; + updel_roll.table_descriptor = td; + CHECK_EQ(cd->columnId, chunk_key[CHUNK_KEY_COLUMN_IDX]); + const auto chunk = Chunk_NS::Chunk::getChunk(cd, + &cat_.getDataMgr(), + chunk_key, + updel_roll.memoryLevel, + 0, + chunk_metadata->numBytes, + chunk_metadata->numElements); + td->fragmenter->compactRows(&cat_, + td, + fragment_id, + td->fragmenter->getVacuumOffsets(chunk), + updel_roll.memoryLevel, + updel_roll); + updel_roll.stageUpdate(); + } + } +} + +void TableOptimizer::vacuumFragmentsAboveMinSelectivity( + const TableUpdateMetadata& table_update_metadata) const { + if (td_->persistenceLevel != Data_Namespace::MemoryLevel::DISK_LEVEL) { + return; + } + auto timer = DEBUG_TIMER(__func__); + const auto db_id = cat_.getDatabaseId(); + const auto table_epochs = cat_.getTableEpochs(db_id, td_->tableId); + std::set vacuumed_tables; + try { + for (const auto& [table_id, fragment_ids] : + table_update_metadata.fragments_with_deleted_rows) { + auto td = cat_.getMetadataForTable(table_id); + // Skip automatic vacuuming for tables with uncapped epoch + if (td->maxRollbackEpochs == -1) { + continue; + } + + DeletedColumnStats deleted_column_stats; + { + mapd_unique_lock executor_lock(executor_->execute_mutex_); + ScopeGuard row_set_holder = [this] { executor_->row_set_mem_owner_ = nullptr; }; + executor_->row_set_mem_owner_ = + std::make_shared(ROW_SET_SIZE, /*num_threads=*/1); + deleted_column_stats = + getDeletedColumnStats(td, getFragmentIndexes(td, fragment_ids)); + executor_->clearMetaInfoCache(); + } + + std::set filtered_fragment_ids; + for (const auto [fragment_id, visible_row_count] : + deleted_column_stats.visible_row_count_per_fragment) { + auto total_row_count = + td->fragmenter->getFragmentInfo(fragment_id)->getPhysicalNumTuples(); + float deleted_row_count = total_row_count - visible_row_count; + if ((deleted_row_count / total_row_count) >= g_vacuum_min_selectivity) { + filtered_fragment_ids.emplace(fragment_id); + } + } + + if (!filtered_fragment_ids.empty()) { + vacuumFragments(td, filtered_fragment_ids); + vacuumed_tables.emplace(td); + VLOG(1) << "Auto-vacuumed fragments: " + << shared::printContainer(filtered_fragment_ids) + << ", table id: " << td->tableId; + } + } + + // Always checkpoint in order to ensure that epochs are uniformly incremented in + // distributed mode. + cat_.checkpoint(td_->tableId); + } catch (...) { + cat_.setTableEpochsLogExceptions(db_id, table_epochs); + throw; + } + + // Reset fragmenters for vacuumed tables in order to ensure that their metadata is in + // sync + for (auto table : vacuumed_tables) { + cat_.removeFragmenterForTable(table->tableId); + cat_.getMetadataForTable(table->tableId); + CHECK(table->fragmenter); + } +} diff --git a/QueryEngine/TableOptimizer.h b/QueryEngine/TableOptimizer.h index fff1943daf..f5fbaab6da 100644 --- a/QueryEngine/TableOptimizer.h +++ b/QueryEngine/TableOptimizer.h @@ -19,6 +19,13 @@ #include "Catalog/Catalog.h" class Executor; +struct TableUpdateMetadata; + +struct DeletedColumnStats { + size_t total_row_count{0}; + std::unordered_map visible_row_count_per_fragment; + std::unordered_map chunk_stats_per_fragment; +}; /** * @brief Driver for running cleanup processes on a table. @@ -47,7 +54,7 @@ class TableOptimizer { * The caller of this method is expected to have already acquired the * executor lock. */ - void recomputeMetadataUnlocked(const ColumnToFragmentsMap& optimize_candidates) const; + void recomputeMetadataUnlocked(const TableUpdateMetadata& table_update_metadata) const; /** * @brief Compacts fragments to remove deleted rows. @@ -58,10 +65,17 @@ class TableOptimizer { */ void vacuumDeletedRows() const; + /** + * Vacuums fragments with a deleted rows percentage that exceeds the configured minimum + * vacuum selectivity threshold. + */ + void vacuumFragmentsAboveMinSelectivity( + const TableUpdateMetadata& table_update_metadata) const; + private: - void recomputeDeletedColumnMetadata( + DeletedColumnStats recomputeDeletedColumnMetadata( const TableDescriptor* td, - std::unordered_map& tuple_count_map) const; + const std::set& fragment_indexes = {}) const; void recomputeColumnMetadata(const TableDescriptor* td, const ColumnDescriptor* cd, @@ -72,7 +86,17 @@ class TableOptimizer { std::set getFragmentIndexes(const TableDescriptor* td, const std::set& fragment_ids) const; + void vacuumFragments(const TableDescriptor* td, + const std::set& fragment_ids = {}) const; + + DeletedColumnStats getDeletedColumnStats( + const TableDescriptor* td, + const std::set& fragment_indexes) const; + const TableDescriptor* td_; Executor* executor_; const Catalog_Namespace::Catalog& cat_; + + // We can use a smaller block size here, since we won't be running projection queries + static constexpr size_t ROW_SET_SIZE{1000000000}; }; diff --git a/Tests/ComputeMetadataTest.cpp b/Tests/ComputeMetadataTest.cpp index 719a76abeb..e828af1cb6 100644 --- a/Tests/ComputeMetadataTest.cpp +++ b/Tests/ComputeMetadataTest.cpp @@ -28,6 +28,8 @@ #define BASE_PATH "./tmp" #endif +extern float g_vacuum_min_selectivity; + namespace { #define ASSERT_METADATA(type, tag) \ @@ -149,6 +151,7 @@ static const std::string g_table_name{"metadata_test"}; } // namespace class MultiFragMetadataUpdate : public DBHandlerTestFixture { + protected: void SetUp() override { DBHandlerTestFixture::SetUp(); EXPECT_NO_THROW(sql("DROP TABLE IF EXISTS " + g_table_name + ";")); @@ -257,6 +260,12 @@ TEST_F(MultiFragMetadataUpdate, NoChanges) { class MetadataUpdate : public DBHandlerTestFixture, public testing::WithParamInterface { + protected: + static void SetUpTestSuite() { + g_enable_auto_metadata_update = false; + g_vacuum_min_selectivity = 1.1; + } + void SetUp() override { DBHandlerTestFixture::SetUp(); auto is_sharded = GetParam(); @@ -495,17 +504,22 @@ TEST_P(MetadataUpdate, AlterAfterEmptied) { check_fragment_metadata(12, std::numeric_limits::max(), std::numeric_limits::lowest(), - true)); + false)); // test ADD multiple columns EXPECT_NO_THROW( sql("ALTER TABLE " + g_table_name + " ADD (c88 int default 88, cnn int);")); - run_op_per_fragment(cat, td, check_fragment_metadata(13, 88, 88, false)); + run_op_per_fragment(cat, + td, + check_fragment_metadata(13, + std::numeric_limits::max(), + std::numeric_limits::lowest(), + false)); run_op_per_fragment(cat, td, check_fragment_metadata(14, std::numeric_limits::max(), std::numeric_limits::lowest(), - true)); + false)); } INSTANTIATE_TEST_SUITE_P(ShardedAndNonShardedTable, @@ -546,6 +560,8 @@ TEST_F(DeletedRowsMetadataUpdateTest, ComputeMetadataAfterDelete) { class OptimizeTableVacuumTest : public DBHandlerTestFixture { protected: + static void SetUpTestSuite() { g_vacuum_min_selectivity = 1.1; } + void SetUp() override { DBHandlerTestFixture::SetUp(); sql("drop table if exists test_table;"); @@ -667,7 +683,7 @@ TEST_F(OptimizeTableVacuumTest, UpdateAndCompactTableData) { File_Namespace::FileMgr::setNumPagesPerMetadataFile(1); File_Namespace::FileMgr::setNumPagesPerDataFile(1); - sql("create table test_table (i int);"); + sql("create table test_table (i int) with (max_rollback_epochs = 25);"); sql("insert into test_table values (10);"); // 2 chunk page writes and 2 metadata page writes. One for // the "i" column and a second for the "$deleted" column @@ -700,7 +716,8 @@ TEST_F(OptimizeTableVacuumTest, InsertAndCompactTableData) { File_Namespace::FileMgr::setNumPagesPerMetadataFile(1); File_Namespace::FileMgr::setNumPagesPerDataFile(1); - sql("create table test_table (i int) with (fragment_size = 2);"); + sql("create table test_table (i int) with (fragment_size = 2, max_rollback_epochs = " + "25);"); insertRange(1, 3, 1); // 4 chunk page writes. 2 for the "i" column and "$deleted" column each. // 6 metadata page writes for each insert (3 inserts for 2 columns). @@ -733,7 +750,8 @@ TEST_F(OptimizeTableVacuumTest, UpdateAndCompactShardedTableData) { File_Namespace::FileMgr::setNumPagesPerMetadataFile(1); File_Namespace::FileMgr::setNumPagesPerDataFile(1); - sql("create table test_table (i int, f float, shard key(i)) with (shard_count = 4);"); + sql("create table test_table (i int, f float, shard key(i)) with (shard_count = 4, " + "max_rollback_epochs = 25);"); insertRange(1, 4, 2); // 12 chunk page writes and 12 metadata page writes. Each shard with // 3 metadata/data page writes for columns "i", "f", and "$deleted". @@ -804,7 +822,7 @@ TEST_F(OptimizeTableVacuumTest, MultiplePagesPerFile) { File_Namespace::FileMgr::setNumPagesPerMetadataFile(4); File_Namespace::FileMgr::setNumPagesPerDataFile(2); - sql("create table test_table (i int);"); + sql("create table test_table (i int) with (max_rollback_epochs = 25);"); sql("insert into test_table values (10);"); // 2 chunk page writes and 2 metadata page writes. One for // the "i" column and a second for the "$deleted" column @@ -1099,6 +1117,625 @@ TEST_F(VarLenColumnUpdateTest, ChunkUpdateReusesDataPage) { sqlAndCompareResult("select * from test_table;", {{i(3), "b"}, {i(2), "e"}}); } +class OpportunisticVacuumingTest : public OptimizeTableVacuumTest { + protected: + void SetUp() override { + OptimizeTableVacuumTest::SetUp(); + sql("drop table if exists test_table;"); + g_vacuum_min_selectivity = 0.1; + } + + void TearDown() override { + sql("drop table if exists test_table;"); + DBHandlerTestFixture::TearDown(); + } + + void insertRange(int start, int end, const std::string& str_value) { + for (int value = start; value <= end; value++) { + auto number_str = std::to_string(value); + sql("insert into test_table values (" + number_str + ", '" + str_value + + number_str + "');"); + } + } + + void assertChunkContentAndMetadata(int32_t fragment_id, + const std::vector& values, + bool has_nulls = false, + const std::string& table_name = "test_table") { + auto [chunk, chunk_metadata] = getChunkAndMetadata("i", fragment_id, table_name); + auto buffer = chunk.getBuffer(); + ASSERT_EQ(values.size() * sizeof(int32_t), buffer->size()); + assertCommonChunkMetadata(buffer, values.size(), has_nulls); + + if (!values.empty()) { + auto min = *std::min_element(values.begin(), values.end()); + auto max = *std::max_element(values.begin(), values.end()); + assertMinAndMax(min, max, chunk_metadata); + + std::vector actual_values(values.size()); + buffer->read(reinterpret_cast(actual_values.data()), buffer->size()); + EXPECT_EQ(values, actual_values); + } + } + + void assertTextChunkContentAndMetadata(int32_t fragment_id, + const std::vector& values, + bool has_nulls = false, + const std::string& table_name = "test_table") { + auto [chunk, chunk_metadata] = getChunkAndMetadata("t", fragment_id, table_name); + auto data_buffer = chunk.getBuffer(); + assertCommonChunkMetadata(data_buffer, values.size(), has_nulls); + + auto index_buffer = chunk.getIndexBuf(); + if (values.empty()) { + EXPECT_EQ(static_cast(0), data_buffer->size()); + EXPECT_EQ(static_cast(0), index_buffer->size()); + } else { + ASSERT_EQ(index_buffer->size() % sizeof(int32_t), static_cast(0)); + std::vector index_values(index_buffer->size() / sizeof(int32_t)); + ASSERT_EQ(values.size() + 1, index_values.size()); + index_buffer->read(reinterpret_cast(index_values.data()), + index_buffer->size()); + + std::string data_values(data_buffer->size(), '\0'); + data_buffer->read(reinterpret_cast(data_values.data()), + data_buffer->size()); + + int32_t cumulative_length{0}; + for (size_t i = 0; i < values.size(); i++) { + EXPECT_EQ(cumulative_length, index_values[i]); + cumulative_length += values[i].size(); + EXPECT_EQ( + values[i], + data_values.substr(index_values[i], index_values[i + 1] - index_values[i])); + } + EXPECT_EQ(cumulative_length, index_values[values.size()]); + } + } + + void assertCommonChunkMetadata(AbstractBuffer* buffer, + size_t num_elements, + bool has_nulls) { + ASSERT_TRUE(buffer->hasEncoder()); + std::shared_ptr chunk_metadata = std::make_shared(); + buffer->getEncoder()->getMetadata(chunk_metadata); + EXPECT_EQ(buffer->size(), chunk_metadata->numBytes); + EXPECT_EQ(num_elements, chunk_metadata->numElements); + EXPECT_EQ(has_nulls, chunk_metadata->chunkStats.has_nulls); + } + + void assertShardChunkContentAndMetadata(int32_t shard_index, + int32_t fragment_id, + const std::vector& values) { + assertChunkContentAndMetadata( + fragment_id, values, false, getShardTableName(shard_index)); + } + + void assertShardTextChunkContent(int32_t shard_index, + int32_t fragment_id, + const std::vector& values) { + assertTextChunkContentAndMetadata( + fragment_id, values, false, getShardTableName(shard_index)); + } + + void assertFragmentRowCount(size_t row_count) { + auto td = getCatalog().getMetadataForTable("test_table"); + ASSERT_TRUE(td->fragmenter != nullptr); + ASSERT_EQ(row_count, td->fragmenter->getNumRows()); + } + + void assertShardFragmentRowCount(int32_t shard_index, size_t row_count) { + auto td = getCatalog().getMetadataForTable(getShardTableName(shard_index)); + ASSERT_TRUE(td->fragmenter != nullptr); + ASSERT_EQ(row_count, td->fragmenter->getNumRows()); + } + + void assertMinAndMax(int32_t min, + int32_t max, + std::shared_ptr chunk_metadata) { + EXPECT_EQ(min, chunk_metadata->chunkStats.min.intval); + EXPECT_EQ(max, chunk_metadata->chunkStats.max.intval); + } + + void assertMinAndMax(int64_t min, + int64_t max, + std::shared_ptr chunk_metadata) { + EXPECT_EQ(min, chunk_metadata->chunkStats.min.bigintval); + EXPECT_EQ(max, chunk_metadata->chunkStats.max.bigintval); + } + + void assertMinAndMax(float min, + float max, + std::shared_ptr chunk_metadata) { + EXPECT_EQ(min, chunk_metadata->chunkStats.min.floatval); + EXPECT_EQ(max, chunk_metadata->chunkStats.max.floatval); + } + + void assertMinAndMax(double min, + double max, + std::shared_ptr chunk_metadata) { + EXPECT_EQ(min, chunk_metadata->chunkStats.min.doubleval); + EXPECT_EQ(max, chunk_metadata->chunkStats.max.doubleval); + } + + std::pair> getChunkAndMetadata( + const std::string& column_name, + int32_t fragment_id, + const std::string& table_name = "test_table") { + auto& catalog = getCatalog(); + auto& data_mgr = catalog.getDataMgr(); + auto td = catalog.getMetadataForTable(table_name); + auto cd = catalog.getMetadataForColumn(td->tableId, column_name); + Chunk_NS::Chunk chunk; + ChunkKey chunk_key{catalog.getDatabaseId(), td->tableId, cd->columnId, fragment_id}; + if (cd->columnType.is_varlen_indeed()) { + chunk_key.emplace_back(2); + chunk.setIndexBuffer(data_mgr.getChunkBuffer(chunk_key, MemoryLevel::DISK_LEVEL)); + chunk_key.back() = 1; + } + chunk.setBuffer(data_mgr.getChunkBuffer(chunk_key, MemoryLevel::DISK_LEVEL)); + CHECK(chunk.getBuffer()->hasEncoder()); + std::shared_ptr chunk_metadata = std::make_shared(); + chunk.getBuffer()->getEncoder()->getMetadata(chunk_metadata); + return {chunk, chunk_metadata}; + } + + std::shared_ptr getStringDictionary(const std::string& column_name) { + auto& catalog = getCatalog(); + auto td = catalog.getMetadataForTable("test_table"); + auto cd = catalog.getMetadataForColumn(td->tableId, column_name); + CHECK(cd->columnType.is_dict_encoded_string()); + auto dict_metadata = catalog.getMetadataForDict(cd->columnType.get_comp_param()); + CHECK(dict_metadata); + return dict_metadata->stringDict; + } + + using DataTypesTestRow = std:: + tuple; + + void assertFragmentMetadataForDataTypesTest(const std::vector& rows, + int32_t fragment_id, + bool has_nulls) { + // Assert metadata for "i" chunk + assertCommonChunkMetadata( + rows, "i", fragment_id, has_nulls, sizeof(int16_t) * rows.size()); + assertMinMaxMetadata(rows, "i", fragment_id, NULL_SMALLINT); + + // Assert metadata for "t" chunk + assertCommonChunkMetadata( + rows, "t", fragment_id, has_nulls, sizeof(int32_t) * rows.size()); + auto string_dictionary = getStringDictionary("t"); + assertMinMaxMetadata( + rows, + "t", + fragment_id, + NULL_INT, + [string_dictionary](const std::string& str_value) { + return string_dictionary->getIdOfString(str_value); + }); + + // Assert metadata for "t_none" chunk + size_t chunk_size{0}; + for (const auto& row : rows) { + chunk_size += std::get<2>(row).size(); + } + assertCommonChunkMetadata(rows, "t_none", fragment_id, has_nulls, chunk_size); + // Skip min/max metadata check for none encoded string column, since this metadata is + // not updated + + // Assert metadata for "f" chunk + assertCommonChunkMetadata( + rows, "f", fragment_id, has_nulls, sizeof(float) * rows.size()); + assertMinMaxMetadata(rows, "f", fragment_id, NULL_FLOAT); + + // Assert metadata for "d_arr" chunk + assertCommonChunkMetadata( + rows, "d_arr", fragment_id, has_nulls, sizeof(double) * rows.size()); + assertMinMaxMetadata(rows, "d_arr", fragment_id, NULL_DOUBLE); + + // Assert metadata for "tm_arr" chunk + assertCommonChunkMetadata( + rows, "tm_arr", fragment_id, has_nulls, sizeof(int64_t) * rows.size()); + // Skip min/max metadata check for variable length array column, since this metadata + // is not updated + + // Assert metadata for "dt" chunk + assertCommonChunkMetadata( + rows, "dt", fragment_id, has_nulls, sizeof(int32_t) * rows.size()); + assertMinMaxMetadata( + rows, "dt", fragment_id, NULL_INT, [](const std::string& str_value) { + SQLTypeInfo type{kDATE, false}; + return StringToDatum(str_value, type).bigintval; + }); + } + + void assertCommonChunkMetadata(const std::vector& rows, + const std::string& column_name, + int32_t fragment_id, + bool has_nulls, + size_t expected_chunk_size) { + auto [chunk, chunk_metadata] = getChunkAndMetadata(column_name, fragment_id); + ASSERT_EQ(expected_chunk_size, chunk.getBuffer()->size()); + assertCommonChunkMetadata(chunk.getBuffer(), rows.size(), has_nulls); + } + + template + void assertMinMaxMetadata( + const std::vector& rows, + const std::string& column_name, + int32_t fragment_id, + EncodedType null_sentinel, + std::function type_converter = nullptr) { + auto chunk_metadata = getChunkAndMetadata(column_name, fragment_id).second; + auto [min, max] = getMinAndMax( + rows, null_sentinel, type_converter); + assertMinAndMax(min, max, chunk_metadata); + } + + template + std::pair getMinAndMax( + const std::vector& rows, + EncodedType null_sentinel, + std::function type_converter) { + EncodedType min{std::numeric_limits::max()}, + max{std::numeric_limits::lowest()}; + for (const auto& row : rows) { + if constexpr (convert_input) { + auto str_value = std::get(row); + if (!str_value.empty()) { + auto value = type_converter(str_value); + min = std::min(min, value); + max = std::max(max, value); + } + } else { + auto value = std::get(row); + if (value != null_sentinel) { + min = std::min(min, value); + max = std::max(max, value); + } + } + } + return {min, max}; + } + + std::string getShardTableName(int32_t shard_index) { + auto& catalog = getCatalog(); + auto td = catalog.getMetadataForTable("test_table"); + CHECK_GT(td->nShards, 0); + CHECK_LT(shard_index, td->nShards); + + auto shards = catalog.getPhysicalTablesDescriptors(td); + CHECK_EQ(static_cast(td->nShards), shards.size()); + return shards[shard_index]->tableName; + } +}; + +TEST_F(OpportunisticVacuumingTest, DeletedFragment) { + sql("create table test_table (i int) with (fragment_size = 3, " + "max_rollback_epochs = 25);"); + OptimizeTableVacuumTest::insertRange(1, 5); + + assertChunkContentAndMetadata(0, {1, 2, 3}); + assertChunkContentAndMetadata(1, {4, 5}); + + sql("delete from test_table where i <= 3;"); + + assertChunkContentAndMetadata(0, {}); + assertChunkContentAndMetadata(1, {4, 5}); + assertFragmentRowCount(2); + sqlAndCompareResult("select * from test_table;", {{i(4)}, {i(5)}}); +} + +TEST_F(OpportunisticVacuumingTest, + DeleteQueryAndPercentDeletedRowsBelowSelectivityThreshold) { + sql("create table test_table (i int) with (fragment_size = 5, " + "max_rollback_epochs = 25);"); + OptimizeTableVacuumTest::insertRange(1, 10); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + + g_vacuum_min_selectivity = 0.45; + sql("delete from test_table where i <= 2 or i >= 9;"); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + assertFragmentRowCount(10); + sqlAndCompareResult("select * from test_table;", + {{i(3)}, {i(4)}, {i(5)}, {i(6)}, {i(7)}, {i(8)}}); +} + +TEST_F(OpportunisticVacuumingTest, + DeleteQueryAndPercentDeletedRowsAboveSelectivityThreshold) { + sql("create table test_table (i int) with (fragment_size = 5, " + "max_rollback_epochs = 25);"); + OptimizeTableVacuumTest::insertRange(1, 10); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + + g_vacuum_min_selectivity = 0.35; + sql("delete from test_table where i <= 2 or i >= 9;"); + + assertChunkContentAndMetadata(0, {3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8}); + assertFragmentRowCount(6); + sqlAndCompareResult("select * from test_table;", + {{i(3)}, {i(4)}, {i(5)}, {i(6)}, {i(7)}, {i(8)}}); +} + +TEST_F(OpportunisticVacuumingTest, + DeleteQueryAndPercentDeletedRowsAboveSelectivityThresholdAndUncappedEpoch) { + sql("create table test_table (i int) with (fragment_size = 5);"); + getCatalog().setUncappedTableEpoch("test_table"); + OptimizeTableVacuumTest::insertRange(1, 10); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + + g_vacuum_min_selectivity = 0.35; + sql("delete from test_table where i <= 2 or i >= 9;"); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + assertFragmentRowCount(10); + sqlAndCompareResult("select * from test_table;", + {{i(3)}, {i(4)}, {i(5)}, {i(6)}, {i(7)}, {i(8)}}); +} + +TEST_F(OpportunisticVacuumingTest, DeleteOnShardedTable) { + sql("create table test_table (i int, shard key(i)) with (fragment_size = 2, " + "shard_count = 2, max_rollback_epochs = 25);"); + OptimizeTableVacuumTest::insertRange(1, 6); + + assertShardChunkContentAndMetadata(0, 0, {2, 4}); + assertShardChunkContentAndMetadata(0, 1, {6}); + + assertShardChunkContentAndMetadata(1, 0, {1, 3}); + assertShardChunkContentAndMetadata(1, 1, {5}); + + sql("delete from test_table where i <= 4;"); + + assertShardChunkContentAndMetadata(0, 0, {}); + assertShardChunkContentAndMetadata(0, 1, {6}); + assertShardFragmentRowCount(0, 1); + + assertShardChunkContentAndMetadata(1, 0, {}); + assertShardChunkContentAndMetadata(1, 1, {5}); + assertShardFragmentRowCount(1, 1); + + sqlAndCompareResult("select * from test_table;", {{i(6)}, {i(5)}}); +} + +TEST_F(OpportunisticVacuumingTest, VarLenColumnUpdateAndEntireFragmentUpdated) { + sql("create table test_table (i int, t text encoding none) with (fragment_size = 3, " + "max_rollback_epochs = 25);"); + insertRange(1, 5, "abc"); + + assertChunkContentAndMetadata(0, {1, 2, 3}); + assertChunkContentAndMetadata(1, {4, 5}); + + assertTextChunkContentAndMetadata(0, {"abc1", "abc2", "abc3"}); + assertTextChunkContentAndMetadata(1, {"abc4", "abc5"}); + + sql("update test_table set t = 'test_val' where i <= 3;"); + + // When a variable length column is updated, the entire row is marked as deleted and a + // new row with updated values is appended to the end of the table. + assertChunkContentAndMetadata(0, {}); + assertChunkContentAndMetadata(1, {4, 5, 1}); + assertChunkContentAndMetadata(2, {2, 3}); + + assertTextChunkContentAndMetadata(0, {}); + assertTextChunkContentAndMetadata(1, {"abc4", "abc5", "test_val"}); + assertTextChunkContentAndMetadata(2, {"test_val", "test_val"}); + + assertFragmentRowCount(5); + sqlAndCompareResult("select * from test_table;", + {{i(4), "abc4"}, + {i(5), "abc5"}, + {i(1), "test_val"}, + {i(2), "test_val"}, + {i(3), "test_val"}}); +} + +TEST_F(OpportunisticVacuumingTest, + VarLenColumnUpdateAndPercentDeletedRowsBelowSelectivityThreshold) { + sql("create table test_table (i int, t text encoding none) with (fragment_size = 5, " + "max_rollback_epochs = 25);"); + insertRange(1, 10, "abc"); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + + assertTextChunkContentAndMetadata(0, {"abc1", "abc2", "abc3", "abc4", "abc5"}); + assertTextChunkContentAndMetadata(1, {"abc6", "abc7", "abc8", "abc9", "abc10"}); + + g_vacuum_min_selectivity = 0.45; + sql("update test_table set t = 'test_val' where i <= 2 or i >= 9;"); + + // When a variable length column is updated, the entire row is marked as deleted and a + // new row with updated values is appended to the end of the table. + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + assertChunkContentAndMetadata(2, {1, 2, 9, 10}); + + assertTextChunkContentAndMetadata(0, {"abc1", "abc2", "abc3", "abc4", "abc5"}); + assertTextChunkContentAndMetadata(1, {"abc6", "abc7", "abc8", "abc9", "abc10"}); + assertTextChunkContentAndMetadata(2, {"test_val", "test_val", "test_val", "test_val"}); + + assertFragmentRowCount(14); + sqlAndCompareResult("select * from test_table;", + {{i(3), "abc3"}, + {i(4), "abc4"}, + {i(5), "abc5"}, + {i(6), "abc6"}, + {i(7), "abc7"}, + {i(8), "abc8"}, + {i(1), "test_val"}, + {i(2), "test_val"}, + {i(9), "test_val"}, + {i(10), "test_val"}}); +} + +TEST_F(OpportunisticVacuumingTest, + VarLenColumnUpdateAndPercentDeletedRowsAboveSelectivityThreshold) { + sql("create table test_table (i int, t text encoding none) with (fragment_size = 5, " + "max_rollback_epochs = 25);"); + insertRange(1, 10, "abc"); + + assertChunkContentAndMetadata(0, {1, 2, 3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8, 9, 10}); + + assertTextChunkContentAndMetadata(0, {"abc1", "abc2", "abc3", "abc4", "abc5"}); + assertTextChunkContentAndMetadata(1, {"abc6", "abc7", "abc8", "abc9", "abc10"}); + + g_vacuum_min_selectivity = 0.35; + sql("update test_table set t = 'test_val' where i <= 2 or i >= 9;"); + + // When a variable length column is updated, the entire row is marked as deleted and a + // new row with updated values is appended to the end of the table. + assertChunkContentAndMetadata(0, {3, 4, 5}); + assertChunkContentAndMetadata(1, {6, 7, 8}); + assertChunkContentAndMetadata(2, {1, 2, 9, 10}); + + assertTextChunkContentAndMetadata(0, {"abc3", "abc4", "abc5"}); + assertTextChunkContentAndMetadata(1, {"abc6", "abc7", "abc8"}); + assertTextChunkContentAndMetadata(2, {"test_val", "test_val", "test_val", "test_val"}); + + assertFragmentRowCount(10); + sqlAndCompareResult("select * from test_table;", + {{i(3), "abc3"}, + {i(4), "abc4"}, + {i(5), "abc5"}, + {i(6), "abc6"}, + {i(7), "abc7"}, + {i(8), "abc8"}, + {i(1), "test_val"}, + {i(2), "test_val"}, + {i(9), "test_val"}, + {i(10), "test_val"}}); +} + +TEST_F(OpportunisticVacuumingTest, VarlenColumnUpdateOnShardedTable) { + sql("create table test_table (i int, t text encoding none, shard key(i)) with " + "(fragment_size = 2, shard_count = 2, max_rollback_epochs = 25);"); + insertRange(1, 6, "abc"); + + assertShardChunkContentAndMetadata(0, 0, {2, 4}); + assertShardChunkContentAndMetadata(0, 1, {6}); + assertShardTextChunkContent(0, 0, {"abc2", "abc4"}); + assertShardTextChunkContent(0, 1, {"abc6"}); + + assertShardChunkContentAndMetadata(1, 0, {1, 3}); + assertShardChunkContentAndMetadata(1, 1, {5}); + assertShardTextChunkContent(1, 0, {"abc1", "abc3"}); + assertShardTextChunkContent(1, 1, {"abc5"}); + + sql("update test_table set t = 'test_val' where i <= 4;"); + + // When a variable length column is updated, the entire row is marked as deleted and a + // new row with updated values is appended to the end of the table. + assertShardChunkContentAndMetadata(0, 0, {}); + assertShardChunkContentAndMetadata(0, 1, {6, 2}); + assertShardChunkContentAndMetadata(0, 2, {4}); + assertShardTextChunkContent(0, 0, {}); + assertShardTextChunkContent(0, 1, {"abc6", "test_val"}); + assertShardTextChunkContent(0, 2, {"test_val"}); + assertShardFragmentRowCount(0, 3); + + assertShardChunkContentAndMetadata(1, 0, {}); + assertShardChunkContentAndMetadata(1, 1, {5, 1}); + assertShardChunkContentAndMetadata(1, 2, {3}); + assertShardTextChunkContent(1, 0, {}); + assertShardTextChunkContent(1, 1, {"abc5", "test_val"}); + assertShardTextChunkContent(1, 2, {"test_val"}); + assertShardFragmentRowCount(1, 3); + + sqlAndCompareResult("select * from test_table;", + {{i(6), "abc6"}, + {i(2), "test_val"}, + {i(4), "test_val"}, + {i(5), "abc5"}, + {i(1), "test_val"}, + {i(3), "test_val"}}); +} + +TEST_F(OpportunisticVacuumingTest, DifferentDataTypesMetadataUpdate) { + sql("create table test_table (i integer encoding fixed(16), t text, " + "t_none text encoding none, f float, d_arr double[1], tm_arr timestamp[], " + "dt date) with (fragment_size = 2, max_rollback_epochs = 25);"); + sql("insert into test_table values (1, 'test_1', 'test_1', 1.5, {10.5}, " + "{'2021-01-01 00:10:00'}, '2021-01-01');"); + sql("insert into test_table values (2, 'test_2', 'test_2', 2.5, {20.5}, " + "{'2021-02-01 00:10:00'}, '2021-02-01');"); + sql("insert into test_table values (3, 'test_3', 'test_3', 3.5, {30.5}, " + "{'2021-03-01 00:10:00'}, '2021-03-01');"); + sql("insert into test_table values (4, 'test_4', 'test_4', 4.5, {40.5}, " + "{'2021-04-01 00:10:00'}, '2021-04-01');"); + sql("insert into test_table values (5, 'test_5', 'test_5', 5.5, {50.5}, " + "{'2021-05-01 00:10:00'}, '2021-05-01');"); + + assertFragmentMetadataForDataTypesTest( + {{1, "test_1", "test_1", 1.5f, 10.5, "2021-01-01 00:10:00", "2021-01-01"}, + {2, "test_2", "test_2", 2.5f, 20.5, "2021-02-01 00:10:00", "2021-02-01"}}, + 0, + false); + assertFragmentMetadataForDataTypesTest( + {{3, "test_3", "test_3", 3.5f, 30.5, "2021-03-01 00:10:00", "2021-03-01"}, + {4, "test_4", "test_4", 4.5f, 40.5, "2021-04-01 00:10:00", "2021-04-01"}}, + 1, + false); + assertFragmentMetadataForDataTypesTest( + {{5, "test_5", "test_5", 5.5f, 50.5, "2021-05-01 00:10:00", "2021-05-01"}}, + 2, + false); + + // Increase values + sql("update test_table set i = 10, t = 'test_10', t_none = 'test_10', f = 100.5, " + "d_arr = ARRAY[1000.5], tm_arr = ARRAY['2021-10-10 00:10:00'], " + "dt = '2021-10-10' where i = 2;"); + + // Set values to null + sql("update test_table set i = null, t = null, t_none = null, f = null, " + "d_arr = ARRAY[null], tm_arr = null, dt = null where i = 3;"); + + // Decrease values + sql("update test_table set i = 0, t = 'test', t_none = 'test', f = 0.5, " + "d_arr = ARRAY[1.5], tm_arr = ARRAY['2020-01-01 00:10:00'], " + "dt = '2020-01-01' where i = 5;"); + + // When a variable length column is updated, the entire row is marked as deleted and a + // new row with updated values is appended to the end of the table. + assertFragmentMetadataForDataTypesTest( + {{1, "test_1", "test_1", 1.5f, 10.5, "2021-01-01 00:10:00", "2021-01-01"}}, + 0, + false); + assertFragmentMetadataForDataTypesTest( + {{4, "test_4", "test_4", 4.5f, 40.5, "2021-04-01 00:10:00", "2021-04-01"}}, + 1, + false); + assertFragmentMetadataForDataTypesTest( + {{10, "test_10", "test_10", 100.5f, 1000.5, "2021-10-10 00:10:00", "2021-10-10"}}, + 2, + false); + assertFragmentMetadataForDataTypesTest( + {{NULL_SMALLINT, "", "", NULL_FLOAT, NULL_DOUBLE, "", ""}, + {0, "test", "test", 0.5f, 1.5, "2020-01-01 00:10:00", "2020-01-01"}}, + 3, + true); + + // clang-format off + sqlAndCompareResult( + "select * from test_table order by i;", + {{i(0), "test", "test", 0.5f, array({1.5}), array({"2020-01-01 00:10:00"}), "2020-01-01"}, + {i(1), "test_1", "test_1", 1.5f, array({10.5}), array({"2021-01-01 00:10:00"}), "2021-01-01"}, + {i(4), "test_4", "test_4", 4.5f, array({40.5}), array({"2021-04-01 00:10:00"}), "2021-04-01"}, + {i(10), "test_10", "test_10", 100.5f, array({1000.5}), array({"2021-10-10 00:10:00"}), "2021-10-10"}, + {Null_i, Null, Null, NULL_FLOAT, array({NULL_DOUBLE}), array({}), Null}}); + // clang-format on +} + int main(int argc, char** argv) { TestHelpers::init_logger_stderr_only(argc, argv); testing::InitGoogleTest(&argc, argv); diff --git a/Tests/CreateAndDropTableDdlTest.cpp b/Tests/CreateAndDropTableDdlTest.cpp index 69dba47a43..00dd97231f 100644 --- a/Tests/CreateAndDropTableDdlTest.cpp +++ b/Tests/CreateAndDropTableDdlTest.cpp @@ -1722,13 +1722,12 @@ TEST_F(MaxRollbackEpochsTest, CreateTableDefaultValue) { ASSERT_EQ(table->maxRollbackEpochs, DEFAULT_MAX_ROLLBACK_EPOCHS); // Sanity test to ensure that default max_rollback_epoch is in effect - sql("INSERT INTO test_table VALUES (10);"); - for (int i = 1; i < DEFAULT_MAX_ROLLBACK_EPOCHS; i++) { - sql("UPDATE test_table SET col1 = col1 + 10;"); + for (int i = 0; i < DEFAULT_MAX_ROLLBACK_EPOCHS; i++) { + sql("INSERT INTO test_table VALUES (10);"); } assertEpochCeilingAndFloor(DEFAULT_MAX_ROLLBACK_EPOCHS, 0); - sql("UPDATE test_table SET col1 = col1 + 10;"); + sql("INSERT INTO test_table VALUES (10);"); assertEpochCeilingAndFloor(DEFAULT_MAX_ROLLBACK_EPOCHS + 1, 1); } diff --git a/Tests/ExecuteTest.cpp b/Tests/ExecuteTest.cpp index b20f6b88dd..2c36684fd7 100644 --- a/Tests/ExecuteTest.cpp +++ b/Tests/ExecuteTest.cpp @@ -15022,19 +15022,7 @@ TEST(Update, BasicVarlenUpdate) { } // Test RelCompound-driven update - - auto y_pre_rowid_1 = - v(run_simple_agg("select rowid from smartswitch where y[1]=1;", dt)); run_multiple_agg("update smartswitch set y=ARRAY[9,10,11,12] where y[1]=1;", dt); - auto y_post_rowid_1 = - v(run_simple_agg("select rowid from smartswitch where y[1]=9 and " - "y[2]=10 and y[3]=11 and y[4]=12 and z='Flake';", - dt)); - - // Internal insert-delete cycle should create a new rowid - // This test validates that the CTAS varlen update path was used; the rowid change is - // evidence - ASSERT_NE(y_pre_rowid_1, y_post_rowid_1); ASSERT_EQ(int64_t(2), v(run_simple_agg("select count(y) from smartswitch where y[1]=9 " diff --git a/Tests/ShardedTableEpochConsistencyTest.cpp b/Tests/ShardedTableEpochConsistencyTest.cpp index fa631ed520..c1dd3d0b75 100644 --- a/Tests/ShardedTableEpochConsistencyTest.cpp +++ b/Tests/ShardedTableEpochConsistencyTest.cpp @@ -86,7 +86,7 @@ class EpochConsistencyTest : public DBHandlerTestFixture { void setUpTestTableWithInconsistentEpochs(const std::string& db_name = {}) { sql("create table test_table(a int, b tinyint, c text encoding none, shard key(a)) " - "with (shard_count = 2);"); + "with (shard_count = 2, max_rollback_epochs = 25);"); sql("copy test_table from '" + getGoodFilePath() + "';"); assertTableEpochs({1, 1}); assertInitialImportResultSet(); @@ -430,7 +430,8 @@ TEST_P(EpochRollbackTest, Update) { // Ensure that a subsequent update query still works as expected sql("update test_table set b = b + 1 where b = 10 or b = 20;"); - assertTableEpochs({2, 3}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 4}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -458,7 +459,8 @@ TEST_P(EpochRollbackTest, VarlenUpdate) { // Ensure that a subsequent update query still works as expected sql("update test_table set b = 110, c = 'test_110' where b = 10;"); - assertTableEpochs({2, 3}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 4}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -485,7 +487,8 @@ TEST_P(EpochRollbackTest, Delete) { // Ensure that a delete query still works as expected sql("delete from test_table where b = 10 or b = 20;"); - assertTableEpochs({2, 3}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 4}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -559,7 +562,8 @@ TEST_F(EpochLevelingTest, Update) { assertTableEpochs({1, 1, 1, 1}); sql("update test_table set b = b + 1 where b = 1;"); - assertTableEpochs({2, 2, 2, 2}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -577,7 +581,8 @@ TEST_F(EpochLevelingTest, VarlenUpdate) { assertTableEpochs({1, 1, 1, 1}); sql("update test_table set b = 110, c = 'test_110' where b = 10;"); - assertTableEpochs({2, 2, 2, 2}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -593,7 +598,8 @@ TEST_F(EpochLevelingTest, UpdateQueryButDataNotChanged) { assertTableEpochs({1, 1, 1, 1}); sql("update test_table set b = b + 1 where b = 1000;"); - assertTableEpochs({2, 2, 2, 2}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); assertInitialImportResultSet(); } @@ -603,7 +609,8 @@ TEST_F(EpochLevelingTest, Delete) { assertTableEpochs({1, 1, 1, 1}); sql("delete from test_table where b = 10 or b = 20;"); - assertTableEpochs({2, 2, 2, 2}); + // 1 checkpoint for delete and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); // clang-format off sqlAndCompareResult("select * from test_table order by a, b;", @@ -612,6 +619,16 @@ TEST_F(EpochLevelingTest, Delete) { // clang-format on } +TEST_F(EpochLevelingTest, DeleteQueryButNoDataDeleted) { + sql("copy test_table from '" + getGoodFilePath() + "';"); + assertTableEpochs({1, 1, 1, 1}); + + sql("delete from test_table where b = 1000;"); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); + assertInitialImportResultSet(); +} + TEST_F(EpochLevelingTest, InsertTableAsSelect) { // Import data to be selected by the following query sql("copy test_table from '" + getGoodFilePath() + "';"); @@ -640,11 +657,12 @@ TEST_F(EpochLevelingTest, Optimize) { assertTableEpochs({1, 1, 1, 1}); sql("delete from test_table where mod(b, 2) = 0;"); - assertTableEpochs({2, 2, 2, 2}); + // 1 checkpoint for update and 1 checkpoint for automatic vacuum + assertTableEpochs({3, 3, 3, 3}); sql("optimize table test_table with (vacuum = 'true');"); // Vacuum does a checkpoint and metadata re-computation does another checkpoint - assertTableEpochs({4, 4, 4, 4}); + assertTableEpochs({5, 5, 5, 5}); // clang-format off // Assert subsequent query returns expected result diff --git a/Tests/ShowCommandsDdlTest.cpp b/Tests/ShowCommandsDdlTest.cpp index 192bd1fe04..794e2ffcc6 100644 --- a/Tests/ShowCommandsDdlTest.cpp +++ b/Tests/ShowCommandsDdlTest.cpp @@ -1648,10 +1648,10 @@ TEST_F(ShowTableDetailsTest, MaxRollbackEpochsUpdates) { for (int i = 0; i < 2; i++) { sql("update test_table_1 set c1 = c1 + 1 where c1 >= 10;"); } - assertMaxRollbackUpdateResult(15, 8, 5, 4, 0); + assertMaxRollbackUpdateResult(15, 8, 5, 6, 0); sql("alter table test_table_1 set max_rollback_epochs = 1;"); - assertMaxRollbackUpdateResult(1, 4, 4, 5, 3); + assertMaxRollbackUpdateResult(1, 3, 3, 7, 5); } TEST_F(ShowTableDetailsTest, CommandWithTableNames) { diff --git a/Tests/UpdateMetadataTest.cpp b/Tests/UpdateMetadataTest.cpp index 1554d8cf91..f20ea95129 100644 --- a/Tests/UpdateMetadataTest.cpp +++ b/Tests/UpdateMetadataTest.cpp @@ -1834,6 +1834,8 @@ int main(int argc, char** argv) { // that metadata is not automatically updated for other // tests that do and assert metadata updates. g_enable_auto_metadata_update = false; + g_vacuum_min_selectivity = 1.1; + QR::init(BASE_PATH); int err{0}; diff --git a/Tests/UpdelStorageTest.cpp b/Tests/UpdelStorageTest.cpp index 856557b2b9..dda18b04fb 100644 --- a/Tests/UpdelStorageTest.cpp +++ b/Tests/UpdelStorageTest.cpp @@ -1065,6 +1065,8 @@ TEST_F(UpdateStorageTest, Half_boolean_deleted_rollback) { class VarLenColumnUpdateTest : public ::testing::Test { protected: + static void SetUpTestSuite() { g_vacuum_min_selectivity = 1.1; } + void SetUp() override { run_ddl_statement("drop table if exists test_table;"); run_ddl_statement( diff --git a/ThriftHandler/CommandLineOptions.cpp b/ThriftHandler/CommandLineOptions.cpp index 87d25301e8..7fc4dae22c 100644 --- a/ThriftHandler/CommandLineOptions.cpp +++ b/ThriftHandler/CommandLineOptions.cpp @@ -721,6 +721,15 @@ void CommandLineOptions::fillAdvancedOptions() { po::value(&g_parallel_top_max)->default_value(g_parallel_top_max), "For ResultSets requiring a heap sort, the maximum number of rows allowed by " "watchdog."); + developer_desc.add_options()("vacuum-min-selectivity", + po::value(&g_vacuum_min_selectivity) + ->default_value(g_vacuum_min_selectivity), + "Minimum selectivity for automatic vacuuming. " + "This specifies the percentage (with a value of 0 " + "implying 0% and a value of 1 implying 100%) of " + "deleted rows in a fragment at which to perform " + "automatic vacuuming. A number greater than 1 can " + "be used to disable automatic vacuuming."); } namespace { @@ -901,6 +910,11 @@ void CommandLineOptions::validate() { addOptionalFileToBlacklist(system_parameters.ssl_key_file); addOptionalFileToBlacklist(system_parameters.ssl_trust_ca_file); addOptionalFileToBlacklist(cluster_file); + + if (g_vacuum_min_selectivity < 0) { + throw std::runtime_error{"vacuum-min-selectivity cannot be less than 0."}; + } + LOG(INFO) << "Vacuum Min Selectivity: " << g_vacuum_min_selectivity; } boost::optional CommandLineOptions::parse_command_line( diff --git a/ThriftHandler/CommandLineOptions.h b/ThriftHandler/CommandLineOptions.h index 6f529f0e10..718cd55a88 100644 --- a/ThriftHandler/CommandLineOptions.h +++ b/ThriftHandler/CommandLineOptions.h @@ -197,4 +197,5 @@ extern bool g_use_tbb_pool; extern bool g_enable_filter_function; extern size_t g_max_import_threads; extern bool g_enable_auto_metadata_update; +extern float g_vacuum_min_selectivity; extern bool g_read_only;