Skip to content

Commit

Permalink
feat: support encode and decode string for Spark UnsafeRow format (#119)
Browse files Browse the repository at this point in the history
* Support encode and decode string for Spark UnsafeRow format
  • Loading branch information
tobegit3hub authored Jun 21, 2021
1 parent e958670 commit 9367683
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 48 deletions.
4 changes: 2 additions & 2 deletions include/codec/list_iterator_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class StringColumnImpl : public ColumnImpl<StringRef> {
int32_t addr_space = v1::GetAddrSpace(row.size(row_idx_));
StringRef value;
const char *buffer;
v1::GetStrFieldUnsafe(row.buf(row_idx_), str_field_offset_,
v1::GetStrFieldUnsafe(row.buf(row_idx_), col_idx_, str_field_offset_,
next_str_field_offset_, str_start_offset_,
addr_space, &buffer, &(value.size_));
value.data_ = buffer;
Expand All @@ -142,7 +142,7 @@ class StringColumnImpl : public ColumnImpl<StringRef> {
int32_t addr_space = v1::GetAddrSpace(row.size(row_idx_));
StringRef value;
const char *buffer;
v1::GetStrFieldUnsafe(buf, str_field_offset_,
v1::GetStrFieldUnsafe(buf, col_idx_, str_field_offset_,
next_str_field_offset_, str_start_offset_,
addr_space, &buffer, &(value.size_));
value.data_ = buffer;
Expand Down
23 changes: 5 additions & 18 deletions include/codec/type_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "base/fe_hash.h"
#include "base/mem_pool.h"
#include "glog/logging.h"

namespace hybridse {
namespace codec {
static const uint32_t SEED = 0xe17a1465;
Expand Down Expand Up @@ -217,23 +218,8 @@ static constexpr uint8_t SIZE_LENGTH = 4;
static constexpr uint8_t HEADER_LENGTH = VERSION_LENGTH + SIZE_LENGTH;

// calc the total row size with primary_size, str field count and str_size
inline uint32_t CalcTotalLength(uint32_t primary_size, uint32_t str_field_cnt,
uint32_t str_size, uint32_t* str_addr_space) {
uint32_t total_size = primary_size + str_size;
if (total_size + str_field_cnt <= UINT8_MAX) {
*str_addr_space = 1;
return total_size + str_field_cnt;
} else if (total_size + str_field_cnt * 2 <= UINT16_MAX) {
*str_addr_space = 2;
return total_size + str_field_cnt * 2;
} else if (total_size + str_field_cnt * 3 <= 1 << 24) {
*str_addr_space = 3;
return total_size + str_field_cnt * 3;
} else {
*str_addr_space = 4;
return total_size + str_field_cnt * 4;
}
}
uint32_t CalcTotalLength(uint32_t primary_size, uint32_t str_field_cnt,
uint32_t str_size, uint32_t* str_addr_space);

inline void AppendNullBit(int8_t* buf_ptr, uint32_t col_idx, int8_t is_null) {
int8_t* ptr = buf_ptr + HEADER_LENGTH + (col_idx >> 3);
Expand Down Expand Up @@ -432,7 +418,8 @@ inline double GetDoubleField(const int8_t* row, uint32_t idx, uint32_t offset,
}

// native get string field method
int32_t GetStrFieldUnsafe(const int8_t* row, uint32_t str_field_offset,
int32_t GetStrFieldUnsafe(const int8_t* row, uint32_t col_idx,
uint32_t str_field_offset,
uint32_t next_str_field_offset,
uint32_t str_start_offset, uint32_t addr_space,
const char** data, uint32_t* size);
Expand Down
22 changes: 15 additions & 7 deletions src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ namespace codec {

const uint32_t BitMapSize(uint32_t size) {
if (FLAGS_enable_spark_unsaferow_format) {
return 8;
// For UnsafeRow opt, the nullbit set increases by 8 bytes
return ((size >> 6) + !!(size&0x7f)) * 8;
} else {
return ((size) >> 3) + !!((size)&0x07);
return (size >> 3) + !!(size&0x07);
}
}

Expand Down Expand Up @@ -448,7 +449,7 @@ std::string RowView::GetStringUnsafe(uint32_t idx) {
}
const char* val;
uint32_t length;
v1::GetStrFieldUnsafe(row_, field_offset, next_str_field_offset,
v1::GetStrFieldUnsafe(row_, idx, field_offset, next_str_field_offset,
str_field_start_offset_, str_addr_length_, &val,
&length);
return std::string(val, length);
Expand Down Expand Up @@ -845,7 +846,7 @@ int32_t RowView::GetValue(const int8_t* row, uint32_t idx, const char** val,
if (offset_vec_.at(idx) < string_field_cnt_ - 1) {
next_str_field_offset = field_offset + 1;
}
return v1::GetStrFieldUnsafe(row, field_offset, next_str_field_offset,
return v1::GetStrFieldUnsafe(row, idx, field_offset, next_str_field_offset,
str_field_start_offset_, GetAddrLength(size),
val, length);
}
Expand All @@ -871,7 +872,7 @@ int32_t RowView::GetString(uint32_t idx, const char** val, uint32_t* length) {
if (offset_vec_.at(idx) < string_field_cnt_ - 1) {
next_str_field_offset = field_offset + 1;
}
return v1::GetStrFieldUnsafe(row_, field_offset, next_str_field_offset,
return v1::GetStrFieldUnsafe(row_, idx, field_offset, next_str_field_offset,
str_field_start_offset_, str_addr_length_, val,
length);
}
Expand Down Expand Up @@ -941,8 +942,15 @@ bool RowFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
DLOG(INFO) << "get string with offset " << offset << " next offset "
<< next_offset << " str_field_start_offset "
<< str_field_start_offset_ << " for col " << base_col_info.name;
*res = StringColInfo(base_col_info.name, ty, col_idx, offset, next_offset,
str_field_start_offset_);

if (FLAGS_enable_spark_unsaferow_format) {
// Notice that we pass the nullbitmap size as str_field_start_offset
*res = StringColInfo(base_col_info.name, ty, col_idx, offset, next_offset,
BitMapSize(schema_->size()));
} else {
*res = StringColInfo(base_col_info.name, ty, col_idx, offset, next_offset,
str_field_start_offset_);
}
return true;
}

Expand Down
57 changes: 57 additions & 0 deletions src/codec/fe_row_codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <vector>
#include "gtest/gtest.h"

DECLARE_bool(enable_spark_unsaferow_format);

namespace hybridse {
namespace codec {

Expand Down Expand Up @@ -649,6 +651,61 @@ TEST_F(CodecTest, RowFormatOffsetLongHeaderTest) {
ASSERT_EQ(50u, str_info.str_start_offset);
}
}
TEST_F(CodecTest, SparkUnsaferowBitMapSizeTest) {
FLAGS_enable_spark_unsaferow_format = false;
ASSERT_EQ(BitMapSize(3), 1);
ASSERT_EQ(BitMapSize(8), 1);
ASSERT_EQ(BitMapSize(9), 2);
ASSERT_EQ(BitMapSize(20), 3);
ASSERT_EQ(BitMapSize(65), 9);

FLAGS_enable_spark_unsaferow_format = true;
ASSERT_EQ(BitMapSize(3), 8);
ASSERT_EQ(BitMapSize(8), 8);
ASSERT_EQ(BitMapSize(9), 8);
ASSERT_EQ(BitMapSize(20), 8);
ASSERT_EQ(BitMapSize(65), 16);
}
TEST_F(CodecTest, SparkUnsaferowRowFormatTest) {
FLAGS_enable_spark_unsaferow_format = true;

std::vector<int> num_vec = {10, 20, 50, 100, 1000};
for (auto col_num : num_vec) {
::hybridse::type::TableDef def;
for (int i = 0; i < col_num; i++) {
::hybridse::type::ColumnDef* col = def.add_columns();
col->set_name("col" + std::to_string(i));
if (i % 3 == 0) {
col->set_type(::hybridse::type::kVarchar);
} else if (i % 3 == 1) {
col->set_type(::hybridse::type::kInt64);
} else if (i % 3 == 2) {
col->set_type(::hybridse::type::kDouble);
}
}

RowFormat decoder(&def.columns());
for (int i = 0; i < col_num; i++) {
if (i % 3 == 0) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);

codec::StringColInfo str_info;
ASSERT_TRUE(decoder.GetStringColumnInfo(i, &str_info));
} else if (i % 3 == 1) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kInt64, info->type);
} else if (i % 3 == 2) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kDouble, info->type);
}
}
}
}

} // namespace codec
} // namespace hybridse

Expand Down
67 changes: 65 additions & 2 deletions src/codec/type_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,41 @@
#include "glog/logging.h"
#include "proto/fe_type.pb.h"

DECLARE_bool(enable_spark_unsaferow_format);

namespace hybridse {
namespace codec {
namespace v1 {

using hybridse::codec::ListV;
using hybridse::codec::Row;

uint32_t CalcTotalLength(uint32_t primary_size, uint32_t str_field_cnt,
uint32_t str_size, uint32_t* str_addr_space) {
uint32_t total_size = primary_size + str_size;

// Support Spark UnsafeRow format where string field will take up 8 bytes
if (FLAGS_enable_spark_unsaferow_format) {
// Make sure each string column takes up 8 bytes
*str_addr_space = 8;
return total_size + str_field_cnt * 8;
}

if (total_size + str_field_cnt <= UINT8_MAX) {
*str_addr_space = 1;
return total_size + str_field_cnt;
} else if (total_size + str_field_cnt * 2 <= UINT16_MAX) {
*str_addr_space = 2;
return total_size + str_field_cnt * 2;
} else if (total_size + str_field_cnt * 3 <= 1 << 24) {
*str_addr_space = 3;
return total_size + str_field_cnt * 3;
} else {
*str_addr_space = 4;
return total_size + str_field_cnt * 4;
}
}

int32_t GetStrField(const int8_t* row, uint32_t idx, uint32_t str_field_offset,
uint32_t next_str_field_offset, uint32_t str_start_offset,
uint32_t addr_space, const char** data, uint32_t* size,
Expand All @@ -42,16 +70,33 @@ int32_t GetStrField(const int8_t* row, uint32_t idx, uint32_t str_field_offset,
return 0;
} else {
*is_null = false;
return GetStrFieldUnsafe(row, str_field_offset, next_str_field_offset,
return GetStrFieldUnsafe(row, idx, str_field_offset, next_str_field_offset,
str_start_offset, addr_space, data, size);
}
}

int32_t GetStrFieldUnsafe(const int8_t* row, uint32_t field_offset,
int32_t GetStrFieldUnsafe(const int8_t* row, uint32_t col_idx,
uint32_t field_offset,
uint32_t next_str_field_offset,
uint32_t str_start_offset, uint32_t addr_space,
const char** data, uint32_t* size) {
if (row == NULL || data == NULL || size == NULL) return -1;

// Support Spark UnsafeRow format
if (FLAGS_enable_spark_unsaferow_format) {
// For UnsafeRow opt, str_start_offset is the nullbitmap size
const uint32_t bitmap_size = str_start_offset;
const int8_t* row_with_col_offset = row + HEADER_LENGTH + bitmap_size + col_idx * 8;

// For Spark UnsafeRow, the first 32 bits is for length and the last
// 32 bits is for offset.
*size = *(reinterpret_cast<const uint32_t*>(row_with_col_offset));
uint32_t str_value_offset = *(reinterpret_cast<const uint32_t*>(row_with_col_offset + 4)) + HEADER_LENGTH;
*data = reinterpret_cast<const char*>(row + str_value_offset);

return 0;
}

const int8_t* row_with_offset = row + str_start_offset;
uint32_t str_offset = 0;
uint32_t next_str_offset = 0;
Expand Down Expand Up @@ -143,6 +188,24 @@ int32_t AppendString(int8_t* buf_ptr, uint32_t buf_size, uint32_t col_idx,
int8_t* val, uint32_t size, int8_t is_null,
uint32_t str_start_offset, uint32_t str_field_offset,
uint32_t str_addr_space, uint32_t str_body_offset) {

if (FLAGS_enable_spark_unsaferow_format) {
// TODO(chenjing): Refactor to support multiple codec instead of reusing the variable
// For UnsafeRow opt, str_start_offset is the nullbitmap size
const uint32_t bitmap_size = str_start_offset;
const uint32_t str_col_offset = HEADER_LENGTH + bitmap_size + col_idx * 8;

*(reinterpret_cast<uint32_t*>(buf_ptr + str_col_offset)) = size; // set size
// Notice that the offset in UnsafeRow should start without HybridSE header
*(reinterpret_cast<uint32_t*>(buf_ptr + str_col_offset + 4)) = str_body_offset - HEADER_LENGTH; // set offset

if (size != 0) {
memcpy(reinterpret_cast<char*>(buf_ptr + str_body_offset), val, size);
}

return str_body_offset + size;
}

if (is_null) {
AppendNullBit(buf_ptr, col_idx, true);
size_t str_addr_length = GetAddrLength(buf_size);
Expand Down
62 changes: 43 additions & 19 deletions src/codegen/buf_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "codegen/timestamp_ir_builder.h"
#include "glog/logging.h"

DECLARE_bool(enable_spark_unsaferow_format);

namespace hybridse {
namespace codegen {

Expand Down Expand Up @@ -258,22 +260,33 @@ BufNativeEncoderIRBuilder::BufNativeEncoderIRBuilder(
block_(block) {
str_field_start_offset_ = codec::GetStartOffset(schema_->size());
for (int32_t idx = 0; idx < schema_->size(); idx++) {
const ::hybridse::type::ColumnDef& column = schema_->Get(idx);
if (column.type() == ::hybridse::type::kVarchar) {
offset_vec_.push_back(str_field_cnt_);
str_field_cnt_++;
// Support Spark UnsafeRow format where all fields will take up 8 bytes
if (FLAGS_enable_spark_unsaferow_format) {
offset_vec_.push_back(str_field_start_offset_);
str_field_start_offset_ += 8;
const ::hybridse::type::ColumnDef& column = schema_->Get(idx);
if (column.type() == ::hybridse::type::kVarchar) {
str_field_cnt_++;
}
} else {
auto TYPE_SIZE_MAP = codec::GetTypeSizeMap();
auto it = TYPE_SIZE_MAP.find(column.type());
if (it == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << ::hybridse::type::Type_Name(column.type())
<< " is not supported";
const ::hybridse::type::ColumnDef& column = schema_->Get(idx);
if (column.type() == ::hybridse::type::kVarchar) {
offset_vec_.push_back(str_field_cnt_);
str_field_cnt_++;
} else {
offset_vec_.push_back(str_field_start_offset_);
DLOG(INFO) << "idx " << idx << " offset "
<< str_field_start_offset_;
str_field_start_offset_ += it->second;
auto TYPE_SIZE_MAP = codec::GetTypeSizeMap();
auto it = TYPE_SIZE_MAP.find(column.type());
if (it == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << ::hybridse::type::Type_Name(column.type())
<< " is not supported";
} else {
offset_vec_.push_back(str_field_start_offset_);
DLOG(INFO) << "idx " << idx << " offset "
<< str_field_start_offset_;
str_field_start_offset_ += it->second;
}
}

}
}
}
Expand Down Expand Up @@ -499,12 +512,23 @@ bool BufNativeEncoderIRBuilder::AppendString(
size_ty, // str_field_offset
size_ty, // str_addr_space
size_ty); // str_body_offset
*output = builder.CreateCall(
callee,
::llvm::ArrayRef<::llvm::Value*>{
i8_ptr, buf_size, val_field_idx, data_ptr, fe_str_size, is_null,
builder.getInt32(str_field_start_offset_),
builder.getInt32(str_field_idx), str_addr_space, str_body_offset});

if (FLAGS_enable_spark_unsaferow_format) {
*output = builder.CreateCall(
callee,
::llvm::ArrayRef<::llvm::Value*>{
i8_ptr, buf_size, val_field_idx, data_ptr, fe_str_size, is_null,
// Notice that we pass nullbitmap size as str_field_start_offset
builder.getInt32(codec::BitMapSize(schema_->size())),
builder.getInt32(str_field_idx), str_addr_space, str_body_offset});
} else {
*output = builder.CreateCall(
callee,
::llvm::ArrayRef<::llvm::Value*>{
i8_ptr, buf_size, val_field_idx, data_ptr, fe_str_size, is_null,
builder.getInt32(str_field_start_offset_),
builder.getInt32(str_field_idx), str_addr_space, str_body_offset});
}
return true;
}

Expand Down
Loading

0 comments on commit 9367683

Please sign in to comment.