Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add time series pool #119

Closed
wants to merge 13 commits into from
80 changes: 72 additions & 8 deletions src/base/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include <stdint.h>

#include <atomic>
#include <iostream>

#include "base/random.h"
#include "base/time_series_pool.h"

namespace openmldb {
namespace base {
Expand All @@ -43,6 +43,16 @@ struct DefaultComparator {
template <class K, class V>
class Node {
public:
// Set data reference and Node height
Node(const K& key, V& value, uint8_t height, std::atomic<Node<K, V>*>* preAlloc) // NOLINT
: height_(height), key_(key), value_(value) {
nexts_ = preAlloc;
}

Node(uint8_t height, std::atomic<Node<K, V>*>* preAlloc) : height_(height), key_(), value_() { // NOLINT
nexts_ = preAlloc;
}

// Set data reference and Node height
Node(const K& key, V& value, uint8_t height) // NOLINT
: height_(height), key_(key), value_(value) {
Expand Down Expand Up @@ -94,13 +104,7 @@ template <class K, class V, class Comparator>
class Skiplist {
public:
Skiplist(uint8_t max_height, uint8_t branch, const Comparator& compare)
: MaxHeight(max_height),
Branch(branch),
max_height_(0),
compare_(compare),
rand_(0xdeadbeef),
head_(NULL),
tail_(NULL) {
: MaxHeight(max_height), Branch(branch), max_height_(0), compare_(compare), rand_(0xdeadbeef), tail_(NULL) {
head_ = new Node<K, V>(MaxHeight);
for (uint8_t i = 0; i < head_->Height(); i++) {
head_->SetNext(i, NULL);
Expand Down Expand Up @@ -131,6 +135,29 @@ class Skiplist {
return height;
}

// Insert need external synchronized
// use iif skiplist is using a pool
uint8_t Insert(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT
uint8_t height = RandomHeight();
Node<K, V>* pre[MaxHeight];
FindLessOrEqual(key, pre);
if (height > GetMaxHeight()) {
for (uint8_t i = GetMaxHeight(); i < height; i++) {
pre[i] = head_;
}
max_height_.store(height, std::memory_order_relaxed);
}
Node<K, V>* node = NewNode(key, value, height, time, pool);
if (pre[0]->GetNext(0) == NULL) {
tail_.store(node, std::memory_order_release);
}
for (uint8_t i = 0; i < height; i++) {
node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i));
pre[i]->SetNext(i, node);
}
return height;
}

bool IsEmpty() {
if (head_->GetNextNoBarrier(0) == NULL) {
return true;
Expand Down Expand Up @@ -269,6 +296,7 @@ class Skiplist {
}

// Need external synchronized
// called iif skiplist is using tcalloc
cc004 marked this conversation as resolved.
Show resolved Hide resolved
uint64_t Clear() {
uint64_t cnt = 0;
Node<K, V>* node = head_->GetNext(0);
Expand All @@ -291,6 +319,34 @@ class Skiplist {
return cnt;
}

// Need external synchronized
// use iif skiplist is using a pool
bool AddToFirst(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT
{
Node<K, V>* node = head_->GetNext(0);
if (node != NULL && compare_(key, node->GetKey()) > 0) {
return false;
}
}
uint8_t height = RandomHeight();
Node<K, V>* pre[MaxHeight];
for (uint8_t i = 0; i < height; i++) {
pre[i] = head_;
}
if (height > GetMaxHeight()) {
max_height_.store(height, std::memory_order_relaxed);
}
Node<K, V>* node = NewNode(key, value, height, time, pool);
if (pre[0]->GetNext(0) == NULL) {
tail_.store(node, std::memory_order_release);
}
for (uint8_t i = 0; i < height; i++) {
node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i));
pre[i]->SetNext(i, node);
}
return true;
}

// Need external synchronized
bool AddToFirst(const K& key, V& value) { // NOLINT
{
Expand Down Expand Up @@ -363,6 +419,14 @@ class Skiplist {
Iterator* NewIterator() { return new Iterator(this); }

private:
Node<K, V>* NewNode(const K& key, V& value, uint8_t height, uint64_t time, TimeSeriesPool& pool) { // NOLINT
auto arrmemvptr = pool.Alloc(sizeof(std::atomic<Node<K, V>*>) * height, time);
auto arrmem = reinterpret_cast<std::atomic<Node<K, V>*>*>(arrmemvptr);
auto nodemem = pool.Alloc(sizeof(Node<K, V>), time);
Node<K, V>* node = new (nodemem) Node<K, V>(key, value, height, arrmem);
return node;
}

Node<K, V>* NewNode(const K& key, V& value, uint8_t height) { // NOLINT
Node<K, V>* node = new Node<K, V>(key, value, height);
return node;
Expand Down
51 changes: 51 additions & 0 deletions src/base/skiplist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <vector>

#include "base/slice.h"
#include "base/time_series_pool.h"
#include "gtest/gtest.h"

namespace openmldb {
Expand Down Expand Up @@ -178,6 +179,45 @@ TEST_F(SkiplistTest, InsertAndIterator) {
}
}

TEST_F(SkiplistTest, InsertAndIteratorWithPool) {
Comparator cmp;
TimeSeriesPool pool(1024);
for (auto height : vec) {
Skiplist<uint32_t, uint32_t, Comparator> sl(height, 4, cmp);
uint32_t key1 = 1;
uint32_t value1 = 2;
sl.Insert(key1, value1, 1, pool);
uint32_t key2 = 2;
uint32_t value2 = 4;
sl.Insert(key2, value2, 2, pool);
uint32_t key3 = 2;
uint32_t value3 = 5;
sl.Insert(key3, value3, 1, pool);
uint32_t key4 = 3;
uint32_t value4 = 6;
sl.Insert(key4, value4, 1, pool);
Skiplist<uint32_t, uint32_t, Comparator>::Iterator* it = sl.NewIterator();
it->Seek(0);
ASSERT_EQ(1, (signed)it->GetKey());
ASSERT_EQ(2, (signed)it->GetValue());
it->Next();
ASSERT_EQ(2, (signed)it->GetKey());
ASSERT_EQ(5, (signed)it->GetValue());
it->Next();
ASSERT_EQ(2, (signed)it->GetKey());
ASSERT_EQ(4, (signed)it->GetValue());
it->Next();
ASSERT_EQ(3, (signed)it->GetKey());
ASSERT_EQ(6, (signed)it->GetValue());
it->Next();
ASSERT_FALSE(it->Valid());
it->Seek(2);
ASSERT_EQ(2, (signed)it->GetKey());
ASSERT_EQ(5, (signed)it->GetValue());
delete it;
}
}

TEST_F(SkiplistTest, GetSize) {
Comparator cmp;
Skiplist<uint32_t, uint32_t, Comparator> sl(12, 4, cmp);
Expand Down Expand Up @@ -663,6 +703,17 @@ TEST_F(SkiplistTest, Duplicate) {
ASSERT_FALSE(it->Valid());
}

TEST_F(SkiplistTest, DuplicateWithPool) {
TimeSeriesPool pool(1024);
DescComparator cmp;
Skiplist<uint32_t, uint32_t, DescComparator> sl(12, 4, cmp);
uint32_t val = 1;
sl.Insert(1, val, 111, pool);
sl.Insert(2, val, 111, pool);
val = 2;
sl.Insert(1, val, 112, pool);
}

} // namespace base
} // namespace openmldb

Expand Down
105 changes: 105 additions & 0 deletions src/base/time_series_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef SRC_BASE_TIME_Series_POOL_H_
#define SRC_BASE_TIME_Series_POOL_H_

#include <malloc.h>

#include <map>
#include <memory>
#include <utility>

namespace openmldb {
namespace base {

class TimeBucket {
public:
explicit TimeBucket(uint32_t block_size)
: block_size_(block_size), current_offset_(block_size + 1), object_num_(0) {
head_ = reinterpret_cast<Block*>(malloc(sizeof(Block*))); // empty block at end
head_->next = NULL;
}
~TimeBucket() {
auto p = head_;
while (p) {
auto q = p->next;
free(p);
p = q;
}
}
void* Alloc(uint32_t size) {
// return new char[size];
object_num_++;
if (current_offset_ + size <= block_size_ - sizeof(Block)) {
void* addr = head_->data + current_offset_;
current_offset_ += size;
return addr;
} else {
auto block = reinterpret_cast<Block*>(malloc(block_size_));
current_offset_ = size;
block->next = head_->next;
head_ = block;
return head_->data;
}
}
bool Free() { // ret if fully freed
return !--object_num_;
}

private:
uint32_t block_size_;
uint32_t current_offset_;
uint32_t object_num_;
struct Block {
Block* next;
char data[];
} * head_;
};

class TimeSeriesPool {
public:
explicit TimeSeriesPool(uint32_t block_size) : block_size_(block_size) {}
void* Alloc(uint32_t size, uint64_t time) {
auto key = ComputeTimeSlot(time);
auto pair = pool_.find(key);
if (pair == pool_.end()) {
auto bucket = new TimeBucket(block_size_);
pool_.insert(std::pair<uint32_t, std::unique_ptr<TimeBucket>>(key, std::unique_ptr<TimeBucket>(bucket)));
return bucket->Alloc(size);
}

return pair->second->Alloc(size);
}
void Free(uint64_t time) {
auto pair = pool_.find(ComputeTimeSlot(time));
if (pair != pool_.end() && pair->second->Free()) {
pool_.erase(pair);
}
}
bool Empty() { return pool_.empty(); }

private:
// key is the time / (60 * 60 * 1000)
uint32_t block_size_;
std::map<uint32_t, std::unique_ptr<TimeBucket>> pool_;
inline static uint32_t ComputeTimeSlot(uint64_t time) { return time / (60 * 60 * 1000); }
};

} // namespace base
} // namespace openmldb

#endif // SRC_BASE_TIME_Series_POOL_H_
56 changes: 56 additions & 0 deletions src/base/time_series_pool_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "base/time_series_pool.h"

#include <vector>

#include "gtest/gtest.h"

namespace openmldb {
namespace base {

class TimeSeriesPoolTest : public ::testing::Test {
public:
TimeSeriesPoolTest() {}
~TimeSeriesPoolTest() {}
};

TEST_F(TimeSeriesPoolTest, FreeToEmpty) {
TimeSeriesPool pool(1024);
std::vector<uint64_t> times;
const int datasize = 1024 / 2;
char *data = new char[datasize];
for (int i = 0; i < datasize; ++i) data[i] = i * i * i;
for (int i = 0; i < 1000; ++i) {
auto time = (i * i % 7) * (60 * 60 * 1000);
auto ptr = pool.Alloc(datasize, time);
memcpy(ptr, data, datasize);
times.push_back(time);
}

for (auto time : times) pool.Free(time);

ASSERT_TRUE(pool.Empty());
}

} // namespace base
} // namespace openmldb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ DEFINE_uint32(recycle_ttl, 0, "ttl of recycle in minute");

DEFINE_uint32(latest_ttl_max, 1000, "the max ttl of latest");
DEFINE_uint32(absolute_ttl_max, 60 * 24 * 365 * 30, "the max ttl of absolute time");
DEFINE_uint32(time_series_pool_block_size, 4 * 1024, "the block size of time series pool");
DEFINE_uint32(skiplist_max_height, 12, "the max height of skiplist");
DEFINE_uint32(key_entry_max_height, 8, "the max height of key entry");
DEFINE_uint32(latest_default_skiplist_height, 1, "the default height of skiplist for latest table");
Expand Down
Loading