From f41a7618519fea81abd9e8291ea51aac604fee9e Mon Sep 17 00:00:00 2001 From: Xiaoli Zhou Date: Thu, 2 Jan 2025 10:16:45 +0800 Subject: [PATCH 1/3] fix(interactive): Fix Aggregate Column Order Mismatch (#4364) ## What do these changes do? as titled. ## Related issue number Fixes #4360 --------- Co-authored-by: BingqingLyu --- .github/workflows/gaia.yml | 6 ++ .../compiler/ir_experimental_ci.sh | 37 +++---- .../cypher/antlr4/visitor/ColumnOrder.java | 96 +++++++++++++++++++ .../antlr4/visitor/GraphBuilderVisitor.java | 74 +++++++------- .../common/ir/planner/cbo/BITest.java | 4 +- .../common/ir/planner/cbo/LdbcTest.java | 22 +++-- .../graphscope/cypher/antlr4/MatchTest.java | 18 ++++ 7 files changed, 187 insertions(+), 70 deletions(-) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ColumnOrder.java diff --git a/.github/workflows/gaia.yml b/.github/workflows/gaia.yml index 6007d9e37a87..bc135601dfb6 100644 --- a/.github/workflows/gaia.yml +++ b/.github/workflows/gaia.yml @@ -52,6 +52,12 @@ jobs: ~/.cache/sccache key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.81.0 + override: true + - name: Rust Format Check run: | cd ${GITHUB_WORKSPACE}/interactive_engine/executor && ./check_format.sh diff --git a/interactive_engine/compiler/ir_experimental_ci.sh b/interactive_engine/compiler/ir_experimental_ci.sh index 66e470afa0b6..510efe100654 100755 --- a/interactive_engine/compiler/ir_experimental_ci.sh +++ b/interactive_engine/compiler/ir_experimental_ci.sh @@ -57,26 +57,29 @@ if [ $exit_code -ne 0 ]; then fi unset DISTRIBUTED_ENV -# Test4: run cypher movie tests on experimental store via ir-core -cd ${base_dir}/../executor/ir/target/release && DATA_PATH=/tmp/gstest/movie_graph_exp_bin RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config & -sleep 5s -# start compiler service -cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json & -sleep 10s -export ENGINE_TYPE=pegasus -# run cypher movie tests -cd ${base_dir} && make cypher_test -exit_code=$? -# clean service -ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep | awk '{print $2}' | xargs kill -9 || true -# report test result -if [ $exit_code -ne 0 ]; then - echo "ir cypher movie integration test on experimental store fail" - exit 1 -fi +## Test4: run cypher movie tests on experimental store via ir-core +#cd ${base_dir}/../executor/ir/target/release && DATA_PATH=/tmp/gstest/movie_graph_exp_bin RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config & +#sleep 5s +## start compiler service +#cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json & +#sleep 10s +#export ENGINE_TYPE=pegasus +## run cypher movie tests +#cd ${base_dir} && make cypher_test +#exit_code=$? +## clean service +#ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep | awk '{print $2}' | xargs kill -9 || true +## report test result +#if [ $exit_code -ne 0 ]; then +# echo "ir cypher movie integration test on experimental store fail" +# exit 1 +#fi # Test5: run cypher movie tests on experimental store via calcite-based ir +# start engine service and load movie graph +cd ${base_dir}/../executor/ir/target/release && DATA_PATH=/tmp/gstest/movie_graph_exp_bin RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config & +sleep 5s # restart compiler service cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/test/resources/statistics/movie_statistics.json graph.physical.opt=proto graph.planner.rules=FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule & sleep 10s diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ColumnOrder.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ColumnOrder.java new file mode 100644 index 000000000000..10ed8eebe337 --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/ColumnOrder.java @@ -0,0 +1,96 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.cypher.antlr4.visitor; + +import com.alibaba.graphscope.common.ir.tools.GraphBuilder; +import com.google.common.base.Objects; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * ColumnOrder keeps fields as the same order with RETURN clause + */ +public class ColumnOrder { + public static class Field { + private final RexNode expr; + private final String alias; + + public Field(RexNode expr, String alias) { + this.expr = expr; + this.alias = alias; + } + + public RexNode getExpr() { + return expr; + } + + public String getAlias() { + return alias; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Field field = (Field) o; + return Objects.equal(expr, field.expr) && Objects.equal(alias, field.alias); + } + + @Override + public int hashCode() { + return Objects.hashCode(expr, alias); + } + } + + public interface FieldSupplier { + Field get(RelDataType inputType); + + class Default implements FieldSupplier { + private final GraphBuilder builder; + private final Supplier ordinalSupplier; + + public Default(GraphBuilder builder, Supplier ordinalSupplier) { + this.builder = builder; + this.ordinalSupplier = ordinalSupplier; + } + + @Override + public Field get(RelDataType inputType) { + String aliasName = inputType.getFieldList().get(ordinalSupplier.get()).getName(); + return new Field(this.builder.variable(aliasName), aliasName); + } + } + } + + private final List fieldSuppliers; + + public ColumnOrder(List fieldSuppliers) { + this.fieldSuppliers = fieldSuppliers; + } + + public @Nullable List getFields(RelDataType inputType) { + return this.fieldSuppliers.stream().map(k -> k.get(inputType)).collect(Collectors.toList()); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/GraphBuilderVisitor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/GraphBuilderVisitor.java index 0fefef6a9717..e7d7b191d9d9 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/GraphBuilderVisitor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/antlr4/visitor/GraphBuilderVisitor.java @@ -18,13 +18,11 @@ import com.alibaba.graphscope.common.antlr4.ExprUniqueAliasInfer; import com.alibaba.graphscope.common.antlr4.ExprVisitorResult; -import com.alibaba.graphscope.common.ir.rel.GraphLogicalAggregate; import com.alibaba.graphscope.common.ir.rel.GraphProcedureCall; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalGetV; import com.alibaba.graphscope.common.ir.rel.graph.GraphLogicalPathExpand; import com.alibaba.graphscope.common.ir.rel.type.group.GraphAggCall; import com.alibaba.graphscope.common.ir.rex.RexTmpVariableConverter; -import com.alibaba.graphscope.common.ir.rex.RexVariableAliasCollector; import com.alibaba.graphscope.common.ir.tools.GraphBuilder; import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; import com.alibaba.graphscope.grammar.CypherGSBaseVisitor; @@ -39,6 +37,7 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexSubQuery; @@ -49,6 +48,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; public class GraphBuilderVisitor extends CypherGSBaseVisitor { @@ -271,9 +271,8 @@ public GraphBuilder visitOC_ProjectionBody(CypherGSParser.OC_ProjectionBodyConte List keyExprs = new ArrayList<>(); List keyAliases = new ArrayList<>(); List aggCalls = new ArrayList<>(); - List extraExprs = new ArrayList<>(); - List extraAliases = new ArrayList<>(); - if (isGroupPattern(ctx, keyExprs, keyAliases, aggCalls, extraExprs, extraAliases)) { + AtomicReference columnManagerRef = new AtomicReference<>(); + if (isGroupPattern(ctx, keyExprs, keyAliases, aggCalls, columnManagerRef)) { RelBuilder.GroupKey groupKey; if (keyExprs.isEmpty()) { groupKey = builder.groupKey(); @@ -281,39 +280,25 @@ public GraphBuilder visitOC_ProjectionBody(CypherGSParser.OC_ProjectionBodyConte groupKey = builder.groupKey(keyExprs, keyAliases); } builder.aggregate(groupKey, aggCalls); - if (!extraExprs.isEmpty()) { + RelDataType inputType = builder.peek().getRowType(); + List originalFields = + inputType.getFieldList().stream() + .map( + k -> + new ColumnOrder.Field( + builder.variable(k.getName()), k.getName())) + .collect(Collectors.toList()); + List newFields = columnManagerRef.get().getFields(inputType); + if (!originalFields.equals(newFields)) { + List extraExprs = new ArrayList<>(); + List<@Nullable String> extraAliases = new ArrayList<>(); RexTmpVariableConverter converter = new RexTmpVariableConverter(true, builder); - extraExprs = - extraExprs.stream() - .map(k -> k.accept(converter)) - .collect(Collectors.toList()); - List projectExprs = Lists.newArrayList(); - List projectAliases = Lists.newArrayList(); - List extraVarNames = Lists.newArrayList(); - RexVariableAliasCollector varNameCollector = - new RexVariableAliasCollector<>( - true, - v -> { - String[] splits = v.getName().split("\\."); - return splits[0]; - }); - extraExprs.forEach(k -> extraVarNames.addAll(k.accept(varNameCollector))); - GraphLogicalAggregate aggregate = (GraphLogicalAggregate) builder.peek(); - aggregate - .getRowType() - .getFieldList() - .forEach( - field -> { - if (!extraVarNames.contains(field.getName())) { - projectExprs.add(builder.variable(field.getName())); - projectAliases.add(field.getName()); - } - }); - for (int i = 0; i < extraExprs.size(); ++i) { - projectExprs.add(extraExprs.get(i)); - projectAliases.add(extraAliases.get(i)); - } - builder.project(projectExprs, projectAliases, false); + newFields.forEach( + k -> { + extraExprs.add(k.getExpr().accept(converter)); + extraAliases.add(k.getAlias()); + }); + builder.project(extraExprs, extraAliases, false); } } else if (isDistinct) { builder.aggregate(builder.groupKey(keyExprs, keyAliases)); @@ -334,22 +319,28 @@ private boolean isGroupPattern( List keyExprs, List keyAliases, List aggCalls, - List extraExprs, - List extraAliases) { + AtomicReference columnManagerRef) { + List fieldSuppliers = Lists.newArrayList(); for (CypherGSParser.OC_ProjectionItemContext itemCtx : ctx.oC_ProjectionItems().oC_ProjectionItem()) { ExprVisitorResult item = expressionVisitor.visitOC_Expression(itemCtx.oC_Expression()); String alias = (itemCtx.AS() == null) ? null : Utils.getAliasName(itemCtx.oC_Variable()); if (item.getAggCalls().isEmpty()) { + int ordinal = keyExprs.size(); + fieldSuppliers.add(new ColumnOrder.FieldSupplier.Default(builder, () -> ordinal)); keyExprs.add(item.getExpr()); keyAliases.add(alias); } else { if (item.getExpr() instanceof RexCall) { - extraExprs.add(item.getExpr()); - extraAliases.add(alias); + fieldSuppliers.add( + (RelDataType type) -> new ColumnOrder.Field(item.getExpr(), alias)); aggCalls.addAll(item.getAggCalls()); } else if (item.getAggCalls().size() == 1) { // count(a.name) + int ordinal = aggCalls.size(); + fieldSuppliers.add( + new ColumnOrder.FieldSupplier.Default( + builder, () -> keyExprs.size() + ordinal)); GraphAggCall original = (GraphAggCall) item.getAggCalls().get(0); aggCalls.add( new GraphAggCall( @@ -363,6 +354,7 @@ private boolean isGroupPattern( } } } + columnManagerRef.set(new ColumnOrder(fieldSuppliers)); return !aggCalls.isEmpty(); } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java index 788d5e19909f..563cebe096b6 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/BITest.java @@ -124,8 +124,8 @@ public void bi1_test() { + " totalMessageCount)], isAppend=[false])\n" + " GraphLogicalProject(totalMessageCount=[totalMessageCount], year=[year]," + " isComment=[isComment], lengthCategory=[lengthCategory]," - + " messageCount=[messageCount], sumMessageLength=[sumMessageLength]," - + " averageMessageLength=[/(EXPR$2, EXPR$3)], isAppend=[false])\n" + + " messageCount=[messageCount], averageMessageLength=[/(EXPR$2, EXPR$3)]," + + " sumMessageLength=[sumMessageLength], isAppend=[false])\n" + " GraphLogicalAggregate(keys=[{variables=[totalMessageCount, year, $f0," + " $f1], aliases=[totalMessageCount, year, isComment, lengthCategory]}]," + " values=[[{operands=[message], aggFunction=COUNT, alias='messageCount'," diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java index ea08d443639f..1402ebaa1aec 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/planner/cbo/LdbcTest.java @@ -511,30 +511,32 @@ public void ldbc7_test() { + " messageContent=[message.content], messageImageFile=[message.imageFile]," + " minutesLatency=[/(/(-(likeTime, message.creationDate), 1000), 60)]," + " isNew=[isNew], isAppend=[false])\n" - + " GraphLogicalAggregate(keys=[{variables=[liker, person, isNew]," + + " GraphLogicalProject(liker=[liker], person=[person], message=[message]," + + " likeTime=[likeTime], isNew=[isNew], isAppend=[false])\n" + + " GraphLogicalAggregate(keys=[{variables=[liker, person, isNew]," + " aliases=[liker, person, isNew]}], values=[[{operands=[message]," + " aggFunction=FIRST_VALUE, alias='message', distinct=false}," + " {operands=[likeTime], aggFunction=FIRST_VALUE, alias='likeTime'," + " distinct=false}]])\n" - + " GraphLogicalSort(sort0=[likeTime], sort1=[message.id], dir0=[DESC]," + + " GraphLogicalSort(sort0=[likeTime], sort1=[message.id], dir0=[DESC]," + " dir1=[ASC])\n" - + " GraphLogicalProject(liker=[liker], message=[message]," + + " GraphLogicalProject(liker=[liker], message=[message]," + " likeTime=[like.creationDate], person=[person], isNew=[IS NULL(k)]," + " isAppend=[false])\n" - + " MultiJoin(joinFilter=[=(liker, liker)], isFullOuterJoin=[false]," + + " MultiJoin(joinFilter=[=(liker, liker)], isFullOuterJoin=[false]," + " joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]]," + " projFields=[[ALL, ALL]])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[liker], opt=[START])\n" - + " GraphLogicalExpand(tableConfig=[{isAll=false," + + " GraphLogicalExpand(tableConfig=[{isAll=false," + " tables=[LIKES]}], alias=[like], startAlias=[message], opt=[IN])\n" - + " CommonTableScan(table=[[common#378747223]])\n" - + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + + " CommonTableScan(table=[[common#378747223]])\n" + + " GraphLogicalGetV(tableConfig=[{isAll=false, tables=[PERSON]}]," + " alias=[liker], opt=[OTHER])\n" - + " GraphLogicalExpand(tableConfig=[{isAll=false," + + " GraphLogicalExpand(tableConfig=[{isAll=false," + " tables=[KNOWS]}], alias=[k], startAlias=[person], opt=[BOTH]," + " optional=[true])\n" - + " CommonTableScan(table=[[common#378747223]])\n" + + " CommonTableScan(table=[[common#378747223]])\n" + "common#378747223:\n" + "GraphPhysicalExpand(tableConfig=[{isAll=false, tables=[HASCREATOR]}]," + " alias=[message], startAlias=[person], opt=[IN], physicalOpt=[VERTEX])\n" diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java index 6d8f49b7be03..051754845055 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/antlr4/MatchTest.java @@ -731,4 +731,22 @@ public void special_label_name_test() { + " alias=[n], opt=[VERTEX])", after.explain().trim()); } + + // the return column order should align with the query given + @Test + public void aggregate_column_order_test() { + GraphBuilder builder = + com.alibaba.graphscope.common.ir.Utils.mockGraphBuilder(optimizer, irMeta); + RelNode node = + Utils.eval("Match (n:person) Return count(n), n, sum(n.age)", builder).build(); + RelNode after = optimizer.optimize(node, new GraphIOProcessor(builder, irMeta)); + Assert.assertEquals( + "GraphLogicalProject($f1=[$f1], n=[n], $f2=[$f2], isAppend=[false])\n" + + " GraphLogicalAggregate(keys=[{variables=[n], aliases=[n]}]," + + " values=[[{operands=[n], aggFunction=COUNT, alias='$f1', distinct=false}," + + " {operands=[n.age], aggFunction=SUM, alias='$f2', distinct=false}]])\n" + + " GraphLogicalSource(tableConfig=[{isAll=false, tables=[person]}]," + + " alias=[n], opt=[VERTEX])", + after.explain().trim()); + } } From 06d254c6811cf0ce7c2ae125508016da80486cfc Mon Sep 17 00:00:00 2001 From: Zhang Lei Date: Thu, 2 Jan 2025 11:46:02 +0800 Subject: [PATCH 2/3] fix(interactive): Refactor EdgeColumn Implementation (#4391) Use In-memory EdgePropVector to store edge properties for EdgeColumn. Speed up EdgeColumn scanning. Fix #4392 Co-Authored-By: liulx20 liulexiao.llx@alibaba-inc.com --- .../runtime/adhoc/operators/procedure_call.cc | 9 +- .../graph_db/runtime/common/accessors.h | 36 +- .../runtime/common/columns/edge_columns.cc | 100 +++++- .../runtime/common/columns/edge_columns.h | 337 +++++++++++------- .../runtime/common/columns/i_context_column.h | 20 ++ .../runtime/common/columns/value_columns.h | 1 + .../graph_db/runtime/common/operators/get_v.h | 22 +- .../engines/graph_db/runtime/common/rt_any.cc | 100 +++++- flex/engines/graph_db/runtime/common/rt_any.h | 320 ++++++++++++++++- flex/engines/graph_db/runtime/common/types.h | 1 + 10 files changed, 760 insertions(+), 186 deletions(-) diff --git a/flex/engines/graph_db/runtime/adhoc/operators/procedure_call.cc b/flex/engines/graph_db/runtime/adhoc/operators/procedure_call.cc index 09b1fb17b0ee..8568d9dbdd89 100644 --- a/flex/engines/graph_db/runtime/adhoc/operators/procedure_call.cc +++ b/flex/engines/graph_db/runtime/adhoc/operators/procedure_call.cc @@ -143,12 +143,11 @@ RTAny edge_to_rt_any(const results::Edge& edge) { std::get<2>(edge_triplet_tuple)}; if (properties.size() == 0) { return RTAny::from_edge( - std::tuple{label_triplet, src_vid, dst_vid, Any(), Direction::kOut}); + {label_triplet, src_vid, dst_vid, Any(), Direction::kOut}); } else if (properties.size() == 1) { LOG(FATAL) << "Not implemented."; - return RTAny::from_edge(std::tuple{label_triplet, src_vid, dst_vid, - property_to_any(properties[0]), - Direction::kOut}); + return RTAny::from_edge({label_triplet, src_vid, dst_vid, + property_to_any(properties[0]), Direction::kOut}); } else { std::vector props; for (auto& prop : properties) { @@ -157,7 +156,7 @@ RTAny edge_to_rt_any(const results::Edge& edge) { Any any; any.set_record(props); return RTAny::from_edge( - std::tuple{label_triplet, src_vid, dst_vid, any, Direction::kOut}); + {label_triplet, src_vid, dst_vid, any, Direction::kOut}); } } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/accessors.h b/flex/engines/graph_db/runtime/common/accessors.h index 67f5e94534f4..aae6d6068252 100644 --- a/flex/engines/graph_db/runtime/common/accessors.h +++ b/flex/engines/graph_db/runtime/common/accessors.h @@ -369,7 +369,7 @@ class VertexPropertyVertexAccessor : public IAccessor { class EdgeIdPathAccessor : public IAccessor { public: - using elem_t = std::tuple; + using elem_t = EdgeRecord; EdgeIdPathAccessor(const Context& ctx, int tag) : edge_col_(*std::dynamic_pointer_cast(ctx.get(tag))) {} @@ -407,14 +407,12 @@ class EdgePropertyPathAccessor : public IAccessor { RTAny eval_path(size_t idx) const override { const auto& e = col_.get_edge(idx); - return RTAny(std::get<3>(e)); + return RTAny(e.prop_); } elem_t typed_eval_path(size_t idx) const { const auto& e = col_.get_edge(idx); - elem_t ret; - ConvertAny::to(std::get<3>(e), ret); - return ret; + return e.prop_.as(); } bool is_optional() const override { return col_.is_optional(); } @@ -464,13 +462,13 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor { RTAny eval_path(size_t idx) const override { const auto& e = col_.get_edge(idx); - auto val = std::get<3>(e); - auto id = get_index(std::get<0>(e)); - if (std::get<3>(e).type != PropertyType::RecordView()) { + auto val = e.prop_; + auto id = get_index(e.label_triplet_); + if (e.prop_.type.type_enum_ != RTAnyType::RTAnyTypeImpl::kRecordView) { CHECK(id == 0); return RTAny(val); } else { - auto rv = val.AsRecordView(); + auto rv = val.as(); CHECK(id != std::numeric_limits::max()); return RTAny(rv[id]); } @@ -478,16 +476,16 @@ class MultiPropsEdgePropertyPathAccessor : public IAccessor { elem_t typed_eval_path(size_t idx) const { const auto& e = col_.get_edge(idx); - auto val = std::get<3>(e); - auto id = get_index(std::get<0>(e)); - if (std::get<3>(e).type != PropertyType::RecordView()) { + auto val = e.prop_; + auto id = get_index(e.label_triplet_); + if (e.prop_.type.type_enum_ != RTAnyType::RTAnyTypeImpl::kRecordView) { CHECK(id == 0); elem_t ret; ConvertAny::to(val, ret); return ret; } else { - auto rv = val.AsRecordView(); + auto rv = val.as(); CHECK(id != std::numeric_limits::max()); auto tmp = rv[id]; elem_t ret; @@ -530,12 +528,12 @@ class EdgeLabelPathAccessor : public IAccessor { RTAny eval_path(size_t idx) const override { const auto& e = col_.get_edge(idx); - return RTAny(static_cast(std::get<0>(e).edge_label)); + return RTAny(static_cast(e.label_triplet_.edge_label)); } elem_t typed_eval_path(size_t idx) const { const auto& e = col_.get_edge(idx); - return static_cast(std::get<0>(e).edge_label); + return static_cast(e.label_triplet_.edge_label); } std::shared_ptr builder() const override { @@ -632,10 +630,10 @@ class EdgeGlobalIdPathAccessor : public IAccessor { elem_t typed_eval_path(size_t idx) const { const auto& e = edge_col_.get_edge(idx); - auto label_id = generate_edge_label_id(std::get<0>(e).src_label, - std::get<0>(e).dst_label, - std::get<0>(e).edge_label); - return encode_unique_edge_id(label_id, std::get<1>(e), std::get<2>(e)); + auto label_id = generate_edge_label_id(e.label_triplet_.src_label, + e.label_triplet_.dst_label, + e.label_triplet_.edge_label); + return encode_unique_edge_id(label_id, e.src_, e.dst_); } RTAny eval_path(size_t idx) const override { diff --git a/flex/engines/graph_db/runtime/common/columns/edge_columns.cc b/flex/engines/graph_db/runtime/common/columns/edge_columns.cc index 35d0acce0e84..4c0f7a7beeb9 100644 --- a/flex/engines/graph_db/runtime/common/columns/edge_columns.cc +++ b/flex/engines/graph_db/runtime/common/columns/edge_columns.cc @@ -29,7 +29,7 @@ std::shared_ptr SDSLEdgeColumn::dup() const { builder.reserve(edges_.size()); for (size_t i = 0; i < edges_.size(); ++i) { auto e = get_edge(i); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_); } return builder.finish(); } @@ -58,7 +58,41 @@ std::shared_ptr SDSLEdgeColumn::shuffle( size_t off = offsets[idx]; const auto& e = edges_[off]; builder.push_back_endpoints(e.first, e.second); - ret_props.set_any(idx, prop_col_->get(off)); + ret_props.set_any(idx, prop_col_.get(), off); + } + } + + return builder.finish(); +} + +std::shared_ptr SDSLEdgeColumn::optional_shuffle( + const std::vector& offsets) const { + CHECK(prop_type_ != PropertyType::kRecordView); + OptionalSDSLEdgeColumnBuilder builder(dir_, label_, prop_type_); + size_t new_row_num = offsets.size(); + builder.reserve(new_row_num); + + if (prop_type_ == PropertyType::kEmpty) { + for (auto off : offsets) { + if (off == std::numeric_limits::max()) { + builder.push_back_null(); + } else { + const auto& e = edges_[off]; + builder.push_back_endpoints(e.first, e.second); + } + } + } else { + auto& ret_props = *builder.prop_col_; + ret_props.resize(new_row_num); + for (size_t idx = 0; idx < new_row_num; ++idx) { + size_t off = offsets[idx]; + if (off == std::numeric_limits::max()) { + builder.push_back_null(); + } else { + const auto& e = edges_[off]; + builder.push_back_endpoints(e.first, e.second); + ret_props.set_any(idx, prop_col_.get(), off); + } } } @@ -70,7 +104,7 @@ std::shared_ptr SDSLEdgeColumnBuilder::finish() { std::make_shared(dir_, label_, prop_type_, sub_types_); ret->edges_.swap(edges_); // shrink to fit - prop_col_->resize(edges_.size()); + prop_col_->resize(ret->edges_.size()); ret->prop_col_ = prop_col_; return ret; } @@ -80,8 +114,7 @@ std::shared_ptr BDSLEdgeColumn::dup() const { builder.reserve(size()); for (size_t i = 0; i < size(); ++i) { auto e = get_edge(i); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); } return builder.finish(); } @@ -98,7 +131,30 @@ std::shared_ptr BDSLEdgeColumn::shuffle( size_t off = offsets[idx]; const auto& e = edges_[off]; builder.push_back_endpoints(std::get<0>(e), std::get<1>(e), std::get<2>(e)); - ret_props.set_any(idx, prop_col_->get(off)); + ret_props.set_any(idx, prop_col_.get(), off); + } + + return builder.finish(); +} + +std::shared_ptr BDSLEdgeColumn::optional_shuffle( + const std::vector& offsets) const { + OptionalBDSLEdgeColumnBuilder builder(label_, prop_type_); + size_t new_row_num = offsets.size(); + builder.reserve(new_row_num); + + auto& ret_props = *builder.prop_col_; + ret_props.resize(new_row_num); + for (size_t idx = 0; idx < new_row_num; ++idx) { + size_t off = offsets[idx]; + if (off == std::numeric_limits::max()) { + builder.push_back_null(); + } else { + const auto& e = edges_[off]; + builder.push_back_endpoints(std::get<0>(e), std::get<1>(e), + std::get<2>(e)); + ret_props.set_any(idx, prop_col_.get(), off); + } } return builder.finish(); @@ -106,6 +162,7 @@ std::shared_ptr BDSLEdgeColumn::shuffle( std::shared_ptr BDSLEdgeColumnBuilder::finish() { auto ret = std::make_shared(label_, prop_type_); + prop_col_->resize(edges_.size()); ret->edges_.swap(edges_); ret->prop_col_ = prop_col_; return ret; @@ -189,8 +246,7 @@ std::shared_ptr OptionalBDSLEdgeColumn::shuffle( for (size_t idx = 0; idx < new_row_num; ++idx) { size_t off = offsets[idx]; const auto& e = get_edge(off); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); } return builder.finish(); } @@ -199,8 +255,24 @@ std::shared_ptr OptionalBDSLEdgeColumn::dup() const { builder.reserve(edges_.size()); for (size_t i = 0; i < edges_.size(); ++i) { auto e = get_edge(i); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); + } + return builder.finish(); +} + +std::shared_ptr OptionalBDSLEdgeColumn::optional_shuffle( + const std::vector& offsets) const { + OptionalBDSLEdgeColumnBuilder builder(label_, prop_type_); + size_t new_row_num = offsets.size(); + builder.reserve(new_row_num); + for (size_t idx = 0; idx < new_row_num; ++idx) { + size_t off = offsets[idx]; + if (off == std::numeric_limits::max()) { + builder.push_back_null(); + continue; + } + const auto& e = get_edge(off); + builder.push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); } return builder.finish(); } @@ -209,6 +281,8 @@ std::shared_ptr OptionalBDSLEdgeColumnBuilder::finish() { auto ret = std::make_shared(label_, prop_type_); ret->edges_.swap(edges_); ret->prop_col_ = prop_col_; + // shrink to fit + ret->prop_col_->resize(ret->edges_.size()); return ret; } @@ -220,7 +294,7 @@ std::shared_ptr OptionalSDSLEdgeColumn::shuffle( for (size_t idx = 0; idx < new_row_num; ++idx) { size_t off = offsets[idx]; const auto& e = get_edge(off); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_); } return builder.finish(); } @@ -230,7 +304,7 @@ std::shared_ptr OptionalSDSLEdgeColumn::dup() const { builder.reserve(edges_.size()); for (size_t i = 0; i < edges_.size(); ++i) { auto e = get_edge(i); - builder.push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e)); + builder.push_back_opt(e.src_, e.dst_, e.prop_); } return builder.finish(); } @@ -239,6 +313,8 @@ std::shared_ptr OptionalSDSLEdgeColumnBuilder::finish() { auto ret = std::make_shared(dir_, label_, prop_type_); ret->edges_.swap(edges_); ret->prop_col_ = prop_col_; + // shrink to fit + ret->prop_col_->resize(ret->edges_.size()); return ret; } diff --git a/flex/engines/graph_db/runtime/common/columns/edge_columns.h b/flex/engines/graph_db/runtime/common/columns/edge_columns.h index 474ffaf9d7b9..db4ac4c20958 100644 --- a/flex/engines/graph_db/runtime/common/columns/edge_columns.h +++ b/flex/engines/graph_db/runtime/common/columns/edge_columns.h @@ -23,6 +23,67 @@ namespace gs { namespace runtime { enum class EdgeColumnType { kSDSL, kSDML, kBDSL, kBDML, kUnKnown }; +static inline void get_edge_data(EdgePropVecBase* prop, size_t idx, + EdgeData& edge_data) { + if (prop->type() == PropertyType::kEmpty) { + edge_data.type = RTAnyType::kEmpty; + } else if (prop->type() == PropertyType::kInt64) { + edge_data.type = RTAnyType::kI64Value; + edge_data.value.i64_val = + dynamic_cast*>(prop)->get_view(idx); + } else if (prop->type() == PropertyType::kInt32) { + edge_data.type = RTAnyType::kI32Value; + edge_data.value.i32_val = + dynamic_cast*>(prop)->get_view(idx); + } else if (prop->type() == PropertyType::kDouble) { + edge_data.type = RTAnyType::kF64Value; + edge_data.value.f64_val = + dynamic_cast*>(prop)->get_view(idx); + } else if (prop->type() == PropertyType::kBool) { + edge_data.type = RTAnyType::kBoolValue; + edge_data.value.b_val = + dynamic_cast*>(prop)->get_view(idx); + } else if (prop->type() == PropertyType::kString) { + edge_data.type = RTAnyType::kStringValue; + edge_data.value.str_val = + dynamic_cast*>(prop)->get_view(idx); + + } else if (prop->type() == PropertyType::kDate) { + edge_data.type = RTAnyType::kDate32; + edge_data.value.i64_val = + dynamic_cast*>(prop)->get_view(idx).milli_second; + } else if (prop->type() == PropertyType::kRecordView) { + // edge_data.type = RTAnyType::kRecordView; + } else { + edge_data.type = RTAnyType::kUnknown; + } +} + +static inline void set_edge_data(EdgePropVecBase* col, size_t idx, + const EdgeData& edge_data) { + if (edge_data.type == RTAnyType::kEmpty) { + return; + } else if (edge_data.type == RTAnyType::kI64Value) { + dynamic_cast*>(col)->set(idx, edge_data.value.i64_val); + } else if (edge_data.type == RTAnyType::kI32Value) { + dynamic_cast*>(col)->set(idx, edge_data.value.i32_val); + } else if (edge_data.type == RTAnyType::kF64Value) { + dynamic_cast*>(col)->set(idx, edge_data.value.f64_val); + } else if (edge_data.type == RTAnyType::kBoolValue) { + dynamic_cast*>(col)->set(idx, edge_data.value.b_val); + } else if (edge_data.type == RTAnyType::kStringValue) { + dynamic_cast*>(col)->set( + idx, std::string_view(edge_data.value.str_val.data(), + edge_data.value.str_val.size())); + } else if (edge_data.type == RTAnyType::kDate32) { + dynamic_cast*>(col)->set(idx, + Date(edge_data.value.i64_val)); + } else { + LOG(FATAL) << "not support for " + << static_cast(edge_data.type.type_enum_); + } +} + class IEdgeColumn : public IContextColumn { public: IEdgeColumn() = default; @@ -32,8 +93,7 @@ class IEdgeColumn : public IContextColumn { return ContextColumnType::kEdge; } - virtual std::tuple get_edge( - size_t idx) const = 0; + virtual EdgeRecord get_edge(size_t idx) const = 0; RTAny get_elem(size_t idx) const override { return RTAny::from_edge(this->get_edge(idx)); @@ -46,6 +106,8 @@ class IEdgeColumn : public IContextColumn { class SDSLEdgeColumnBuilder; class OptionalSDSLEdgeColumnBuilder; +template +class SDSLEdgeColumnBuilderBeta; class SDSLEdgeColumn : public IEdgeColumn { public: @@ -55,14 +117,11 @@ class SDSLEdgeColumn : public IEdgeColumn { : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem, sub_types)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} - std::tuple get_edge( - size_t idx) const override { - return std::make_tuple(label_, edges_[idx].first, edges_[idx].second, - prop_col_->get(idx), dir_); + EdgeRecord get_edge(size_t idx) const override { + return EdgeRecord(label_, edges_[idx].first, edges_[idx].second, + prop_col_->get(idx), dir_); } size_t size() const override { return edges_.size(); } @@ -129,12 +188,16 @@ class SDSLEdgeColumn : public IEdgeColumn { std::shared_ptr dup() const override; + std::shared_ptr optional_shuffle( + const std::vector& offsets) const override; + template void foreach_edge(const FUNC_T& func) const { if (prop_type_ == PropertyType::kEmpty) { size_t idx = 0; for (auto& e : edges_) { - func(idx++, label_, e.first, e.second, grape::EmptyType(), dir_); + func(idx++, label_, e.first, e.second, EdgeData(grape::EmptyType()), + dir_); } } else { size_t idx = 0; @@ -153,11 +216,13 @@ class SDSLEdgeColumn : public IEdgeColumn { private: friend class SDSLEdgeColumnBuilder; + template + friend class SDSLEdgeColumnBuilderBeta; Direction dir_; LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class OptionalSDSLEdgeColumn : public IEdgeColumn { @@ -167,14 +232,16 @@ class OptionalSDSLEdgeColumn : public IEdgeColumn { : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} - std::tuple get_edge( - size_t idx) const override { - return std::make_tuple(label_, edges_[idx].first, edges_[idx].second, - prop_col_->get(idx), dir_); + EdgeRecord get_edge(size_t idx) const override { + EdgeRecord ret; + ret.label_triplet_ = label_; + ret.src_ = edges_[idx].first; + ret.dst_ = edges_[idx].second; + get_edge_data(prop_col_.get(), idx, ret.prop_); + ret.dir_ = dir_; + return ret; } size_t size() const override { return edges_.size(); } @@ -236,12 +303,13 @@ class OptionalSDSLEdgeColumn : public IEdgeColumn { if (prop_type_ == PropertyType::kEmpty) { size_t idx = 0; for (auto& e : edges_) { - func(idx++, label_, e.first, e.second, grape::EmptyType(), dir_, 0); + func(idx++, label_, e.first, e.second, EdgeData(grape::EmptyType()), + dir_); } } else { size_t idx = 0; for (auto& e : edges_) { - func(idx, label_, e.first, e.second, prop_col_->get(idx), dir_, 0); + func(idx, label_, e.first, e.second, prop_col_->get(idx), dir_); ++idx; } } @@ -255,7 +323,7 @@ class OptionalSDSLEdgeColumn : public IEdgeColumn { } std::vector get_labels() const override { - LOG(INFO) << "get_labels: " << label_.to_string() << std::endl; + // LOG(INFO) << "get_labels: " << label_.to_string() << std::endl; return {label_}; } @@ -269,7 +337,7 @@ class OptionalSDSLEdgeColumn : public IEdgeColumn { LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class OptionalSDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { @@ -279,21 +347,18 @@ class OptionalSDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} ~OptionalSDSLEdgeColumnBuilder() = default; void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e)); + push_back_opt(e.src_, e.dst_, e.prop_); } - void push_back_opt(vid_t src, vid_t dst, const Any& data) { + void push_back_opt(vid_t src, vid_t dst, const EdgeData& data) { edges_.emplace_back(src, dst); size_t len = edges_.size(); - prop_col_->resize(len); - prop_col_->set_any(len - 1, data); + set_edge_data(prop_col_.get(), len - 1, data); } void push_back_null() override { @@ -307,12 +372,13 @@ class OptionalSDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { std::shared_ptr finish() override; private: + friend class SDSLEdgeColumn; friend class OptionalSDSLEdgeColumn; Direction dir_; LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class BDSLEdgeColumnBuilder; @@ -323,17 +389,14 @@ class BDSLEdgeColumn : public IEdgeColumn { BDSLEdgeColumn(const LabelTriplet& label, PropertyType prop_type) : label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} - std::tuple get_edge( - size_t idx) const override { + EdgeRecord get_edge(size_t idx) const override { auto src = std::get<0>(edges_[idx]); auto dst = std::get<1>(edges_[idx]); auto dir = std::get<2>(edges_[idx]); - return std::make_tuple(label_, src, dst, prop_col_->get(idx), - (dir ? Direction::kOut : Direction::kIn)); + return EdgeRecord(label_, src, dst, prop_col_->get(idx), + (dir ? Direction::kOut : Direction::kIn)); } size_t size() const override { return edges_.size(); } @@ -358,6 +421,9 @@ class BDSLEdgeColumn : public IEdgeColumn { std::shared_ptr dup() const override; + std::shared_ptr optional_shuffle( + const std::vector& offsets) const override; + template void foreach_edge(const FUNC_T& func) const { size_t idx = 0; @@ -379,7 +445,7 @@ class BDSLEdgeColumn : public IEdgeColumn { LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class OptionalBDSLEdgeColumn : public IEdgeColumn { @@ -387,17 +453,14 @@ class OptionalBDSLEdgeColumn : public IEdgeColumn { OptionalBDSLEdgeColumn(const LabelTriplet& label, PropertyType prop_type) : label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} - std::tuple get_edge( - size_t idx) const override { + EdgeRecord get_edge(size_t idx) const override { auto src = std::get<0>(edges_[idx]); auto dst = std::get<1>(edges_[idx]); auto dir = std::get<2>(edges_[idx]); - return std::make_tuple(label_, src, dst, prop_col_->get(idx), - (dir ? Direction::kOut : Direction::kIn)); + return EdgeRecord(label_, src, dst, prop_col_->get(idx), + (dir ? Direction::kOut : Direction::kIn)); } size_t size() const override { return edges_.size(); } @@ -417,6 +480,9 @@ class OptionalBDSLEdgeColumn : public IEdgeColumn { std::shared_ptr dup() const override; + std::shared_ptr optional_shuffle( + const std::vector& offsets) const override; + template void foreach_edge(const FUNC_T& func) const { size_t idx = 0; @@ -434,10 +500,7 @@ class OptionalBDSLEdgeColumn : public IEdgeColumn { std::get<1>(edges_[idx]) != std::numeric_limits::max(); } - std::vector get_labels() const override { - LOG(INFO) << "get_labels: " << label_.to_string() << std::endl; - return {label_}; - } + std::vector get_labels() const override { return {label_}; } EdgeColumnType edge_column_type() const override { return EdgeColumnType::kBDSL; @@ -448,7 +511,7 @@ class OptionalBDSLEdgeColumn : public IEdgeColumn { LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class SDMLEdgeColumnBuilder; @@ -465,20 +528,22 @@ class SDMLEdgeColumn : public IEdgeColumn { edge_labels_.emplace_back(label); index_[label.first] = idx++; prop_cols_[index_[label.first]] = - CreateColumn(label.second, StorageStrategy::kMem); - prop_cols_[index_[label.first]]->open_in_memory(""); + EdgePropVecBase::make_edge_prop_vec(label.second); } } - std::tuple get_edge( - size_t idx) const override { + EdgeRecord get_edge(size_t idx) const override { auto& e = edges_[idx]; auto index = std::get<0>(e); auto label = edge_labels_[index].first; auto offset = std::get<3>(e); - return std::tuple( - label, std::get<1>(e), std::get<2>(e), prop_cols_[index]->get(offset), - dir_); + EdgeRecord ret; + ret.label_triplet_ = label; + ret.src_ = std::get<1>(e); + ret.dst_ = std::get<2>(e); + get_edge_data(prop_cols_[index].get(), offset, ret.prop_); + ret.dir_ = dir_; + return ret; } size_t size() const override { return edges_.size(); } @@ -540,7 +605,7 @@ class SDMLEdgeColumn : public IEdgeColumn { std::map index_; std::vector> edge_labels_; std::vector> edges_; - std::vector> prop_cols_; + std::vector> prop_cols_; }; class BDMLEdgeColumnBuilder; @@ -555,20 +620,18 @@ class BDMLEdgeColumn : public IEdgeColumn { for (const auto& label : labels) { index_[label.first] = idx++; prop_cols_[index_[label.first]] = - CreateColumn(label.second, StorageStrategy::kMem); - prop_cols_[index_[label.first]]->open_in_memory(""); + EdgePropVecBase::make_edge_prop_vec(label.second); } } - std::tuple get_edge( - size_t idx) const override { + EdgeRecord get_edge(size_t idx) const override { auto& e = edges_[idx]; auto index = std::get<0>(e); auto label = labels_[index].first; auto offset = std::get<3>(e); - return std::tuple( - label, std::get<1>(e), std::get<2>(e), prop_cols_[index]->get(offset), - (std::get<4>(e) ? Direction::kOut : Direction::kIn)); + return EdgeRecord(label, std::get<1>(e), std::get<2>(e), + prop_cols_[index]->get(offset), + (std::get<4>(e) ? Direction::kOut : Direction::kIn)); } size_t size() const override { return edges_.size(); } @@ -628,7 +691,7 @@ class BDMLEdgeColumn : public IEdgeColumn { std::map index_; std::vector> labels_; std::vector> edges_; - std::vector> prop_cols_; + std::vector> prop_cols_; }; class SDSLEdgeColumnBuilder : public IContextColumnBuilder { @@ -639,31 +702,21 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder { : dir_(dir), label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem, sub_types)), - sub_types_(sub_types), - cap_(0) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)), + sub_types_(sub_types) {} ~SDSLEdgeColumnBuilder() = default; void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e)); + push_back_opt(e.src_, e.dst_, e.prop_); } - void push_back_opt(vid_t src, vid_t dst, const Any& data) { + void push_back_opt(vid_t src, vid_t dst, const EdgeData& data) { edges_.emplace_back(src, dst); size_t len = edges_.size(); - if (cap_ == 0) { - prop_col_->resize(len); - cap_ = len; - } else if (len >= cap_) { - prop_col_->resize(len * 2); - cap_ = len * 2; - } - prop_col_->set_any(len - 1, data); + set_edge_data(prop_col_.get(), len - 1, data); } void push_back_endpoints(vid_t src, vid_t dst) { edges_.emplace_back(src, dst); @@ -677,9 +730,50 @@ class SDSLEdgeColumnBuilder : public IContextColumnBuilder { LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; std::vector sub_types_; - size_t cap_; +}; + +template +class SDSLEdgeColumnBuilderBeta : public IContextColumnBuilder { + public: + SDSLEdgeColumnBuilderBeta(Direction dir, const LabelTriplet& label, + PropertyType prop_type) + : dir_(dir), + label_(label), + prop_type_(prop_type), + prop_col_(std::make_shared>()), + prop_col_ptr_(prop_col_.get()) {} + ~SDSLEdgeColumnBuilderBeta() = default; + + void reserve(size_t size) override { edges_.reserve(size); } + void push_back_elem(const RTAny& val) override { + const auto& e = val.as_edge(); + + push_back_opt(e.src_, e.dst_, e.prop_.as()); + } + void push_back_opt(vid_t src, vid_t dst, const T& data) { + size_t len = edges_.size(); + edges_.emplace_back(src, dst); + prop_col_ptr_->set(len, data); + } + + std::shared_ptr finish() override { + auto ret = std::make_shared(dir_, label_, prop_type_, + std::vector()); + ret->edges_.swap(edges_); + prop_col_->resize(edges_.size()); + ret->prop_col_ = prop_col_; + return ret; + } + + private: + Direction dir_; + LabelTriplet label_; + std::vector> edges_; + PropertyType prop_type_; + std::shared_ptr> prop_col_; + EdgePropVec* prop_col_ptr_; }; class BDSLEdgeColumnBuilder : public IContextColumnBuilder { @@ -687,22 +781,19 @@ class BDSLEdgeColumnBuilder : public IContextColumnBuilder { BDSLEdgeColumnBuilder(const LabelTriplet& label, PropertyType prop_type) : label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} ~BDSLEdgeColumnBuilder() = default; void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); } - void push_back_opt(vid_t src, vid_t dst, const Any& data, Direction dir) { + void push_back_opt(vid_t src, vid_t dst, const EdgeData& data, + Direction dir) { edges_.emplace_back(src, dst, dir == Direction::kOut); size_t len = edges_.size(); - prop_col_->resize(len); - prop_col_->set_any(len - 1, data); + set_edge_data(prop_col_.get(), len - 1, data); } void push_back_endpoints(vid_t src, vid_t dst, Direction dir) { edges_.emplace_back(src, dst, dir == Direction::kOut); @@ -719,7 +810,7 @@ class BDSLEdgeColumnBuilder : public IContextColumnBuilder { LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; class SDMLEdgeColumnBuilder : public IContextColumnBuilder { public: @@ -733,8 +824,7 @@ class SDMLEdgeColumnBuilder : public IContextColumnBuilder { edge_labels_.emplace_back(label); index_[label.first] = idx++; prop_cols_[index_[label.first]] = - CreateColumn(label.second, StorageStrategy::kMem); - prop_cols_[index_[label.first]]->open_in_memory(""); + EdgePropVecBase::make_edge_prop_vec(label.second); } } ~SDMLEdgeColumnBuilder() = default; @@ -742,24 +832,23 @@ class SDMLEdgeColumnBuilder : public IContextColumnBuilder { void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - auto label = std::get<0>(e); + auto label = e.label_triplet_; auto index = index_[label]; - push_back_opt(index, std::get<1>(e), std::get<2>(e), std::get<3>(e)); + push_back_opt(index, e.src_, e.dst_, e.prop_); } - void push_back_opt(int8_t index, vid_t src, vid_t dst, const Any& data) { + void push_back_opt(int8_t index, vid_t src, vid_t dst, const EdgeData& data) { edges_.emplace_back(index, src, dst, prop_cols_[index]->size()); - prop_cols_[index]->resize(prop_cols_[index]->size() + 1); - prop_cols_[index]->set_any(prop_cols_[index]->size() - 1, data); + set_edge_data(prop_cols_[index].get(), prop_cols_[index]->size(), data); } void push_back_opt(LabelTriplet label, vid_t src, vid_t dst, - const Any& data) { + const EdgeData& data) { auto index = index_[label]; push_back_opt(index, src, dst, data); } void push_back_endpoints(int8_t index, vid_t src, vid_t dst) { - edges_.emplace_back(index, src, dst, prop_cols_[index]->size()); + LOG(FATAL) << "Not implemented"; } std::shared_ptr finish() override; @@ -770,7 +859,7 @@ class SDMLEdgeColumnBuilder : public IContextColumnBuilder { std::map index_; std::vector> edge_labels_; std::vector> edges_; - std::vector> prop_cols_; + std::vector> prop_cols_; }; class BDMLEdgeColumnBuilder : public IContextColumnBuilder { @@ -784,8 +873,7 @@ class BDMLEdgeColumnBuilder : public IContextColumnBuilder { for (const auto& label : labels) { index_[label.first] = idx++; prop_cols_[index_[label.first]] = - CreateColumn(label.second, StorageStrategy::kMem); - prop_cols_[index_[label.first]]->open_in_memory(""); + EdgePropVecBase::make_edge_prop_vec(label.second); } } ~BDMLEdgeColumnBuilder() = default; @@ -793,28 +881,27 @@ class BDMLEdgeColumnBuilder : public IContextColumnBuilder { void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - auto label = std::get<0>(e); + auto label = e.label_triplet_; if (index_.find(label) == index_.end()) { index_[label] = labels_.size(); - auto data = std::get<3>(e); - labels_.emplace_back(label, data.type); - prop_cols_.emplace_back(CreateColumn(data.type, StorageStrategy::kMem)); - prop_cols_.back()->open_in_memory(""); + auto data = e.prop_; + auto type = rt_type_to_property_type(data.type); + labels_.emplace_back(label, type); + prop_cols_.emplace_back(EdgePropVecBase::make_edge_prop_vec(type)); } auto index = index_[label]; - push_back_opt(index, std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + push_back_opt(index, e.src_, e.dst_, e.prop_, e.dir_); } - void push_back_opt(int8_t index, vid_t src, vid_t dst, const Any& data, + void push_back_opt(int8_t index, vid_t src, vid_t dst, const EdgeData& data, Direction dir) { edges_.emplace_back(index, src, dst, prop_cols_[index]->size(), dir == Direction::kOut); - prop_cols_[index]->resize(prop_cols_[index]->size() + 1); - prop_cols_[index]->set_any(prop_cols_[index]->size() - 1, data); + // prop_cols_[index]->resize(prop_cols_[index]->size() + 1); + set_edge_data(prop_cols_[index].get(), prop_cols_[index]->size(), data); } - void push_back_opt(LabelTriplet label, vid_t src, vid_t dst, const Any& data, - Direction dir) { + void push_back_opt(LabelTriplet label, vid_t src, vid_t dst, + const EdgeData& data, Direction dir) { auto index = index_[label]; push_back_opt(index, src, dst, data, dir); } @@ -836,7 +923,7 @@ class BDMLEdgeColumnBuilder : public IContextColumnBuilder { std::map index_; std::vector> labels_; std::vector> edges_; - std::vector> prop_cols_; + std::vector> prop_cols_; }; class OptionalBDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { @@ -845,22 +932,19 @@ class OptionalBDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { PropertyType prop_type) : label_(label), prop_type_(prop_type), - prop_col_(CreateColumn(prop_type, StorageStrategy::kMem)) { - prop_col_->open_in_memory(""); - } + prop_col_(EdgePropVecBase::make_edge_prop_vec(prop_type)) {} ~OptionalBDSLEdgeColumnBuilder() = default; void reserve(size_t size) override { edges_.reserve(size); } void push_back_elem(const RTAny& val) override { const auto& e = val.as_edge(); - push_back_opt(std::get<1>(e), std::get<2>(e), std::get<3>(e), - std::get<4>(e)); + push_back_opt(e.src_, e.dst_, e.prop_, e.dir_); } - void push_back_opt(vid_t src, vid_t dst, const Any& data, Direction dir) { + void push_back_opt(vid_t src, vid_t dst, const EdgeData& data, + Direction dir) { edges_.emplace_back(src, dst, dir == Direction::kOut); size_t len = edges_.size(); - prop_col_->resize(len); - prop_col_->set_any(len - 1, data); + set_edge_data(prop_col_.get(), len - 1, data); } void push_back_endpoints(vid_t src, vid_t dst, Direction dir) { edges_.emplace_back(src, dst, dir == Direction::kOut); @@ -878,11 +962,12 @@ class OptionalBDSLEdgeColumnBuilder : public IOptionalContextColumnBuilder { std::shared_ptr finish() override; private: + friend class BDSLEdgeColumn; friend class OptionalBDSLEdgeColumn; LabelTriplet label_; std::vector> edges_; PropertyType prop_type_; - std::shared_ptr prop_col_; + std::shared_ptr prop_col_; }; } // namespace runtime diff --git a/flex/engines/graph_db/runtime/common/columns/i_context_column.h b/flex/engines/graph_db/runtime/common/columns/i_context_column.h index c35f97f19378..bea97334afb8 100644 --- a/flex/engines/graph_db/runtime/common/columns/i_context_column.h +++ b/flex/engines/graph_db/runtime/common/columns/i_context_column.h @@ -192,6 +192,12 @@ class IContextColumn { return nullptr; } + virtual std::shared_ptr optional_shuffle( + const std::vector& offsets) const { + LOG(FATAL) << "not implemented for " << this->column_info(); + return nullptr; + } + virtual std::shared_ptr union_col( std::shared_ptr other) const { LOG(FATAL) << "not implemented for " << this->column_info(); @@ -215,6 +221,20 @@ class IContextColumn { virtual void generate_dedup_offset(std::vector& offsets) const { LOG(FATAL) << "not implemented for " << this->column_info(); } + + virtual std::pair, + std::vector>> + generate_aggregate_offset() const { + LOG(INFO) << "not implemented for " << this->column_info(); + std::shared_ptr col(nullptr); + return std::make_pair(col, std::vector>()); + } + + virtual bool order_by_limit(bool asc, size_t limit, + std::vector& offsets) const { + LOG(INFO) << "order by limit not implemented for " << this->column_info(); + return false; + } }; class IContextColumnBuilder { diff --git a/flex/engines/graph_db/runtime/common/columns/value_columns.h b/flex/engines/graph_db/runtime/common/columns/value_columns.h index 1547ab7b91fb..9d9f964618d1 100644 --- a/flex/engines/graph_db/runtime/common/columns/value_columns.h +++ b/flex/engines/graph_db/runtime/common/columns/value_columns.h @@ -19,6 +19,7 @@ #include "flex/engines/graph_db/runtime/common/columns/i_context_column.h" #include "flex/engines/graph_db/runtime/common/rt_any.h" +#include #include namespace gs { diff --git a/flex/engines/graph_db/runtime/common/operators/get_v.h b/flex/engines/graph_db/runtime/common/operators/get_v.h index 6a509c825631..16160881cdc8 100644 --- a/flex/engines/graph_db/runtime/common/operators/get_v.h +++ b/flex/engines/graph_db/runtime/common/operators/get_v.h @@ -113,7 +113,7 @@ class GetV { if (opt == VOpt::kStart) { input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (pred(label.src_label, src, index)) { builder.push_back_opt(src); shuffle_offset.push_back(index); @@ -122,7 +122,7 @@ class GetV { } else if (opt == VOpt::kEnd) { input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (pred(label.dst_label, dst, index)) { builder.push_back_opt(dst); shuffle_offset.push_back(index); @@ -159,7 +159,7 @@ class GetV { if (opt == VOpt::kStart) { input_edge_list.foreach_edge([&](size_t index, const LabelTriplet& label, vid_t src, - vid_t dst, const Any& edata, + vid_t dst, const EdgeData& edata, Direction dir) { if (std::find(labels.begin(), labels.end(), label.src_label) != labels.end()) { @@ -170,7 +170,7 @@ class GetV { } else if (opt == VOpt::kEnd) { input_edge_list.foreach_edge([&](size_t index, const LabelTriplet& label, vid_t src, - vid_t dst, const Any& edata, + vid_t dst, const EdgeData& edata, Direction dir) { if (std::find(labels.begin(), labels.end(), label.dst_label) != labels.end()) { @@ -196,7 +196,7 @@ class GetV { CHECK(params.opt == VOpt::kOther); input_edge_list.foreach_edge([&](size_t index, const LabelTriplet& label, vid_t src, - vid_t dst, const Any& edata, + vid_t dst, const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { builder.push_back_vertex(std::make_pair(label.dst_label, dst)); @@ -212,7 +212,7 @@ class GetV { SLVertexColumnBuilder builder(type.src_label); input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { builder.push_back_opt(dst); shuffle_offset.push_back(index); @@ -237,7 +237,7 @@ class GetV { SLVertexColumnBuilder builder(labels[0]); input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { if (label.dst_label == labels[0]) { builder.push_back_opt(dst); @@ -257,7 +257,7 @@ class GetV { MLVertexColumnBuilder builder; input_edge_list.foreach_edge([&](size_t index, const LabelTriplet& label, vid_t src, - vid_t dst, const Any& edata, + vid_t dst, const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { if (std::find(labels.begin(), labels.end(), label.dst_label) != @@ -286,7 +286,7 @@ class GetV { CHECK(params.opt == VOpt::kOther); input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { builder.push_back_vertex(std::make_pair(label.dst_label, dst)); } else { @@ -303,7 +303,7 @@ class GetV { SLVertexColumnBuilder builder(vlabel); input_edge_list.foreach_edge( [&](size_t index, const LabelTriplet& label, vid_t src, vid_t dst, - const Any& edata, Direction dir) { + const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { if (label.dst_label == vlabel) { builder.push_back_opt(dst); @@ -327,7 +327,7 @@ class GetV { MLVertexColumnBuilder builder; input_edge_list.foreach_edge([&](size_t index, const LabelTriplet& label, vid_t src, - vid_t dst, const Any& edata, + vid_t dst, const EdgeData& edata, Direction dir) { if (dir == Direction::kOut) { if (labels[label.dst_label]) { diff --git a/flex/engines/graph_db/runtime/common/rt_any.cc b/flex/engines/graph_db/runtime/common/rt_any.cc index 3fb774bb1ffc..68d9eb79c616 100644 --- a/flex/engines/graph_db/runtime/common/rt_any.cc +++ b/flex/engines/graph_db/runtime/common/rt_any.cc @@ -48,6 +48,9 @@ const RTAnyType RTAnyType::kNull = RTAnyType(RTAnyType::RTAnyTypeImpl::kNull); const RTAnyType RTAnyType::kTuple = RTAnyType(RTAnyType::RTAnyTypeImpl::kTuple); const RTAnyType RTAnyType::kList = RTAnyType(RTAnyType::RTAnyTypeImpl::kList); const RTAnyType RTAnyType::kMap = RTAnyType(RTAnyType::RTAnyTypeImpl::kMap); +const RTAnyType RTAnyType::kEmpty = RTAnyType(RTAnyType::RTAnyTypeImpl::kEmpty); +const RTAnyType RTAnyType::kRecordView = + RTAnyType(RTAnyType::RTAnyTypeImpl::kRecordView); RTAny List::get(size_t idx) const { return impl_->get(idx); } RTAnyType parse_from_ir_data_type(const ::common::IrDataType& dt) { switch (dt.type_case()) { @@ -70,6 +73,8 @@ RTAnyType parse_from_ir_data_type(const ::common::IrDataType& dt) { return RTAnyType::kDate32; case ::common::DataType::DOUBLE: return RTAnyType::kF64Value; + case ::common::DataType::NONE: + return RTAnyType::kUnknown; default: LOG(FATAL) << "unrecoginized data type - " << ddt; break; @@ -97,6 +102,27 @@ RTAnyType parse_from_ir_data_type(const ::common::IrDataType& dt) { return RTAnyType::kUnknown; } +PropertyType rt_type_to_property_type(RTAnyType type) { + switch (type.type_enum_) { + case RTAnyType::RTAnyTypeImpl::kEmpty: + return PropertyType::kEmpty; + case RTAnyType::RTAnyTypeImpl::kI64Value: + return PropertyType::kInt64; + case RTAnyType::RTAnyTypeImpl::kI32Value: + return PropertyType::kInt32; + case RTAnyType::RTAnyTypeImpl::kF64Value: + return PropertyType::kDouble; + case RTAnyType::RTAnyTypeImpl::kBoolValue: + return PropertyType::kBool; + case RTAnyType::RTAnyTypeImpl::kStringValue: + return PropertyType::kString; + case RTAnyType::RTAnyTypeImpl::kDate32: + return PropertyType::kDate; + default: + LOG(FATAL) << "not support for " << static_cast(type.type_enum_); + } +} + RTAny::RTAny() : type_(RTAnyType::kUnknown), value_() {} RTAny::RTAny(RTAnyType type) : type_(type) {} @@ -127,6 +153,32 @@ RTAny::RTAny(const Any& val) { } } +RTAny::RTAny(const EdgeData& val) { + if (val.type == RTAnyType::kI64Value) { + type_ = RTAnyType::kI64Value; + value_.i64_val = val.value.i64_val; + } else if (val.type == RTAnyType::kStringValue) { + type_ = RTAnyType::kStringValue; + value_.str_val = + std::string_view(val.value.str_val.data(), val.value.str_val.size()); + } else if (val.type == RTAnyType::kI32Value) { + type_ = RTAnyType::kI32Value; + value_.i32_val = val.value.i32_val; + } else if (val.type == RTAnyType::kF64Value) { + type_ = RTAnyType::kF64Value; + value_.f64_val = val.value.f64_val; + } else if (val.type == RTAnyType::kBoolValue) { + type_ = RTAnyType::kBoolValue; + value_.b_val = val.value.b_val; + } else if (val.type == RTAnyType::kDate32) { + type_ = RTAnyType::kDate32; + value_.i64_val = val.value.i64_val; + } else { + LOG(FATAL) << "Any value: " << val.to_string() + << ", type = " << static_cast(val.type.type_enum_); + } +} + RTAny::RTAny(const Path& p) { type_ = RTAnyType::kPath; value_.p = p; @@ -202,8 +254,7 @@ RTAny RTAny::from_vertex(const std::pair& v) { return ret; } -RTAny RTAny::from_edge( - const std::tuple& v) { +RTAny RTAny::from_edge(const EdgeRecord& v) { RTAny ret; ret.type_ = RTAnyType::kEdge; ret.value_.edge = v; @@ -343,8 +394,7 @@ const std::pair& RTAny::as_vertex() const { CHECK(type_ == RTAnyType::kVertex); return value_.vertex; } -const std::tuple& RTAny::as_edge() - const { +const EdgeRecord& RTAny::as_edge() const { CHECK(type_ == RTAnyType::kEdge); return value_.edge; } @@ -618,6 +668,23 @@ void sink_vertex(const gs::ReadTransaction& txn, } } +static void sink_edge_data(const EdgeData& any, common::Value* value) { + if (any.type == RTAnyType::kI64Value) { + value->set_i64(any.value.i64_val); + } else if (any.type == RTAnyType::kStringValue) { + value->set_str(any.value.str_val.data(), any.value.str_val.size()); + } else if (any.type == RTAnyType::kI32Value) { + value->set_i32(any.value.i32_val); + } else if (any.type == RTAnyType::kF64Value) { + value->set_f64(any.value.f64_val); + } else if (any.type == RTAnyType::kBoolValue) { + value->set_boolean(any.value.b_val); + } else { + LOG(FATAL) << "Any value: " << any.to_string() + << ", type = " << static_cast(any.type.type_enum_); + } +} + void RTAny::sink(const gs::ReadTransaction& txn, int id, results::Column* col) const { col->mutable_name_or_id()->set_id(id); @@ -677,9 +744,9 @@ void RTAny::sink(const gs::ReadTransaction& txn, int id, if (prop_names.size() == 1) { auto props = e->add_properties(); props->mutable_key()->set_name(prop_names[0]); - sink_any(prop, e->mutable_properties(0)->mutable_value()); + sink_edge_data(prop, e->mutable_properties(0)->mutable_value()); } else if (prop_names.size() > 1) { - auto rv = prop.AsRecordView(); + auto rv = prop.as(); if (rv.size() != prop_names.size()) { LOG(ERROR) << "record view size not match with prop names"; } @@ -803,6 +870,27 @@ std::string RTAny::to_string() const { } } +std::shared_ptr EdgePropVecBase::make_edge_prop_vec( + PropertyType type) { + if (type == PropertyType::Int64()) { + return std::make_shared>(); + } else if (type == PropertyType::StringView()) { + return std::make_shared>(); + } else if (type == PropertyType::Date()) { + return std::make_shared>(); + } else if (type == PropertyType::Int32()) { + return std::make_shared>(); + } else if (type == PropertyType::Double()) { + return std::make_shared>(); + } else if (type == PropertyType::Bool()) { + return std::make_shared>(); + } else if (type == PropertyType::Empty()) { + return std::make_shared>(); + } else { + LOG(FATAL) << "not support for " << type; + return nullptr; + } +} } // namespace runtime } // namespace gs diff --git a/flex/engines/graph_db/runtime/common/rt_any.h b/flex/engines/graph_db/runtime/common/rt_any.h index 3b73f06e3998..a2b7d1f95c9c 100644 --- a/flex/engines/graph_db/runtime/common/rt_any.h +++ b/flex/engines/graph_db/runtime/common/rt_any.h @@ -34,6 +34,14 @@ class PathImpl { new_path->path_.push_back(std::make_pair(label, v)); return new_path; } + static std::shared_ptr make_path_impl( + label_t label, std::vector& path_ids) { + auto new_path = std::make_shared(); + for (auto id : path_ids) { + new_path->path_.push_back({label, id}); + } + return new_path; + } std::shared_ptr expand(label_t label, vid_t v) const { auto new_path = std::make_shared(); new_path->path_ = path_; @@ -61,6 +69,7 @@ class PathImpl { }; class Path { public: + Path() : impl_(nullptr) {} static Path make_path(const std::shared_ptr& impl) { Path new_path; new_path.impl_ = impl.get(); @@ -166,6 +175,8 @@ class RTAnyType { kTuple, kList, kMap, + kEmpty, + kRecordView, }; static const RTAnyType kVertex; static const RTAnyType kEdge; @@ -184,6 +195,8 @@ class RTAnyType { static const RTAnyType kTuple; static const RTAnyType kList; static const RTAnyType kMap; + static const RTAnyType kEmpty; + static const RTAnyType kRecordView; RTAnyType() : type_enum_(RTAnyTypeImpl::kUnknown) {} RTAnyType(const RTAnyType& other) @@ -198,6 +211,8 @@ class RTAnyType { bool null_able_; }; +PropertyType rt_type_to_property_type(RTAnyType type); + class Map { public: static Map make_map(MapImpl impl) { @@ -213,6 +228,224 @@ class Map { MapImpl map_; }; +struct pod_string_view { + const char* data_; + size_t size_; + pod_string_view() = default; + pod_string_view(const pod_string_view& other) = default; + pod_string_view(const char* data) : data_(data), size_(strlen(data_)) {} + pod_string_view(const char* data, size_t size) : data_(data), size_(size) {} + pod_string_view(const std::string& str) + : data_(str.data()), size_(str.size()) {} + pod_string_view(const std::string_view& str) + : data_(str.data()), size_(str.size()) {} + const char* data() const { return data_; } + size_t size() const { return size_; } + + std::string to_string() const { return std::string(data_, size_); } +}; +struct EdgeData { + // PropertyType type; + + template + T as() const { + if constexpr (std::is_same_v) { + return value.i32_val; + } else if constexpr (std::is_same_v) { + return value.i64_val; + } else if constexpr (std::is_same_v) { + return value.u64_val; + } else if constexpr (std::is_same_v) { + return value.f64_val; + } else if constexpr (std::is_same_v) { + return value.b_val; + } else if constexpr (std::is_same_v) { + return value.str_val; + } else if constexpr (std::is_same_v) { + return grape::EmptyType(); + } else if constexpr (std::is_same_v) { + return Date(value.i64_val); + } else { + LOG(FATAL) << "not support for " << typeid(T).name(); + } + } + + template + explicit EdgeData(T val) { + if constexpr (std::is_same_v) { + type = RTAnyType::kI32Value; + value.i32_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kI64Value; + value.i64_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kU64Value; + value.u64_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kF64Value; + value.f64_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kBoolValue; + value.b_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kStringValue; + value.str_val = val; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kEmpty; + } else if constexpr (std::is_same_v) { + type = RTAnyType::kDate32; + value.i64_val = val.milli_second; + } else { + LOG(FATAL) << "not support for " << typeid(T).name(); + } + } + + std::string to_string() const { + if (type == RTAnyType::kI32Value) { + return std::to_string(value.i32_val); + } else if (type == RTAnyType::kI64Value) { + return std::to_string(value.i64_val); + } else if (type == RTAnyType::kStringValue) { + return std::string(value.str_val.data(), value.str_val.size()); + return value.str_val.to_string(); + } else if (type == RTAnyType::kNull) { + return "NULL"; + } else if (type == RTAnyType::kF64Value) { + return std::to_string(value.f64_val); + } else if (type == RTAnyType::kBoolValue) { + return value.b_val ? "true" : "false"; + } else if (type == RTAnyType::kEmpty) { + return ""; + } else if (type == RTAnyType::kDate32) { + return std::to_string(value.i64_val); + } else if (type == RTAnyType::kEmpty) { + return ""; + } else { + LOG(FATAL) << "Unexpected property type: " + << static_cast(type.type_enum_); + return ""; + } + } + + EdgeData() = default; + + EdgeData(const Any& any) { + switch (any.type.type_enum) { + case impl::PropertyTypeImpl::kInt64: + type = RTAnyType::kI64Value; + value.i64_val = any.value.l; + break; + case impl::PropertyTypeImpl::kInt32: + type = RTAnyType::kI32Value; + value.i32_val = any.value.i; + break; + case impl::PropertyTypeImpl::kStringView: + type = RTAnyType::kStringValue; + value.str_val = any.value.s; + break; + case impl::PropertyTypeImpl::kDouble: + type = RTAnyType::kF64Value; + value.f64_val = any.value.db; + break; + case impl::PropertyTypeImpl::kBool: + type = RTAnyType::kBoolValue; + value.b_val = any.value.b; + break; + case impl::PropertyTypeImpl::kEmpty: + type = RTAnyType::kEmpty; + break; + case impl::PropertyTypeImpl::kDate: + type = RTAnyType::kDate32; + value.i64_val = any.value.d.milli_second; + break; + default: + LOG(FATAL) << "Unexpected property type: " + << static_cast(any.type.type_enum); + } + } + + bool operator<(const EdgeData& e) const { + if (type == RTAnyType::kI64Value) { + return value.i64_val < e.value.i64_val; + } else if (type == RTAnyType::kI32Value) { + return value.i32_val < e.value.i32_val; + } else if (type == RTAnyType::kF64Value) { + return value.f64_val < e.value.f64_val; + } else if (type == RTAnyType::kBoolValue) { + return value.b_val < e.value.b_val; + } else if (type == RTAnyType::kStringValue) { + return std::string_view(value.str_val.data(), value.str_val.size()) < + std::string_view(e.value.str_val.data(), e.value.str_val.size()); + } else if (type == RTAnyType::kDate32) { + return value.i64_val < e.value.i64_val; + } else { + return false; + } + } + + bool operator==(const EdgeData& e) const { + if (type == RTAnyType::kI64Value) { + return value.i64_val == e.value.i64_val; + } else if (type == RTAnyType::kI32Value) { + return value.i32_val == e.value.i32_val; + } else if (type == RTAnyType::kF64Value) { + return value.f64_val == e.value.f64_val; + } else if (type == RTAnyType::kBoolValue) { + return value.b_val == e.value.b_val; + } else if (type == RTAnyType::kStringValue) { + return std::string_view(value.str_val.data(), value.str_val.size()) == + std::string_view(e.value.str_val.data(), e.value.str_val.size()); + } else if (type == RTAnyType::kDate32) { + return value.i64_val == e.value.i64_val; + } else { + return false; + } + } + RTAnyType type; + + union { + int32_t i32_val; + int64_t i64_val; + uint64_t u64_val; + double f64_val; + bool b_val; + pod_string_view str_val; + Date date_val; + + // todo: make recordview as a pod type + // RecordView record; + } value; +}; +class EdgeRecord { + public: + EdgeRecord() = default; + EdgeRecord(LabelTriplet label_triplet, vid_t src, vid_t dst, EdgeData prop, + Direction dir) + : label_triplet_(label_triplet), + src_(src), + dst_(dst), + prop_(prop), + dir_(dir) {} + bool operator<(const EdgeRecord& e) const { + return std::tie(src_, dst_, label_triplet_, prop_, dir_) < + std::tie(e.src_, e.dst_, e.label_triplet_, prop_, dir_); + } + bool operator==(const EdgeRecord& e) const { + return std::tie(src_, dst_, label_triplet_, prop_, dir_) == + std::tie(e.src_, e.dst_, e.label_triplet_, prop_, dir_); + } + vid_t src() const { return src_; } + vid_t dst() const { return dst_; } + LabelTriplet label_triplet() const { return label_triplet_; } + EdgeData prop() const { return prop_; } + Direction dir() const { return dir_; } + + LabelTriplet label_triplet_; + vid_t src_, dst_; + EdgeData prop_; + Direction dir_; +}; + RTAnyType parse_from_ir_data_type(const ::common::IrDataType& dt); union RTAnyValue { @@ -220,7 +453,7 @@ union RTAnyValue { ~RTAnyValue() {} std::pair vertex; - std::tuple edge; + EdgeRecord edge; int64_t i64_val; uint64_t u64_val; int i32_val; @@ -240,6 +473,7 @@ class RTAny { RTAny(); RTAny(RTAnyType type); RTAny(const Any& val); + RTAny(const EdgeData& val); RTAny(const RTAny& rhs); RTAny(const Path& p); ~RTAny() = default; @@ -251,8 +485,7 @@ class RTAny { static RTAny from_vertex(label_t l, vid_t v); static RTAny from_vertex(const std::pair& v); - static RTAny from_edge( - const std::tuple& v); + static RTAny from_edge(const EdgeRecord& v); static RTAny from_bool(bool v); static RTAny from_int64(int64_t v); static RTAny from_uint64(uint64_t v); @@ -275,7 +508,7 @@ class RTAny { int64_t as_date32() const; double as_double() const; const std::pair& as_vertex() const; - const std::tuple& as_edge() const; + const EdgeRecord& as_edge() const; const std::set& as_string_set() const; std::string_view as_string() const; const std::vector& as_vertex_list() const; @@ -379,17 +612,17 @@ struct TypedConverter { template <> struct TypedConverter { - static RTAnyType type() { return RTAnyType::kI64Value; } + static RTAnyType type() { return RTAnyType::kF64Value; } static double to_typed(const RTAny& val) { return val.as_double(); } static RTAny from_typed(double val) { return RTAny::from_double(val); } - static const std::string name() { return "int64"; } + static const std::string name() { return "double"; } }; template <> struct TypedConverter { static RTAnyType type() { return RTAnyType::kDate32; } static Date to_typed(const RTAny& val) { return val.as_date32(); } static RTAny from_typed(Date val) { return RTAny::from_date32(val); } - static const std::string name() { return "int64"; } + static const std::string name() { return "date"; } static Date typed_from_string(const std::string& str) { int64_t val = std::stoll(str); return Date(val); @@ -504,6 +737,79 @@ class ListImpl : public ListImplBase { std::vector list_; std::vector is_valid_; }; +class EdgePropVecBase { + public: + static std::shared_ptr make_edge_prop_vec(PropertyType type); + virtual ~EdgePropVecBase() = default; + virtual size_t size() const = 0; + virtual void resize(size_t size) = 0; + virtual void reserve(size_t size) = 0; + virtual void clear() = 0; + virtual EdgeData get(size_t idx) const = 0; + + virtual PropertyType type() const = 0; + virtual void set_any(size_t idx, EdgePropVecBase* other, + size_t other_idx) = 0; +}; +template +class EdgePropVec : public EdgePropVecBase { + public: + ~EdgePropVec() {} + + void push_back(const T& val) { prop_data_.push_back(val); } + void emplace_back(T&& val) { prop_data_.emplace_back(std::move(val)); } + size_t size() const override { return prop_data_.size(); } + + EdgeData get(size_t idx) const override { return EdgeData(prop_data_[idx]); } + + T get_view(size_t idx) const { return prop_data_[idx]; } + void resize(size_t size) override { prop_data_.resize(size); } + void clear() override { prop_data_.clear(); } + void reserve(size_t size) override { prop_data_.reserve(size); } + T operator[](size_t idx) const { return prop_data_[idx]; } + void set(size_t idx, const T& val) { + if (prop_data_.size() <= idx) { + prop_data_.resize(idx + 1); + } + prop_data_[idx] = val; + } + + PropertyType type() const override { return AnyConverter::type(); } + + void set_any(size_t idx, EdgePropVecBase* other, size_t other_idx) override { + CHECK(dynamic_cast*>(other) != nullptr); + set(idx, dynamic_cast*>(other)->get_view(other_idx)); + } + + private: + std::vector prop_data_; +}; + +template <> +class EdgePropVec : public EdgePropVecBase { + public: + ~EdgePropVec() {} + void push_back(const grape::EmptyType& val) { size_++; } + void emplace_back(grape::EmptyType&& val) { size_++; } + size_t size() const override { return size_; } + + EdgeData get(size_t idx) const override { + return EdgeData(grape::EmptyType()); + } + + grape::EmptyType get_view(size_t idx) const { return grape::EmptyType(); } + void resize(size_t size) override { size_ = size; } + void clear() override {} + void reserve(size_t size) override {} + grape::EmptyType operator[](size_t idx) const { return grape::EmptyType(); } + void set(size_t idx, const grape::EmptyType& val) {} + + PropertyType type() const override { return PropertyType::kEmpty; } + + void set_any(size_t idx, EdgePropVecBase* other, size_t other_idx) override {} + size_t size_; +}; + } // namespace runtime } // namespace gs diff --git a/flex/engines/graph_db/runtime/common/types.h b/flex/engines/graph_db/runtime/common/types.h index af5cd4f80c93..65de6829fdad 100644 --- a/flex/engines/graph_db/runtime/common/types.h +++ b/flex/engines/graph_db/runtime/common/types.h @@ -56,6 +56,7 @@ enum class JoinKind { }; struct LabelTriplet { + LabelTriplet() = default; LabelTriplet(label_t src, label_t dst, label_t edge) : src_label(src), dst_label(dst), edge_label(edge) {} From 0c3d19eba9204ba55bb57e44282ca67579f9e754 Mon Sep 17 00:00:00 2001 From: Xiaoli Zhou Date: Thu, 2 Jan 2025 15:10:42 +0800 Subject: [PATCH 3/3] fix(interactive): Fix Bugs of Type Inference in `Collect(labels(n))` (#4398) ## What do these changes do? ## Related issue number Fixes #4388 --- .../common/ir/runtime/proto/Utils.java | 11 +- .../ir/runtime/GraphRelToProtoTest.java | 25 ++++- .../test/resources/proto/collect_labels.json | 106 ++++++++++++++++++ 3 files changed, 130 insertions(+), 12 deletions(-) create mode 100644 interactive_engine/compiler/src/test/resources/proto/collect_labels.json diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java index a11f2eae2c59..8e595417fc29 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/ir/runtime/proto/Utils.java @@ -296,15 +296,14 @@ public static final Common.DataType protoBasicDataType(RelDataType basicType) { case MULTISET: case ARRAY: RelDataType elementType = basicType.getComponentType(); - switch (elementType.getSqlTypeName()) { - case INTEGER: + Common.DataType dataType = protoBasicDataType(elementType); + switch (dataType) { + case INT32: return Common.DataType.INT32_ARRAY; - case BIGINT: + case INT64: return Common.DataType.INT64_ARRAY; - case CHAR: + case STRING: return Common.DataType.STRING_ARRAY; - case DECIMAL: - case FLOAT: case DOUBLE: return Common.DataType.DOUBLE_ARRAY; default: diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java index b59695ef1337..56ad0c2dda53 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/common/ir/runtime/GraphRelToProtoTest.java @@ -27,12 +27,7 @@ import com.alibaba.graphscope.common.ir.tools.GraphBuilder; import com.alibaba.graphscope.common.ir.tools.GraphStdOperatorTable; import com.alibaba.graphscope.common.ir.tools.LogicalPlan; -import com.alibaba.graphscope.common.ir.tools.config.ExpandConfig; -import com.alibaba.graphscope.common.ir.tools.config.GetVConfig; -import com.alibaba.graphscope.common.ir.tools.config.GraphOpt; -import com.alibaba.graphscope.common.ir.tools.config.LabelConfig; -import com.alibaba.graphscope.common.ir.tools.config.PathExpandConfig; -import com.alibaba.graphscope.common.ir.tools.config.SourceConfig; +import com.alibaba.graphscope.common.ir.tools.config.*; import com.alibaba.graphscope.common.utils.FileUtils; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -1237,6 +1232,24 @@ public void intersect_test_02() throws Exception { } } + @Test + public void collect_labels_test() { + Configs configs = getMockCBOConfig(); + GraphRelOptimizer optimizer = new GraphRelOptimizer(configs); + IrMeta irMeta = getMockCBOMeta(optimizer); + GraphBuilder builder = Utils.mockGraphBuilder(optimizer, irMeta); + RelNode before = + com.alibaba.graphscope.cypher.antlr4.Utils.eval( + "Match (n:PERSON) Return collect(labels(n));", builder) + .build(); + RelNode after = optimizer.optimize(before, new GraphIOProcessor(builder, irMeta)); + PhysicalBuilder protoBuilder = + new GraphRelProtoPhysicalBuilder(configs, irMeta, new LogicalPlan(after)); + Assert.assertEquals( + FileUtils.readJsonFromResource("proto/collect_labels.json"), + protoBuilder.build().explain().trim()); + } + private Configs getMockCBOConfig() { return new Configs( ImmutableMap.of( diff --git a/interactive_engine/compiler/src/test/resources/proto/collect_labels.json b/interactive_engine/compiler/src/test/resources/proto/collect_labels.json new file mode 100644 index 000000000000..51b91a48728b --- /dev/null +++ b/interactive_engine/compiler/src/test/resources/proto/collect_labels.json @@ -0,0 +1,106 @@ +{ + "plan": [{ + "opr": { + "scan": { + "alias": 0, + "params": { + "tables": [{ + "id": 1 + }], + "sampleRatio": 1.0 + } + } + }, + "metaData": [{ + "type": { + "graphType": { + "graphDataType": [{ + "label": { + "label": 1 + }, + "props": [{ + "propId": { + "name": "id" + }, + "type": "INT64" + }, { + "propId": { + "name": "firstName" + }, + "type": "STRING" + }, { + "propId": { + "name": "lastName" + }, + "type": "STRING" + }, { + "propId": { + "name": "gender" + }, + "type": "STRING" + }, { + "propId": { + "name": "birthday" + }, + "type": "INT64" + }, { + "propId": { + "name": "creationDate" + }, + "type": "DATE32" + }, { + "propId": { + "name": "locationIP" + }, + "type": "STRING" + }, { + "propId": { + "name": "browserUsed" + }, + "type": "STRING" + }] + }] + } + } + }] + }, { + "opr": { + "groupBy": { + "functions": [{ + "vars": [{ + "tag": { + "id": 0 + }, + "property": { + "label": { + } + }, + "nodeType": { + "dataType": "INT32" + } + }], + "aggregate": "TO_LIST", + "alias": 1 + }] + } + }, + "metaData": [{ + "type": { + "dataType": "INT32_ARRAY" + }, + "alias": 1 + }] + }, { + "opr": { + "sink": { + "tags": [{ + "tag": 1 + }], + "sinkTarget": { + "sinkDefault": { + } + } + } + } + }] +}