Skip to content

Commit

Permalink
fix disk cache empty checkpoint
Browse files Browse the repository at this point in the history
Enable writes of empty buffers in the disk cache for mutable tables.  Also reduce frequency of checkpoints for the disk cache in this scenario.
  • Loading branch information
misiugodfrey authored and andrewseidl committed Jul 6, 2021
1 parent 6d37f5d commit 7487ce1
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 27 deletions.
5 changes: 5 additions & 0 deletions DataMgr/FileMgr/CachingFileMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,12 @@ FileBuffer* CachingFileMgr::createBufferFromHeaders(
FileBuffer* CachingFileMgr::putBuffer(const ChunkKey& key,
AbstractBuffer* src_buffer,
const size_t num_bytes) {
CHECK(!src_buffer->isDirty()) << "Cannot cache dirty buffers.";
deleteBufferIfExists(key);
// Since the buffer is not dirty we mark it as dirty if we are only writing metadata and
// appended if we are writing chunk data. We delete + append rather than write to make
// sure we don't write multiple page versions.
(src_buffer->size() == 0) ? src_buffer->setDirty() : src_buffer->setAppended();
return FileMgr::putBuffer(key, src_buffer, num_bytes);
}

Expand Down
6 changes: 3 additions & 3 deletions DataMgr/FileMgr/FileMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ class FileMgr : public AbstractBufferMgr { // implements
**/
inline virtual bool failOnReadError() const { return true; }

// Used to describe the manager in logging and error messages.
virtual std::string describeSelf() const;

static constexpr size_t DEFAULT_NUM_PAGES_PER_DATA_FILE{256};
static constexpr size_t DEFAULT_NUM_PAGES_PER_METADATA_FILE{4096};

Expand Down Expand Up @@ -471,9 +474,6 @@ class FileMgr : public AbstractBufferMgr { // implements
// For testing purposes only
FileMgr(const int epoch);

// Used to describe the manager in logging and error messages.
virtual std::string describeSelf() const;

void closePhysicalUnlocked();
void syncFilesToDisk();
void freePages();
Expand Down
19 changes: 5 additions & 14 deletions DataMgr/ForeignStorage/ForeignStorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,11 @@ void ForeignStorageCache::deleteBufferIfExists(const ChunkKey& chunk_key) {
caching_file_mgr_->deleteBufferIfExists(chunk_key);
}

void ForeignStorageCache::cacheChunk(const ChunkKey& chunk_key, AbstractBuffer* buffer) {
// We should only be caching buffers that are in sync with storage.
CHECK(!buffer->isDirty());
if (buffer->size() == 0) {
// If we are writing an empty buffer, just delete it from the cache entirely.
deleteBufferIfExists(chunk_key);
} else {
// Replace the existing chunk with a new version.
buffer->setAppended();
caching_file_mgr_->putBuffer(chunk_key, buffer);
CHECK(!buffer->isDirty());
}
caching_file_mgr_->checkpoint(chunk_key[CHUNK_KEY_DB_IDX],
chunk_key[CHUNK_KEY_TABLE_IDX]);
void ForeignStorageCache::putBuffer(const ChunkKey& key,
AbstractBuffer* buf,
const size_t num_bytes) {
caching_file_mgr_->putBuffer(key, buf, num_bytes);
CHECK(!buf->isDirty());
}

void ForeignStorageCache::checkpoint(const int32_t db_id, const int32_t tb_id) {
Expand Down
2 changes: 1 addition & 1 deletion DataMgr/ForeignStorage/ForeignStorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ForeignStorageCache {
ForeignStorageCache(const File_Namespace::DiskCacheConfig& config);

void checkpoint(const int32_t db_id, const int32_t tb_id);
void cacheChunk(const ChunkKey&, AbstractBuffer*);
void putBuffer(const ChunkKey&, AbstractBuffer*, const size_t numBytes = 0);
File_Namespace::FileBuffer* getCachedChunkIfExists(const ChunkKey&);
bool isMetadataCached(const ChunkKey&) const;
void cacheMetadataVec(const ChunkMetadataVector&);
Expand Down
16 changes: 13 additions & 3 deletions DataMgr/PersistentStorageMgr/MutableCachePersistentStorageMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,28 @@ AbstractBuffer* MutableCachePersistentStorageMgr::putBuffer(const ChunkKey& chun
AbstractBuffer* source_buffer,
const size_t num_bytes) {
auto buf = PersistentStorageMgr::putBuffer(chunk_key, source_buffer, num_bytes);
disk_cache_->cacheChunk(chunk_key, source_buffer);
disk_cache_->putBuffer(chunk_key, source_buffer, num_bytes);
return buf;
}

void MutableCachePersistentStorageMgr::checkpoint() {
std::set<File_Namespace::TablePair> tables_to_checkpoint;
for (auto& key : cached_chunk_keys_) {
if (global_file_mgr_->getBuffer(key)->isDirty()) {
tables_to_checkpoint.emplace(get_table_prefix(key));
foreign_storage::ForeignStorageBuffer temp_buf;
global_file_mgr_->fetchBuffer(key, &temp_buf, 0);
disk_cache_->cacheChunk(key, &temp_buf);
disk_cache_->putBuffer(key, &temp_buf);
}
}
for (auto [db, tb] : tables_to_checkpoint) {
disk_cache_->checkpoint(db, tb);
}
PersistentStorageMgr::global_file_mgr_->checkpoint();
}

void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_id) {
bool need_checkpoint{false};
ChunkKey chunk_prefix{db_id, tb_id};
ChunkKey upper_prefix(chunk_prefix);
upper_prefix.push_back(std::numeric_limits<int>::max());
Expand All @@ -90,11 +96,15 @@ void MutableCachePersistentStorageMgr::checkpoint(const int db_id, const int tb_
chunk_key_it != end_it;
++chunk_key_it) {
if (global_file_mgr_->getBuffer(*chunk_key_it)->isDirty()) {
need_checkpoint = true;
foreign_storage::ForeignStorageBuffer temp_buf;
global_file_mgr_->fetchBuffer(*chunk_key_it, &temp_buf, 0);
disk_cache_->cacheChunk(*chunk_key_it, &temp_buf);
disk_cache_->putBuffer(*chunk_key_it, &temp_buf);
}
}
if (need_checkpoint) {
disk_cache_->checkpoint(db_id, tb_id);
}
PersistentStorageMgr::global_file_mgr_->checkpoint(db_id, tb_id);
}

Expand Down
2 changes: 1 addition & 1 deletion DataMgr/PersistentStorageMgr/PersistentStorageMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void PersistentStorageMgr::fetchBuffer(const ChunkKey& chunk_key,
if (!isForeignStorage(chunk_key)) {
// Foreign storage will read into cache buffers directly if enabled, so we do
// not want to cache foreign table chunks here as they will already be cached.
disk_cache_->cacheChunk(chunk_key, destination_buffer);
disk_cache_->putBuffer(chunk_key, destination_buffer, num_bytes);
}
return;
}
Expand Down
2 changes: 0 additions & 2 deletions QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,6 @@ const int8_t* ColumnFetcher::getOneTableColumnFragment(
chunk_iter_holder.push_back(chunk->begin_iterator(chunk_meta_it->second));
auto& chunk_iter = chunk_iter_holder.back();
if (memory_level == Data_Namespace::CPU_LEVEL) {
ChunkKey chunk_key{
cat.getCurrentDB().dbId, fragment.physicalTableId, col_id, fragment.fragmentId};
return reinterpret_cast<int8_t*>(&chunk_iter);
} else {
auto ab = chunk->getBuffer();
Expand Down
2 changes: 2 additions & 0 deletions Tests/CachingFileMgrTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class CachingFileMgrTest : public testing::Test {
auto [db, tb] = get_table_prefix(key);
TestHelpers::TestBuffer test_buf{
std::vector<int8_t>(page_data_size_ * num_pages, value)};
test_buf.clearDirtyBits();
cfm.putBuffer(key, &test_buf);
cfm.checkpoint(db, tb);
}
Expand All @@ -119,6 +120,7 @@ class CachingFileMgrTest : public testing::Test {
for (int32_t i = 0; i < num_pages; ++i) {
ChunkKey key{db, tb, 1, i};
TestHelpers::TestBuffer test_buf{small_buffer_};
test_buf.clearDirtyBits();
cfm.putBuffer(key, &test_buf);
}
cfm.checkpoint(db, tb);
Expand Down
6 changes: 3 additions & 3 deletions Tests/DBHandlerTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,12 @@ class DBHandlerTestFixture : public testing::Test {
desc.add_options()("cluster",
po::value<std::string>(&cluster_config_file_path_),
"Path to data leaves list JSON file.");
desc.add_options()("use-disk-cache",
po::value<bool>(&use_disk_cache_),
"Enable disk cache for all tables.");
desc.add_options()("use-disk-cache", "Enable disk cache for all tables.");
po::variables_map vm;
po::store(po::command_line_parser(argc, argv).options(desc).run(), vm);
po::notify(vm);

use_disk_cache_ = (vm.count("use-disk-cache"));
}

static void initTestArgs(const std::vector<LeafHostInfo>& string_servers,
Expand Down

0 comments on commit 7487ce1

Please sign in to comment.