diff --git a/src/base/skiplist.h b/src/base/skiplist.h index f7753f72d8d..e46eb72d1d9 100644 --- a/src/base/skiplist.h +++ b/src/base/skiplist.h @@ -21,9 +21,9 @@ #include #include -#include #include "base/random.h" +#include "base/time_series_pool.h" namespace openmldb { namespace base { @@ -43,6 +43,16 @@ struct DefaultComparator { template class Node { public: + // Set data reference and Node height + Node(const K& key, V& value, uint8_t height, std::atomic*>* preAlloc) // NOLINT + : height_(height), key_(key), value_(value) { + nexts_ = preAlloc; + } + + Node(uint8_t height, std::atomic*>* preAlloc) : height_(height), key_(), value_() { // NOLINT + nexts_ = preAlloc; + } + // Set data reference and Node height Node(const K& key, V& value, uint8_t height) // NOLINT : height_(height), key_(key), value_(value) { @@ -94,13 +104,7 @@ template class Skiplist { public: Skiplist(uint8_t max_height, uint8_t branch, const Comparator& compare) - : MaxHeight(max_height), - Branch(branch), - max_height_(0), - compare_(compare), - rand_(0xdeadbeef), - head_(NULL), - tail_(NULL) { + : MaxHeight(max_height), Branch(branch), max_height_(0), compare_(compare), rand_(0xdeadbeef), tail_(NULL) { head_ = new Node(MaxHeight); for (uint8_t i = 0; i < head_->Height(); i++) { head_->SetNext(i, NULL); @@ -131,6 +135,29 @@ class Skiplist { return height; } + // Insert need external synchronized + // use if skiplist is using a pool + uint8_t Insert(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT + uint8_t height = RandomHeight(); + Node* pre[MaxHeight]; + FindLessOrEqual(key, pre); + if (height > GetMaxHeight()) { + for (uint8_t i = GetMaxHeight(); i < height; i++) { + pre[i] = head_; + } + max_height_.store(height, std::memory_order_relaxed); + } + Node* node = NewNode(key, value, height, time, pool); + if (pre[0]->GetNext(0) == NULL) { + tail_.store(node, std::memory_order_release); + } + for (uint8_t i = 0; i < height; i++) { + node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i)); + pre[i]->SetNext(i, node); + } + return height; + } + bool IsEmpty() { if (head_->GetNextNoBarrier(0) == NULL) { return true; @@ -269,6 +296,7 @@ class Skiplist { } // Need external synchronized + // called if skiplist is using tcalloc uint64_t Clear() { uint64_t cnt = 0; Node* node = head_->GetNext(0); @@ -291,6 +319,34 @@ class Skiplist { return cnt; } + // Need external synchronized + // use if skiplist is using a pool + bool AddToFirst(const K& key, V& value, uint64_t time, TimeSeriesPool& pool) { // NOLINT + { + Node* node = head_->GetNext(0); + if (node != NULL && compare_(key, node->GetKey()) > 0) { + return false; + } + } + uint8_t height = RandomHeight(); + Node* pre[MaxHeight]; + for (uint8_t i = 0; i < height; i++) { + pre[i] = head_; + } + if (height > GetMaxHeight()) { + max_height_.store(height, std::memory_order_relaxed); + } + Node* node = NewNode(key, value, height, time, pool); + if (pre[0]->GetNext(0) == NULL) { + tail_.store(node, std::memory_order_release); + } + for (uint8_t i = 0; i < height; i++) { + node->SetNextNoBarrier(i, pre[i]->GetNextNoBarrier(i)); + pre[i]->SetNext(i, node); + } + return true; + } + // Need external synchronized bool AddToFirst(const K& key, V& value) { // NOLINT { @@ -363,6 +419,14 @@ class Skiplist { Iterator* NewIterator() { return new Iterator(this); } private: + Node* NewNode(const K& key, V& value, uint8_t height, uint64_t time, TimeSeriesPool& pool) { // NOLINT + auto arrmemvptr = pool.Alloc(sizeof(std::atomic*>) * height, time); + auto arrmem = reinterpret_cast*>*>(arrmemvptr); + auto nodemem = pool.Alloc(sizeof(Node), time); + Node* node = new (nodemem) Node(key, value, height, arrmem); + return node; + } + Node* NewNode(const K& key, V& value, uint8_t height) { // NOLINT Node* node = new Node(key, value, height); return node; diff --git a/src/base/skiplist_test.cc b/src/base/skiplist_test.cc index 4d816a53ba8..fb5c584f356 100644 --- a/src/base/skiplist_test.cc +++ b/src/base/skiplist_test.cc @@ -20,6 +20,7 @@ #include #include "base/slice.h" +#include "base/time_series_pool.h" #include "gtest/gtest.h" namespace openmldb { @@ -178,6 +179,45 @@ TEST_F(SkiplistTest, InsertAndIterator) { } } +TEST_F(SkiplistTest, InsertAndIteratorWithPool) { + Comparator cmp; + TimeSeriesPool pool(1024); + for (auto height : vec) { + Skiplist sl(height, 4, cmp); + uint32_t key1 = 1; + uint32_t value1 = 2; + sl.Insert(key1, value1, 1, pool); + uint32_t key2 = 2; + uint32_t value2 = 4; + sl.Insert(key2, value2, 2, pool); + uint32_t key3 = 2; + uint32_t value3 = 5; + sl.Insert(key3, value3, 1, pool); + uint32_t key4 = 3; + uint32_t value4 = 6; + sl.Insert(key4, value4, 1, pool); + Skiplist::Iterator* it = sl.NewIterator(); + it->Seek(0); + ASSERT_EQ(1, (signed)it->GetKey()); + ASSERT_EQ(2, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(5, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(4, (signed)it->GetValue()); + it->Next(); + ASSERT_EQ(3, (signed)it->GetKey()); + ASSERT_EQ(6, (signed)it->GetValue()); + it->Next(); + ASSERT_FALSE(it->Valid()); + it->Seek(2); + ASSERT_EQ(2, (signed)it->GetKey()); + ASSERT_EQ(5, (signed)it->GetValue()); + delete it; + } +} + TEST_F(SkiplistTest, GetSize) { Comparator cmp; Skiplist sl(12, 4, cmp); @@ -663,6 +703,17 @@ TEST_F(SkiplistTest, Duplicate) { ASSERT_FALSE(it->Valid()); } +TEST_F(SkiplistTest, DuplicateWithPool) { + TimeSeriesPool pool(1024); + DescComparator cmp; + Skiplist sl(12, 4, cmp); + uint32_t val = 1; + sl.Insert(1, val, 111, pool); + sl.Insert(2, val, 111, pool); + val = 2; + sl.Insert(1, val, 112, pool); +} + } // namespace base } // namespace openmldb diff --git a/src/base/time_series_pool.h b/src/base/time_series_pool.h new file mode 100644 index 00000000000..4c5e4330787 --- /dev/null +++ b/src/base/time_series_pool.h @@ -0,0 +1,105 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_BASE_TIME_SERIES_POOL_H_ +#define SRC_BASE_TIME_SERIES_POOL_H_ + +#include + +#include +#include +#include + +namespace openmldb { +namespace base { + +class TimeBucket { + public: + explicit TimeBucket(uint32_t block_size) + : block_size_(block_size), current_offset_(block_size + 1), object_num_(0) { + head_ = reinterpret_cast(malloc(sizeof(Block*))); // empty block at end + head_->next = NULL; + } + ~TimeBucket() { + auto p = head_; + while (p) { + auto q = p->next; + free(p); + p = q; + } + } + void* Alloc(uint32_t size) { + // return new char[size]; + object_num_++; + if (current_offset_ + size <= block_size_ - sizeof(Block)) { + void* addr = head_->data + current_offset_; + current_offset_ += size; + return addr; + } else { + auto block = reinterpret_cast(malloc(block_size_)); + current_offset_ = size; + block->next = head_->next; + head_ = block; + return head_->data; + } + } + bool Free() { // ret if fully freed + return !--object_num_; + } + + private: + uint32_t block_size_; + uint32_t current_offset_; + uint32_t object_num_; + struct Block { + Block* next; + char data[]; + } * head_; +}; + +class TimeSeriesPool { + public: + explicit TimeSeriesPool(uint32_t block_size) : block_size_(block_size) {} + void* Alloc(uint32_t size, uint64_t time) { + auto key = ComputeTimeSlot(time); + auto pair = pool_.find(key); + if (pair == pool_.end()) { + auto bucket = new TimeBucket(block_size_); + pool_.insert(std::pair>(key, std::unique_ptr(bucket))); + return bucket->Alloc(size); + } + + return pair->second->Alloc(size); + } + void Free(uint64_t time) { + auto pair = pool_.find(ComputeTimeSlot(time)); + if (pair != pool_.end() && pair->second->Free()) { + pool_.erase(pair); + } + } + bool Empty() { return pool_.empty(); } + + private: + // key is the time / (60 * 60 * 1000) + uint32_t block_size_; + std::map> pool_; + inline static uint32_t ComputeTimeSlot(uint64_t time) { return time / (60 * 60 * 1000); } +}; + +} // namespace base +} // namespace openmldb + +#endif // SRC_BASE_TIME_SERIES_POOL_H_ diff --git a/src/base/time_series_pool_test.cc b/src/base/time_series_pool_test.cc new file mode 100644 index 00000000000..14090be16f7 --- /dev/null +++ b/src/base/time_series_pool_test.cc @@ -0,0 +1,56 @@ +/* + * Copyright 2021 4Paradigm + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "base/time_series_pool.h" + +#include + +#include "gtest/gtest.h" + +namespace openmldb { +namespace base { + +class TimeSeriesPoolTest : public ::testing::Test { + public: + TimeSeriesPoolTest() {} + ~TimeSeriesPoolTest() {} +}; + +TEST_F(TimeSeriesPoolTest, FreeToEmpty) { + TimeSeriesPool pool(1024); + std::vector times; + const int datasize = 1024 / 2; + char *data = new char[datasize]; + for (int i = 0; i < datasize; ++i) data[i] = i * i * i; + for (int i = 0; i < 1000; ++i) { + auto time = (i * i % 7) * (60 * 60 * 1000); + auto ptr = pool.Alloc(datasize, time); + memcpy(ptr, data, datasize); + times.push_back(time); + } + + for (auto time : times) pool.Free(time); + + ASSERT_TRUE(pool.Empty()); +} + +} // namespace base +} // namespace openmldb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/flags.cc b/src/flags.cc index 4c33b0fdc9c..982166e66d1 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -119,6 +119,7 @@ DEFINE_uint32(recycle_ttl, 0, "ttl of recycle in minute"); DEFINE_uint32(latest_ttl_max, 1000, "the max ttl of latest"); DEFINE_uint32(absolute_ttl_max, 60 * 24 * 365 * 30, "the max ttl of absolute time"); +DEFINE_uint32(time_series_pool_block_size, 4 * 1024, "the block size of time series pool"); DEFINE_uint32(skiplist_max_height, 12, "the max height of skiplist"); DEFINE_uint32(key_entry_max_height, 8, "the max height of key entry"); DEFINE_uint32(latest_default_skiplist_height, 1, "the default height of skiplist for latest table"); diff --git a/src/storage/segment.cc b/src/storage/segment.cc index c39e0b69779..6fd7f879d41 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -24,6 +24,7 @@ #include "storage/record.h" DECLARE_int32(gc_safe_offset); +DECLARE_uint32(time_series_pool_block_size); DECLARE_uint32(skiplist_max_height); DECLARE_uint32(gc_deleted_pk_version_delta); @@ -39,6 +40,8 @@ Segment::Segment() pk_cnt_(0), ts_cnt_(1), gc_version_(0), + pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); key_entry_max_height_ = (uint8_t)FLAGS_skiplist_max_height; @@ -54,6 +57,8 @@ Segment::Segment(uint8_t height) key_entry_max_height_(height), ts_cnt_(1), gc_version_(0), + pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -68,6 +73,8 @@ Segment::Segment(uint8_t height, const std::vector& ts_idx_vec) key_entry_max_height_(height), ts_cnt_(ts_idx_vec.size()), gc_version_(0), + pool_(FLAGS_time_series_pool_block_size), + boost_pool_(sizeof(KeyEntry)), ttl_offset_(FLAGS_gc_safe_offset * 60 * 1000) { entries_ = new KeyEntries((uint8_t)FLAGS_skiplist_max_height, 4, scmp); entry_free_list_ = new KeyEntryNodeList(4, 4, tcmp); @@ -92,14 +99,14 @@ uint64_t Segment::Release() { if (ts_cnt_ > 1) { KeyEntry** entry_arr = (KeyEntry**)it->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { - cnt += entry_arr[i]->Release(); - delete entry_arr[i]; + cnt += entry_arr[i]->Release(pool_); + boost_pool_.free(entry_arr[i]); } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)it->GetValue(); // NOLINT - cnt += entry->Release(); - delete entry; + cnt += entry->Release(pool_); + boost_pool_.free(entry); } } it->Next(); @@ -115,14 +122,14 @@ uint64_t Segment::Release() { if (ts_cnt_ > 1) { KeyEntry** entry_arr = (KeyEntry**)node->GetValue(); // NOLINT for (uint32_t i = 0; i < ts_cnt_; i++) { - entry_arr[i]->Release(); - delete entry_arr[i]; + entry_arr[i]->Release(pool_); + boost_pool_.free(entry_arr[i]); } delete[] entry_arr; } else { KeyEntry* entry = (KeyEntry*)node->GetValue(); // NOLINT - entry->Release(); - delete entry; + entry->Release(pool_); + boost_pool_.free(entry); } delete node; f_it->Next(); @@ -176,14 +183,15 @@ void Segment::Put(const Slice& key, uint64_t time, DataBlock* row) { memcpy(pk, key.data(), key.size()); // need to delete memory when free node Slice skey(pk, key.size()); - entry = (void*)new KeyEntry(key_entry_max_height_); // NOLINT + entry = new (boost_pool_.malloc()) KeyEntry(key_entry_max_height_); + // Key entry do not use pool uint8_t height = entries_->Insert(skey, entry); byte_size += GetRecordPkIdxSize(height, key.size(), key_entry_max_height_); pk_cnt_.fetch_add(1, std::memory_order_relaxed); } idx_cnt_.fetch_add(1, std::memory_order_relaxed); - uint8_t height = ((KeyEntry*)entry)->entries.Insert(time, row); // NOLINT - ((KeyEntry*)entry) // NOLINT + uint8_t height = ((KeyEntry*)entry)->entries.Insert(time, row, time, pool_); // NOLINT + ((KeyEntry*)entry) // NOLINT ->count_.fetch_add(1, std::memory_order_relaxed); byte_size += GetRecordTsIdxSize(height); idx_byte_size_.fetch_add(byte_size, std::memory_order_relaxed); @@ -224,16 +232,17 @@ void Segment::Put(const Slice& key, const TSDimensions& ts_dimension, DataBlock* Slice skey(pk, key.size()); KeyEntry** entry_arr_tmp = new KeyEntry*[ts_cnt_]; for (uint32_t i = 0; i < ts_cnt_; i++) { - entry_arr_tmp[i] = new KeyEntry(key_entry_max_height_); + entry_arr_tmp[i] = new (boost_pool_.malloc()) KeyEntry(key_entry_max_height_); } entry_arr = (void*)entry_arr_tmp; // NOLINT + // key entry do not use pool uint8_t height = entries_->Insert(skey, entry_arr); byte_size += GetRecordPkMultiIdxSize(height, key.size(), key_entry_max_height_, ts_cnt_); pk_cnt_.fetch_add(1, std::memory_order_relaxed); } } uint8_t height = ((KeyEntry**)entry_arr)[pos->second]->entries.Insert( // NOLINT - cur_ts.ts(), row); + cur_ts.ts(), row, cur_ts.ts(), pool_); ((KeyEntry**)entry_arr)[pos->second]->count_.fetch_add( // NOLINT 1, std::memory_order_relaxed); byte_size += GetRecordTsIdxSize(height); @@ -284,6 +293,7 @@ bool Segment::Delete(const Slice& key) { } { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } return true; @@ -305,7 +315,9 @@ void Segment::FreeList(::openmldb::base::Node* node, uint6 delete tmp->GetValue(); gc_record_cnt++; } - delete tmp; + // changed to pool free (time entry) + pool_.Free(tmp->GetKey()); + // delete tmp; } } @@ -329,7 +341,7 @@ void Segment::FreeEntry(::openmldb::base::Node* entry_node, uint64 FreeList(data_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); } delete it; - delete entry; + boost_pool_.free(entry); idx_cnt_vec_[i]->fetch_sub(gc_idx_cnt - old, std::memory_order_relaxed); } delete[] entry_arr; @@ -347,7 +359,7 @@ void Segment::FreeEntry(::openmldb::base::Node* entry_node, uint64 FreeList(data_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); } delete it; - delete entry; + boost_pool_.free(entry); uint64_t byte_size = GetRecordPkIdxSize(entry_node->Height(), entry_node->GetKey().size(), key_entry_max_height_); idx_byte_size_.fetch_sub(byte_size, std::memory_order_relaxed); @@ -365,9 +377,11 @@ void Segment::GcEntryFreeList(uint64_t version, uint64_t& gc_idx_cnt, uint64_t& while (node != NULL) { ::openmldb::base::Node* entry_node = node->GetValue(); FreeEntry(entry_node, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); + // no change for key entry not using pool delete entry_node; ::openmldb::base::Node*>* tmp = node; node = node->GetNextNoBarrier(0); + // no change for gc not using pool delete tmp; pk_cnt_.fetch_sub(1, std::memory_order_relaxed); } @@ -584,6 +598,7 @@ void Segment::GcAllType(const std::map& ttl_st_map, uint64_t& g } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } } @@ -632,6 +647,7 @@ void Segment::Gc4TTL(const uint64_t time, uint64_t& gc_idx_cnt, uint64_t& gc_rec } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } uint64_t entry_gc_idx_cnt = 0; @@ -725,6 +741,7 @@ void Segment::Gc4TTLOrHead(const uint64_t time, const uint64_t keep_cnt, uint64_ } if (entry_node != NULL) { std::lock_guard lock(gc_mu_); + // gc insert should not use timepool entry_free_list_->Insert(gc_version_.load(std::memory_order_relaxed), entry_node); } uint64_t entry_gc_idx_cnt = 0; diff --git a/src/storage/segment.h b/src/storage/segment.h index 165a8aa9578..2cc4cd57cfc 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -25,6 +25,8 @@ #include "base/skiplist.h" #include "base/slice.h" +#include "base/time_series_pool.h" +#include "boost/pool/pool.hpp" #include "proto/tablet.pb.h" #include "storage/iterator.h" #include "storage/schema.h" @@ -95,7 +97,7 @@ class KeyEntry { ~KeyEntry() {} // just return the count of datablock - uint64_t Release() { + uint64_t Release(::openmldb::base::TimeSeriesPool& pool) { uint64_t cnt = 0; TimeEntries::Iterator* it = entries.NewIterator(); it->SeekToFirst(); @@ -108,9 +110,11 @@ class KeyEntry { } else { delete block; } + pool.Free(it->GetKey()); it->Next(); } - entries.Clear(); + // not clearing for using pool for time entry + // entries.Clear(); delete it; return cnt; } @@ -251,11 +255,13 @@ class Segment { std::atomic idx_byte_size_; std::atomic pk_cnt_; uint8_t key_entry_max_height_; - KeyEntryNodeList* entry_free_list_; + KeyEntryNodeList* entry_free_list_; // NOTE: entry free list DO NOT use pool uint32_t ts_cnt_; std::atomic gc_version_; std::map ts_idx_map_; std::vector>> idx_cnt_vec_; + ::openmldb::base::TimeSeriesPool pool_; + boost::pool<> boost_pool_; uint64_t ttl_offset_; };