Skip to content

Commit

Permalink
Created new CachingFileMgr to back the ForeignStorageCache
Browse files Browse the repository at this point in the history
  • Loading branch information
misiugodfrey authored and andrewseidl committed Mar 26, 2021
1 parent 88f1ed3 commit 0495c0c
Show file tree
Hide file tree
Showing 22 changed files with 574 additions and 223 deletions.
10 changes: 2 additions & 8 deletions Catalog/DdlCommandExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1490,14 +1490,8 @@ ExecutionResult ShowDiskCacheUsageCommand::execute() {
auto [td, td_with_lock] =
get_table_descriptor_with_lock<lockmgr::ReadLock>(*cat_ptr, table_name, false);

const auto mgr = dynamic_cast<File_Namespace::FileMgr*>(
disk_cache->getGlobalFileMgr()->findFileMgr(cat_ptr->getDatabaseId(),
td->tableId));

// NOTE: This size does not include datawrapper metadata that is on disk.
// If a mgr does not exist it means a cache is not enabled/created for the given
// table.
auto table_cache_size = mgr ? mgr->getTotalFileSize() : 0;
auto table_cache_size =
disk_cache->getSpaceReservedByTable(cat_ptr->getDatabaseId(), td->tableId);

// logical_values -> table data
logical_values.emplace_back(RelLogicalValues::RowValues{});
Expand Down
7 changes: 4 additions & 3 deletions DataMgr/AbstractBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@
} \
}

DEFINE_ENUM_WITH_STRING_CONVERSIONS(MgrType,
(FILE_MGR)(CPU_MGR)(GPU_MGR)(GLOBAL_FILE_MGR)(
PERSISTENT_STORAGE_MGR)(FOREIGN_STORAGE_MGR))
DEFINE_ENUM_WITH_STRING_CONVERSIONS(
MgrType,
(CACHING_FILE_MGR)(FILE_MGR)(CPU_MGR)(GPU_MGR)(GLOBAL_FILE_MGR)(
PERSISTENT_STORAGE_MGR)(FOREIGN_STORAGE_MGR))

