From 1dd3916df637988af06679fab27140d4150a9f35 Mon Sep 17 00:00:00 2001 From: Weibin Zeng Date: Fri, 19 Apr 2024 10:03:41 +0800 Subject: [PATCH] feat(python): Unify the graph level load_from and save to API & Bump up vineyard to v0.22.0 (#3610) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What do these changes do? Unify the `load_from` and `save_to` API to support different kind of datasource. related discussion issue: #2836 #2920 ### `save_to` ```python def save_to(self, path, format="serialization", **kwargs) ``` We use `save_to` to dump graph to certain format data, currently support `graphar` and `serialization` and it's extensible to add a new format. - `path` output dir, support local,oss,s3,hdfs - `format` format to save, default is 'serialization' - related configurations the writing of each format may attach some related configurations. We set these configurations with naming strategy "format_config1", "format_config2". For example, graphar provide configuration: - graphar_graph_name - graphar_vertex_chunk_size - graphar_edge_chunk_size - graphar_file_type - graphar_store_in_local - `selector` to dump subgraph with selected vertices and edges User can define `selector` to only dump certain set of vertex and edge, example: ```python selector = { "vertices": { "person": ["id", "firstName", "secondName"], "comment": None, # select all properties }, "edges": { "knows": ["CreationDate"], "replyOf": None, }, } ``` - return format ```python {"type": format, "URI": "format+file:///tmp/graph/xxx"} ``` Example: ```ptyhon # graphar g.save_to("/tmp/graphar/", format="graphar", graphar_graph_name="ldbc", vertex_chunk_size=1024, graphar_edge_chunk_size=4096, graphar_file_type="parquet", graphar_store_in_local=False) {'type': 'grpahar', 'URI': 'graphar+file:///tmp/graphar/ldbc.graph.yml'} # graphar with selector selector = { "vertices": { "person": ["id", "firstName", "secondName"], "comment": None, # select all properties }, "edges": { "knows": ["CreationDate"], "replyOf": None, }, } g.save_to("/tmp/graphar/", format="graphar", selector=selector, graphar_graph_name="ldbc") {'type': 'grpahar', 'URI': 'graphar+file:///tmp/graphar/ldbc.graph.yml'} # serialization g.save_to("/tmp/serialization/") {'type':'serialization', 'URI': '/tmp/serialization/'} ``` ### `load_from` ```python def load_from(uri, **kwargs) ``` We use `load_from` to load graph to certain format data source, currently support `graphar` and `serialization` and it's extensible to add a new data source. - uri examples ```python graphar+file:///tmp/graphar/ldbc.graph.yaml graphar+oss://bucket/graphar/ldbc.graphar.yaml /tmp/serialization ``` - related configurations the loading of each format may attach some related configurations. We set these configurations with naming strategy "format_config1", "format_config2". For example, graphar provide configuration:`storage_options`. for example graphar can set configurations: ```python - graphar_store_in_local # the graphar files are store in local file system of workers, only support for local file system. ``` - `selector` to load subgraph with selected vertices and edges User can define `selector` to only load certain set of vertex and edge. example: ```python selector = { "vertices": { "person": None # select all properties "comment": None }, "edges": { "knows": None "replyOf": None }, } ``` - return A new Graph Example: ```ptyhon # graphar ```python Graph.load_from("graphar+file:///tmp/graphar/ldbc.graph.yml", graphar_store_in_local=True) # graphar with selector selector = { "vertices": { "person": None # select all properties "comment": None }, "edges": { "knows": None "replyOf": None }, } Graph.load_from("graphar+file:///tmp/graphar/ldbc.graph.yml", selector=selector) # serialization g.load_from("/tmp/serialization/") ``` ## Related issue number Fixes #2836 Fixes #2920 --------- Signed-off-by: acezen --- .../build-graphscope-wheels-macos.yml | 2 +- .github/workflows/gae.yml | 2 +- .github/workflows/local-ci.yml | 3 + .../networkx-forward-algo-nightly.yml | 2 +- .github/workflows/nightly.yml | 2 +- analytical_engine/core/grape_instance.cc | 7 +- analytical_engine/core/object/graph_utils.h | 1 - .../frame/property_graph_frame.cc | 141 ++++++++++- docs/reference/session.rst | 1 - docs/storage_engine/graphar.md | 167 +++++++++++-- k8s/Makefile | 2 +- .../manylinux/Makefile | 2 +- k8s/dockerfiles/vineyard-runtime.Dockerfile | 2 +- k8s/internal/Makefile | 2 +- python/graphscope/__init__.py | 1 - python/graphscope/client/session.py | 7 - python/graphscope/framework/dag_utils.py | 34 ++- python/graphscope/framework/graph.py | 224 ++++++++++++----- python/graphscope/framework/graph_builder.py | 55 ----- python/graphscope/gsctl/commands/dev.py | 5 +- .../gsctl/scripts/install_deps_command.sh | 2 +- python/graphscope/tests/unittest/test_gar.py | 40 --- .../graphscope/tests/unittest/test_graphar.py | 230 ++++++++++++++++++ python/requirements.txt | 5 +- 24 files changed, 725 insertions(+), 214 deletions(-) delete mode 100644 python/graphscope/tests/unittest/test_gar.py create mode 100644 python/graphscope/tests/unittest/test_graphar.py diff --git a/.github/workflows/build-graphscope-wheels-macos.yml b/.github/workflows/build-graphscope-wheels-macos.yml index 13387b2b28e5..55e625d92b01 100644 --- a/.github/workflows/build-graphscope-wheels-macos.yml +++ b/.github/workflows/build-graphscope-wheels-macos.yml @@ -185,7 +185,7 @@ jobs: run: | . ~/.graphscope_env python3 -m pip install libclang - git clone --single-branch -b v0.21.3 --depth=1 https://github.com/v6d-io/v6d.git /tmp/v6d + git clone --single-branch -b v0.22.0 --depth=1 https://github.com/v6d-io/v6d.git /tmp/v6d cd /tmp/v6d git submodule update --init cmake . -DCMAKE_INSTALL_PREFIX=/usr/local \ diff --git a/.github/workflows/gae.yml b/.github/workflows/gae.yml index eeaef7a85db3..8145b477dc73 100644 --- a/.github/workflows/gae.yml +++ b/.github/workflows/gae.yml @@ -30,7 +30,7 @@ jobs: runs-on: ubuntu-20.04 if: ${{ github.repository == 'alibaba/GraphScope' }} container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.21.3 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.22.0 steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/local-ci.yml b/.github/workflows/local-ci.yml index 2b195ec9096c..48f3530e8844 100644 --- a/.github/workflows/local-ci.yml +++ b/.github/workflows/local-ci.yml @@ -85,6 +85,7 @@ jobs: - 'analytical_engine/frame/**' - 'analytical_engine/core/**' - 'python/graphscope/nx/**' + - 'python/requirements.txt' gie-function-test: - 'interactive_engine/**' - 'python/graphscope/interactive/**' @@ -379,6 +380,7 @@ jobs: run: | # download dataset git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR} + export TMPDIR="${TMPDIR:-$(dirname $(mktemp))}" python3 -m pytest -d --tx popen//python=python3 \ -s -v \ @@ -421,6 +423,7 @@ jobs: - 'python/graphscope/nx/classes/**' - 'python/graphscope/nx/!(tests)' - 'python/graphscope/nx/tests/!(convert)' + - 'python/requirements.txt' convert: - 'python/graphscope/nx/convert.py' - 'python/graphscope/nx/convert_matrix.py' diff --git a/.github/workflows/networkx-forward-algo-nightly.yml b/.github/workflows/networkx-forward-algo-nightly.yml index 4f9a068fd773..d745472b43cf 100644 --- a/.github/workflows/networkx-forward-algo-nightly.yml +++ b/.github/workflows/networkx-forward-algo-nightly.yml @@ -17,7 +17,7 @@ jobs: run: shell: bash --noprofile --norc -eo pipefail {0} container: - image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.21.3 + image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.22.0 options: --shm-size 4096m diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index fbb03ad347d6..ecd8d8333559 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -44,7 +44,7 @@ jobs: sudo mkdir /opt/graphscope sudo chown -R $(id -u):$(id -g) /opt/graphscope python3 -m pip install click - python3 gsctl.py install-deps dev --v6d-version v0.21.3 + python3 gsctl.py install-deps dev --v6d-version v0.22.0 - name: Setup tmate session if: false diff --git a/analytical_engine/core/grape_instance.cc b/analytical_engine/core/grape_instance.cc index d830c6f0494e..7cc45d1a7a4e 100644 --- a/analytical_engine/core/grape_instance.cc +++ b/analytical_engine/core/grape_instance.cc @@ -220,7 +220,12 @@ bl::result GrapeInstance::archiveGraph(const rpc::GSParams& params) { bool exists = false; VY_OK_OR_RAISE(client_->Exists(frag_group_id, exists)); if (exists) { - graph_utils->ArchiveGraph(frag_group_id, comm_spec_, *client_, params); + BOOST_LEAF_CHECK(graph_utils->ArchiveGraph(frag_group_id, comm_spec_, + *client_, params)); + } else { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "Fragment group " + std::to_string(frag_group_id) + + " does not exist"); } } return {}; diff --git a/analytical_engine/core/object/graph_utils.h b/analytical_engine/core/object/graph_utils.h index bc6caa9ddfc3..0762bfc88248 100644 --- a/analytical_engine/core/object/graph_utils.h +++ b/analytical_engine/core/object/graph_utils.h @@ -131,7 +131,6 @@ class PropertyGraphUtils : public GSObject { vineyard::Client& client, const rpc::GSParams& params) { bl::result out; - archive_graph_(frag_id, comm_spec, client, params, out); return out; } diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 0b840dad4bf9..14ea7ae15da2 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -15,6 +15,10 @@ #include +#include "boost/property_tree/exceptions.hpp" +#include "boost/property_tree/json_parser.hpp" +#include "boost/property_tree/ptree.hpp" + #include "vineyard/client/client.h" #include "vineyard/common/util/macros.h" #include "vineyard/graph/fragment/arrow_fragment.h" @@ -48,6 +52,64 @@ static constexpr bool compact_v = _GRAPH_TYPE::compact_v; namespace bl = boost::leaf; namespace detail { +bool isTerminalValue(const boost::property_tree::ptree& pt) { + // node is terminal and has no children + return pt.empty() && !pt.data().empty(); +} +/** + * Parse the selectors from the property tree and store the vertex and edge + * labels selector example: + * + * selector = { + * "vertices": { + * "person": ["id", "firstName", "secondName"], + * "comment": None # select all properties + * }, + * "edges": { + * "knows": ["CreationDate"], + * "replyOf": None + * } + * } + * + */ +void parse_selectors(const boost::property_tree::ptree& selector, + std::vector& selected_vertices, + std::vector& selected_edges, + std::unordered_map>& + selected_vertex_properties, + std::unordered_map>& + selected_edge_properties) { + const auto& vertices_node = selector.get_child("vertices"); + const auto& edges_node = selector.get_child("edges"); + for (const auto& item : vertices_node) { + const std::string& label = item.first; + const boost::property_tree::ptree& properties_node = item.second; + selected_vertices.push_back(label); + if (!isTerminalValue(properties_node)) { + // is a list of properties, select only these properties + selected_vertex_properties[label] = std::vector(); + for (const auto& sub_item : properties_node) { + selected_vertex_properties[label].push_back( + sub_item.second.get_value()); + } + } + } + + for (const auto& item : edges_node) { + const auto& edge_label = item.first; + const auto& properties_node = item.second; + selected_edges.push_back(edge_label); + if (!isTerminalValue(properties_node)) { + // is a list of properties, select only these properties + selected_edge_properties[edge_label] = std::vector(); + for (const auto& sub_item : properties_node) { + selected_edge_properties[edge_label].push_back( + sub_item.second.get_value()); + } + } + } +} + __attribute__((visibility( "hidden"))) static bl::result> LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client, @@ -112,13 +174,37 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client, #ifdef ENABLE_GAR BOOST_LEAF_AUTO(graph_info_path, params.Get(gs::rpc::GRAPH_INFO_PATH)); - BOOST_LEAF_ASSIGN(generate_eid, params.Get(gs::rpc::GENERATE_EID)); - BOOST_LEAF_ASSIGN(retain_oid, params.Get(gs::rpc::RETAIN_OID)); + BOOST_LEAF_AUTO(storage_option, + params.Get(gs::rpc::STORAGE_OPTIONS)); + boost::property_tree::ptree pt; + bool store_in_local; + std::stringstream ss(storage_option); + std::vector selected_vertices; + std::vector selected_edges; + std::unordered_map> + dummy_selected_vertex_properties; + std::unordered_map> + dummy_selected_edge_properties; + try { + boost::property_tree::read_json(ss, pt); + store_in_local = pt.get("graphar_store_in_local", false); + if (pt.find("selector") != pt.not_found()) { + const auto& selector_node = pt.get_child("selector"); + parse_selectors(selector_node, selected_vertices, selected_edges, + dummy_selected_vertex_properties, + dummy_selected_edge_properties); + } + } catch (boost::property_tree::ptree_error const& e) { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "Invalid write_option: " + std::string(e.what())); + } using loader_t = vineyard::gar_fragment_loader_t; - loader_t loader(client, comm_spec, graph_info_path); - MPI_Barrier(comm_spec.comm()); - BOOST_LEAF_ASSIGN(frag_group_id, loader.LoadFragmentAsFragmentGroup()); + auto loader = std::make_unique(client, comm_spec); + BOOST_LEAF_CHECK(loader->Init(graph_info_path, selected_vertices, + selected_edges, true, false, + store_in_local)); + BOOST_LEAF_ASSIGN(frag_group_id, loader->LoadFragmentAsFragmentGroup()); #else RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, "The vineyard is not compiled with GAR support"); @@ -187,18 +273,53 @@ __attribute__((visibility("hidden"))) static bl::result ArchiveGraph( vineyard::ObjectID frag_group_id, const grape::CommSpec& comm_spec, vineyard::Client& client, const gs::rpc::GSParams& params) { #ifdef ENABLE_GAR - BOOST_LEAF_AUTO(graph_info_path, + BOOST_LEAF_AUTO(output_path, params.Get(gs::rpc::GRAPH_INFO_PATH)); - + BOOST_LEAF_AUTO(write_option, + params.Get(gs::rpc::WRITE_OPTIONS)); + boost::property_tree::ptree pt; + std::string graph_name, file_type; + int64_t vertex_chunk_size, edge_chunk_size; + bool store_in_local; + std::stringstream ss(write_option); + std::vector selected_vertices; + std::vector selected_edges; + std::unordered_map> + selected_vertex_properties; + std::unordered_map> + selected_edge_properties; + try { + boost::property_tree::read_json(ss, pt); + graph_name = pt.get("graphar_graph_name"); + file_type = pt.get("graphar_file_type", "parquet"); + vertex_chunk_size = + pt.get("graphar_vertex_chunk_size", 262144); // default 2^18 + edge_chunk_size = + pt.get("graphar_edge_chunk_size", 4194304); // default 2^22 + store_in_local = pt.get("graphar_store_in_local", false); + if (pt.find("selector") != pt.not_found()) { + const auto& selector_node = pt.get_child("selector"); + parse_selectors(selector_node, selected_vertices, selected_edges, + selected_vertex_properties, selected_edge_properties); + } + } catch (boost::property_tree::ptree_error const& e) { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError, + "Invalid write_option: " + std::string(e.what())); + } auto fg = std::dynamic_pointer_cast( client.GetObject(frag_group_id)); auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id()); auto frag_id = fg->Fragments().at(fid); auto frag = std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id)); - using archive_t = vineyard::ArrowFragmentWriter<_GRAPH_TYPE>; - archive_t archive(frag, comm_spec, graph_info_path); - archive.WriteFragment(); + using writer_t = vineyard::ArrowFragmentWriter<_GRAPH_TYPE>; + auto writer = std::make_unique(); + BOOST_LEAF_CHECK(writer->Init( + frag, comm_spec, graph_name, output_path, vertex_chunk_size, + edge_chunk_size, file_type, selected_vertices, selected_edges, + selected_vertex_properties, selected_edge_properties, store_in_local)); + BOOST_LEAF_CHECK(writer->WriteGraphInfo(output_path)); + BOOST_LEAF_CHECK(writer->WriteFragment()); return {}; #else diff --git a/docs/reference/session.rst b/docs/reference/session.rst index ad34220fa6fc..bd11d1ad52c1 100644 --- a/docs/reference/session.rst +++ b/docs/reference/session.rst @@ -20,7 +20,6 @@ Session Functions graphscope.get_default_session graphscope.has_default_session graphscope.set_option - graphscope.get_option graphscope.g graphscope.gremlin graphscope.graphlearn diff --git a/docs/storage_engine/graphar.md b/docs/storage_engine/graphar.md index 61db675038d2..a170aaaa4a20 100644 --- a/docs/storage_engine/graphar.md +++ b/docs/storage_engine/graphar.md @@ -39,42 +39,175 @@ As previously mentioned, each logical vertex/edge table is divided into multiple - [Apache Parquet](https://parquet.apache.org/) - CSV -See [Gar Information Files](https://alibaba.github.io/GraphAr/user-guide/getting-started.html#gar-information-files) and [Gar Data Files](https://alibaba.github.io/GraphAr/user-guide/getting-started.html#gar-data-files) for an example. +See [Information Files](https://graphar.apache.org/docs/specification/format#information-files) and [Data Files](https://graphar.apache.org/docs/specification/format#data-files) for an example. -More details about the GraphAr file format can be found in the [GraphAr File Format](https://alibaba.github.io/GraphAr/user-guide/file-format.html). +More details about the GraphAr file format can be found in the [GraphAr File Format](https://graphar.apache.org/docs/specification/format). + +#### Data Types + +Property Data Types +------------------- +GraphAr support a set of built-in property data types that are common in real use cases and supported by most file types (CSV, ORC, Parquet), includes: + +``` +- Boolean +- Int32: Integer with 32 bits +- Int64: Integer with 64 bits +- Float: 32-bit floating point values +- Double: 64-bit floating point values +- String: Textual data +- Date: days since the Unix epoch +- Timestamp: milliseconds since the Unix epoch +- List: A list of values of the same type +``` ## GraphAr in GraphScope -GraphScope can read, store GraphAr formatted graph data. +GraphScope provides a set of APIs to load and archive graph data in GraphAr format. The GraphScope client (Python) can be used to load and archive graph data in GraphAr format through the `save_to` and `load_from` functions. -### Loading GraphAr Data into GraphScope +### Saving Graph Data in GraphAr -To load GraphAr formatted data into GraphScope: +You can save a graph in GraphAr format using the `save_to` function. -1. Define the graph meta files with YAML. The meta files describe the properties of vertices and edges, and where the data files are stored. +`save_to` supports the following GraphAr related parameters: -2. Load the graph data using the GraphScope client (Python) with graphscope.load_from_gar(graph_yaml_path) function. Here's an example: +- **graphar_graph_name**: The name of the graph, default is "graph". +- **graphar_file_type**: The file type of the graph data, including "csv", "orc", "parquet". default is "parquet". +- **graphar_vertex_chunk_size**: The chunk size of the vertex data in graphar format, default is 2^18. +- **graphar_edge_chunk_size**: The chunk size of the edge data in graphar format, default is 2^22. +- **graphar_store_in_local**: Whether to make each worker store the part of the graph data in local file system, default is False. +- **selector**: The selector to select the subgraph to save, if not specified, the whole graph will be saved. + +Here's an example: ```python import graphscope +from graphscoped.dataset import load_ldbc + +# initialize a session +sess = graphscope.session(cluster_type="hosts") +# load ldbc graph +graph = load_ldbc(sess) + +# save the ldbc graph to GraphAr format +r = g.save_to( + "/tmp/ldbc_graphar/", + format="graphar", + graphar_graph_name="ldbc", # the name of the graph + graphar_file_type="parquet", # the file type of the graph data + graphar_vertex_chunk_size=1024, # the chunk size of the vertex data + graphar_edge_chunk_size=4096, # the chunk size of the edge data +) +# the result is a dictionary that contains the format and the URI path of the saved graph +print(r) +{ "format": "graphar", "uri": "graphar+file:///tmp/ldbc_graphar/ldbc.graph.yaml"} +``` + +You can also save a subgraph in GraphAr format using the `save_to` function with the `selector` parameter. Here's an example: -graph_yaml_path = "file:///path-yaml/demo.graph.yml" -g = graphscope.load_from_gar(graph_yaml_path) -g.schema() +```python +import graphscope +from graphscoped.dataset import load_ldbc + +# initialize a session +sess = graphscope.session(cluster_type="hosts") +# load ldbc graph +graph = load_ldbc(sess) + +# define the selector +# we only want to save the "person" and "comment" vertices and the "knows" and "replyOf" edges +# with the specified properties +selector = { + "vertices": { + "person": ["id", "firstName", "lastName"], + "comment": None, # None means all properties + }, + "edges": { + "knows": ["creationDate"], + "likes": ["creationDate"], + }, +} + +# save the subgraph to GraphAr format +r = g.save_to( + "/tmp/ldbc_subgraph_graphar/", + format="graphar", + selector=selector, + graphar_graph_name="ldbc_subgraph", # the name of the graph + graphar_file_type="parquet", # the file type of the graph data + graphar_vertex_chunk_size=1024, # the chunk size of the vertex data + graphar_edge_chunk_size=4096, # the chunk size of the edge data +) +# the result is a dictionary that contains the format and the URI path of the saved graph +print(r) +{ "format": "graphar", "uri": "graphar+file:///tmp/ldbc_graphar/ldbc_subgraph.graph.yaml"} ``` -### Archiving the Graph Data in GraphAr +### Loading GraphAr Data into GraphScope -To archive the graph data in GraphAr format: +You can load a graph from GraphAr format data using the `load_from` function. -1. Define the graph meta files with YAML. +`load_from` supports the following GraphAr related parameters: +- **graphar_store_in_local**: Whether the graph data is stored in the local file system of each worker, default is False. +- **selector**: The selector to select the subgraph to load, if not specified, the whole graph will be loaded. + +Here's an example: -2. Call the g.archive(graph_yaml_path) function, where g is the GraphScope graph object, and graph_yaml_path is the graph info file path. Here's an example: ```python import graphscope +from graphscope import pagerank +from graphscope.framework.graph import Graph + +# initialize a session +sess = graphscope.session(cluster_type="hosts") + +# assume the graph data is saved in the "/tmp/ldbc_graphar/" directory and it's graph information file is "ldbc.graph.yaml", that the URI is "graphar+file:///tmp/ldbc_graphar/ldbc.graph.yaml" +uri = "graphar+file:///tmp/ldbc_graphar/ldbc.graph.yaml" + +# load the graph from GraphAr format +g = Graph.load_from(uri, sess) +print(g.schema) + +# do some graph processing +pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) +ctx = pagerank(pg, max_round=10) +df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) +print(df) +``` + +You can also load a subgraph from the whole ldbc dataset with GraphAr format data using the `load_from` function with the `selector` parameter. Here's an example: + +```python +import graphscope +from graphscope.framework.graph import Graph + +# initialize a session +sess = graphscope.session(cluster_type="hosts") + +# assume the ldbc data is saved in the "/tmp/ldbc__graphar/" directory and it's graph information file is "ldbc.graph.yaml", that the URI is "graphar+file:///tmp/ldbc_graphar/ldbc.graph.yaml" +uri = "graphar+file:///tmp/ldbc_graphar/ldbc.graph.yaml" + +# define the selector, you want to only load the "person" and "comment" vertices and the "knows" and "replyOf" edges +selector = { + "vertices": { + "person": None, + "comment": None, # None means all properties + }, + "edges": { + "knows": None, + "likes": None, + }, +} +g = Graph.load_from(uri, sess, selector=selector) +print(g.schema) + +# do some graph processing +pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) +ctx = pagerank(pg, max_round=10) +df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) +print(df) +``` -graph_yaml_path = "file:///path-yaml/demo.graph.yml" -g.archive(graph_yaml_path) -``` \ No newline at end of file +More examples about how to use GraphAr in GraphScope can be found in the [test_graphar](https://github.com/alibaba/GraphScope/blob/main/python/graphscope/tests/unittest/test_graphar.py). diff --git a/k8s/Makefile b/k8s/Makefile index 8fc3240b9271..f05288b88eba 100644 --- a/k8s/Makefile +++ b/k8s/Makefile @@ -10,7 +10,7 @@ endif ARCH := $(shell uname -m) VERSION ?= latest -VINEYARD_VERSION ?= v0.21.3 +VINEYARD_VERSION ?= v0.22.0 # This is the version of builder base image in most cases, except for graphscope-dev BUILDER_VERSION ?= $(VINEYARD_VERSION) # This is the version of runtime base image diff --git a/k8s/actions-runner-controller/manylinux/Makefile b/k8s/actions-runner-controller/manylinux/Makefile index 65b6472e004f..db92e8c8888c 100644 --- a/k8s/actions-runner-controller/manylinux/Makefile +++ b/k8s/actions-runner-controller/manylinux/Makefile @@ -12,7 +12,7 @@ TARGETPLATFORM ?= $(shell arch) RUNNER_VERSION ?= 2.287.1 DOCKER_VERSION ?= 20.10.12 -VINEYARD_VERSION ?= v0.21.3 +VINEYARD_VERSION ?= v0.22.0 BUILDER_VERSION ?= $(VINEYARD_VERSION) # default list of platforms for which multiarch image is built diff --git a/k8s/dockerfiles/vineyard-runtime.Dockerfile b/k8s/dockerfiles/vineyard-runtime.Dockerfile index ec03d49ecd79..2c3d0c0cb598 100644 --- a/k8s/dockerfiles/vineyard-runtime.Dockerfile +++ b/k8s/dockerfiles/vineyard-runtime.Dockerfile @@ -32,7 +32,7 @@ RUN apt-get update -y && \ -P /tmp/ --no-check-certificate && \ sudo apt-get install -y -V /tmp/apache-arrow-apt-source-latest-jammy.deb && \ sudo apt-get update -y && \ - sudo apt-get install -y libarrow-dev && \ + sudo apt-get install -y libarrow-dev libarrow-dataset-dev libarrow-acero-dev libparquet-dev && \ rm /tmp/apache-arrow-apt-source-latest-*.deb && \ apt-get clean -y && \ rm -rf /var/lib/apt/lists/* diff --git a/k8s/internal/Makefile b/k8s/internal/Makefile index e942bfd94370..0dc08b7ce4e2 100644 --- a/k8s/internal/Makefile +++ b/k8s/internal/Makefile @@ -42,7 +42,7 @@ GRAPHSCOPE_HOME ?= /usr/local INSTALL_PREFIX ?= /opt/graphscope VERSION ?= latest -VINEYARD_VERSION ?= v0.21.3 +VINEYARD_VERSION ?= v0.22.0 PROFILE ?= release CI ?= false diff --git a/python/graphscope/__init__.py b/python/graphscope/__init__.py index 079751face65..5d0c3ed3d11d 100644 --- a/python/graphscope/__init__.py +++ b/python/graphscope/__init__.py @@ -50,7 +50,6 @@ from graphscope.framework.errors import * from graphscope.framework.graph import Graph from graphscope.framework.graph_builder import load_from -from graphscope.framework.graph_builder import load_from_gar from graphscope.version import __version__ __doc__ = """ diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 2abdb2d380cc..6f4a42e60cca 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1162,13 +1162,6 @@ def load_from(self, *args, **kwargs): with default_session(self): return graphscope.load_from(*args, **kwargs) - def load_from_gar(self, *args, **kwargs): - """Load a graph from gar format files within the session. - See more information in :meth:`graphscope.load_from_gar`. - """ - with default_session(self): - return graphscope.load_from_gar(*args, **kwargs) - @deprecated("Please use `sess.interactive` instead.") def gremlin(self, graph, params=None): """This method is going to be deprecated. diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index 2e977f0bef10..ed6a696e187b 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -1082,7 +1082,7 @@ def gremlin_to_subgraph( return op -def archive_graph(graph, path): +def save_to_graphar(graph, path: str, **kwargs): """Archive a graph to gar format with a path. Args: @@ -1099,8 +1099,9 @@ def archive_graph(graph, path): types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(graph._vertex_map), types_pb2.COMPACT_EDGES: utils.b_to_attr(graph._compact_edges), types_pb2.USE_PERFECT_HASH: utils.b_to_attr(graph._use_perfect_hash), + types_pb2.WRITE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path), } - config[types_pb2.GRAPH_INFO_PATH] = utils.s_to_attr(path) op = Operation( graph.session_id, types_pb2.ARCHIVE_GRAPH, @@ -1111,24 +1112,24 @@ def archive_graph(graph, path): return op -def save_graph_to( - graph, - path: str, - vineyard_id, - **kwargs, -): +def serialize_graph(graph, path: str, **kwargs): """Serialize graph to the specified location + The meta and data of graph is dumped to specified location, + and can be restored by `Graph.load_from` in other sessions. + Each worker will write a `path_{worker_id}.meta` file and + a `path_{worker_id}` file to storage. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. - path (str): The path to serialize the graph, on each worker. + path (str): The path to serialize the graph, on each worker, supported + storages are local, hdfs, oss, s3 Returns: An op to serialize the graph to a path. """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), - types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id), + types_pb2.VINEYARD_ID: utils.i_to_attr(graph._vineyard_id), types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), } op = Operation( @@ -1141,7 +1142,18 @@ def save_graph_to( return op -def load_graph_from(path: str, sess, **kwargs): +def deserialize_graph(path: str, sess, **kwargs): + """Deserialize graph from the specified location. + + Args: + path (str): The path contains the serialization files. + sess (`graphscope.Session`): The target session + that the graph will be construct in. + + Returns: + `Graph`: A new graph object. Schema and data is supposed to be + identical with the one that called serialized method. + """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index d49270278da7..b582557124a9 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -28,6 +28,7 @@ from typing import Mapping from typing import Tuple from typing import Union +from urllib.parse import urlparse try: import vineyard @@ -117,9 +118,6 @@ def save_to(self, path, **kwargs): def load_from(cls, path, sess, **kwargs): raise NotImplementedError - def archive(self, path, **kwargs): - raise NotImplementedError - @abstractmethod def project(self, vertices, edges): raise NotImplementedError @@ -466,16 +464,6 @@ def to_dataframe(self, selector, vertex_range=None): op = dag_utils.graph_to_dataframe(self, selector, vertex_range) return ResultDAGNode(self, op) - def archive(self, path): - """Archive the graph to gar format with graph yaml file path. - - Args: - path (str): The graph yaml file path describe how to archive the graph. - """ - check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) - op = dag_utils.archive_graph(self, path) - return ArchivedGraph(self._session, op) - def to_directed(self): op = dag_utils.to_directed(self) graph_dag_node = GraphDAGNode(self._session, op) @@ -1179,49 +1167,182 @@ def _check_unmodified(self): self.signature == self._saved_signature, "Graph has been modified!" ) - def save_to(self, path, **kwargs): - """Serialize graph to a location. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.load_from` in other sessions. - - Each worker will write a `path_{worker_id}.meta` file and - a `path_{worker_id}` file to storage. - Args: - path (str): supported storages are local, hdfs, oss, s3 - """ - - op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs) - self._session.dag.add_op(op) - return self._session._wrapper(op) + @staticmethod + def _load_from_graphar(path, sess, **kwargs): + # graphar now only support global vertex map. + vertex_map = utils.vertex_map_type_to_enum("global") + # oid_type = utils.get_oid_type_from_graph_info(path) + config = { + types_pb2.OID_TYPE: utils.s_to_attr( + "int64_t" + ), # graphar use vertex index as oid, so it always be int64_t + types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), + types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), + types_pb2.IS_FROM_GAR: utils.b_to_attr(True), + types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), + types_pb2.COMPACT_EDGES: utils.b_to_attr(False), + types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + } + op = dag_utils.create_graph( + sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config + ) + return sess._wrapper(GraphDAGNode(sess, op)) @classmethod - def load_from(cls, path, sess, **kwargs): - """Construct a `Graph` by deserialize from `path`. - It will read all serialization files, which is dumped by - `Graph.serialize`. - If any serialize file doesn't exists or broken, will error out. + def load_from(cls, uri, sess=None, **kwargs): + """Load a ArrowProperty graph from a certain data source. The data source + can be vineyard serialized files, graphar serialized files, or other data + sources supported by graphscope. Args: - path (str): Path contains the serialization files. - sess (`graphscope.Session`): The target session - that the graph will be construct in - + uri (str): URI contains the description of the data source or + path contains the serialization files, + example: "graphar+file:///tmp/graphar/xxx" + sess (`graphscope.Session`): The target session that the graph + will be construct, if None, use the default session. + selector (dict, optional): the selector to select the data to read. + graphar_store_in_local (bool, optional): whether store graphar format in local, default is False. Returns: - `Graph`: A new graph object. Schema and data is supposed to be - identical with the one that called serialized method. + `Graph`: A new graph object. """ - op = dag_utils.load_graph_from(path, sess, **kwargs) - return sess._wrapper(GraphDAGNode(sess, op)) + from graphscope.client.session import get_default_session + + def _check_load_options(load_options): + for k, v in load_options.items(): + if k == "selector": + if not isinstance(v, dict): + raise ValueError( + "selector should be a dict, but got {}".format(type(v)) + ) + elif k == "graphar_store_in_local": + if not isinstance(v, bool): + raise ValueError( + "graphar_store_in_local should be a bool, but got {}".format( + v + ) + ) + + if sess is None: + sess = get_default_session() + uri_str = uri + uri = urlparse(uri) + if uri.scheme and "+" in uri.scheme: + source = uri.scheme.split("+")[0] + if uri.scheme.split("+")[-1] not in ["file", "s3", "oss", "hdfs"]: + raise ValueError( + "Unknown file system %s, currently only support file, s3, oss and hdfs" + % uri.scheme.split("+")[-1] + ) + path = uri.scheme.split("+")[-1] + "://" + uri.netloc + uri.path + if source == "graphar": + _check_load_options(kwargs) + return cls._load_from_graphar(path, sess, **kwargs) + else: + raise ValueError("Unknown source %s with uri $s:" % source, uri_str) + else: + # not a uri string, assume it is a path for deserialization + op = dag_utils.deserialize_graph(uri_str, sess, **kwargs) + return sess._wrapper(GraphDAGNode(sess, op)) - def archive(self, path): - """Archive graph gar format files base on the graph info. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.deserialize` in other sessions. + def save_to( + self, + path, + format="serialization", + **kwargs, + ): + """Save graph to specified location with specified format. Args: - path (str): the graph info file path. + path (str): the directory path to write graph. + format (str): the format to write graph, default is "serialization". + selector (dict, optional): the selector to select the data to write. + graphar_graph_name (str, optional): the name of graph in graphar format. + graphar_file_type (str, optional): the file type of graphar format, + support "parquet", "orc", "csv", default is "parquet". + graphar_vertex_chunk_size (int, optional): the chunk size of vertex in graphar format, default is 2^18. + graphar_edge_chunk_size (int, optional): the chunk size of edge in graphar format, default is 2^22. + graphar_store_in_local (bool, optional): whether store graphar format in local, default is False. + + Return (dict): A dict contains the type and uri string of output data. """ - return self._session._wrapper(self._graph_node.archive(path)) + + def _check_write_options(write_options): + for k, v in write_options.items(): + if k == "graphar_graph_name" and not isinstance(v, str): + raise ValueError( + "graphar_graph_name should be a string, but got {}".format( + type(v) + ) + ) + elif k == "graphar_file_type" and v not in ["parquet", "orc", "csv"]: + raise ValueError( + "graphar_file_type should be one of ['parquet', 'orc', 'csv'], but got {}".format( + v + ) + ) + elif k == "graphar_vertex_chunk_size": + if not isinstance(v, int) or v <= 0: + raise ValueError( + "graphar_vertex_chunk_size should be a positive integer, but got {}".format( + v + ) + ) + elif k == "graphar_edge_chunk_size": + if not isinstance(v, int) or v <= 0: + raise ValueError( + "graphar_edge_chunk_size should be a positive integer, but got {}".format( + v + ) + ) + elif k == "graphar_store_in_local": + if not isinstance(v, bool): + raise ValueError( + "graphar_store_in_local should be a bool, but got {}".format( + v + ) + ) + elif k == "selector": + if not isinstance(v, dict): + raise ValueError( + "selector should be a dict, but got {}".format(type(v)) + ) + + if format == "graphar": + if "graphar_graph_name" not in kwargs: + kwargs["graphar_graph_name"] = "graph" # default graph name + _check_write_options(kwargs) + graph_name = kwargs["graphar_graph_name"] + + maybe_uri = urlparse(path) + if maybe_uri.scheme and maybe_uri.scheme not in [ + "file", + "s3", + "oss", + "hdfs", + ]: + raise ValueError( + "Unknown file system %s, currently only support file, s3, oss and hdfs" + % maybe_uri.scheme + ) + if not maybe_uri.scheme: + maybe_uri = maybe_uri._replace(scheme="file") + + op = dag_utils.save_to_graphar(self, path, **kwargs) + self._session.dag.add_op(op) + self._session._wrapper(op) + return { + "type": format, + "URI": "graphar+" + maybe_uri.geturl() + graph_name + ".graph.yaml", + } + elif format == "serialization": + # serialize graph + op = dag_utils.serialize_graph(self, path, **kwargs) + self._session.dag.add_op(op) + self._session._wrapper(op) + return {"type": format, "URI": path} + else: + raise ValueError("Unknown format: %s" % format) @apply_docstring(GraphDAGNode.add_vertices) def add_vertices( @@ -1314,14 +1435,3 @@ def __init__(self, session, op): self._op = op # add op to dag self._session.dag.add_op(self._op) - - -class ArchivedGraph(DAGNode): - """Archived graph node in a DAG""" - - def __init__(self, session, op): - super().__init__() - self._session = session - self._op = op - # add op to dag - self._session.dag.add_op(self._op) diff --git a/python/graphscope/framework/graph_builder.py b/python/graphscope/framework/graph_builder.py index 3ebf8a4ec2c3..17530a7e26a8 100644 --- a/python/graphscope/framework/graph_builder.py +++ b/python/graphscope/framework/graph_builder.py @@ -217,58 +217,3 @@ def load_from( use_perfect_hash=use_perfect_hash, ) return graph - - -def load_from_gar( - graph_info_path: str, - directed=True, - oid_type="int64_t", - vid_type="uint64_t", - vertex_map="global", - compact_edges=False, - use_perfect_hash=False, -) -> Graph: - sess = get_default_session() - oid_type = utils.normalize_data_type_str(oid_type) - if oid_type not in ("int32_t", "int64_t", "std::string"): - raise ValueError("The 'oid_type' can only be int32_t, int64_t or string.") - vid_type = utils.normalize_data_type_str(vid_type) - if vid_type not in ("uint32_t", "uint64_t"): - raise ValueError("The 'vid_type' can only be uint32_t or uint64_t.") - if compact_edges: - raise ValueError( - "Loading from gar with 'compact_edges' hasn't been supported yet." - ) - if use_perfect_hash: - raise ValueError( - "Loading from gar with 'use_perfect_hash' hasn't been supported yet." - ) - # generate and add a loader op to dag - vertex_map = utils.vertex_map_type_to_enum(vertex_map) - # construct create graph op - config = { - types_pb2.DIRECTED: utils.b_to_attr(directed), - types_pb2.OID_TYPE: utils.s_to_attr(oid_type), - types_pb2.VID_TYPE: utils.s_to_attr(vid_type), - types_pb2.GENERATE_EID: utils.b_to_attr(False), - types_pb2.RETAIN_OID: utils.b_to_attr(False), - types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), - types_pb2.IS_FROM_GAR: utils.b_to_attr(True), - types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), - types_pb2.COMPACT_EDGES: utils.b_to_attr(compact_edges), - types_pb2.USE_PERFECT_HASH: utils.b_to_attr(use_perfect_hash), - types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(graph_info_path), - } - op = dag_utils.create_graph( - sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config - ) - graph = sess.g( - op, - oid_type=oid_type, - vid_type=vid_type, - directed=directed, - vertex_map=vertex_map, - compact_edges=compact_edges, - use_perfect_hash=use_perfect_hash, - ) - return graph diff --git a/python/graphscope/gsctl/commands/dev.py b/python/graphscope/gsctl/commands/dev.py index 7f058a587b48..c29162f058cb 100644 --- a/python/graphscope/gsctl/commands/dev.py +++ b/python/graphscope/gsctl/commands/dev.py @@ -195,6 +195,7 @@ def install_deps( """Install dependencies for building GraphScope.""" cmd = [ "bash", + "-e", install_deps_script, "-t", type, @@ -278,6 +279,7 @@ def make(component, graphscope_repo, install_prefix, storage_type): cmd = [ "bash", + "-e", make_script, "-c", component, @@ -339,7 +341,7 @@ def make_image(component, graphscope_repo, registry, tag): if component is None: component = "all" - cmd = ["bash", make_image_script, "-c", component, "-r", registry, "-t", tag] + cmd = ["bash", "-e", make_image_script, "-c", component, "-r", registry, "-t", tag] run_shell_cmd(cmd, graphscope_repo) @@ -411,6 +413,7 @@ def test(type, graphscope_repo, testdata, local, storage_type, k8s, nx): type = "" cmd = [ "bash", + "-e", test_script, "-t", type, diff --git a/python/graphscope/gsctl/scripts/install_deps_command.sh b/python/graphscope/gsctl/scripts/install_deps_command.sh index e1e2173139cd..02952c769910 100644 --- a/python/graphscope/gsctl/scripts/install_deps_command.sh +++ b/python/graphscope/gsctl/scripts/install_deps_command.sh @@ -141,7 +141,7 @@ _install_apache_arrow_ubuntu() { -P /tmp/ ${SUDO} apt-get install -y -V /tmp/apache-arrow-apt-source-latest-"$(lsb_release --codename --short)".deb ${SUDO} apt-get update -y - ${SUDO} apt-get install -y libarrow-dev + ${SUDO} apt-get install -y libarrow-dev libarrow-dataset-dev libarrow-acero-dev libparquet-dev rm /tmp/apache-arrow-apt-source-latest-*.deb else log "apache-arrow (libarrow-dev) already installed, skip." diff --git a/python/graphscope/tests/unittest/test_gar.py b/python/graphscope/tests/unittest/test_gar.py deleted file mode 100644 index 7a2281bd76da..000000000000 --- a/python/graphscope/tests/unittest/test_gar.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os - -import pytest - -gar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") - - -@pytest.mark.skip(reason="Issue 3162") -def test_load_from_gar(graphscope_session): - graph_yaml = os.path.join( - gar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" - ) - print(graph_yaml) - graph = graphscope_session.load_from_gar(graph_yaml) - assert graph.schema is not None - del graph - - -@pytest.mark.skip(reason="Issue 3162") -def test_archive_to_gar(ldbc_graph): - graph_yaml = os.path.join(gar_test_repo_dir, "graphar/ldbc/ldbc.graph.yml") - ldbc_graph.archive(graph_yaml) diff --git a/python/graphscope/tests/unittest/test_graphar.py b/python/graphscope/tests/unittest/test_graphar.py new file mode 100644 index 000000000000..db834edd38f8 --- /dev/null +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import os + +import pytest + +from graphscope import pagerank +from graphscope.framework.graph import Graph + +logger = logging.getLogger("graphscope") +graphar_temp_dir = ( + os.path.expandvars("${TMPDIR}") + if os.path.expandvars("${TMPDIR}").endswith(os.sep) + else os.path.expandvars("${TMPDIR}") + os.sep +) + + +def test_save_full_ldbc_to_graphar_and_load_back(graphscope_session, ldbc_graph): + output_dir = graphar_temp_dir + "graphar" + os.sep + r = ldbc_graph.save_to( + output_dir, + format="graphar", + graphar_graph_name="ldbc_sample", + graphar_file_type="parquet", + graphar_vertex_chunk_size=1000000, + graphar_edge_chunk_size=1000000, + ) + assert r == { + "type": "graphar", + "URI": "graphar+file://{}ldbc_sample.graph.yaml".format(output_dir), + } + g = Graph.load_from(r["URI"], sess=graphscope_session) + assert g.schema.to_dict() == ldbc_graph.schema.to_dict() + + # do some graph processing + pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) + ctx = pagerank(pg, max_round=10) + df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) + del g + + +def test_save_to_graphar_with_selector_and_load_back_1(graphscope_session, ldbc_graph): + output_dir = graphar_temp_dir + "graphar_subgraph" + os.sep + selector = { + "vertices": { + "person": ["id", "firstName", "lastName"], + "comment": ["content", "creationDate"], + }, + "edges": { + "knows": ["creationDate"], + "likes": ["creationDate"], + }, + } + + r = ldbc_graph.save_to( + "file://" + output_dir, + format="graphar", + selector=selector, + graphar_graph_name="ldbc_sample", + graphar_file_type="parquet", + graphar_vertex_chunk_size=256, + graphar_edge_chunk_size=1024, + ) + assert r == { + "type": "graphar", + "URI": "graphar+file://{}ldbc_sample.graph.yaml".format(output_dir), + } + g = Graph.load_from(r["URI"], sess=graphscope_session) + assert g.schema.vertex_label_num == 2 and g.schema.edge_label_num == 2 + assert "person" in g.schema.vertex_labels and "comment" in g.schema.vertex_labels + assert "knows" in g.schema.edge_labels and "likes" in g.schema.edge_labels + assert ( + g.schema.vertex_properties_num("person") == 3 + and g.schema.vertex_properties_num("comment") == 2 + ) + assert ( + g.schema.edge_properties_num("knows") == 1 + and g.schema.edge_properties_num("likes") == 1 + ) + + # do some graph processing + pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) + ctx = pagerank(pg, max_round=10) + df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) + del g + + +def test_save_to_graphar_with_selector_and_load_back_2(graphscope_session, ldbc_graph): + output_dir = graphar_temp_dir + "graphar_subgraph2" + os.sep + selector = { + "vertices": { + "person": ["id", "firstName", "lastName"], + "comment": ["content", "creationDate"], + }, + "edges": { + "knows": [], + "likes": [], + }, + } + + r = ldbc_graph.save_to( + output_dir, + format="graphar", + selector=selector, + graphar_graph_name="ldbc_sample", + graphar_file_type="parquet", + graphar_vertex_chunk_size=256, + graphar_edge_chunk_size=1024, + ) + assert r == { + "type": "graphar", + "URI": "graphar+file://{}ldbc_sample.graph.yaml".format(output_dir), + } + g = Graph.load_from(r["URI"], sess=graphscope_session) + assert g.schema.vertex_label_num == 2 and g.schema.edge_label_num == 2 + assert "person" in g.schema.vertex_labels and "comment" in g.schema.vertex_labels + assert "knows" in g.schema.edge_labels and "likes" in g.schema.edge_labels + assert ( + g.schema.vertex_properties_num("person") == 3 + and g.schema.vertex_properties_num("comment") == 2 + ) + assert ( + g.schema.edge_properties_num("knows") == 0 + and g.schema.edge_properties_num("likes") == 0 + ) + + # do some graph processing + pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) + ctx = pagerank(pg, max_round=10) + df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) + + +def test_save_to_graphar_with_selector_and_load_back_3(graphscope_session, ldbc_graph): + output_dir = graphar_temp_dir + "graphar_subgraph3" + os.sep + selector = { + "vertices": { + "person": ["id", "firstName", "lastName"], + "comment": None, + }, + "edges": { + "knows": None, + "likes": None, + }, + } + + r = ldbc_graph.save_to( + output_dir, + format="graphar", + selector=selector, + graphar_graph_name="ldbc_sample", + graphar_file_type="parquet", + graphar_vertex_chunk_size=256, + graphar_edge_chunk_size=1024, + ) + assert r == { + "type": "graphar", + "URI": "graphar+file://{}ldbc_sample.graph.yaml".format(output_dir), + } + g = Graph.load_from(r["URI"], sess=graphscope_session) + print(g.schema) + assert g.schema.vertex_label_num == 2 and g.schema.edge_label_num == 2 + assert "person" in g.schema.vertex_labels and "comment" in g.schema.vertex_labels + assert "knows" in g.schema.edge_labels and "likes" in g.schema.edge_labels + assert ( + g.schema.vertex_properties_num("person") == 3 + and g.schema.vertex_properties_num("comment") == 6 + ) + assert ( + g.schema.edge_properties_num("knows") == 2 + and g.schema.edge_properties_num("likes") == 2 + ) + + # do some graph processing + pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) + ctx = pagerank(pg, max_round=10) + df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) + del g + + +@pytest.mark.dependency(depends=["test_save_full_ldbc_to_graphar_and_load_back"]) +def test_load_from_graphar_with_selector(graphscope_session): + graph_uri = "graphar+file://{}graphar{}ldbc_sample.graph.yaml".format( + graphar_temp_dir, + os.sep, + ) + selector = { + "vertices": { + "person": None, + "comment": None, + }, + "edges": { + "knows": None, + "likes": None, + }, + } + g = Graph.load_from(graph_uri, sess=graphscope_session, selector=selector) + assert g.schema.vertex_label_num == 2 and g.schema.edge_label_num == 2 + assert "person" in g.schema.vertex_labels and "comment" in g.schema.vertex_labels + assert "knows" in g.schema.edge_labels and "likes" in g.schema.edge_labels + assert ( + g.schema.vertex_properties_num("person") == 8 + and g.schema.vertex_properties_num("comment") == 6 + ) + assert ( + g.schema.edge_properties_num("knows") == 2 + and g.schema.edge_properties_num("likes") == 2 + ) + + # do some graph processing + pg = g.project(vertices={"person": ["id"]}, edges={"knows": []}) + ctx = pagerank(pg, max_round=10) + df = ctx.to_dataframe(selector={"id": "v.data", "r": "r"}) + del g diff --git a/python/requirements.txt b/python/requirements.txt index 49d5effcf71b..c898a22c2a79 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -7,9 +7,8 @@ msgpack>=1.0.5 mypy-protobuf>=3.4.0 neo4j==5.10.0 nest_asyncio -networkx==3.2.1;python_version>="3.12" -networkx==2.6.0;python_version=="3.7" -networkx==2.8.8;python_version<="3.11" +networkx==2.8.0;python_version>="3.8" +networkx==2.6.0;python_version<"3.8" numpy orjson packaging