Skip to content

Commit

Permalink
Chunk class cleanup
Browse files Browse the repository at this point in the history
* Match style guide

* Nest under data mgr
  • Loading branch information
alexbaden authored and andrewseidl committed May 23, 2020
1 parent f1d0670 commit 07151c6
Show file tree
Hide file tree
Showing 29 changed files with 227 additions and 214 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ add_subdirectory(CudaMgr)
add_subdirectory(LockMgr)
add_subdirectory(MigrationMgr)
add_subdirectory(Fragmenter)
add_subdirectory(Chunk)
add_subdirectory(Shared)
add_subdirectory(Utils)
add_subdirectory(QueryRunner)
Expand All @@ -509,7 +508,7 @@ if(ENABLE_ODBC)
add_subdirectory(ODBC)
endif()

set(MAPD_LIBRARIES Shared Catalog SqliteConnector MigrationMgr TableArchiver Parser Analyzer CsvImport QueryRunner QueryEngine QueryState LockMgr DataMgr Fragmenter Chunk)
set(MAPD_LIBRARIES Shared Catalog SqliteConnector MigrationMgr TableArchiver Parser Analyzer CsvImport QueryRunner QueryEngine QueryState LockMgr DataMgr Fragmenter)

if("${MAPD_EDITION_LOWER}" STREQUAL "ee")
list(APPEND MAPD_LIBRARIES Distributed)
Expand Down
2 changes: 1 addition & 1 deletion Catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ if("${MAPD_EDITION_LOWER}" STREQUAL "ee")
add_subdirectory(ee)
endif()

target_link_libraries(Catalog SqliteConnector StringDictionary Fragmenter MigrationMgr Chunk ${AUTH_LIBRARIES} Calcite bcrypt)
target_link_libraries(Catalog SqliteConnector StringDictionary Fragmenter MigrationMgr ${AUTH_LIBRARIES} Calcite bcrypt)
if(ENABLE_KRB5)
target_link_libraries(Catalog krb5_gss)
endif()
4 changes: 0 additions & 4 deletions Chunk/CMakeLists.txt

This file was deleted.

4 changes: 2 additions & 2 deletions DataMgr/ArrayNoneEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,13 @@ class ArrayNoneEncoder : public Encoder {
initialized = array_encoder->initialized;
}

AbstractBuffer* get_index_buf() const { return index_buf; }
AbstractBuffer* getIndexBuf() const { return index_buf; }

