diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java index 3b90f8eebfc0..3dbfe3920375 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/schema/wrapper/DataType.java @@ -143,15 +143,16 @@ public static DataType parseFromDataType( return DataType.DOUBLE; case STRING: + return DataType.STRING; case DATE: - return DataType.STRING; + return DataType.DATE; case TIME: - return DataType.STRING; + return DataType.TIME32; case TIMESTAMP: - return DataType.STRING; + return DataType.TIMESTAMP; case BYTES: return DataType.BYTES; diff --git a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/graph_builder_ffi.cc b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/graph_builder_ffi.cc index 45001fb7c4df..edee0601f64e 100644 --- a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/graph_builder_ffi.cc +++ b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/graph_builder_ffi.cc @@ -328,7 +328,7 @@ void v6d_free_string(char *s) { Schema v6d_create_schema_builder() { return new vineyard::htap::MGPropertyGraphSchema(); } VertexTypeBuilder v6d_build_vertex_type(Schema schema, LabelId label, - const char *name) { + const char *name) { #ifndef NDEBUG LOG(INFO) << "add vertex type: " << label << " -> " << name; #endif @@ -337,7 +337,7 @@ VertexTypeBuilder v6d_build_vertex_type(Schema schema, LabelId label, } EdgeTypeBuilder v6d_build_edge_type(Schema schema, LabelId label, - const char *name) { + const char *name) { #ifndef NDEBUG LOG(INFO) << "add edge type: " << label << " -> " << name; #endif @@ -355,7 +355,7 @@ static bool v6d_entry_has_property(vineyard::Entry *entry, std::string const &na } int v6d_build_vertex_property(VertexTypeBuilder vertex, PropertyId id, - const char *name, ::PropertyType prop_type) { + const char *name, ::PropertyType prop_type) { #ifndef NDEBUG LOG(INFO) << "add vertex property: " << id << " -> " << name << ": " << prop_type; diff --git a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.cc b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.cc index efc9cc1b8c6c..fa25ac368274 100644 --- a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.cc +++ b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.cc @@ -35,6 +35,21 @@ std::shared_ptr PropertyTypeToDataType( {SHORT, arrow::int16()}, {INT, arrow::int32()}, {LONG, arrow::int64()}, {FLOAT, arrow::float32()}, {DOUBLE, arrow::float64()}, {STRING, arrow::large_utf8()}, + {DATE32, arrow::date32()}, {DATE64, arrow::date64()}, + {TIME32_S, arrow::time32(arrow::TimeUnit::SECOND)}, + {TIME32_MS, arrow::time32(arrow::TimeUnit::MILLI)}, + // arrow reject this + // {TIME32_US, arrow::time32(arrow::TimeUnit::MICRO)}, + // {TIME32_NS, arrow::time32(arrow::TimeUnit::NANO)}, + // arrow reject this + // {TIME64_S, arrow::time64(arrow::TimeUnit::SECOND)}, + // {TIME64_MS, arrow::time64(arrow::TimeUnit::MILLI)}, + {TIME64_US, arrow::time64(arrow::TimeUnit::MICRO)}, + {TIME64_NS, arrow::time64(arrow::TimeUnit::NANO)}, + {TIMESTAMP_S, arrow::timestamp(arrow::TimeUnit::SECOND)}, + {TIMESTAMP_MS, arrow::timestamp(arrow::TimeUnit::MILLI)}, + {TIMESTAMP_US, arrow::timestamp(arrow::TimeUnit::MICRO)}, + {TIMESTAMP_NS, arrow::timestamp(arrow::TimeUnit::NANO)}, }; auto iter = parse_type_dict.find(type); if (iter != parse_type_dict.end()) { @@ -66,6 +81,43 @@ ::PropertyType PropertyTypeFromDataType(std::shared_ptr const & return STRING; case arrow::Type::BINARY: return BYTES; + case arrow::Type::DATE32: + return DATE32; + case arrow::Type::DATE64: + return DATE64; + case arrow::Type::TIME32: + switch (static_cast(type.get())->unit()) { + case arrow::TimeUnit::SECOND: + return TIME32_S; + case arrow::TimeUnit::MILLI: + return TIME32_MS; + case arrow::TimeUnit::MICRO: + return TIME32_US; + case arrow::TimeUnit::NANO: + return TIME32_NS; + } + case arrow::Type::TIME64: + switch (static_cast(type.get())->unit()) { + case arrow::TimeUnit::SECOND: + return TIME64_S; + case arrow::TimeUnit::MILLI: + return TIME64_MS; + case arrow::TimeUnit::MICRO: + return TIME64_US; + case arrow::TimeUnit::NANO: + return TIME64_NS; + } + case arrow::Type::TIMESTAMP: + switch (static_cast(type.get())->unit()) { + case arrow::TimeUnit::SECOND: + return TIMESTAMP_S; + case arrow::TimeUnit::MILLI: + return TIMESTAMP_MS; + case arrow::TimeUnit::MICRO: + return TIMESTAMP_US; + case arrow::TimeUnit::NANO: + return TIMESTAMP_NS; + } default: LOG(ERROR) << "Unknown arrow data type: " << type->ToString(); return INVALID; @@ -93,7 +145,7 @@ static std::shared_ptr ToArrowSchema( kv->Append("label_index", std::to_string(entry.id)); std::vector> fields; for (auto const& prop : entry.props_) { - LOG(INFO) << "prop.id = " << prop.id << ", " << prop.name << " -> " << prop.type; + LOG(INFO) << "prop.id = " << prop.id << ", " << prop.name << " -> " << prop.type->ToString(); fields.emplace_back(PropertyToField(prop)); } return std::make_shared(fields, kv); @@ -103,26 +155,34 @@ PropertyTableAppender::PropertyTableAppender( std::shared_ptr schema) { for (const auto& field : schema->fields()) { std::shared_ptr type = field->type(); - if (type == arrow::boolean()) { + if (arrow::null()->Equals(type)) { + funcs_.push_back(AppendProperty::append); + } else if (arrow::boolean()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::int8()) { + } else if (arrow::int8()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::int16()) { + } else if (arrow::int16()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::int32()) { + } else if (arrow::int32()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::int64()) { + } else if (arrow::int64()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::float32()) { + } else if (arrow::float32()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::float64()) { + } else if (arrow::float64()->Equals(type)) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::large_utf8()) { + } else if (arrow::large_utf8()->Equals(type)) { funcs_.push_back(AppendProperty::append); + } else if (arrow::date32()->Equals(type)) { + funcs_.push_back(AppendProperty::append); + } else if (arrow::date64()->Equals(type)) { + funcs_.push_back(AppendProperty::append); + } else if (type->id() == arrow::Type::TIME32) { + funcs_.push_back(AppendProperty::append); + } else if (type->id() == arrow::Type::TIME64) { + funcs_.push_back(AppendProperty::append); } else if (type->id() == arrow::Type::TIMESTAMP) { funcs_.push_back(AppendProperty::append); - } else if (type == arrow::null()) { - funcs_.push_back(AppendProperty::append); } else { LOG(FATAL) << "Datatype [" << type->ToString() << "] not implemented..."; } @@ -207,7 +267,8 @@ void PropertyTableAppender::Apply( << properties[i].type << ", len = " << properties[i].len - << ", prop_id = " << properties[i].id; + << ", prop_id = " << properties[i].id + << ", builder type = " << builder->GetField(index)->type()->ToString(); #endif funcs_[index](builder->GetField(index), properties + i); } diff --git a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.h b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.h index d5ccfd0d0826..fd4ee4a29aa6 100644 --- a/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.h +++ b/interactive_engine/executor/store/global_query/src/store_impl/v6d/native/property_graph_stream.h @@ -139,6 +139,51 @@ struct AppendProperty { } }; +template <> +struct AppendProperty { + static void append(arrow::ArrayBuilder* builder, Property const* prop) { + vineyard::htap::htap_types::PodProperties pp; + pp.long_value = prop->len; + CHECK_ARROW_ERROR(dynamic_cast(builder)->Append(pp.int_value)); + } +}; + +template <> +struct AppendProperty { + static void append(arrow::ArrayBuilder* builder, Property const* prop) { + vineyard::htap::htap_types::PodProperties pp; + pp.long_value = prop->len; + CHECK_ARROW_ERROR(dynamic_cast(builder)->Append(pp.long_value)); + } +}; + +template <> +struct AppendProperty { + static void append(arrow::ArrayBuilder* builder, Property const* prop) { + vineyard::htap::htap_types::PodProperties pp; + pp.long_value = prop->len; + CHECK_ARROW_ERROR(dynamic_cast(builder)->Append(pp.int_value)); + } +}; + +template <> +struct AppendProperty { + static void append(arrow::ArrayBuilder* builder, Property const* prop) { + vineyard::htap::htap_types::PodProperties pp; + pp.long_value = prop->len; + CHECK_ARROW_ERROR(dynamic_cast(builder)->Append(pp.long_value)); + } +}; + +template <> +struct AppendProperty { + static void append(arrow::ArrayBuilder* builder, Property const* prop) { + vineyard::htap::htap_types::PodProperties pp; + pp.long_value = prop->len; + CHECK_ARROW_ERROR(dynamic_cast(builder)->Append(pp.long_value)); + } +}; + template <> struct AppendProperty { static void append(arrow::ArrayBuilder* builder, Property const* prop) { @@ -210,7 +255,7 @@ class PropertyGraphOutStream : public Registered { auto stream_id = StreamBuilder::Make(client, params); s->vertex_stream_ = client.GetObject(stream_id); - client.Persist(s->vertex_stream_->id()); + VINEYARD_DISCARD(client.Persist(s->vertex_stream_->id())); // Don't "OpenWriter" when creating, it will be "Get and Construct" again // VINEYARD_CHECK_OK(s->vertex_stream_->OpenWriter(client, s->vertex_writer_)); } @@ -221,7 +266,7 @@ class PropertyGraphOutStream : public Registered { auto stream_id = StreamBuilder::Make(client, params); s->edge_stream_ = client.GetObject(stream_id); - client.Persist(s->edge_stream_->id()); + VINEYARD_DISCARD(client.Persist(s->edge_stream_->id())); // Don't "OpenWriter" when creating, it will be "Get and Construct" again // VINEYARD_CHECK_OK(s->edge_stream_->OpenWriter(client, s->edge_writer_)); } diff --git a/interactive_engine/executor/store/global_query/src/store_impl/v6d/read_ffi.rs b/interactive_engine/executor/store/global_query/src/store_impl/v6d/read_ffi.rs index 8edb08b5d8a7..0f48a90b2f85 100644 --- a/interactive_engine/executor/store/global_query/src/store_impl/v6d/read_ffi.rs +++ b/interactive_engine/executor/store/global_query/src/store_impl/v6d/read_ffi.rs @@ -856,17 +856,17 @@ impl WriteNativeProperty { Object::DateFormat(DateTimeFormats::Time(v)) => { let u = PropertyUnion { i: (v.num_seconds_from_midnight() * 1000) as i32 - + (v.nanosecond() / 1000000) as i32, + + (v.nanosecond() / 1000_000) as i32, }; (PropertyType::Time32MS, vec![], unsafe { u.l }) } Object::DateFormat(DateTimeFormats::DateTime(v)) => { - let u = PropertyUnion { l: v.timestamp_millis() }; - (PropertyType::TimestampMS, vec![], unsafe { u.l }) + let u = PropertyUnion { l: v.timestamp_nanos() }; + (PropertyType::TimestampNS, vec![], unsafe { u.l }) } Object::DateFormat(DateTimeFormats::DateTimeWithTz(v)) => { - let u = PropertyUnion { l: v.timestamp_millis() }; - (PropertyType::TimestampMS, vec![], unsafe { u.l }) + let u = PropertyUnion { l: v.timestamp_nanos() }; + (PropertyType::TimestampNS, vec![], unsafe { u.l }) } _ => { panic!("Unsupported object type: {:?}", property) @@ -932,8 +932,8 @@ impl PropertyType { RawType::String => PropertyType::String, RawType::Date => PropertyType::Date32, RawType::Time => PropertyType::Time32MS, - RawType::DateTime => PropertyType::TimestampMS, - RawType::DateTimeWithTz => PropertyType::TimestampMS, + RawType::DateTime => PropertyType::TimestampNS, + RawType::DateTimeWithTz => PropertyType::TimestampNS, RawType::Blob(_) => PropertyType::Bytes, _ => { unimplemented!("Unsupported data type {:?}", raw_type) @@ -984,7 +984,7 @@ impl PropertyType { common_pb::DataType::StringArray => PropertyType::StringList, common_pb::DataType::Date32 => PropertyType::Date32, common_pb::DataType::Time32 => PropertyType::Time32MS, - common_pb::DataType::Timestamp => PropertyType::TimestampMS, + common_pb::DataType::Timestamp => PropertyType::TimestampNS, _ => { unimplemented!("Unsupported data type {:?}", raw_type) } @@ -1038,7 +1038,7 @@ impl PropertyType { DataType::ListFloat => PropertyType::FloatList, DataType::ListDouble => PropertyType::DoubleList, DataType::ListString => PropertyType::StringList, - DataType::Date => PropertyType::TimestampMS, + DataType::Date => PropertyType::TimestampNS, _ => { unimplemented!("Unsupported data type {:?}", data_type) } diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 1911d41bcbc7..f5e914aa6c65 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -502,25 +502,25 @@ def _unify_str_type(t): # noqa: C901 return graph_def_pb2.DataTypePb.STRING elif t == "bytes": return graph_def_pb2.DataTypePb.BYTES - elif t == "date32[day]": + elif t == "date32[day]" or t == "date[32][day]" or t == "date32" or t == "date[32]": return graph_def_pb2.DataTypePb.DATE32 - elif t == "date64[ms]": + elif t == "date64[ms]" or t == "date[64][ms]" or t == "date64" or t == "date[64]": return graph_def_pb2.DataTypePb.DATE64 - elif t == "time32[s]": + elif t == "time32[s]" or t == "time[32][s]": return graph_def_pb2.DataTypePb.TIME32_S - elif t == "time32[ms]": + elif t == "time32[ms]" or t == "time[32][ms]": return graph_def_pb2.DataTypePb.TIME32_MS - elif t == "time32[us]": + elif t == "time32[us]" or t == "time[32][us]": return graph_def_pb2.DataTypePb.TIME32_US - elif t == "time32[ns]": + elif t == "time32[ns]" or t == "time[32][ns]": return graph_def_pb2.DataTypePb.TIME32_NS - elif t == "time64[s]": + elif t == "time64[s]" or t == "time[64][s]": return graph_def_pb2.DataTypePb.TIME64_S - elif t == "time64[ms]": + elif t == "time64[ms]" or t == "time[64][ms]": return graph_def_pb2.DataTypePb.TIME64_MS - elif t == "time64[us]": + elif t == "time64[us]" or t == "time[64][us]": return graph_def_pb2.DataTypePb.TIME64_US - elif t == "time64[ns]": + elif t == "time64[ns]" or t == "time[64][ns]": return graph_def_pb2.DataTypePb.TIME64_NS elif t.startswith("timestamp[s]"): return graph_def_pb2.DataTypePb.TIMESTAMP_S diff --git a/python/graphscope/tests/minitest/test_min.py b/python/graphscope/tests/minitest/test_min.py index 964e3e66d747..76bb56392bd5 100644 --- a/python/graphscope/tests/minitest/test_min.py +++ b/python/graphscope/tests/minitest/test_min.py @@ -447,4 +447,8 @@ def check_edge_values(edges): check_node_values(nodes) check_edge_values(edges) + # check subgraph + g1 = interactive.subgraph("g.E()") + logger.info("subgraph = %s", g1.schema) + session.close()