Skip to content

Commit

Permalink
add prioritized cache wrapper
Browse files Browse the repository at this point in the history
Signed-off-by: haoxiang47 <[email protected]>
  • Loading branch information
haoxiang47 committed Mar 24, 2020
1 parent 4dc4ba8 commit 199513e
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
titan_db_test
titan_options_test
util_test
version_test)
version_test
prioritized_cache_test)
set(TEST_LIBS
titan
rocksdb
Expand Down
117 changes: 117 additions & 0 deletions include/titan/prioritized_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#pragma once
#include "rocksdb/cache.h"

namespace rocksdb {
namespace titandb {

class PrioritizedCache : public Cache {
public:
// Constructs a PrioritizedCache, which is a wrapper
// of rocksdb Cache
PrioritizedCache(std::shared_ptr<Cache> cache, Cache::Priority priority);
~PrioritizedCache();

// Get the Cache ptr
std::shared_ptr<Cache> GetCache();

// always insert into the cache with the priority when init,
// regardless of user provided option
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority /*priority*/) override;

// The type of the Cache
const char* Name() const override;

// If the cache has no mapping for "key", returns nullptr.
//
// Else return a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
// If stats is not nullptr, relative tickers could be used inside the
// function.
Cache::Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override;

// Increments the reference count for the handle if it refers to an entry in
// the cache. Returns true if refcount was incremented; otherwise, returns
// false.
// REQUIRES: handle must have been returned by a method on *this.
bool Ref(Cache::Handle* handle) override;

/**
* Release a mapping returned by a previous Lookup(). A released entry might
* still remain in cache in case it is later looked up by others. If
* force_erase is set then it also erase it from the cache if there is no
* other reference to it. Erasing it should call the deleter function that
* was provided when the
* entry was inserted.
*
* Returns true if the entry was also erased.
*/
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
bool Release(Cache::Handle* handle, bool force_erase = false) override;

// Return the value encapsulated in a handle returned by a
// successful Lookup().
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
void* Value(Cache::Handle* handle) override;

// If the cache contains entry for key, erase it. Note that the
// underlying entry will be kept around until all existing handles
// to it have been released.
void Erase(const Slice& key) override;

// Return a new numeric id. May be used by multiple clients who are
// sharding the same cache to partition the key space. Typically the
// client will allocate a new id at startup and prepend the id to
// its cache keys.
uint64_t NewId() override;

// sets the maximum configured capacity of the cache. When the new
// capacity is less than the old capacity and the existing usage is
// greater than new capacity, the implementation will do its best job to
// purge the released entries from the cache in order to lower the usage
void SetCapacity(size_t capacity) override;

// Set whether to return error on insertion when cache reaches its full
// capacity.
void SetStrictCapacityLimit(bool strict_capacity_limit) override;

// Get the flag whether to return error on insertion when cache reaches its
// full capacity.
bool HasStrictCapacityLimit() const override;

// returns the maximum configured capacity of the cache
size_t GetCapacity() const override;

// returns the memory size for the entries residing in the cache.
size_t GetUsage() const override;

// returns the memory size for a specific entry in the cache.
size_t GetUsage(Cache::Handle* handle) const override;

// returns the memory size for the entries in use by the system
size_t GetPinnedUsage() const override;

// returns the charge for the specific entry in the cache.
size_t GetCharge(Cache::Handle* handle) const override;

// Apply callback to all entries in the cache
// If thread_safe is true, it will also lock the accesses. Otherwise, it will
// access the cache without the lock held
void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override;

// Remove all entries.
// Prerequisite: no entry is referenced.
void EraseUnRefEntries() override;

private:
std::shared_ptr<Cache> cache_;
Cache::Priority priority_;
};

} // namespace titandb
} // namespace rocksdb
78 changes: 78 additions & 0 deletions src/prioritized_cache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "titan/prioritized_cache.h"