Datum elem_min;
Datum elem_max;
bool has_nulls;
bool initialized;
void set_index_buf(AbstractBuffer* buf) {
void setIndexBuffer(AbstractBuffer* buf) {
std::unique_lock<std::mutex> lock(EncoderMutex_);
index_buf = buf;
}
Expand Down
1 change: 1 addition & 0 deletions DataMgr/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result")

set(datamgr_source_files
Chunk/Chunk.cpp
DataMgr.cpp
Encoder.cpp
StringNoneEncoder.cpp
Expand Down
138 changes: 71 additions & 67 deletions Chunk/Chunk.cpp → DataMgr/Chunk/Chunk.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 MapD Technologies, Inc.
* Copyright 2020 OmniSci, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,10 @@
* @author Wei Hong <[email protected]>
*/

#include "Chunk.h"
#include "../DataMgr/ArrayNoneEncoder.h"
#include "../DataMgr/FixedLengthArrayNoneEncoder.h"
#include "../DataMgr/StringNoneEncoder.h"
#include "DataMgr/Chunk/Chunk.h"
#include "DataMgr/ArrayNoneEncoder.h"
#include "DataMgr/FixedLengthArrayNoneEncoder.h"
#include "DataMgr/StringNoneEncoder.h"

namespace Chunk_NS {
std::shared_ptr<Chunk> Chunk::getChunk(const ColumnDescriptor* cd,
Expand All @@ -41,7 +41,8 @@ bool Chunk::isChunkOnDevice(DataMgr* data_mgr,
const ChunkKey& key,
const MemoryLevel mem_level,
const int device_id) {
if (column_desc->columnType.is_varlen() && !column_desc->columnType.is_fixlen_array()) {
if (column_desc_->columnType.is_varlen() &&
!column_desc_->columnType.is_fixlen_array()) {
ChunkKey subKey = key;
ChunkKey indexKey(subKey);
indexKey.push_back(1);
Expand All @@ -60,48 +61,49 @@ void Chunk::getChunkBuffer(DataMgr* data_mgr,
const int device_id,
const size_t num_bytes,
const size_t num_elems) {
if (column_desc->columnType.is_varlen() && !column_desc->columnType.is_fixlen_array()) {
if (column_desc_->columnType.is_varlen() &&
!column_desc_->columnType.is_fixlen_array()) {
ChunkKey subKey = key;
subKey.push_back(1); // 1 for the main buffer
buffer = data_mgr->getChunkBuffer(subKey, mem_level, device_id, num_bytes);
subKey.push_back(1); // 1 for the main buffer_
buffer_ = data_mgr->getChunkBuffer(subKey, mem_level, device_id, num_bytes);
subKey.pop_back();
subKey.push_back(2); // 2 for the index buffer
index_buf = data_mgr->getChunkBuffer(
subKey.push_back(2); // 2 for the index buffer_
index_buf_ = data_mgr->getChunkBuffer(
subKey,
mem_level,
device_id,
(num_elems + 1) * sizeof(StringOffsetT)); // always record n+1 offsets so string
// length can be calculated
switch (column_desc->columnType.get_type()) {
switch (column_desc_->columnType.get_type()) {
case kARRAY: {
ArrayNoneEncoder* array_encoder =
dynamic_cast<ArrayNoneEncoder*>(buffer->encoder.get());
array_encoder->set_index_buf(index_buf);
auto array_encoder = dynamic_cast<ArrayNoneEncoder*>(buffer_->encoder.get());
CHECK(array_encoder);
array_encoder->setIndexBuffer(index_buf_);
break;
}
case kTEXT:
case kVARCHAR:
case kCHAR: {
CHECK_EQ(kENCODING_NONE, column_desc->columnType.get_compression());
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
str_encoder->set_index_buf(index_buf);
CHECK_EQ(kENCODING_NONE, column_desc_->columnType.get_compression());
auto str_encoder = dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
CHECK(str_encoder);
str_encoder->setIndexBuffer(index_buf_);
break;
}
case kPOINT:
case kLINESTRING:
case kPOLYGON:
case kMULTIPOLYGON: {
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
str_encoder->set_index_buf(index_buf);
auto str_encoder = dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
CHECK(str_encoder);
str_encoder->setIndexBuffer(index_buf_);
break;
}
default:
CHECK(false);
UNREACHABLE();
}
} else {
buffer = data_mgr->getChunkBuffer(key, mem_level, device_id, num_bytes);
buffer_ = data_mgr->getChunkBuffer(key, mem_level, device_id, num_bytes);
}
}

Expand All @@ -110,15 +112,16 @@ void Chunk::createChunkBuffer(DataMgr* data_mgr,
const MemoryLevel mem_level,
const int device_id,
const size_t page_size) {
if (column_desc->columnType.is_varlen() && !column_desc->columnType.is_fixlen_array()) {
if (column_desc_->columnType.is_varlen() &&
!column_desc_->columnType.is_fixlen_array()) {
ChunkKey subKey = key;
subKey.push_back(1); // 1 for the main buffer
buffer = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
subKey.push_back(1); // 1 for the main buffer_
buffer_ = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
subKey.pop_back();
subKey.push_back(2); // 2 for the index buffer
index_buf = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
subKey.push_back(2); // 2 for the index buffer_
index_buf_ = data_mgr->createChunkBuffer(subKey, mem_level, device_id, page_size);
} else {
buffer = data_mgr->createChunkBuffer(key, mem_level, device_id, page_size);
buffer_ = data_mgr->createChunkBuffer(key, mem_level, device_id, page_size);
}
}

Expand All @@ -127,26 +130,26 @@ size_t Chunk::getNumElemsForBytesInsertData(const DataBlockPtr& src_data,
const size_t start_idx,
const size_t byte_limit,
const bool replicating) {
CHECK(column_desc->columnType.is_varlen());
switch (column_desc->columnType.get_type()) {
CHECK(column_desc_->columnType.is_varlen());
switch (column_desc_->columnType.get_type()) {
case kARRAY: {
if (column_desc->columnType.get_size() > 0) {
if (column_desc_->columnType.get_size() > 0) {
FixedLengthArrayNoneEncoder* array_encoder =
dynamic_cast<FixedLengthArrayNoneEncoder*>(buffer->encoder.get());
dynamic_cast<FixedLengthArrayNoneEncoder*>(buffer_->encoder.get());
return array_encoder->getNumElemsForBytesInsertData(
src_data.arraysPtr, start_idx, num_elems, byte_limit, replicating);
}
ArrayNoneEncoder* array_encoder =
dynamic_cast<ArrayNoneEncoder*>(buffer->encoder.get());
dynamic_cast<ArrayNoneEncoder*>(buffer_->encoder.get());
return array_encoder->getNumElemsForBytesInsertData(
src_data.arraysPtr, start_idx, num_elems, byte_limit, replicating);
}
case kTEXT:
case kVARCHAR:
case kCHAR: {
CHECK_EQ(kENCODING_NONE, column_desc->columnType.get_compression());
CHECK_EQ(kENCODING_NONE, column_desc_->columnType.get_compression());
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
return str_encoder->getNumElemsForBytesInsertData(
src_data.stringsPtr, start_idx, num_elems, byte_limit, replicating);
}
Expand All @@ -155,7 +158,7 @@ size_t Chunk::getNumElemsForBytesInsertData(const DataBlockPtr& src_data,
case kPOLYGON:
case kMULTIPOLYGON: {
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
return str_encoder->getNumElemsForBytesInsertData(
src_data.stringsPtr, start_idx, num_elems, byte_limit, replicating);
}
Expand All @@ -169,18 +172,18 @@ std::shared_ptr<ChunkMetadata> Chunk::appendData(DataBlockPtr& src_data,
const size_t num_elems,
const size_t start_idx,
const bool replicating) {
const auto& ti = column_desc->columnType;
const auto& ti = column_desc_->columnType;
if (ti.is_varlen()) {
switch (ti.get_type()) {
case kARRAY: {
if (ti.get_size() > 0) {
FixedLengthArrayNoneEncoder* array_encoder =
dynamic_cast<FixedLengthArrayNoneEncoder*>(buffer->encoder.get());
dynamic_cast<FixedLengthArrayNoneEncoder*>(buffer_->encoder.get());
return array_encoder->appendData(
src_data.arraysPtr, start_idx, num_elems, replicating);
}
ArrayNoneEncoder* array_encoder =
dynamic_cast<ArrayNoneEncoder*>(buffer->encoder.get());
dynamic_cast<ArrayNoneEncoder*>(buffer_->encoder.get());
return array_encoder->appendData(
src_data.arraysPtr, start_idx, num_elems, replicating);
}
Expand All @@ -189,7 +192,7 @@ std::shared_ptr<ChunkMetadata> Chunk::appendData(DataBlockPtr& src_data,
case kCHAR: {
CHECK_EQ(kENCODING_NONE, ti.get_compression());
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
return str_encoder->appendData(
src_data.stringsPtr, start_idx, num_elems, replicating);
}
Expand All @@ -198,52 +201,53 @@ std::shared_ptr<ChunkMetadata> Chunk::appendData(DataBlockPtr& src_data,
case kPOLYGON:
case kMULTIPOLYGON: {
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
return str_encoder->appendData(
src_data.stringsPtr, start_idx, num_elems, replicating);
}
default:
CHECK(false);
}
}
return buffer->encoder->appendData(src_data.numbersPtr, num_elems, ti, replicating);
return buffer_->encoder->appendData(src_data.numbersPtr, num_elems, ti, replicating);
}

void Chunk::unpin_buffer() {
if (buffer != nullptr) {
buffer->unPin();
void Chunk::unpinBuffer() {
if (buffer_) {
buffer_->unPin();
}
if (index_buf != nullptr) {
index_buf->unPin();
if (index_buf_) {
index_buf_->unPin();
}
}

void Chunk::init_encoder() {
buffer->initEncoder(column_desc->columnType);
if (column_desc->columnType.is_varlen() && !column_desc->columnType.is_fixlen_array()) {
switch (column_desc->columnType.get_type()) {
void Chunk::initEncoder() {
buffer_->initEncoder(column_desc_->columnType);
if (column_desc_->columnType.is_varlen() &&
!column_desc_->columnType.is_fixlen_array()) {
switch (column_desc_->columnType.get_type()) {
case kARRAY: {
ArrayNoneEncoder* array_encoder =
dynamic_cast<ArrayNoneEncoder*>(buffer->encoder.get());
array_encoder->set_index_buf(index_buf);
dynamic_cast<ArrayNoneEncoder*>(buffer_->encoder.get());
array_encoder->setIndexBuffer(index_buf_);
break;
}
case kTEXT:
case kVARCHAR:
case kCHAR: {
CHECK_EQ(kENCODING_NONE, column_desc->columnType.get_compression());
CHECK_EQ(kENCODING_NONE, column_desc_->columnType.get_compression());
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
str_encoder->set_index_buf(index_buf);
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
str_encoder->setIndexBuffer(index_buf_);
break;
}
case kPOINT:
case kLINESTRING:
case kPOLYGON:
case kMULTIPOLYGON: {
StringNoneEncoder* str_encoder =
dynamic_cast<StringNoneEncoder*>(buffer->encoder.get());
str_encoder->set_index_buf(index_buf);
dynamic_cast<StringNoneEncoder*>(buffer_->encoder.get());
str_encoder->setIndexBuffer(index_buf_);
break;
}
default:
Expand All @@ -256,17 +260,17 @@ ChunkIter Chunk::begin_iterator(const std::shared_ptr<ChunkMetadata>& chunk_meta
int start_idx,
int skip) const {
ChunkIter it;
it.type_info = column_desc->columnType;
it.type_info = column_desc_->columnType;
it.skip = skip;
it.skip_size = column_desc->columnType.get_size();
it.skip_size = column_desc_->columnType.get_size();
if (it.skip_size < 0) { // if it's variable length
it.current_pos = it.start_pos =
index_buf->getMemoryPtr() + start_idx * sizeof(StringOffsetT);
it.end_pos = index_buf->getMemoryPtr() + index_buf->size() - sizeof(StringOffsetT);
it.second_buf = buffer->getMemoryPtr();
index_buf_->getMemoryPtr() + start_idx * sizeof(StringOffsetT);
it.end_pos = index_buf_->getMemoryPtr() + index_buf_->size() - sizeof(StringOffsetT);
it.second_buf = buffer_->getMemoryPtr();
} else {
it.current_pos = it.start_pos = buffer->getMemoryPtr() + start_idx * it.skip_size;
it.end_pos = buffer->getMemoryPtr() + buffer->size();
it.current_pos = it.start_pos = buffer_->getMemoryPtr() + start_idx * it.skip_size;
it.end_pos = buffer_->getMemoryPtr() + buffer_->size();
it.second_buf = nullptr;
}
it.num_elems = chunk_metadata->numElements;
Expand Down
Loading

0 comments on commit 07151c6

Please sign in to comment.