diff --git a/analytical_engine/core/io/property_parser.h b/analytical_engine/core/io/property_parser.h index acf3e1da25ca..35bb8cef6c45 100644 --- a/analytical_engine/core/io/property_parser.h +++ b/analytical_engine/core/io/property_parser.h @@ -133,6 +133,13 @@ struct Graph { bool retain_oid = true; bool compact_edges = false; bool use_perfect_hash = false; + // This is used to extend the label data + // when user try to add data to existed labels. + // the available option is 0/1/2, + // 0 stands for no extend, + // 1 stands for extend vertex label data, + // 2 stands for extend edge label data. + int extend_type = 0; std::string SerializeToString() const { std::stringstream ss; @@ -289,6 +296,7 @@ inline bl::result> ParseCreatePropertyGraph( BOOST_LEAF_AUTO(compact_edges, params.Get(rpc::COMPACT_EDGES, false)); BOOST_LEAF_AUTO(use_perfect_hash, params.Get(rpc::USE_PERFECT_HASH, false)); + BOOST_LEAF_AUTO(extend_type, params.Get(rpc::EXTEND_LABEL_DATA)); auto graph = std::make_shared(); graph->directed = directed; @@ -296,6 +304,7 @@ inline bl::result> ParseCreatePropertyGraph( graph->retain_oid = retain_oid; graph->compact_edges = compact_edges; graph->use_perfect_hash = use_perfect_hash; + graph->extend_type = extend_type; const auto& large_attr = params.GetLargeAttr(); for (const auto& item : large_attr.chunk_list().items()) { diff --git a/analytical_engine/core/loader/arrow_fragment_loader.h b/analytical_engine/core/loader/arrow_fragment_loader.h index 1a8c369f8755..6db66e16535a 100644 --- a/analytical_engine/core/loader/arrow_fragment_loader.h +++ b/analytical_engine/core/loader/arrow_fragment_loader.h @@ -272,6 +272,22 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader { return Base::addVerticesAndEdges(frag_id, std::move(raw_v_e_tables)); } + bl::result AddDataToExistedVLable( + vineyard::ObjectID frag_id, label_id_t label_id) { + BOOST_LEAF_CHECK(initPartitioner()); + BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); + return Base::addDataToExistedVLabel(frag_id, label_id, + std::move(raw_v_e_tables)); + } + + bl::result AddDataToExistedELable( + vineyard::ObjectID frag_id, label_id_t label_id) { + BOOST_LEAF_CHECK(initPartitioner()); + BOOST_LEAF_AUTO(raw_v_e_tables, LoadVertexEdgeTables()); + return Base::addDataToExistedELabel(frag_id, label_id, + std::move(raw_v_e_tables)); + } + boost::leaf::result AddLabelsToFragmentAsFragmentGroup( vineyard::ObjectID frag_id) { BOOST_LEAF_AUTO(new_frag_id, AddLabelsToFragment(frag_id)); @@ -279,6 +295,59 @@ class ArrowFragmentLoader : public vineyard::ArrowFragmentLoader { return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); } + bl::result ExtendLabelData(vineyard::ObjectID frag_id, + int extend_type) { + // find duplicate label id + assert(extend_type); + auto frag = std::dynamic_pointer_cast( + client_.GetObject(frag_id)); + vineyard::PropertyGraphSchema schema = frag->schema(); + std::vector labels; + label_id_t target_label_id = -1; + if (extend_type == 1) + labels = schema.GetVertexLabels(); + else if (extend_type == 2) + labels = schema.GetEdgeLabels(); + + std::map label_set; + for (size_t i = 0; i < labels.size(); ++i) + label_set[labels[i]] = i; + + if (extend_type == 1) { + for (size_t i = 0; i < graph_info_->vertices.size(); ++i) { + auto it = label_set.find(graph_info_->vertices[i]->label); + if (it != label_set.end()) { + target_label_id = it->second; + break; + } + } + } else if (extend_type == 2) { + for (size_t i = 0; i < graph_info_->edges.size(); ++i) { + auto it = label_set.find(graph_info_->edges[i]->label); + if (it != label_set.end()) { + target_label_id = it->second; + break; + } + } + } else { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "extend type is invalid"); + } + + if (target_label_id == -1) + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "label not found"); + vineyard::ObjectID new_frag_id; + if (extend_type == 1) { + BOOST_LEAF_ASSIGN(new_frag_id, + AddDataToExistedVLable(frag_id, target_label_id)); + } else if (extend_type == 2) { + BOOST_LEAF_ASSIGN(new_frag_id, + AddDataToExistedELable(frag_id, target_label_id)); + } + return vineyard::ConstructFragmentGroup(client_, new_frag_id, comm_spec_); + } + bl::result initPartitioner() { #ifdef HASH_PARTITION Base::partitioner_.Init(comm_spec_.fnum()); diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 6ca408c963df..0b840dad4bf9 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -346,9 +346,16 @@ AddLabelsToGraph(vineyard::ObjectID origin_frag_id, BOOST_LEAF_AUTO(graph_info, gs::ParseCreatePropertyGraph(params)); using loader_t = gs::arrow_fragment_loader_t; loader_t loader(client, comm_spec, graph_info); + vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID(); - BOOST_LEAF_AUTO(frag_group_id, - loader.AddLabelsToFragmentAsFragmentGroup(origin_frag_id)); + if (graph_info->extend_type) { + BOOST_LEAF_ASSIGN( + frag_group_id, + loader.ExtendLabelData(origin_frag_id, graph_info->extend_type)); + } else { + BOOST_LEAF_ASSIGN(frag_group_id, loader.AddLabelsToFragmentAsFragmentGroup( + origin_frag_id)); + } MPI_Barrier(comm_spec.comm()); LOG_IF(INFO, comm_spec.worker_id() == 0) diff --git a/proto/types.proto b/proto/types.proto index 82d9f69e1811..2155e7e63d02 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -201,6 +201,9 @@ enum ParamKey { IS_FROM_GAR = 70; GRAPH_INFO_PATH = 71; + // Extend label data + EXTEND_LABEL_DATA = 80; + APP_NAME = 100; APP_ALGO = 101; APP_LIBRARY_PATH = 102; diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index f8fe467a516e..2e977f0bef10 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -202,6 +202,7 @@ def add_labels_to_graph(graph, loader_op): config = { types_pb2.GRAPH_TYPE: utils.graph_type_to_attr(graph._graph_type), types_pb2.DIRECTED: utils.b_to_attr(graph._directed), + types_pb2.EXTEND_LABEL_DATA: utils.i_to_attr(graph._extend_label_data), types_pb2.OID_TYPE: utils.s_to_attr(graph._oid_type), types_pb2.VID_TYPE: utils.s_to_attr(graph._vid_type), types_pb2.GENERATE_EID: utils.b_to_attr(graph._generate_eid), diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 281ca3ddf54d..1d5822f2e658 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -65,6 +65,7 @@ def __init__(self): self._vertex_map = graph_def_pb2.GLOBAL_VERTEX_MAP self._compact_edges = False self._use_perfect_hash = False + self._extend_label_data = 0 @property def session_id(self): @@ -215,6 +216,7 @@ def _construct_op_of_empty_graph(self): config[types_pb2.VERTEX_MAP_TYPE] = utils.i_to_attr(self._vertex_map) config[types_pb2.COMPACT_EDGES] = utils.b_to_attr(self._compact_edges) config[types_pb2.USE_PERFECT_HASH] = utils.b_to_attr(self._use_perfect_hash) + config[types_pb2.EXTEND_LABEL_DATA] = utils.i_to_attr(self._extend_label_data) return dag_utils.create_graph( self.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=None, attrs=config ) @@ -304,6 +306,11 @@ def __init__( self._vertex_map = utils.vertex_map_type_to_enum(vertex_map) self._compact_edges = compact_edges self._use_perfect_hash = use_perfect_hash + # for need to extend label in 'eager mode' when add_vertices and add_edges + # 0 - not extending label + # 1 - extend vertex label + # 2 - extend edge label + self._extend_label_data = 0 # list of pair self._unsealed_vertices_and_edges = list() @@ -505,10 +512,13 @@ def add_vertices( "Cannot incrementally add vertices to graphs with compacted edges, " "please use `graphscope.load_from()` instead." ) - if label in self._v_labels: - raise ValueError(f"Label {label} already existed in graph.") if not self._v_labels and self._e_labels: raise ValueError("Cannot manually add vertices after inferred vertices.") + # currently not support local_vertex_map + if label in self._v_labels: + self._extend_label_data = 1 + warnings.warn(f"Label {label} already existed in graph" + ", origin label data will be extend.") unsealed_vertices_and_edges = deepcopy(self._unsealed_vertices_and_edges) vertex_label = VertexLabel( label=label, @@ -520,7 +530,8 @@ def add_vertices( ) unsealed_vertices_and_edges.append((self.op.key, vertex_label)) v_labels = deepcopy(self._v_labels) - v_labels.append(label) + if self._extend_label_data == 0: + v_labels.append(label) # generate and add a loader op to dag loader_op = dag_utils.create_loader(vertex_label) self._session.dag.add_op(loader_op) @@ -616,7 +627,7 @@ def add_edges( if self.evaluated: if label in self._e_labels: - raise ValueError(f"Label {label} already existed in graph") + self._extend_label_data = 2 unsealed_vertices = list() unsealed_edges = list() @@ -634,7 +645,7 @@ def add_edges( v_labels.append(dst_label) parent = self - if label in self.e_labels: + if not self.evaluated and label in self.e_labels: # aggregate op with the same edge label fork = False unsealed_vertices_and_edges = list()