Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add prioritized cache wrapper #142

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ if (WITH_TITAN_TESTS AND (NOT CMAKE_BUILD_TYPE STREQUAL "Release"))
titan_db_test
titan_options_test
util_test
version_test)
version_test
prioritized_cache_test)
set(TEST_LIBS
titan
rocksdb
Expand Down
117 changes: 117 additions & 0 deletions include/titan/prioritized_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#pragma once
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the header to new files:

// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

#include "rocksdb/cache.h"

namespace rocksdb {
namespace titandb {

class PrioritizedCache : public Cache {
public:
// Constructs a PrioritizedCache, which is a wrapper
// of rocksdb Cache
PrioritizedCache(std::shared_ptr<Cache> cache, Cache::Priority priority);
~PrioritizedCache();

// Get the Cache ptr
std::shared_ptr<Cache> GetCache();

// always insert into the cache with the priority when init,
// regardless of user provided option
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority /*priority*/) override;

// The type of the Cache
const char* Name() const override;

// If the cache has no mapping for "key", returns nullptr.
//
// Else return a handle that corresponds to the mapping. The caller
// must call this->Release(handle) when the returned mapping is no
// longer needed.
// If stats is not nullptr, relative tickers could be used inside the
// function.
Cache::Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override;

// Increments the reference count for the handle if it refers to an entry in
// the cache. Returns true if refcount was incremented; otherwise, returns
// false.
// REQUIRES: handle must have been returned by a method on *this.
bool Ref(Cache::Handle* handle) override;

/**
* Release a mapping returned by a previous Lookup(). A released entry might
* still remain in cache in case it is later looked up by others. If
* force_erase is set then it also erase it from the cache if there is no
* other reference to it. Erasing it should call the deleter function that
* was provided when the
* entry was inserted.
*
* Returns true if the entry was also erased.
*/
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
bool Release(Cache::Handle* handle, bool force_erase = false) override;

// Return the value encapsulated in a handle returned by a
// successful Lookup().
// REQUIRES: handle must not have been released yet.
// REQUIRES: handle must have been returned by a method on *this.
void* Value(Cache::Handle* handle) override;

// If the cache contains entry for key, erase it. Note that the
// underlying entry will be kept around until all existing handles
// to it have been released.
void Erase(const Slice& key) override;

// Return a new numeric id. May be used by multiple clients who are
// sharding the same cache to partition the key space. Typically the
// client will allocate a new id at startup and prepend the id to
// its cache keys.
uint64_t NewId() override;

// sets the maximum configured capacity of the cache. When the new
// capacity is less than the old capacity and the existing usage is
// greater than new capacity, the implementation will do its best job to
// purge the released entries from the cache in order to lower the usage
void SetCapacity(size_t capacity) override;

// Set whether to return error on insertion when cache reaches its full
// capacity.
void SetStrictCapacityLimit(bool strict_capacity_limit) override;

// Get the flag whether to return error on insertion when cache reaches its
// full capacity.
bool HasStrictCapacityLimit() const override;

// returns the maximum configured capacity of the cache
size_t GetCapacity() const override;

// returns the memory size for the entries residing in the cache.
size_t GetUsage() const override;

// returns the memory size for a specific entry in the cache.
size_t GetUsage(Cache::Handle* handle) const override;

// returns the memory size for the entries in use by the system
size_t GetPinnedUsage() const override;

// returns the charge for the specific entry in the cache.
size_t GetCharge(Cache::Handle* handle) const override;

// Apply callback to all entries in the cache
// If thread_safe is true, it will also lock the accesses. Otherwise, it will
// access the cache without the lock held
void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) override;

// Remove all entries.
// Prerequisite: no entry is referenced.
void EraseUnRefEntries() override;

private:
std::shared_ptr<Cache> cache_;
Cache::Priority priority_;
};

} // namespace titandb
} // namespace rocksdb
78 changes: 78 additions & 0 deletions src/prioritized_cache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "titan/prioritized_cache.h"