namespace Data_Namespace {

Expand Down
1 change: 1 addition & 0 deletions DataMgr/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(datamgr_source_files
DataMgr.cpp
Encoder.cpp
StringNoneEncoder.cpp
FileMgr/CachingFileMgr.cpp
FileMgr/GlobalFileMgr.cpp
FileMgr/FileMgr.cpp
FileMgr/FileBuffer.cpp
Expand Down
159 changes: 159 additions & 0 deletions DataMgr/FileMgr/CachingFileMgr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright 2021 Omnisci, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @file CachingFileMgr.h
*/

#include "DataMgr/FileMgr/CachingFileMgr.h"
#include <boost/filesystem.hpp>
#include "Shared/File.h"

constexpr char EPOCH_FILENAME[] = "epoch_metadata";

namespace File_Namespace {
namespace bf = boost::filesystem;

CachingFileMgr::CachingFileMgr(const std::string& base_path,
const size_t num_reader_threads) {
fileMgrBasePath_ = base_path;
maxRollbackEpochs_ = 0;
defaultPageSize_ = DEFAULT_PAGE_SIZE;
nextFileId_ = 0;
init(num_reader_threads, -1);
}

bool CachingFileMgr::coreInit() {
mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
bf::path path(fileMgrBasePath_);
if (bf::exists(path)) {
if (!bf::is_directory(path)) {
LOG(FATAL) << "Specified path '" << fileMgrBasePath_
<< "' for disk cache is not a directory.";
}
migrateToLatestFileMgrVersion();
openAndReadEpochFile(EPOCH_FILENAME);
return true;
}
LOG(FATAL) << "Cache path: " << fileMgrBasePath_ << "does not exit.";
return false;
}

void CachingFileMgr::clearForTable(int db_id, int tb_id) {
{
mapd_unique_lock<mapd_shared_mutex> write_lock(chunkIndexMutex_);
for (auto it = chunkIndex_.begin(); it != chunkIndex_.end();) {
auto& [key, buffer] = *it;
if (in_same_table(key, {db_id, tb_id})) {
buffer->freePages();
delete buffer;
it = chunkIndex_.erase(it);
} else {
++it;
}
}
auto dir_name = getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
if (bf::exists(dir_name)) {
bf::remove_all(dir_name);
}
}
checkpoint();
// TODO(Misiu): Implement background file removal.
// Currently the renameForDelete idiom will only work in the mapd/ directory as the
// cleanup thread is targetted there. If we want it to work for arbitrary directories
// we will need to add a new dir to the thread, or start a second thread.
// File_Namespace::renameForDelete(get_dir_name_for_table(db_id, tb_id));
}

std::string CachingFileMgr::getOrAddTableDir(int db_id, int tb_id) {
mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
std::string table_dir =
getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
if (!bf::exists(table_dir)) {
bf::create_directory(table_dir);
} else {
if (!bf::is_directory(table_dir)) {
LOG(FATAL) << "Specified path '" << table_dir
<< "' for cache table data is not a directory.";
}
}
return table_dir;
}

void CachingFileMgr::closeRemovePhysical() {
mapd_unique_lock<mapd_shared_mutex> write_lock(files_rw_mutex_);
closePhysicalUnlocked();
auto dir_name = getFileMgrBasePath();
if (bf::exists(dir_name)) {
bf::remove_all(dir_name);
}

// TODO(Misiu): Implement background file removal.
// Currently the renameForDelete idiom will only work in the mapd/ directory as the
// cleanup thread is targetted there. If we want it to work for arbitrary directories
// we will need to add a new dir to the thread, or start a second thread.
// File_Namespace::renameForDelete(getFileMgrBasePath());
}

uint64_t CachingFileMgr::getChunkSpaceReservedByTable(int db_id, int tb_id) {
mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
uint64_t space_used = 0;
for (const auto& [key, buffer] : chunkIndex_) {
if (key[CHUNK_KEY_DB_IDX] == db_id && key[CHUNK_KEY_TABLE_IDX] == tb_id) {
space_used += buffer->reservedSize();
}
}
return space_used;
}

uint64_t CachingFileMgr::getMetadataSpaceReservedByTable(int db_id, int tb_id) {
mapd_shared_lock<mapd_shared_mutex> read_lock(chunkIndexMutex_);
uint64_t space_used = 0;
for (const auto& [key, buffer] : chunkIndex_) {
if (key[CHUNK_KEY_DB_IDX] == db_id && key[CHUNK_KEY_TABLE_IDX] == tb_id) {
space_used += (buffer->numMetadataPages() * METADATA_PAGE_SIZE);
}
}
return space_used;
}

uint64_t CachingFileMgr::getWrapperSpaceReservedByTable(int db_id, int tb_id) {
mapd_shared_lock<mapd_shared_mutex> read_lock(files_rw_mutex_);
uint64_t space_used = 0;
std::string table_dir =
getFileMgrBasePath() + "/" + get_dir_name_for_table(db_id, tb_id);
if (bf::exists(table_dir)) {
for (const auto& file : bf::recursive_directory_iterator(table_dir)) {
if (bf::is_regular_file(file.path())) {
space_used += bf::file_size(file.path());
}
}
}
return space_used;
}

uint64_t CachingFileMgr::getSpaceReservedByTable(int db_id, int tb_id) {
auto chunkSpace = getChunkSpaceReservedByTable(db_id, tb_id);
auto metaSpace = getMetadataSpaceReservedByTable(db_id, tb_id);
auto wrapperSpace = getWrapperSpaceReservedByTable(db_id, tb_id);
return chunkSpace + metaSpace + wrapperSpace;
}

std::string CachingFileMgr::describeSelf() {
return "cache";
}

} // namespace File_Namespace
108 changes: 108 additions & 0 deletions DataMgr/FileMgr/CachingFileMgr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2021 Omnisci, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* @file CachingFileMgr.h
*
* This file details an extension of the FileMgr that can contain pages from multiple
* tables (CachingFileMgr).
*/

#pragma once

#include "FileMgr.h"

namespace File_Namespace {

inline std::string get_dir_name_for_table(int db_id, int tb_id) {
std::stringstream file_name;
file_name << "table_" << db_id << "_" << tb_id << "/";
return file_name.str();
}

/**
* @class CachingFileMgr
* @brief A FileMgr capable of limiting it's size and storing data from multiple tables
* in a shared directory. For any table that supports DiskCaching, the CachingFileMgr
* must contain either metadata for all table chunks, or for none (the cache is either has
* no knowledge of that table, or has complete knowledge of that table). Any data chunk
* within a table may or may not be contained within the cache.
*/
class CachingFileMgr : public FileMgr {
public:
CachingFileMgr(const std::string& base_path, const size_t num_reader_threads = 0);
~CachingFileMgr() {}
/**
* @brief Determines file path, and if exists, runs file migration and opens and reads
* epoch file
* @return a boolean representing whether the directory path existed
*/
bool coreInit() override;

// Simple getters.
inline MgrType getMgrType() override { return CACHING_FILE_MGR; };
inline std::string getStringMgrType() override { return ToString(CACHING_FILE_MGR); }
inline size_t getDefaultPageSize() { return defaultPageSize_; }

// TODO(Misiu): These are unimplemented for now, but will become necessary when we want
// to limit the size.
inline size_t getMaxSize() override {
UNREACHABLE() << "Unimplemented";
return 0;
}
inline size_t getInUseSize() override {
UNREACHABLE() << "Unimplemented";
return 0;
}
inline size_t getAllocated() override {
UNREACHABLE() << "Unimplemented";
return 0;
}
inline bool isAllocationCapped() override { return false; }

/**
* @brief Removes all data related to the given table (pages and subdirectories).
*/
void clearForTable(int db_id, int tb_id);

/**
* @brief Returns (and optionally creates) a subdirectory for table-specific persistent
* data (e.g. serialized foreign data warppers).
*/
std::string getOrAddTableDir(int db_id, int tb_id);

/**
* @brief Query to determine if the contained pages will have their database and table
* ids overriden by the filemgr key (FileMgr does this).
*/
inline bool hasFileMgrKey() const override { return false; }
/**
* @breif Closes files and removes the caching directory.
*/
void closeRemovePhysical() override;

/**
* Set of functions to determine how much space is reserved in a table by type.
*/
uint64_t getChunkSpaceReservedByTable(int db_id, int tb_id);
uint64_t getMetadataSpaceReservedByTable(int db_id, int tb_id);
uint64_t getWrapperSpaceReservedByTable(int db_id, int tb_id);
uint64_t getSpaceReservedByTable(int db_id, int tb_id);

std::string describeSelf() override;
};

} // namespace File_Namespace
8 changes: 8 additions & 0 deletions DataMgr/FileMgr/FileBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,12 @@ void FileBuffer::write(int8_t* src,
CHECK(bytesLeft == 0);
}

std::string FileBuffer::dump() const {
std::stringstream ss;
ss << "chunk_key = " << show_chunk(chunkKey_) << "\n";
ss << "has_encoder = " << (hasEncoder() ? "true\n" : "false\n");
ss << "size_ = " << size_ << "\n";
return ss.str();
}

} // namespace File_Namespace
5 changes: 4 additions & 1 deletion DataMgr/FileMgr/FileBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ using namespace Data_Namespace;

namespace File_Namespace {

class FileMgr; // forward declaration
// forward declarations
class FileMgr;

/**
* @class FileBuffer
Expand Down Expand Up @@ -148,6 +149,8 @@ class FileBuffer : public AbstractBuffer {

inline size_t numMetadataPages() const { return metadataPages_.pageVersions.size(); };

std::string dump() const;

private:
// FileBuffer(const FileBuffer&); // private copy constructor
// FileBuffer& operator=(const FileBuffer&); // private overloaded assignment operator
Expand Down
10 changes: 7 additions & 3 deletions DataMgr/FileMgr/FileInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ void FileInfo::openExistingFile(std::vector<HeaderInfo>& headerVec,
// We don't want to read headerSize in our header - so start
// reading 4 bytes past it

// always derive dbid/tbid from FileMgr
// Derive dbid/tbid if from FileMgr
ChunkKey chunkKey(&ints[1], &ints[1 + numHeaderElems - 2]);
chunkKey[0] = fileMgr->get_fileMgrKey().first;
chunkKey[1] = fileMgr->get_fileMgrKey().second;
if (fileMgr->hasFileMgrKey()) {
// A regular FileMgr is locked to one table, but a CachingFileMgr can contain
// chunks from different tables
chunkKey[0] = fileMgr->get_fileMgrKey().first;
chunkKey[1] = fileMgr->get_fileMgrKey().second;
}
// recover page in case a crash failed deletion of this page
if (!g_read_only) {
if (ints[1] == DELETE_CONTINGENT || ints[1] == ROLLOFF_CONTINGENT) {
Expand Down
Loading

0 comments on commit 0495c0c

Please sign in to comment.