namespace rocksdb {
namespace titandb {

PrioritizedCache::PrioritizedCache(std::shared_ptr<Cache> cache,
Cache::Priority priority)
: cache_(cache), priority_(priority) {}

PrioritizedCache::~PrioritizedCache() {}

std::shared_ptr<Cache> PrioritizedCache::GetCache() { return cache_; }

Status PrioritizedCache::Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle,
Cache::Priority /*priority*/) {
return cache_->Insert(key, value, charge, deleter, handle, priority_);
}

const char* PrioritizedCache::Name() const { return "PrioritizedCache"; }

Cache::Handle* PrioritizedCache::Lookup(const Slice& key,
Statistics* stats) {
return cache_->Lookup(key, stats);
}

bool PrioritizedCache::Ref(Cache::Handle* handle) { return cache_->Ref(handle); }

bool PrioritizedCache::Release(Cache::Handle* handle, bool force_erase) {
return cache_->Release(handle, force_erase);
}

void* PrioritizedCache::Value(Cache::Handle* handle) { return cache_->Value(handle); }

void PrioritizedCache::Erase(const Slice& key) { cache_->Erase(key); }

uint64_t PrioritizedCache::NewId() { return cache_->NewId(); }

void PrioritizedCache::SetCapacity(size_t capacity) {
cache_->SetCapacity(capacity);
}

void PrioritizedCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}

bool PrioritizedCache::HasStrictCapacityLimit() const {
return cache_->HasStrictCapacityLimit();
}

size_t PrioritizedCache::GetCapacity() const { return cache_->GetCapacity(); }

size_t PrioritizedCache::GetUsage() const { return cache_->GetUsage(); }

size_t PrioritizedCache::GetUsage(Cache::Handle* handle) const {
return cache_->GetUsage();
}

size_t PrioritizedCache::GetPinnedUsage() const {
return cache_->GetPinnedUsage();
}

size_t PrioritizedCache::GetCharge(Cache::Handle* handle) const {
return cache_->GetCharge(handle);
}

void PrioritizedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) {
return cache_->ApplyToAllCacheEntries(callback, thread_safe);
}

void PrioritizedCache::EraseUnRefEntries() {
return cache_->EraseUnRefEntries();
}

} // namespace titandb
} // namespace rocksdb
61 changes: 61 additions & 0 deletions src/prioritized_cache_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "titan/prioritized_cache.h"
#include "test_util/testharness.h"
#include "util.h"

namespace rocksdb {
namespace titandb {

class PrioritizedCacheTest : public testing::Test {
public:
PrioritizedCacheTest() {}
~PrioritizedCacheTest() {}
};

TEST_F(PrioritizedCacheTest, PriorityTest) {
Slice high_cache_key = "high_test_key";
Slice cacahe_value = "test_value";
Slice low_cache_key = "low_test_key";

LRUCacheOptions options;
options.capacity = high_cache_key.size() + cacahe_value.size();
options.high_pri_pool_ratio = 1;

std::shared_ptr<Cache> cache = NewLRUCache(options);
cache->SetCapacity(high_cache_key.size() + cacahe_value.size());
PrioritizedCache high_cache(cache, Cache::Priority::HIGH);
PrioritizedCache low_cache(cache, Cache::Priority::LOW);
Cache::Handle* cache_handle = cache->Lookup(high_cache_key);

// here we set insert Priority to high, to check if it will
// insert to low in fact
auto lo_ok = low_cache.Insert(low_cache_key, (void*)&cacahe_value, cache->GetCapacity(),
&DeleteCacheValue<Slice>, &cache_handle, Cache::Priority::LOW);
ASSERT_TRUE(lo_ok.ok());

// here we set insert Priority to low, to check if it will
// insert to high in fact
auto hi_ok = high_cache.Insert(high_cache_key, (void*)&cacahe_value, cache->GetCapacity(),
&DeleteCacheValue<Slice>, &cache_handle, Cache::Priority::LOW);
ASSERT_TRUE(hi_ok.ok());

auto high_handle = cache->Lookup(high_cache_key);
if (high_handle) {
auto v = reinterpret_cast<Slice*>(cache->Value(high_handle));
ASSERT_EQ(cacahe_value.data(), v->data());
}

auto low_handle = cache->Lookup(low_cache_key);
if (low_handle) {
auto v = reinterpret_cast<Slice*>(cache->Value(low_handle));
ASSERT_EQ(cacahe_value.data(), v->data());
}

}

} // namespace titandb
} // namespace rocksdb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 199513e

Please sign in to comment.