Skip to content

Commit

Permalink
Make the arena allocator thread aware
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbaden authored and andrewseidl committed Mar 16, 2021
1 parent f7157f2 commit 21d3f6a
Show file tree
Hide file tree
Showing 26 changed files with 189 additions and 107 deletions.
47 changes: 38 additions & 9 deletions QueryEngine/ColumnFetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,26 @@

#include "QueryEngine/Execute.h"

namespace {

inline const ColumnarResults* columnarize_result(
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const ResultSetPtr& result,
const size_t thread_idx,
const int frag_id) {
INJECT_TIMER(columnarize_result);
CHECK_EQ(0, frag_id);

std::vector<SQLTypeInfo> col_types;
for (size_t i = 0; i < result->colCount(); ++i) {
col_types.push_back(get_logical_type_info(result->getColType(i)));
}
return new ColumnarResults(
row_set_mem_owner, *result, result->colCount(), col_types, thread_idx);
}

} // namespace

ColumnFetcher::ColumnFetcher(Executor* executor, const ColumnCacheMap& column_cache)
: executor_(executor), columnarized_table_cache_(column_cache) {}

Expand All @@ -33,6 +53,7 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
const Data_Namespace::MemoryLevel effective_mem_lvl,
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
ColumnCacheMap& column_cache) {
static std::mutex columnar_conversion_mutex;
Expand Down Expand Up @@ -81,6 +102,7 @@ std::pair<const int8_t*, size_t> ColumnFetcher::getOneColumnFragment(
std::shared_ptr<const ColumnarResults>(columnarize_result(
executor->row_set_mem_owner_,
get_temporary_table(executor->temporary_tables_, table_id),
thread_idx,
frag_id))));
}
col_frag = column_cache[table_id][frag_id].get();
Expand Down Expand Up @@ -112,6 +134,7 @@ JoinColumn ColumnFetcher::makeJoinColumn(
const Data_Namespace::MemoryLevel effective_mem_lvl,
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
std::vector<std::shared_ptr<void>>& malloc_owner,
ColumnCacheMap& column_cache) {
Expand All @@ -133,6 +156,7 @@ JoinColumn ColumnFetcher::makeJoinColumn(
effective_mem_lvl,
effective_mem_lvl == Data_Namespace::CPU_LEVEL ? 0 : device_id,
device_allocator,
thread_idx,
chunks_owner,
column_cache);
if (col_buff != nullptr) {
Expand Down Expand Up @@ -237,7 +261,8 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
const std::map<int, const TableFragments*>& all_tables_fragments,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const {
DeviceAllocator* device_allocator,
const size_t thread_idx) const {
const auto fragments_it = all_tables_fragments.find(table_id);
CHECK(fragments_it != all_tables_fragments.end());
const auto fragments = fragments_it->second;
Expand Down Expand Up @@ -272,7 +297,8 @@ const int8_t* ColumnFetcher::getAllTableColumnFragments(
std::make_unique<ColumnarResults>(executor_->row_set_mem_owner_,
col_buffer,
fragment.getNumTuples(),
chunk_meta_it->second->sqlType));
chunk_meta_it->second->sqlType,
thread_idx));
}
auto merged_results =
ColumnarResults::mergeResults(executor_->row_set_mem_owner_, column_frags);
Expand All @@ -294,15 +320,17 @@ const int8_t* ColumnFetcher::getResultSetColumn(
const InputColDescriptor* col_desc,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const {
DeviceAllocator* device_allocator,
const size_t thread_idx) const {
CHECK(col_desc);
const auto table_id = col_desc->getScanDesc().getTableId();
return getResultSetColumn(get_temporary_table(executor_->temporary_tables_, table_id),
table_id,
col_desc->getColId(),
memory_level,
device_id,
device_allocator);
device_allocator,
thread_idx);
}

