Skip to content

Commit

Permalink
Fixes subgraph errors
Browse files Browse the repository at this point in the history
Signed-off-by: Tao He <[email protected]>
  • Loading branch information
sighingnow committed Jan 21, 2024
1 parent 08b52c9 commit 835d44e
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ public static DataType parseFromDataType(
case STRING:

case DATE:
return DataType.STRING;
return DataType.DATE;

case TIME:
return DataType.STRING;
return DataType.TIME32;

case TIMESTAMP:
return DataType.STRING;
return DataType.TIMESTAMP;

case BYTES:
return DataType.BYTES;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ void v6d_free_string(char *s) {
Schema v6d_create_schema_builder() { return new vineyard::htap::MGPropertyGraphSchema(); }

VertexTypeBuilder v6d_build_vertex_type(Schema schema, LabelId label,
const char *name) {
const char *name) {
#ifndef NDEBUG
LOG(INFO) << "add vertex type: " << label << " -> " << name;
#endif
Expand All @@ -337,7 +337,7 @@ VertexTypeBuilder v6d_build_vertex_type(Schema schema, LabelId label,
}

EdgeTypeBuilder v6d_build_edge_type(Schema schema, LabelId label,
const char *name) {
const char *name) {
#ifndef NDEBUG
LOG(INFO) << "add edge type: " << label << " -> " << name;
#endif
Expand All @@ -355,7 +355,7 @@ static bool v6d_entry_has_property(vineyard::Entry *entry, std::string const &na
}

int v6d_build_vertex_property(VertexTypeBuilder vertex, PropertyId id,
const char *name, ::PropertyType prop_type) {
const char *name, ::PropertyType prop_type) {
#ifndef NDEBUG
LOG(INFO) << "add vertex property: " << id << " -> " << name << ": "
<< prop_type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ std::shared_ptr<arrow::DataType> PropertyTypeToDataType(
{SHORT, arrow::int16()}, {INT, arrow::int32()},
{LONG, arrow::int64()}, {FLOAT, arrow::float32()},
{DOUBLE, arrow::float64()}, {STRING, arrow::large_utf8()},
{DATE32, arrow::date32()}, {DATE64, arrow::date64()},
{TIME32_S, arrow::time32(arrow::TimeUnit::SECOND)},
{TIME32_MS, arrow::time32(arrow::TimeUnit::MILLI)},
// arrow reject this
// {TIME32_US, arrow::time32(arrow::TimeUnit::MICRO)},
// {TIME32_NS, arrow::time32(arrow::TimeUnit::NANO)},
// arrow reject this
// {TIME64_S, arrow::time64(arrow::TimeUnit::SECOND)},
// {TIME64_MS, arrow::time64(arrow::TimeUnit::MILLI)},
{TIME64_US, arrow::time64(arrow::TimeUnit::MICRO)},
{TIME64_NS, arrow::time64(arrow::TimeUnit::NANO)},
{TIMESTAMP_S, arrow::timestamp(arrow::TimeUnit::SECOND)},
{TIMESTAMP_MS, arrow::timestamp(arrow::TimeUnit::MILLI)},
{TIMESTAMP_US, arrow::timestamp(arrow::TimeUnit::MICRO)},
{TIMESTAMP_NS, arrow::timestamp(arrow::TimeUnit::NANO)},
};
auto iter = parse_type_dict.find(type);
if (iter != parse_type_dict.end()) {
Expand Down Expand Up @@ -66,6 +81,43 @@ ::PropertyType PropertyTypeFromDataType(std::shared_ptr<arrow::DataType> const &
return STRING;
case arrow::Type::BINARY:
return BYTES;
case arrow::Type::DATE32:
return DATE32;
case arrow::Type::DATE64:
return DATE64;
case arrow::Type::TIME32:
switch (static_cast<arrow::Time32Type const *>(type.get())->unit()) {
case arrow::TimeUnit::SECOND:
return TIME32_S;
case arrow::TimeUnit::MILLI:
return TIME32_MS;
case arrow::TimeUnit::MICRO:
return TIME32_US;
case arrow::TimeUnit::NANO:
return TIME32_NS;
}
case arrow::Type::TIME64:
switch (static_cast<arrow::Time64Type const *>(type.get())->unit()) {
case arrow::TimeUnit::SECOND:
return TIME64_S;
case arrow::TimeUnit::MILLI:
return TIME64_MS;
case arrow::TimeUnit::MICRO:
return TIME64_US;
case arrow::TimeUnit::NANO:
return TIME64_NS;
}
case arrow::Type::TIMESTAMP:
switch (static_cast<arrow::TimestampType const *>(type.get())->unit()) {
case arrow::TimeUnit::SECOND:
return TIMESTAMP_S;
case arrow::TimeUnit::MILLI:
return TIMESTAMP_MS;
case arrow::TimeUnit::MICRO:
return TIMESTAMP_US;
case arrow::TimeUnit::NANO:
return TIMESTAMP_NS;
}
default:
LOG(ERROR) << "Unknown arrow data type: " << type->ToString();
return INVALID;
Expand Down Expand Up @@ -93,7 +145,7 @@ static std::shared_ptr<arrow::Schema> ToArrowSchema(
kv->Append("label_index", std::to_string(entry.id));
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto const& prop : entry.props_) {
LOG(INFO) << "prop.id = " << prop.id << ", " << prop.name << " -> " << prop.type;
LOG(INFO) << "prop.id = " << prop.id << ", " << prop.name << " -> " << prop.type->ToString();
fields.emplace_back(PropertyToField(prop));
}
return std::make_shared<arrow::Schema>(fields, kv);
Expand All @@ -103,26 +155,34 @@ PropertyTableAppender::PropertyTableAppender(
std::shared_ptr<arrow::Schema> schema) {
for (const auto& field : schema->fields()) {
std::shared_ptr<arrow::DataType> type = field->type();
if (type == arrow::boolean()) {
if (arrow::null()->Equals(type)) {
funcs_.push_back(AppendProperty<void>::append);
} else if (arrow::boolean()->Equals(type)) {
funcs_.push_back(AppendProperty<bool>::append);
} else if (type == arrow::int8()) {
} else if (arrow::int8()->Equals(type)) {
funcs_.push_back(AppendProperty<char>::append);
} else if (type == arrow::int16()) {
} else if (arrow::int16()->Equals(type)) {
funcs_.push_back(AppendProperty<int16_t>::append);
} else if (type == arrow::int32()) {
} else if (arrow::int32()->Equals(type)) {
funcs_.push_back(AppendProperty<int32_t>::append);
} else if (type == arrow::int64()) {
} else if (arrow::int64()->Equals(type)) {
funcs_.push_back(AppendProperty<int64_t>::append);
} else if (type == arrow::float32()) {
} else if (arrow::float32()->Equals(type)) {
funcs_.push_back(AppendProperty<float>::append);
} else if (type == arrow::float64()) {
} else if (arrow::float64()->Equals(type)) {
funcs_.push_back(AppendProperty<double>::append);
} else if (type == arrow::large_utf8()) {
} else if (arrow::large_utf8()->Equals(type)) {
funcs_.push_back(AppendProperty<std::string>::append);
} else if (arrow::date32()->Equals(type)) {
funcs_.push_back(AppendProperty<arrow::Date32Type>::append);
} else if (arrow::date64()->Equals(type)) {
funcs_.push_back(AppendProperty<arrow::Date64Type>::append);
} else if (type->id() == arrow::Type::TIME32) {
funcs_.push_back(AppendProperty<arrow::Time32Type>::append);
} else if (type->id() == arrow::Type::TIME64) {
funcs_.push_back(AppendProperty<arrow::Time64Type>::append);
} else if (type->id() == arrow::Type::TIMESTAMP) {
funcs_.push_back(AppendProperty<arrow::TimestampType>::append);
} else if (type == arrow::null()) {
funcs_.push_back(AppendProperty<void>::append);
} else {
LOG(FATAL) << "Datatype [" << type->ToString() << "] not implemented...";
}
Expand Down Expand Up @@ -207,7 +267,8 @@ void PropertyTableAppender::Apply(
<< properties[i].type
<< ", len = "
<< properties[i].len
<< ", prop_id = " << properties[i].id;
<< ", prop_id = " << properties[i].id
<< ", builder type = " << builder->GetField(index)->type()->ToString();
#endif
funcs_[index](builder->GetField(index), properties + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,51 @@ struct AppendProperty<std::string> {
}
};

template <>
struct AppendProperty<arrow::Date32Type> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
vineyard::htap::htap_types::PodProperties pp;
pp.long_value = prop->len;
CHECK_ARROW_ERROR(dynamic_cast<arrow::Date32Builder*>(builder)->Append(pp.int_value));
}
};

template <>
struct AppendProperty<arrow::Date64Type> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
vineyard::htap::htap_types::PodProperties pp;
pp.long_value = prop->len;
CHECK_ARROW_ERROR(dynamic_cast<arrow::Date64Builder*>(builder)->Append(pp.long_value));
}
};

template <>
struct AppendProperty<arrow::Time32Type> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
vineyard::htap::htap_types::PodProperties pp;
pp.long_value = prop->len;
CHECK_ARROW_ERROR(dynamic_cast<arrow::Time32Builder*>(builder)->Append(pp.int_value));
}
};

template <>
struct AppendProperty<arrow::Time64Type> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
vineyard::htap::htap_types::PodProperties pp;
pp.long_value = prop->len;
CHECK_ARROW_ERROR(dynamic_cast<arrow::Time64Builder*>(builder)->Append(pp.long_value));
}
};

template <>
struct AppendProperty<arrow::TimestampType> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
vineyard::htap::htap_types::PodProperties pp;
pp.long_value = prop->len;
CHECK_ARROW_ERROR(dynamic_cast<arrow::TimestampBuilder*>(builder)->Append(pp.long_value));
}
};

template <>
struct AppendProperty<void> {
static void append(arrow::ArrayBuilder* builder, Property const* prop) {
Expand Down Expand Up @@ -210,7 +255,7 @@ class PropertyGraphOutStream : public Registered<PropertyGraphOutStream> {
auto stream_id = StreamBuilder<RecordBatchStream>::Make(client, params);
s->vertex_stream_ = client.GetObject<RecordBatchStream>(stream_id);

client.Persist(s->vertex_stream_->id());
VINEYARD_DISCARD(client.Persist(s->vertex_stream_->id()));
// Don't "OpenWriter" when creating, it will be "Get and Construct" again
// VINEYARD_CHECK_OK(s->vertex_stream_->OpenWriter(client, s->vertex_writer_));
}
Expand All @@ -221,7 +266,7 @@ class PropertyGraphOutStream : public Registered<PropertyGraphOutStream> {
auto stream_id = StreamBuilder<RecordBatchStream>::Make(client, params);
s->edge_stream_ = client.GetObject<RecordBatchStream>(stream_id);

client.Persist(s->edge_stream_->id());
VINEYARD_DISCARD(client.Persist(s->edge_stream_->id()));
// Don't "OpenWriter" when creating, it will be "Get and Construct" again
// VINEYARD_CHECK_OK(s->edge_stream_->OpenWriter(client, s->edge_writer_));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,17 +856,17 @@ impl WriteNativeProperty {
Object::DateFormat(DateTimeFormats::Time(v)) => {
let u = PropertyUnion {
i: (v.num_seconds_from_midnight() * 1000) as i32
+ (v.nanosecond() / 1000000) as i32,
+ (v.nanosecond() / 1000_000) as i32,
};
(PropertyType::Time32MS, vec![], unsafe { u.l })
}
Object::DateFormat(DateTimeFormats::DateTime(v)) => {
let u = PropertyUnion { l: v.timestamp_millis() };
(PropertyType::TimestampMS, vec![], unsafe { u.l })
let u = PropertyUnion { l: v.timestamp_nanos() };
(PropertyType::TimestampNS, vec![], unsafe { u.l })
}
Object::DateFormat(DateTimeFormats::DateTimeWithTz(v)) => {
let u = PropertyUnion { l: v.timestamp_millis() };
(PropertyType::TimestampMS, vec![], unsafe { u.l })
let u = PropertyUnion { l: v.timestamp_nanos() };
(PropertyType::TimestampNS, vec![], unsafe { u.l })
}
_ => {
panic!("Unsupported object type: {:?}", property)
Expand Down Expand Up @@ -932,8 +932,8 @@ impl PropertyType {
RawType::String => PropertyType::String,
RawType::Date => PropertyType::Date32,
RawType::Time => PropertyType::Time32MS,
RawType::DateTime => PropertyType::TimestampMS,
RawType::DateTimeWithTz => PropertyType::TimestampMS,
RawType::DateTime => PropertyType::TimestampNS,
RawType::DateTimeWithTz => PropertyType::TimestampNS,
RawType::Blob(_) => PropertyType::Bytes,
_ => {
unimplemented!("Unsupported data type {:?}", raw_type)
Expand Down Expand Up @@ -984,7 +984,7 @@ impl PropertyType {
common_pb::DataType::StringArray => PropertyType::StringList,
common_pb::DataType::Date32 => PropertyType::Date32,
common_pb::DataType::Time32 => PropertyType::Time32MS,
common_pb::DataType::Timestamp => PropertyType::TimestampMS,
common_pb::DataType::Timestamp => PropertyType::TimestampNS,
_ => {
unimplemented!("Unsupported data type {:?}", raw_type)
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ impl PropertyType {
DataType::ListFloat => PropertyType::FloatList,
DataType::ListDouble => PropertyType::DoubleList,
DataType::ListString => PropertyType::StringList,
DataType::Date => PropertyType::TimestampMS,
DataType::Date => PropertyType::TimestampNS,
_ => {
unimplemented!("Unsupported data type {:?}", data_type)
}
Expand Down
20 changes: 10 additions & 10 deletions python/graphscope/framework/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,25 +502,25 @@ def _unify_str_type(t): # noqa: C901
return graph_def_pb2.DataTypePb.STRING
elif t == "bytes":
return graph_def_pb2.DataTypePb.BYTES
elif t == "date32[day]":
elif t == "date32[day]" or t == "date[32][day]" or t == "date32" or t == "date[32]":
return graph_def_pb2.DataTypePb.DATE32
elif t == "date64[ms]":
elif t == "date64[ms]" or t == "date[64][ms]" or t == "date64" or t == "date[64]":
return graph_def_pb2.DataTypePb.DATE64
elif t == "time32[s]":
elif t == "time32[s]" or t == "time[32][s]":
return graph_def_pb2.DataTypePb.TIME32_S
elif t == "time32[ms]":
elif t == "time32[ms]" or t == "time[32][ms]":
return graph_def_pb2.DataTypePb.TIME32_MS
elif t == "time32[us]":
elif t == "time32[us]" or t == "time[32][us]":
return graph_def_pb2.DataTypePb.TIME32_US
elif t == "time32[ns]":
elif t == "time32[ns]" or t == "time[32][ns]":
return graph_def_pb2.DataTypePb.TIME32_NS
elif t == "time64[s]":
elif t == "time64[s]" or t == "time[64][s]":
return graph_def_pb2.DataTypePb.TIME64_S
elif t == "time64[ms]":
elif t == "time64[ms]" or t == "time[64][ms]":
return graph_def_pb2.DataTypePb.TIME64_MS
elif t == "time64[us]":
elif t == "time64[us]" or t == "time[64][us]":
return graph_def_pb2.DataTypePb.TIME64_US
elif t == "time64[ns]":
elif t == "time64[ns]" or t == "time[64][ns]":
return graph_def_pb2.DataTypePb.TIME64_NS
elif t.startswith("timestamp[s]"):
return graph_def_pb2.DataTypePb.TIMESTAMP_S
Expand Down
4 changes: 4 additions & 0 deletions python/graphscope/tests/minitest/test_min.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,8 @@ def check_edge_values(edges):
check_node_values(nodes)
check_edge_values(edges)

# check subgraph
g1 = interactive.subgraph("g.E()")
logger.info("subgraph = %s", g1.schema)

session.close()

0 comments on commit 835d44e

Please sign in to comment.