Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add time series pool #119

Closed
wants to merge 13 commits into from
110 changes: 110 additions & 0 deletions src/base/time_serise_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef SRC_BASE_TIME_SERISE_POOL_H_
#define SRC_BASE_TIME_SERISE_POOL_H_

#include <malloc.h>

#include <map>
#include <memory>
#include <utility>

namespace openmldb {
namespace base {

class TimeBucket {
public:
explicit TimeBucket(uint32_t block_size)
: block_size_(block_size), current_offset_(block_size + 1), object_num_(0) {
head_ = reinterpret_cast<Block*>(malloc(sizeof(Block*))); // empty block at end
head_->next = NULL;
}
void Clear() {
cc004 marked this conversation as resolved.
Show resolved Hide resolved
auto p = head_;
while (p) {
auto q = p->next;
free(p);
p = q;
}
}
void* Alloc(uint32_t size) {
object_num_++;
if (current_offset_ + size <= block_size_ - sizeof(Block)) {
void* addr = head_->data + current_offset_;
current_offset_ += size;
return addr;
} else {
auto block = reinterpret_cast<Block*>(malloc(block_size_));
current_offset_ = size;
block->next = head_->next;
head_ = block;
return head_->data;
}
}
bool Free() { // ret if fully freed
if (!--object_num_) {
Clear();
return true;
} else {
return false;
}
}

private:
uint32_t block_size_;
uint32_t current_offset_;
uint32_t object_num_;
struct Block {
Block* next;
char data[];
} * head_;
};

class TimeSerisePool {
public:
explicit TimeSerisePool(uint32_t block_size) : block_size_(block_size) {}
void* Alloc(uint32_t size, uint64_t time) {
auto key = ComputeTimeSlot(time);
auto pair = pool_.find(key);
if (pair == pool_.end()) {
auto bucket = new TimeBucket(block_size_);
pool_.insert(
std::pair<uint32_t, std::unique_ptr<TimeBucket>>(key, std::unique_ptr<TimeBucket>(bucket)));
return bucket->Alloc(size);
}

return pair->second->Alloc(size);
}
void Free(uint64_t time) {
auto pair = pool_.find(ComputeTimeSlot(time));
if (pair != pool_.end() && pair->second->Free()) {
pool_.erase(pair);
}
}
bool Empty() { return pool_.empty(); }

imotai marked this conversation as resolved.
Show resolved Hide resolved
private:
// key is the time / (60 * 60 * 1000)
uint32_t block_size_;
std::map<uint32_t, std::unique_ptr<TimeBucket>> pool_;
inline static uint32_t ComputeTimeSlot(uint64_t time) { return time / (60 * 60 * 1000); }
};

} // namespace base
} // namespace openmldb

#endif // SRC_BASE_TIME_SERISE_POOL_H_
56 changes: 56 additions & 0 deletions src/base/time_serise_pool_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021 4Paradigm
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "base/time_serise_pool.h"

#include <vector>

#include "gtest/gtest.h"

namespace openmldb {
namespace base {

class TimeSerisePoolTest : public ::testing::Test {
public:
TimeSerisePoolTest() {}
~TimeSerisePoolTest() {}
};

TEST_F(TimeSerisePoolTest, FreeToEmpty) {
TimeSerisePool pool(1024);
std::vector<uint64_t> times;
const int datasize = 1024 / 2;
char *data = new char[datasize];
for (int i = 0; i < datasize; ++i) data[i] = i * i * i;
for (int i = 0; i < 1000; ++i) {
auto time = (i * i % 7) * (60 * 60 * 1000);
auto ptr = pool.Alloc(datasize, time);
memcpy(ptr, data, datasize);
times.push_back(time);
}

for (auto time : times) pool.Free(time);

ASSERT_TRUE(pool.Empty());
}

} // namespace base
} // namespace openmldb

int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}