const int8_t* ColumnFetcher::transferColumnIfNeeded(
Expand Down Expand Up @@ -334,7 +362,8 @@ const int8_t* ColumnFetcher::getResultSetColumn(
const int col_id,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const {
DeviceAllocator* device_allocator,
const size_t thread_idx) const {
const ColumnarResults* result{nullptr};
{
std::lock_guard<std::mutex> columnar_conversion_guard(columnar_conversion_mutex_);
Expand All @@ -345,10 +374,10 @@ const int8_t* ColumnFetcher::getResultSetColumn(
auto& frag_id_to_result = columnarized_table_cache_[table_id];
int frag_id = 0;
if (frag_id_to_result.empty() || !frag_id_to_result.count(frag_id)) {
frag_id_to_result.insert(
std::make_pair(frag_id,
std::shared_ptr<const ColumnarResults>(columnarize_result(
executor_->row_set_mem_owner_, buffer, frag_id))));
frag_id_to_result.insert(std::make_pair(
frag_id,
std::shared_ptr<const ColumnarResults>(columnarize_result(
executor_->row_set_mem_owner_, buffer, thread_idx, frag_id))));
}
CHECK_NE(size_t(0), columnarized_table_cache_.count(table_id));
result = columnarized_table_cache_[table_id][frag_id].get();
Expand Down
11 changes: 8 additions & 3 deletions QueryEngine/ColumnFetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ColumnFetcher {
const Data_Namespace::MemoryLevel effective_mem_lvl,
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
ColumnCacheMap& column_cache);

Expand All @@ -67,6 +68,7 @@ class ColumnFetcher {
const Data_Namespace::MemoryLevel effective_mem_lvl,
const int device_id,
DeviceAllocator* device_allocator,
const size_t thread_idx,
std::vector<std::shared_ptr<Chunk_NS::Chunk>>& chunks_owner,
std::vector<std::shared_ptr<void>>& malloc_owner,
ColumnCacheMap& column_cache);
Expand All @@ -88,12 +90,14 @@ class ColumnFetcher {
const std::map<int, const TableFragments*>& all_tables_fragments,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const;
DeviceAllocator* device_allocator,
const size_t thread_idx) const;

const int8_t* getResultSetColumn(const InputColDescriptor* col_desc,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const;
DeviceAllocator* device_allocator,
const size_t thread_idx) const;

private:
static const int8_t* transferColumnIfNeeded(
Expand All @@ -109,7 +113,8 @@ class ColumnFetcher {
const int col_id,
const Data_Namespace::MemoryLevel memory_level,
const int device_id,
DeviceAllocator* device_allocator) const;
DeviceAllocator* device_allocator,
const size_t thread_idx) const;

Executor* executor_;
using CacheKey = std::vector<int>;
Expand Down
17 changes: 11 additions & 6 deletions QueryEngine/ColumnarResults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
const ResultSet& rows,
const size_t num_columns,
const std::vector<SQLTypeInfo>& target_types,
const size_t thread_idx,
const bool is_parallel_execution_enforced)
: column_buffers_(num_columns)
, num_rows_(result_set::use_parallel_algorithms(rows) ||
Expand All @@ -54,7 +55,8 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
, parallel_conversion_(is_parallel_execution_enforced
? true
: result_set::use_parallel_algorithms(rows))
, direct_columnar_conversion_(rows.isDirectColumnarConversionPossible()) {
, direct_columnar_conversion_(rows.isDirectColumnarConversionPossible())
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);
column_buffers_.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i) {
Expand All @@ -67,8 +69,8 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
}
if (!isDirectColumnarConversionPossible() ||
!rows.isZeroCopyColumnarConversionPossible(i)) {
column_buffers_[i] =
row_set_mem_owner->allocate(num_rows_ * target_types[i].get_size());
column_buffers_[i] = row_set_mem_owner->allocate(
num_rows_ * target_types[i].get_size(), thread_idx_);
}
}

Expand All @@ -82,12 +84,14 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const SQLTypeInfo& target_type)
const SQLTypeInfo& target_type,
const size_t thread_idx)
: column_buffers_(1)
, num_rows_(num_rows)
, target_types_{target_type}
, parallel_conversion_(false)
, direct_columnar_conversion_(false) {
, direct_columnar_conversion_(false)
, thread_idx_(thread_idx) {
auto timer = DEBUG_TIMER(__func__);
const bool is_varlen =
target_type.is_array() ||
Expand All @@ -97,7 +101,8 @@ ColumnarResults::ColumnarResults(std::shared_ptr<RowSetMemoryOwner> row_set_mem_
throw ColumnarConversionNotSupported();
}
const auto buf_size = num_rows * target_type.get_size();
column_buffers_[0] = reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size));
column_buffers_[0] =
reinterpret_cast<int8_t*>(row_set_mem_owner->allocate(buf_size, thread_idx_));
memcpy(((void*)column_buffers_[0]), one_col_buffer, buf_size);
}

Expand Down
12 changes: 7 additions & 5 deletions QueryEngine/ColumnarResults.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ class ColumnarResults {
const ResultSet& rows,
const size_t num_columns,
const std::vector<SQLTypeInfo>& target_types,
const size_t thread_idx,
const bool is_parallel_execution_enforced = false);

ColumnarResults(const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const int8_t* one_col_buffer,
const size_t num_rows,
const SQLTypeInfo& target_type);
const SQLTypeInfo& target_type,
const size_t thread_idx);

