diff --git a/DataMgr/FileMgr/CachingFileMgr.cpp b/DataMgr/FileMgr/CachingFileMgr.cpp index 47d638b26e..6a400cd298 100644 --- a/DataMgr/FileMgr/CachingFileMgr.cpp +++ b/DataMgr/FileMgr/CachingFileMgr.cpp @@ -280,7 +280,12 @@ FileBuffer* CachingFileMgr::createBufferFromHeaders( FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key, AbstractBuffer* src_buffer, const size_t num_bytes) { + CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers."; deleteBufferIfExists(key); + // Since the buffer is not dirty we mark it as dirty if we are only writing metadata and + // appended if we are writing chunk data. We delete + append rather than write to make + // sure we don't write multiple page versions. + (src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended(); return FileMgr::putBuffer(key, src_buffer, num_bytes); } diff --git a/DataMgr/FileMgr/FileMgr.h b/DataMgr/FileMgr/FileMgr.h index e79df1100f..d948ac20e4 100644 --- a/DataMgr/FileMgr/FileMgr.h +++ b/DataMgr/FileMgr/FileMgr.h @@ -357,6 +357,9 @@ class FileMgr : public AbstractBufferMgr { // implements **/ inline virtual bool failOnReadError() const { return true; } + // Used to describe the manager in logging and error messages. + virtual std::string describeSelf() const; + static constexpr size_t DEFAULT_NUM_PAGES_PER_DATA_FILE{256}; static constexpr size_t DEFAULT_NUM_PAGES_PER_METADATA_FILE{4096}; @@ -471,9 +474,6 @@ class FileMgr : public AbstractBufferMgr { // implements // For testing purposes only FileMgr(const int epoch); - // Used to describe the manager in logging and error messages. - virtual std::string describeSelf() const; - void closePhysicalUnlocked(); void syncFilesToDisk(); void freePages(); diff --git a/DataMgr/ForeignStorage/ForeignStorageCache.cpp b/DataMgr/ForeignStorage/ForeignStorageCache.cpp index b2b9e78077..b7d0ea3bb9 100644 --- a/DataMgr/ForeignStorage/ForeignStorageCache.cpp +++ b/DataMgr/ForeignStorage/ForeignStorageCache.cpp @@ -60,20 +60,11 @@ void ForeignStorageCache::deleteBufferIfExists(const ChunkKey& chunk_key) { caching_file_mgr_->deleteBufferIfExists(chunk_key); } -void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) { - // We should only be caching buffers that are in sync with storage. - CHECK(!buffer->isDirty()); - if (buffer->size() == 0) { - // If we are writing an empty buffer, just delete it from the cache entirely. - deleteBufferIfExists(chunk_key); - } else { - // Replace the existing chunk with a new version. - buffer->setAppended(); - caching_file_mgr_->putBuffer(chunk_key, buffer); - CHECK(!buffer->isDirty()); - } - caching_file_mgr_->checkpoint(chunk_key[CHUNK_KEY_DB_IDX], - chunk_key[CHUNK_KEY_TABLE_IDX]); +void ForeignStorageCache::putBuffer(const ChunkKey& key, + AbstractBuffer* buf, + const size_t num_bytes) { + caching_file_mgr_->putBuffer(key, buf, num_bytes); + CHECK(!buf->isDirty()); } void ForeignStorageCache::checkpoint(const int32_t db_id, const int32_t tb_id) { diff --git a/DataMgr/ForeignStorage/ForeignStorageCache.h b/DataMgr/ForeignStorage/ForeignStorageCache.h index 0c9a0d16fc..13f27df28c 100644 --- a/DataMgr/ForeignStorage/ForeignStorageCache.h +++ b/DataMgr/ForeignStorage/ForeignStorageCache.h @@ -44,7 +44,7 @@ class ForeignStorageCache { ForeignStorageCache(const File_Namespace::DiskCacheConfig& config); void checkpoint(const int32_t db_id, const int32_t tb_id); - void cacheChunk(const ChunkKey&, AbstractBuffer*); + void putBuffer(const ChunkKey&, AbstractBuffer*, const size_t numBytes = 0); File_Namespace::FileBuffer* getCachedChunkIfExists(const ChunkKey&); bool isMetadataCached(const ChunkKey&) const; void cacheMetadataVec(const ChunkMetadataVector&); diff --git a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp index ed84239472..dd4df992ca 100644 --- a/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp +++ b/DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp @@ -66,22 +66,28 @@ AbstractBuffer* MutableCachePersistentStorageMgr::putBuffer(const ChunkKey& chun AbstractBuffer* source_buffer, const size_t num_bytes) { auto buf = PersistentStorageMgr::putBuffer(chunk_key, source_buffer, num_bytes); - disk_cache_->cacheChunk(chunk_key, source_buffer); + disk_cache_->putBuffer(chunk_key, source_buffer, num_bytes); return buf; } void MutableCachePersistentStorageMgr::checkpoint() { + std::set tables_to_checkpoint; for (auto& key : cached_chunk_keys_) { if (global_file_mgr_->getBuffer(key)->isDirty()) { + tables_to_checkpoint.emplace(get_table_prefix(key)); foreign_storage::ForeignStorageBuffer temp_buf; global_file_mgr_->fetchBuffer(key, &temp_buf, 0); - disk_cache_->cacheChunk(key, &temp_buf); + disk_cache_->putBuffer(key, &temp_buf); } } + for (auto [db, tb] : tables_to_checkpoint) { + disk_cache_->checkpoint(db, tb); + } PersistentStorageMgr::global_file_mgr_->checkpoint(); } void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_id) { + bool need_checkpoint{false}; ChunkKey chunk_prefix{db_id, tb_id}; ChunkKey upper_prefix(chunk_prefix); upper_prefix.push_back(std::numeric_limits::max()); @@ -90,11 +96,15 @@ void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_ chunk_key_it != end_it; ++chunk_key_it) { if (global_file_mgr_->getBuffer(*chunk_key_it)->isDirty()) { + need_checkpoint = true; foreign_storage::ForeignStorageBuffer temp_buf; global_file_mgr_->fetchBuffer(*chunk_key_it, &temp_buf, 0); - disk_cache_->cacheChunk(*chunk_key_it, &temp_buf); + disk_cache_->putBuffer(*chunk_key_it, &temp_buf); } } + if (need_checkpoint) { + disk_cache_->checkpoint(db_id, tb_id); + } PersistentStorageMgr::global_file_mgr_->checkpoint(db_id, tb_id); } diff --git a/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp b/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp index 4c9ec46153..3635fdd95b 100644 --- a/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp +++ b/DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp @@ -89,7 +89,7 @@ void PersistentStorageMgr::fetchBuffer(const ChunkKey& chunk_key, if (!isForeignStorage(chunk_key)) { // Foreign storage will read into cache buffers directly if enabled, so we do // not want to cache foreign table chunks here as they will already be cached. - disk_cache_->cacheChunk(chunk_key, destination_buffer); + disk_cache_->putBuffer(chunk_key, destination_buffer, num_bytes); } return; } diff --git a/QueryEngine/ColumnFetcher.cpp b/QueryEngine/ColumnFetcher.cpp index 009f7a96d3..3a90055129 100644 --- a/QueryEngine/ColumnFetcher.cpp +++ b/QueryEngine/ColumnFetcher.cpp @@ -259,8 +259,6 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment( chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second)); auto& chunk_iter = chunk_iter_holder.back(); if (memory_level == Data_Namespace::CPU_LEVEL) { - ChunkKey chunk_key{ - cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId}; return reinterpret_cast(&chunk_iter); } else { auto ab = chunk->getBuffer(); diff --git a/Tests/CachingFileMgrTest.cpp b/Tests/CachingFileMgrTest.cpp index 21ad31138a..26d972a0c0 100644 --- a/Tests/CachingFileMgrTest.cpp +++ b/Tests/CachingFileMgrTest.cpp @@ -108,6 +108,7 @@ class CachingFileMgrTest : public testing::Test { auto [db, tb] = get_table_prefix(key); TestHelpers::TestBuffer test_buf{ std::vector(page_data_size_ * num_pages, value)}; + test_buf.clearDirtyBits(); cfm.putBuffer(key, &test_buf); cfm.checkpoint(db, tb); } @@ -119,6 +120,7 @@ class CachingFileMgrTest : public testing::Test { for (int32_t i = 0; i < num_pages; ++i) { ChunkKey key{db, tb, 1, i}; TestHelpers::TestBuffer test_buf{small_buffer_}; + test_buf.clearDirtyBits(); cfm.putBuffer(key, &test_buf); } cfm.checkpoint(db, tb); diff --git a/Tests/DBHandlerTestHelpers.h b/Tests/DBHandlerTestHelpers.h index cd18f22b9b..c46829cf34 100644 --- a/Tests/DBHandlerTestHelpers.h +++ b/Tests/DBHandlerTestHelpers.h @@ -186,12 +186,12 @@ class DBHandlerTestFixture : public testing::Test { desc.add_options()("cluster", po::value(&cluster_config_file_path_), "Path to data leaves list JSON file."); - desc.add_options()("use-disk-cache", - po::value(&use_disk_cache_), - "Enable disk cache for all tables."); + desc.add_options()("use-disk-cache", "Enable disk cache for all tables."); po::variables_map vm; po::store(po::command_line_parser(argc, argv).options(desc).run(), vm); po::notify(vm); + + use_disk_cache_ = (vm.count("use-disk-cache")); } static void initTestArgs(const std::vector& string_servers,