Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): Align Data Type in GIE Physical Pb with Flex #4367

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,9 @@ jobs:
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d ${TMP_INTERACTIVE_WORKSPACE}/data/modern_graph/indices/
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
sed -i 's/interactive_workspace/temp_workspace/g' ./interactive_config_test.yaml
# set thread_num_per_worker to 4
sed -i 's/thread_num_per_worker: 1/thread_num_per_worker: 4/g' ./interactive_config_test.yaml
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml java
bash hqps_sdk_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml python
sed -i 's/temp_workspace/interactive_workspace/g' ./interactive_config_test.yaml
sed -i 's/thread_num_per_worker: 4/thread_num_per_worker: 1/g' ./interactive_config_test.yaml

- name: Robustness test
env:
Expand Down
19 changes: 16 additions & 3 deletions flex/codegen/src/codegen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,23 @@ std::string generate_output_list(std::string input_name, int32_t input_size,
// check type consistent
bool data_type_consistent(const common::DataType& left,
const common::DataType& right) {
if (left == common::DataType::NONE || right == common::DataType::NONE) {
return true;
if (left.item_case() == common::DataType::ITEM_NOT_SET) {
return false;
}
if (left.item_case() != right.item_case()) {
return false;
}
if (left.item_case() == common::DataType::kPrimitiveType) {
return left.primitive_type() == right.primitive_type();
} else if (left.item_case() == common::DataType::kArray ||
left.item_case() == common::DataType::kMap) {
LOG(FATAL) << "Not support list or map type";
} else if (left.item_case() == common::DataType::kString) {
return true; // string type is always consistent
} else {
LOG(FATAL) << "Unexpected data type";
return false;
}
return left == right;
}

std::tuple<std::string, std::string> decode_param_from_decoder(
Expand Down
117 changes: 75 additions & 42 deletions flex/codegen/src/graph_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <type_traits>

#include "flex/codegen/src/string_utils.h"
#include "flex/proto_generated_gie/basic_type.pb.h"
#include "flex/proto_generated_gie/common.pb.h"
#include "glog/logging.h"
#include "google/protobuf/any.h"
Expand Down Expand Up @@ -63,62 +64,93 @@ inline bool operator==(const ParamConst& lhs, const ParamConst& rhs) {

} // namespace codegen

static codegen::DataType common_data_type_pb_2_data_type(
const common::DataType& data_type) {
switch (data_type) {
case common::DataType::INT32:
static codegen::DataType primitive_type_to_data_type(
const common::PrimitiveType& type) {
switch (type) {
case common::PrimitiveType::DT_SIGNED_INT32:
return codegen::DataType::kInt32;
case common::DataType::INT64:
case common::PrimitiveType::DT_SIGNED_INT64:
return codegen::DataType::kInt64;
case common::DataType::DOUBLE:
case common::PrimitiveType::DT_FLOAT:
return codegen::DataType::kFloat;
case common::PrimitiveType::DT_DOUBLE:
return codegen::DataType::kDouble;
case common::DataType::STRING:
return codegen::DataType::kString;
case common::DataType::INT64_ARRAY:
return codegen::DataType::kInt64Array;
case common::DataType::INT32_ARRAY:
return codegen::DataType::kInt32Array;
case common::DataType::BOOLEAN:
case common::PrimitiveType::DT_BOOL:
return codegen::DataType::kBoolean;
case common::DataType::DATE32:
return codegen::DataType::kDate;
case common::DataType::TIME32:
return codegen::DataType::kTime;
case common::DataType::TIMESTAMP:
return codegen::DataType::kTimeStamp;
default:
// LOG(FATAL) << "unknown primitive type";
throw std::runtime_error(
"unknown primitive type when converting primitive type to data type:" +
std::to_string(static_cast<int>(type)));
}
}

static codegen::DataType common_data_type_pb_2_data_type(
const common::DataType& data_type) {
switch (data_type.item_case()) {
case common::DataType::ItemCase::kPrimitiveType:
return primitive_type_to_data_type(data_type.primitive_type());
case common::DataType::ItemCase::kDecimal:
LOG(FATAL) << "Not support decimal type";
case common::DataType::ItemCase::kString:
return codegen::DataType::kString;
case common::DataType::ItemCase::kTemporal:
LOG(FATAL) << "Not support temporal type";
case common::DataType::ItemCase::kArray:
case common::DataType::ItemCase::kMap:
LOG(FATAL) << "Not support array or map type";
default:
// LOG(FATAL) << "unknown data type";
throw std::runtime_error(
"unknown data type when converting common_data_type to inner data "
"type:" +
std::to_string(static_cast<int>(data_type)));
data_type.DebugString());
}
}

static std::string single_common_data_type_pb_2_str(
const common::DataType& data_type) {
switch (data_type) {
case common::DataType::BOOLEAN:
return "bool";
case common::DataType::INT32:
static std::string primitive_type_to_str(const common::PrimitiveType& type) {
switch (type) {
case common::PrimitiveType::DT_SIGNED_INT32:
return "int32_t";
case common::DataType::INT64:
case common::PrimitiveType::DT_UNSIGNED_INT32:
return "uint32_t";
case common::PrimitiveType::DT_SIGNED_INT64:
return "int64_t";
case common::DataType::DOUBLE:
case common::PrimitiveType::DT_UNSIGNED_INT64:
return "uint64_t";
case common::PrimitiveType::DT_FLOAT:
return "float";
case common::PrimitiveType::DT_DOUBLE:
return "double";
case common::DataType::STRING:
case common::PrimitiveType::DT_BOOL:
return "bool";
default:
// LOG(FATAL) << "unknown primitive type";
throw std::runtime_error(
"unknown primitive type when converting primitive type to string:" +
std::to_string(static_cast<int>(type)));
}
}

static std::string single_common_data_type_pb_2_str(
const common::DataType& data_type) {
switch (data_type.item_case()) {
case common::DataType::ItemCase::kPrimitiveType:
return primitive_type_to_str(data_type.primitive_type());
case common::DataType::ItemCase::kDecimal:
LOG(FATAL) << "Not support decimal type";
case common::DataType::ItemCase::kString:
return "std::string_view";
case common::DataType::INT64_ARRAY:
return "std::vector<int64_t>";
case common::DataType::INT32_ARRAY:
return "std::vector<int32_t>";
case common::DataType::DATE32:
return "Date";
case common::DataType::ItemCase::kTemporal:
LOG(FATAL) << "Not support temporal type";
case common::DataType::ItemCase::kArray:
case common::DataType::ItemCase::kMap:
LOG(FATAL) << "Not support array or map type";
// TODO: support time32 and timestamp
default:
throw std::runtime_error(
"unknown data type when convert common data type to string:" +
std::to_string(static_cast<int>(data_type)));
data_type.DebugString());
}
}

Expand Down Expand Up @@ -266,21 +298,22 @@ static std::string data_type_2_rust_string(const codegen::DataType& data_type) {
}

static common::DataType common_value_2_data_type(const common::Value& value) {
common::DataType ret;
switch (value.item_case()) {
case common::Value::kI32:
return common::DataType::INT32;
ret.set_primitive_type(common::PrimitiveType::DT_SIGNED_INT32);
case common::Value::kI64:
return common::DataType::INT64;
ret.set_primitive_type(common::PrimitiveType::DT_SIGNED_INT64);
case common::Value::kBoolean:
return common::DataType::BOOLEAN;
ret.set_primitive_type(common::PrimitiveType::DT_BOOL);
case common::Value::kF64:
return common::DataType::DOUBLE;
ret.set_primitive_type(common::PrimitiveType::DT_DOUBLE);
case common::Value::kStr:
return common::DataType::STRING;
ret.mutable_string()->mutable_long_text();
default:
LOG(FATAL) << "unknown value" << value.DebugString();
}
return common::DataType::NONE;
return ret;
}

static void parse_param_const_from_pb(
Expand Down
5 changes: 2 additions & 3 deletions flex/codegen/src/hqps/hqps_case_when_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ class CaseWhenBuilder : public ExprBuilder {

std::string str = formater.str();

return std::make_tuple(
class_name_, construct_params_, tag_selectors_, str,
std::vector{common::DataType::DataType_INT_MIN_SENTINEL_DO_NOT_USE_});
return std::make_tuple(class_name_, construct_params_, tag_selectors_, str,
std::vector{common::DataType()});
}

protected:
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_edge_expand_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ static void BuildExprFromPredicate(BuildingContext& ctx,
std::string& func_construct_params_str,
std::string& property_selectors_str) {
auto expr_builder = ExprBuilder(ctx);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType type;
type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(type);
expr_builder.AddAllExprOpr(expr.operators());
std::string expr_code;
std::vector<codegen::ParamConst> func_call_param_const;
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_get_v_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ class GetVOpBuilder {

auto& expr_oprs = expr.operators();
expr_builder.AddAllExprOpr(expr_oprs);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType data_type;
data_type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(data_type);
std::vector<common::DataType> unused_expr_ret_type;
if (!expr_builder.empty()) {
std::tie(expr_name_, expr_call_param_, tag_properties_, expr_code_,
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_scan_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ class ScanOpBuilder {

// TODO: make expr_builder a member of ScanOpBuilder
// auto expr_builder = ExprBuilder(ctx_);
expr_builder_.set_return_type(common::DataType::BOOLEAN);
common::DataType type;
type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder_.set_return_type(common::DataType(type));
// Add extra (, ) to wrap the code, since we may append index_predicate
// afterwards.
common::ExprOpr left_brace, right_brace;
Expand Down
4 changes: 3 additions & 1 deletion flex/codegen/src/hqps/hqps_select_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ class SelectOpBuilder {

SelectOpBuilder& expr(const common::Expression expr) {
ExprBuilder expr_builder(ctx_);
expr_builder.set_return_type(common::DataType::BOOLEAN);
common::DataType data_type;
data_type.set_primitive_type(common::PrimitiveType::DT_BOOL);
expr_builder.set_return_type(data_type);
expr_builder.AddAllExprOpr(expr.operators());

std::string func_code;
Expand Down
29 changes: 19 additions & 10 deletions flex/codegen/src/pegasus/pegasus_order_by_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,29 @@ class OrderByOpBuilder {
ss << ".then(";
}
std::string cmp_type;
switch (data_type) {
case common::DataType::BOOLEAN:
case common::DataType::INT32:
case common::DataType::INT64:
case common::DataType::STRING: {
cmp_type = "cmp";
break;
switch (data_type.item_case()) {
case common::DataType::kPrimitiveType: {
switch (data_type.primitive_type()) {
case common::PrimitiveType::DT_BOOL:
case common::PrimitiveType::DT_SIGNED_INT32:
case common::PrimitiveType::DT_SIGNED_INT64:
cmp_type = "cmp";
break;
case common::PrimitiveType::DT_DOUBLE: {
cmp_type = "partial_cmp";
break;
}
default:
LOG(FATAL) << "Unsupported type "
<< static_cast<int32_t>(data_type.primitive_type());
}
}
case common::DataType::DOUBLE: {
cmp_type = "partial_cmp";
case common::DataType::kString: {
cmp_type = "cmp";
break;
}
default:
LOG(FATAL) << "Unsupported type " << data_type;
LOG(FATAL) << "Unsupported type " << data_type.DebugString();
}
std::string reverse_str;
if (ordering_pair_[i].order() == algebra::OrderBy_OrderingPair_Order::
Expand Down
24 changes: 17 additions & 7 deletions flex/codegen/src/pegasus/pegasus_project_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,24 @@ class ProjectOpBuilder {
ctx_.SetOutput(i, data_types);
} else if (column_meta.type().type_case() ==
common::IrDataType::kDataType) {
switch (column_meta.type().data_type()) {
case common::DataType::INT64: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kInt64);
ctx_.SetOutput(i, data_types);
break;
switch (column_meta.type().data_type().item_case()) {
case common::DataType::kPrimitiveType: {
auto data_type = column_meta.type().data_type().primitive_type();
switch (data_type) {
case common::PrimitiveType::DT_SIGNED_INT64: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kInt64);
ctx_.SetOutput(i, data_types);
break;
}
default: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kString);
ctx_.SetOutput(i, data_types);
}
}
}
case common::DataType::STRING: {
case common::DataType::kString: {
std::vector<codegen::DataType> data_types;
data_types.push_back(codegen::DataType::kString);
ctx_.SetOutput(i, data_types);
Expand Down
40 changes: 28 additions & 12 deletions flex/engines/graph_db/runtime/adhoc/expr_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "flex/engines/graph_db/runtime/adhoc/expr_impl.h"
#include <regex>
#include <stack>
#include "flex/proto_generated_gie/basic_type.pb.h"

namespace gs {

Expand Down Expand Up @@ -444,21 +445,36 @@ static RTAny parse_param(const common::DynamicParam& param,
common::IrDataType::TypeCase::kDataType) {
common::DataType dt = param.data_type().data_type();
const std::string& name = param.name();
if (dt == common::DataType::DATE32) {
int64_t val = std::stoll(input.at(name));
return RTAny::from_int64(val);
} else if (dt == common::DataType::STRING) {
if (dt.item_case() == common::DataType::ItemCase::kPrimitiveType) {
switch (dt.primitive_type()) {
case common::PrimitiveType::DT_SIGNED_INT32: {
int val = std::stoi(input.at(name));
return RTAny::from_int32(val);
}
case common::PrimitiveType::DT_SIGNED_INT64: {
int64_t val = std::stoll(input.at(name));
return RTAny::from_int64(val);
}
case common::PrimitiveType::DT_DOUBLE:
return RTAny::from_double(std::stod(input.at(name)));
case common::PrimitiveType::DT_BOOL:
return RTAny::from_bool(input.at(name) == "true");
default:
LOG(FATAL) << "not support type: " << dt.DebugString();
}
} else if (dt.item_case() == common::DataType::ItemCase::kTemporal) {
if (dt.temporal().item_case() == common::Temporal::kDate32) {
int64_t val = std::stoll(input.at(name));
return RTAny::from_int64(val);
} else {
LOG(FATAL) << "not support type: " << dt.temporal().DebugString();
}
} else if (dt.item_case() == common::DataType::ItemCase::kString) {
const std::string& val = input.at(name);
return RTAny::from_string(val);
} else if (dt == common::DataType::INT32) {
int val = std::stoi(input.at(name));
return RTAny::from_int32(val);
} else if (dt == common::DataType::INT64) {
int64_t val = std::stoll(input.at(name));
return RTAny::from_int64(val);
} else {
LOG(FATAL) << "not support type: " << dt.DebugString();
}

LOG(FATAL) << "not support type: " << common::DataType_Name(dt);
}
LOG(FATAL) << "graph data type not expected....";
return RTAny();
Expand Down
Loading
Loading