static std::unique_ptr<ColumnarResults> mergeResults(
const std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
Expand Down Expand Up @@ -188,10 +190,10 @@ class ColumnarResults {
const std::vector<bool>& targets_to_skip = {});

const std::vector<SQLTypeInfo> target_types_;
bool parallel_conversion_; // multi-threaded execution of columnar conversion
bool
direct_columnar_conversion_; // whether columnar conversion might happen directly
// with minimal ussage of result set's iterator access
bool parallel_conversion_; // multi-threaded execution of columnar conversion
bool direct_columnar_conversion_; // whether columnar conversion might happen directly
// with minimal ussage of result set's iterator access
size_t thread_idx_;
};

using ColumnCacheMap =
Expand Down
2 changes: 2 additions & 0 deletions QueryEngine/Descriptors/QueryMemoryDescriptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ std::unique_ptr<QueryExecutionContext> QueryMemoryDescriptor::getQueryExecutionC
std::shared_ptr<RowSetMemoryOwner> row_set_mem_owner,
const bool output_columnar,
const bool sort_on_gpu,
const size_t thread_idx,
RenderInfo* render_info) const {
auto timer = DEBUG_TIMER(__func__);
if (frag_offsets.empty()) {
Expand All @@ -645,6 +646,7 @@ std::unique_ptr<QueryExecutionContext> QueryMemoryDescriptor::getQueryExecutionC
row_set_mem_owner,
output_columnar,
sort_on_gpu,
thread_idx,
render_info));
}

Expand Down
1 change: 1 addition & 0 deletions QueryEngine/Descriptors/QueryMemoryDescriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class QueryMemoryDescriptor {
std::shared_ptr<RowSetMemoryOwner>,
const bool output_columnar,
const bool sort_on_gpu,
const size_t thread_idx,
RenderInfo*) const;

static bool many_entries(const int64_t max_val,
Expand Down
29 changes: 18 additions & 11 deletions QueryEngine/Descriptors/RowSetMemoryOwner.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,27 @@ class ResultSet;
*/
class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
public:
RowSetMemoryOwner(const size_t arena_block_size)
: arena_block_size_(arena_block_size)
, allocator_(std::make_unique<Arena>(arena_block_size)) {}
RowSetMemoryOwner(const size_t arena_block_size, const size_t num_kernel_threads = 0)
: arena_block_size_(arena_block_size) {
for (size_t i = 0; i < num_kernel_threads + 1; i++) {
allocators_.emplace_back(std::make_unique<Arena>(arena_block_size));
}
CHECK(!allocators_.empty());
}

int8_t* allocate(const size_t num_bytes) override {
CHECK(allocator_);
int8_t* allocate(const size_t num_bytes, const size_t thread_idx = 0) override {
CHECK_LT(thread_idx, allocators_.size());
auto allocator = allocators_[thread_idx].get();
std::lock_guard<std::mutex> lock(state_mutex_);
return reinterpret_cast<int8_t*>(allocator_->allocate(num_bytes));
return reinterpret_cast<int8_t*>(allocator->allocate(num_bytes));
}

int8_t* allocateCountDistinctBuffer(const size_t num_bytes) {
CHECK(allocator_);
int8_t* allocateCountDistinctBuffer(const size_t num_bytes,
const size_t thread_idx = 0) {
CHECK_LT(thread_idx, allocators_.size());
auto allocator = allocators_[thread_idx].get();
std::lock_guard<std::mutex> lock(state_mutex_);
auto ret = reinterpret_cast<int8_t*>(allocator_->allocateAndZero(num_bytes));
auto ret = reinterpret_cast<int8_t*>(allocator->allocateAndZero(num_bytes));
count_distinct_bitmaps_.emplace_back(
CountDistinctBitmapBuffer{ret, num_bytes, /*physical_buffer=*/true});
return ret;
Expand Down Expand Up @@ -172,7 +179,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
}

std::shared_ptr<RowSetMemoryOwner> cloneStrDictDataOnly() {
auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_);
auto rtn = std::make_shared<RowSetMemoryOwner>(arena_block_size_, /*num_kernels=*/1);
rtn->str_dict_proxy_owned_ = str_dict_proxy_owned_;
rtn->lit_str_dict_proxy_ = lit_str_dict_proxy_;
return rtn;
Expand Down Expand Up @@ -209,7 +216,7 @@ class RowSetMemoryOwner final : public SimpleAllocator, boost::noncopyable {
std::vector<std::unique_ptr<quantile::TDigest>> t_digests_;

size_t arena_block_size_; // for cloning
std::unique_ptr<Arena> allocator_;
std::vector<std::unique_ptr<Arena>> allocators_;

mutable std::mutex state_mutex_;

Expand Down
Loading

0 comments on commit 21d3f6a

Please sign in to comment.