Skip to content

Commit

Permalink
Implement Opportunistic Vacuuming
Browse files Browse the repository at this point in the history
* Conditionally vacuum fragments after deletes or updates of variable
length columns based on configured minimum deleted fragment rows
selectivity.

* Skip automatic vacuuming for tables without epoch cap.

* Fix bugs related to chunk metadata update when vacuuming.

* Remove FileBuffer pointer caching in MutableCachePersistentStorageMgr.

* Fix use of uninitialized FileInfo::isDirty variable.

* Update tests to reflect additional checkpoint from automatic
vacuuming.
  • Loading branch information
Paul Aiyedun authored and andrewseidl committed Apr 6, 2021
1 parent cdb0dff commit 65743aa
Show file tree
Hide file tree
Showing 28 changed files with 1,027 additions and 213 deletions.
48 changes: 1 addition & 47 deletions Catalog/Catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3954,7 +3954,7 @@ void Catalog::checkpoint(const int logicalTableId) const {
}
}

void Catalog::checkpointWithAutoRollback(const int logical_table_id) {
void Catalog::checkpointWithAutoRollback(const int logical_table_id) const {
auto table_epochs = getTableEpochs(getDatabaseId(), logical_table_id);
try {
checkpoint(logical_table_id);
Expand Down Expand Up @@ -4012,52 +4012,6 @@ std::string Catalog::generatePhysicalTableName(const std::string& logicalTableNa
return (physicalTableName);
}

void Catalog::vacuumDeletedRows(const int logicalTableId) const {
// shard here to serve request from TableOptimizer and elsewhere
const auto td = getMetadataForTable(logicalTableId);
const auto shards = getPhysicalTablesDescriptors(td);
for (const auto shard : shards) {
vacuumDeletedRows(shard);
}
}

void Catalog::vacuumDeletedRows(const TableDescriptor* td) const {
// "if not a table that supports delete return nullptr, nothing more to do"
const ColumnDescriptor* cd = getDeletedColumn(td);
if (nullptr == cd) {
return;
}
// vacuum chunks which show sign of deleted rows in metadata
ChunkKey chunkKeyPrefix = {currentDB_.dbId, td->tableId, cd->columnId};
ChunkMetadataVector chunkMetadataVec;
dataMgr_->getChunkMetadataVecForKeyPrefix(chunkMetadataVec, chunkKeyPrefix);
for (auto cm : chunkMetadataVec) {
// "delete has occured"
if (cm.second->chunkStats.max.tinyintval == 1) {
UpdelRoll updel_roll;
updel_roll.catalog = this;
updel_roll.logicalTableId = getLogicalTableId(td->tableId);
updel_roll.memoryLevel = Data_Namespace::MemoryLevel::CPU_LEVEL;
updel_roll.table_descriptor = td;
const auto cd = getMetadataForColumn(td->tableId, cm.first[2]);
const auto chunk = Chunk_NS::Chunk::getChunk(cd,
&getDataMgr(),
cm.first,
updel_roll.memoryLevel,
0,
cm.second->numBytes,
cm.second->numElements);
td->fragmenter->compactRows(this,
td,
cm.first[3],
td->fragmenter->getVacuumOffsets(chunk),
updel_roll.memoryLevel,
updel_roll);
updel_roll.stageUpdate();
}
}
}

void Catalog::buildForeignServerMap() {
sqliteConnector_.query(
"SELECT id, name, data_wrapper_type, options, owner_user_id, creation_time FROM "
Expand Down
4 changes: 1 addition & 3 deletions Catalog/Catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,10 @@ class Catalog final {
void setDeletedColumnUnlocked(const TableDescriptor* td, const ColumnDescriptor* cd);
int getLogicalTableId(const int physicalTableId) const;
void checkpoint(const int logicalTableId) const;
void checkpointWithAutoRollback(const int logical_table_id);
void checkpointWithAutoRollback(const int logical_table_id) const;
std::string name() const { return getCurrentDB().dbName; }
void eraseDBData();
void eraseTablePhysicalData(const TableDescriptor* td);
void vacuumDeletedRows(const TableDescriptor* td) const;
void vacuumDeletedRows(const int logicalTableId) const;
void setForReload(const int32_t tableId);

std::vector<std::string> getTableDataDirectories(const TableDescriptor* td) const;
Expand Down
5 changes: 5 additions & 0 deletions DataMgr/ArrayNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ class ArrayNoneEncoder : public Encoder {
return true;
}

void resetChunkStats() override {
has_nulls = false;
initialized = false;
}

Datum elem_min;
Datum elem_max;
bool has_nulls;
Expand Down
12 changes: 6 additions & 6 deletions DataMgr/DateDaysEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,17 @@ class DateDaysEncoder : public Encoder {
return true;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
void resetChunkStats() {
void resetChunkStats() override {
dataMin = std::numeric_limits<T>::max();
dataMax = std::numeric_limits<T>::lowest();
has_nulls = false;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
V encodeDataAndUpdateStats(const T& unencoded_data) {
V encoded_data;
if (unencoded_data == std::numeric_limits<V>::min()) {
Expand Down
5 changes: 5 additions & 0 deletions DataMgr/Encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ class Encoder {
return false;
}

/**
* Resets chunk metadata stats to their default values.
*/
virtual void resetChunkStats() = 0;

size_t getNumElems() const { return num_elems_; }
void setNumElems(const size_t num_elems) { num_elems_ = num_elems; }

Expand Down
2 changes: 1 addition & 1 deletion DataMgr/FileMgr/FileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct FileInfo {
FILE* f; /// file stream object for the represented file
size_t pageSize; /// the fixed size of each page in the file
size_t numPages; /// the number of pages in the file
bool isDirty; // True if writes have occured since last sync
bool isDirty{false}; // True if writes have occured since last sync
std::set<size_t> freePages; /// set of page numbers of free pages
std::mutex freePagesMutex_;
std::mutex readWriteMutex_;
Expand Down
5 changes: 5 additions & 0 deletions DataMgr/FixedLengthArrayNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ class FixedLengthArrayNoneEncoder : public Encoder {
return true;
}

void resetChunkStats() override {
has_nulls = false;
initialized = false;
}

Datum elem_min;
Datum elem_max;
bool has_nulls;
Expand Down
12 changes: 6 additions & 6 deletions DataMgr/FixedLengthEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,17 @@ class FixedLengthEncoder : public Encoder {
return true;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
void resetChunkStats() {
void resetChunkStats() override {
dataMin = std::numeric_limits<T>::max();
dataMax = std::numeric_limits<T>::lowest();
has_nulls = false;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
V encodeDataAndUpdateStats(const T& unencoded_data) {
V encoded_data = static_cast<V>(unencoded_data);
if (unencoded_data != encoded_data) {
Expand Down
12 changes: 6 additions & 6 deletions DataMgr/NoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,17 @@ class NoneEncoder : public Encoder {
has_nulls = castedEncoder->has_nulls;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
void resetChunkStats() {
void resetChunkStats() override {
dataMin = std::numeric_limits<T>::max();
dataMax = std::numeric_limits<T>::lowest();
has_nulls = false;
}

T dataMin;
T dataMax;
bool has_nulls;

private:
T validateDataAndUpdateStats(const T& unencoded_data) {
if (unencoded_data == none_encoded_null_value<T>()) {
has_nulls = true;
Expand Down
36 changes: 19 additions & 17 deletions DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ AbstractBuffer* MutableCachePersistentStorageMgr::createBuffer(
const size_t initial_size) {
auto buf = PersistentStorageMgr::createBuffer(chunk_key, page_size, initial_size);
if (isChunkPrefixCacheable(chunk_key)) {
cached_buffer_map_.emplace(chunk_key, buf);
cached_chunk_keys_.emplace(chunk_key);
}
return buf;
}
Expand All @@ -42,7 +42,7 @@ void MutableCachePersistentStorageMgr::deleteBuffer(const ChunkKey& chunk_key,
// not be deleting buffers for them.
CHECK(!isForeignStorage(chunk_key));
disk_cache_->deleteBufferIfExists(chunk_key);
cached_buffer_map_.erase(chunk_key);
cached_chunk_keys_.erase(chunk_key);
PersistentStorageMgr::deleteBuffer(chunk_key, purge);
}

Expand All @@ -54,10 +54,10 @@ void MutableCachePersistentStorageMgr::deleteBuffersWithPrefix(

ChunkKey upper_prefix(chunk_key_prefix);
upper_prefix.push_back(std::numeric_limits<int>::max());
auto end_it = cached_buffer_map_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_it = cached_buffer_map_.lower_bound(chunk_key_prefix);
chunk_it != end_it;) {
chunk_it = cached_buffer_map_.erase(chunk_it);
auto end_it = cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_key_prefix);
chunk_key_it != end_it;) {
chunk_key_it = cached_chunk_keys_.erase(chunk_key_it);
}
PersistentStorageMgr::deleteBuffersWithPrefix(chunk_key_prefix, purge);
}
Expand All @@ -71,8 +71,8 @@ AbstractBuffer* MutableCachePersistentStorageMgr::putBuffer(const ChunkKey& chun
}

void MutableCachePersistentStorageMgr::checkpoint() {
for (auto& [key, buf] : cached_buffer_map_) {
if (buf->isDirty()) {
for (auto& key : cached_chunk_keys_) {
if (global_file_mgr_->getBuffer(key)->isDirty()) {
foreign_storage::ForeignStorageBuffer temp_buf;
global_file_mgr_->fetchBuffer(key, &temp_buf, 0);
disk_cache_->cacheChunk(key, &temp_buf);
Expand All @@ -85,13 +85,14 @@ void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_
ChunkKey chunk_prefix{db_id, tb_id};
ChunkKey upper_prefix(chunk_prefix);
upper_prefix.push_back(std::numeric_limits<int>::max());
auto end_it = cached_buffer_map_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_it = cached_buffer_map_.lower_bound(chunk_prefix); chunk_it != end_it;
++chunk_it) {
if (chunk_it->second->isDirty()) {
auto end_it = cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(chunk_prefix);
chunk_key_it != end_it;
++chunk_key_it) {
if (global_file_mgr_->getBuffer(*chunk_key_it)->isDirty()) {
foreign_storage::ForeignStorageBuffer temp_buf;
global_file_mgr_->fetchBuffer(chunk_it->first, &temp_buf, 0);
disk_cache_->cacheChunk(chunk_it->first, &temp_buf);
global_file_mgr_->fetchBuffer(*chunk_key_it, &temp_buf, 0);
disk_cache_->cacheChunk(*chunk_key_it, &temp_buf);
}
}
PersistentStorageMgr::global_file_mgr_->checkpoint(db_id, tb_id);
Expand All @@ -103,8 +104,9 @@ void MutableCachePersistentStorageMgr::removeTableRelatedDS(const int db_id,
const ChunkKey table_key{db_id, table_id};
ChunkKey upper_prefix(table_key);
upper_prefix.push_back(std::numeric_limits<int>::max());
auto end_it = cached_buffer_map_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_it = cached_buffer_map_.lower_bound(table_key); chunk_it != end_it;) {
chunk_it = cached_buffer_map_.erase(chunk_it);
auto end_it = cached_chunk_keys_.upper_bound(static_cast<const ChunkKey>(upper_prefix));
for (auto&& chunk_key_it = cached_chunk_keys_.lower_bound(table_key);
chunk_key_it != end_it;) {
chunk_key_it = cached_chunk_keys_.erase(chunk_key_it);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ class MutableCachePersistentStorageMgr : public PersistentStorageMgr {
void removeTableRelatedDS(const int db_id, const int table_id) override;

private:
std::map<const ChunkKey, AbstractBuffer*> cached_buffer_map_;
std::set<ChunkKey> cached_chunk_keys_;
};
2 changes: 2 additions & 0 deletions DataMgr/StringNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class StringNoneEncoder : public Encoder {
return true;
}

void resetChunkStats() override { has_nulls = false; }

private:
AbstractBuffer* index_buf;
StringOffsetT last_offset;
Expand Down
16 changes: 14 additions & 2 deletions Fragmenter/UpdelStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,9 @@ static void set_chunk_metadata(const Catalog_Namespace::Catalog* catalog,
chunkMetadata[cd->columnId]->numBytes = data_buffer->size();
if (updel_roll.dirtyChunks.count(chunk.get()) == 0) {
updel_roll.dirtyChunks.emplace(chunk.get(), chunk);
ChunkKey chunk_key{
catalog->getDatabaseId(), cd->tableId, cd->columnId, fragment.fragmentId};
updel_roll.dirtyChunkeys.emplace(chunk_key);
}
}

Expand Down Expand Up @@ -1347,6 +1350,7 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo
auto daddr = data_addr;
auto element_size = col_type.is_fixlen_array() ? col_type.get_size()
: get_element_size(col_type);
data_buffer->getEncoder()->resetChunkStats();
for (size_t irow = 0; irow < nrows_to_keep; ++irow, daddr += element_size) {
if (col_type.is_fixlen_array()) {
auto encoder =
Expand All @@ -1355,13 +1359,13 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo
encoder->updateMetadata((int8_t*)daddr);
} else if (col_type.is_fp()) {
set_chunk_stats(col_type,
data_addr,
daddr,
update_stats_per_thread[ci].new_values_stats.has_null,
update_stats_per_thread[ci].new_values_stats.min_double,
update_stats_per_thread[ci].new_values_stats.max_double);
} else {
set_chunk_stats(col_type,
data_addr,
daddr,
update_stats_per_thread[ci].new_values_stats.has_null,
update_stats_per_thread[ci].new_values_stats.min_int64t,
update_stats_per_thread[ci].new_values_stats.max_int64t);
Expand Down Expand Up @@ -1406,6 +1410,14 @@ void InsertOrderFragmenter::compactRows(const Catalog_Namespace::Catalog* catalo
auto chunk = chunks[ci];
auto cd = chunk->getColumnDesc();
if (!cd->columnType.is_fixlen_array()) {
// For DATE_IN_DAYS encoded columns, data is stored in days but the metadata is
// stored in seconds. Do the metadata conversion here before updating the chunk
// stats.
if (cd->columnType.is_date_in_days()) {
auto& stats = update_stats_per_thread[ci].new_values_stats;
stats.min_int64t = DateConverters::get_epoch_seconds_from_days(stats.min_int64t);
stats.max_int64t = DateConverters::get_epoch_seconds_from_days(stats.max_int64t);
}
updateColumnMetadata(cd,
fragment,
chunk,
Expand Down
27 changes: 16 additions & 11 deletions QueryEngine/Execute.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,13 @@ class SringConstInResultSet : public std::runtime_error {
class ExtensionFunction;

using RowDataProvider = Fragmenter_Namespace::RowDataProvider;
using ColumnToFragmentsMap = std::map<const ColumnDescriptor*, std::set<int>>;
using ColumnToFragmentsMap = std::map<const ColumnDescriptor*, std::set<int32_t>>;
using TableToFragmentIds = std::map<int32_t, std::set<int32_t>>;

struct TableUpdateMetadata {
ColumnToFragmentsMap columns_for_metadata_update;
TableToFragmentIds fragments_with_deleted_rows;
};

class UpdateLogForFragment : public RowDataProvider {
public:
Expand Down Expand Up @@ -334,8 +340,7 @@ class UpdateLogForFragment : public RowDataProvider {

SQLTypeInfo getColumnType(const size_t col_idx) const;

using Callback =
std::function<void(const UpdateLogForFragment&, ColumnToFragmentsMap&)>;
using Callback = std::function<void(const UpdateLogForFragment&, TableUpdateMetadata&)>;

auto getResultSet() const { return rs_; }

Expand Down Expand Up @@ -464,14 +469,14 @@ class Executor {
const bool has_cardinality_estimation,
ColumnCacheMap& column_cache);

void executeUpdate(const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& table_infos,
const CompilationOptions& co,
const ExecutionOptions& eo,
const Catalog_Namespace::Catalog& cat,
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const UpdateLogForFragment::Callback& cb,
const bool is_agg);
TableUpdateMetadata executeUpdate(const RelAlgExecutionUnit& ra_exe_unit,
const std::vector<InputTableInfo>& table_infos,
const CompilationOptions& co,
const ExecutionOptions& eo,
const Catalog_Namespace::Catalog& cat,
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const UpdateLogForFragment::Callback& cb,
const bool is_agg);

private:
void clearMetaInfoCache();
Expand Down
Loading

0 comments on commit 65743aa

Please sign in to comment.