namespace rocksdb {
namespace titandb {

PrioritizedCache::PrioritizedCache(std::shared_ptr<Cache> cache,
Cache::Priority priority)
: cache_(cache), priority_(priority) {}

PrioritizedCache::~PrioritizedCache() {}

std::shared_ptr<Cache> PrioritizedCache::GetCache() { return cache_; }

Status PrioritizedCache::Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle,
Cache::Priority /*priority*/) {
return cache_->Insert(key, value, charge, deleter, handle, priority_);
}

const char* PrioritizedCache::Name() const { return "PrioritizedCache"; }

Cache::Handle* PrioritizedCache::Lookup(const Slice& key,
Statistics* stats) {
return cache_->Lookup(key, stats);
}

bool PrioritizedCache::Ref(Cache::Handle* handle) { return cache_->Ref(handle); }

bool PrioritizedCache::Release(Cache::Handle* handle, bool force_erase) {
return cache_->Release(handle, force_erase);
}

void* PrioritizedCache::Value(Cache::Handle* handle) { return cache_->Value(handle); }

void PrioritizedCache::Erase(const Slice& key) { cache_->Erase(key); }

uint64_t PrioritizedCache::NewId() { return cache_->NewId(); }

void PrioritizedCache::SetCapacity(size_t capacity) {
cache_->SetCapacity(capacity);
}

void PrioritizedCache::SetStrictCapacityLimit(bool strict_capacity_limit) {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}

bool PrioritizedCache::HasStrictCapacityLimit() const {
return cache_->HasStrictCapacityLimit();
}

size_t PrioritizedCache::GetCapacity() const { return cache_->GetCapacity(); }

size_t PrioritizedCache::GetUsage() const { return cache_->GetUsage(); }

size_t PrioritizedCache::GetUsage(Cache::Handle* handle) const {
return cache_->GetUsage();
}

size_t PrioritizedCache::GetPinnedUsage() const {
return cache_->GetPinnedUsage();
}

size_t PrioritizedCache::GetCharge(Cache::Handle* handle) const {
return cache_->GetCharge(handle);
}

void PrioritizedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t),
bool thread_safe) {
return cache_->ApplyToAllCacheEntries(callback, thread_safe);
}

void PrioritizedCache::EraseUnRefEntries() {
return cache_->EraseUnRefEntries();
}

} // namespace titandb
} // namespace rocksdb
61 changes: 61 additions & 0 deletions src/prioritized_cache_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include "titan/prioritized_cache.h"
#include "test_util/testharness.h"
#include "util.h"

namespace rocksdb {
namespace titandb {

class PrioritizedCacheTest : public testing::Test {
public:
PrioritizedCacheTest() {}
~PrioritizedCacheTest() {}
};

TEST_F(PrioritizedCacheTest, PriorityTest) {
Slice high_cache_key = "high_test_key";
Slice cacahe_value = "test_value";
Slice low_cache_key = "low_test_key";

LRUCacheOptions options;
options.capacity = high_cache_key.size() + cacahe_value.size();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let set capacity=1 for this test. See the other comments.

options.high_pri_pool_ratio = 1;

std::shared_ptr<Cache> cache = NewLRUCache(options);
cache->SetCapacity(high_cache_key.size() + cacahe_value.size());
PrioritizedCache high_cache(cache, Cache::Priority::HIGH);
PrioritizedCache low_cache(cache, Cache::Priority::LOW);
Cache::Handle* cache_handle = cache->Lookup(high_cache_key);

// here we set insert Priority to high, to check if it will
// insert to low in fact
auto lo_ok = low_cache.Insert(low_cache_key, (void*)&cacahe_value, cache->GetCapacity(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just set charge=1 instead of cache->GetCapacity(). Same below.

&DeleteCacheValue<Slice>, &cache_handle, Cache::Priority::LOW);
ASSERT_TRUE(lo_ok.ok());

// here we set insert Priority to low, to check if it will
// insert to high in fact
auto hi_ok = high_cache.Insert(high_cache_key, (void*)&cacahe_value, cache->GetCapacity(),
&DeleteCacheValue<Slice>, &cache_handle, Cache::Priority::LOW);
ASSERT_TRUE(hi_ok.ok());

auto high_handle = cache->Lookup(high_cache_key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup through high_cache instead of cache.

if (high_handle) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert high_handle != nullptr.

auto v = reinterpret_cast<Slice*>(cache->Value(high_handle));
ASSERT_EQ(cacahe_value.data(), v->data());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should release the handle here, otherwise there's memory leak: high_cache->Release(high_handle). This should fix CI.

}

auto low_handle = cache->Lookup(low_cache_key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert low_handle == nulllptr.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lookup through low_cache instead of cache.

if (low_handle) {
auto v = reinterpret_cast<Slice*>(cache->Value(low_handle));
ASSERT_EQ(cacahe_value.data(), v->data());
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify the low pri item getting evicted after an high pri item being inserted.

}

} // namespace titandb
} // namespace rocksdb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}