Skip to content

Commit

Permalink
feat(python): Unify the graph level load_from and save to API & Bump …
Browse files Browse the repository at this point in the history
…up vineyard to v0.22.0 (#3610)

## What do these changes do?

Unify the `load_from` and `save_to` API to support different kind of
datasource.
related discussion issue: #2836 #2920 

### `save_to`
```python
def save_to(self, path, format="serialization", **kwargs)
```

We use `save_to` to dump graph to certain format data, currently support
`graphar` and `serialization` and it's extensible to add a new format.
- `path`
    output dir, support local,oss,s3,hdfs
- `format`
   format to save, default is 'serialization'
- related configurations
the writing of each format may attach some related configurations. We
set these configurations with naming strategy "format_config1",
"format_config2". For example, graphar provide configuration:
  - graphar_graph_name
  - graphar_vertex_chunk_size
  - graphar_edge_chunk_size
  - graphar_file_type
  - graphar_store_in_local
 
- `selector` to dump subgraph with selected vertices and edges
User can define `selector` to only dump certain set of vertex and edge,
example:
```python
selector = {
    "vertices": {
           "person": ["id", "firstName", "secondName"],
           "comment": None,  # select all properties
     },
    "edges": {
           "knows": ["CreationDate"],
           "replyOf": None,
    },
}
```
- return format
```python
{"type": format, "URI": "format+file:///tmp/graph/xxx"}
```

Example:
```ptyhon
# graphar
g.save_to("/tmp/graphar/", format="graphar", graphar_graph_name="ldbc", vertex_chunk_size=1024,
                  graphar_edge_chunk_size=4096, graphar_file_type="parquet", graphar_store_in_local=False)
{'type': 'grpahar', 'URI': 'graphar+file:///tmp/graphar/ldbc.graph.yml'}

# graphar with selector
selector = {
    "vertices": {
           "person": ["id", "firstName", "secondName"],
           "comment": None,  # select all properties
     },
    "edges": {
           "knows": ["CreationDate"],
           "replyOf": None,
    },
}

g.save_to("/tmp/graphar/", format="graphar", selector=selector, graphar_graph_name="ldbc")
{'type': 'grpahar', 'URI': 'graphar+file:///tmp/graphar/ldbc.graph.yml'}

# serialization
g.save_to("/tmp/serialization/")
{'type':'serialization', 'URI': '/tmp/serialization/'}
```

### `load_from`
```python
def load_from(uri, **kwargs)
```

We use `load_from` to load graph to certain format data source,
currently support `graphar` and `serialization` and it's extensible to
add a new data source.
- uri examples
```python
graphar+file:///tmp/graphar/ldbc.graph.yaml
graphar+oss://bucket/graphar/ldbc.graphar.yaml
/tmp/serialization
```
- related configurations
the loading of each format may attach some related configurations. We
set these configurations with naming strategy "format_config1",
"format_config2". For example, graphar provide
configuration:`storage_options`. for example graphar can set
configurations:
```python
- graphar_store_in_local # the graphar files are store in local file system of workers, only support for local file system.
```

- `selector` to load subgraph with selected vertices and edges
User can define `selector` to only load certain set of vertex and edge.
example:
```python
selector = {
    "vertices": {
           "person": None  # select all properties
           "comment": None  
     },
    "edges": {
           "knows": None
           "replyOf": None
    },
}
```
- return 
A new Graph

Example:
```ptyhon
# graphar
```python

Graph.load_from("graphar+file:///tmp/graphar/ldbc.graph.yml",
graphar_store_in_local=True)

# graphar with selector
selector = {
    "vertices": {
           "person": None  # select all properties
           "comment": None  
     },
    "edges": {
           "knows": None
           "replyOf": None
    },
}

Graph.load_from("graphar+file:///tmp/graphar/ldbc.graph.yml",
selector=selector)

# serialization
g.load_from("/tmp/serialization/")
```


## Related issue number

<!-- Are there any issues opened that will be resolved by merging this change? -->

Fixes #2836 
Fixes #2920

---------

Signed-off-by: acezen <[email protected]>
  • Loading branch information
acezen authored Apr 19, 2024
1 parent 2e893d1 commit 1dd3916
Show file tree
Hide file tree
Showing 24 changed files with 725 additions and 214 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-graphscope-wheels-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
run: |
. ~/.graphscope_env
python3 -m pip install libclang
git clone --single-branch -b v0.21.3 --depth=1 https://github.com/v6d-io/v6d.git /tmp/v6d
git clone --single-branch -b v0.22.0 --depth=1 https://github.com/v6d-io/v6d.git /tmp/v6d
cd /tmp/v6d
git submodule update --init
cmake . -DCMAKE_INSTALL_PREFIX=/usr/local \
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/gae.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.21.3
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.22.0
steps:
- uses: actions/checkout@v3

Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/local-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ jobs:
- 'analytical_engine/frame/**'
- 'analytical_engine/core/**'
- 'python/graphscope/nx/**'
- 'python/requirements.txt'
gie-function-test:
- 'interactive_engine/**'
- 'python/graphscope/interactive/**'
Expand Down Expand Up @@ -379,6 +380,7 @@ jobs:
run: |
# download dataset
git clone -b master --single-branch --depth=1 https://github.com/7br/gstest.git ${GS_TEST_DIR}
export TMPDIR="${TMPDIR:-$(dirname $(mktemp))}"
python3 -m pytest -d --tx popen//python=python3 \
-s -v \
Expand Down Expand Up @@ -421,6 +423,7 @@ jobs:
- 'python/graphscope/nx/classes/**'
- 'python/graphscope/nx/!(tests)'
- 'python/graphscope/nx/tests/!(convert)'
- 'python/requirements.txt'
convert:
- 'python/graphscope/nx/convert.py'
- 'python/graphscope/nx/convert_matrix.py'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/networkx-forward-algo-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
run:
shell: bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.21.3
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-dev:v0.22.0
options:
--shm-size 4096m

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
sudo mkdir /opt/graphscope
sudo chown -R $(id -u):$(id -g) /opt/graphscope
python3 -m pip install click
python3 gsctl.py install-deps dev --v6d-version v0.21.3
python3 gsctl.py install-deps dev --v6d-version v0.22.0
- name: Setup tmate session
if: false
Expand Down
7 changes: 6 additions & 1 deletion analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ bl::result<void> GrapeInstance::archiveGraph(const rpc::GSParams& params) {
bool exists = false;
VY_OK_OR_RAISE(client_->Exists(frag_group_id, exists));
if (exists) {
graph_utils->ArchiveGraph(frag_group_id, comm_spec_, *client_, params);
BOOST_LEAF_CHECK(graph_utils->ArchiveGraph(frag_group_id, comm_spec_,
*client_, params));
} else {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Fragment group " + std::to_string(frag_group_id) +
" does not exist");
}
}
return {};
Expand Down
1 change: 0 additions & 1 deletion analytical_engine/core/object/graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ class PropertyGraphUtils : public GSObject {
vineyard::Client& client,
const rpc::GSParams& params) {
bl::result<void> out;

archive_graph_(frag_id, comm_spec, client, params, out);
return out;
}
Expand Down
141 changes: 131 additions & 10 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

#include <memory>

#include "boost/property_tree/exceptions.hpp"
#include "boost/property_tree/json_parser.hpp"
#include "boost/property_tree/ptree.hpp"

#include "vineyard/client/client.h"
#include "vineyard/common/util/macros.h"
#include "vineyard/graph/fragment/arrow_fragment.h"
Expand Down Expand Up @@ -48,6 +52,64 @@ static constexpr bool compact_v = _GRAPH_TYPE::compact_v;
namespace bl = boost::leaf;
namespace detail {

bool isTerminalValue(const boost::property_tree::ptree& pt) {
// node is terminal and has no children
return pt.empty() && !pt.data().empty();
}
/**
* Parse the selectors from the property tree and store the vertex and edge
* labels selector example:
*
* selector = {
* "vertices": {
* "person": ["id", "firstName", "secondName"],
* "comment": None # select all properties
* },
* "edges": {
* "knows": ["CreationDate"],
* "replyOf": None
* }
* }
*
*/
void parse_selectors(const boost::property_tree::ptree& selector,
std::vector<std::string>& selected_vertices,
std::vector<std::string>& selected_edges,
std::unordered_map<std::string, std::vector<std::string>>&
selected_vertex_properties,
std::unordered_map<std::string, std::vector<std::string>>&
selected_edge_properties) {
const auto& vertices_node = selector.get_child("vertices");
const auto& edges_node = selector.get_child("edges");
for (const auto& item : vertices_node) {
const std::string& label = item.first;
const boost::property_tree::ptree& properties_node = item.second;
selected_vertices.push_back(label);
if (!isTerminalValue(properties_node)) {
// is a list of properties, select only these properties
selected_vertex_properties[label] = std::vector<std::string>();
for (const auto& sub_item : properties_node) {
selected_vertex_properties[label].push_back(
sub_item.second.get_value<std::string>());
}
}
}

for (const auto& item : edges_node) {
const auto& edge_label = item.first;
const auto& properties_node = item.second;
selected_edges.push_back(edge_label);
if (!isTerminalValue(properties_node)) {
// is a list of properties, select only these properties
selected_edge_properties[edge_label] = std::vector<std::string>();
for (const auto& sub_item : properties_node) {
selected_edge_properties[edge_label].push_back(
sub_item.second.get_value<std::string>());
}
}
}
}

__attribute__((visibility(
"hidden"))) static bl::result<std::shared_ptr<gs::IFragmentWrapper>>
LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
Expand Down Expand Up @@ -112,13 +174,37 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
#ifdef ENABLE_GAR
BOOST_LEAF_AUTO(graph_info_path,
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));
BOOST_LEAF_ASSIGN(generate_eid, params.Get<bool>(gs::rpc::GENERATE_EID));
BOOST_LEAF_ASSIGN(retain_oid, params.Get<bool>(gs::rpc::RETAIN_OID));
BOOST_LEAF_AUTO(storage_option,
params.Get<std::string>(gs::rpc::STORAGE_OPTIONS));
boost::property_tree::ptree pt;
bool store_in_local;
std::stringstream ss(storage_option);
std::vector<std::string> selected_vertices;
std::vector<std::string> selected_edges;
std::unordered_map<std::string, std::vector<std::string>>
dummy_selected_vertex_properties;
std::unordered_map<std::string, std::vector<std::string>>
dummy_selected_edge_properties;
try {
boost::property_tree::read_json(ss, pt);
store_in_local = pt.get<bool>("graphar_store_in_local", false);
if (pt.find("selector") != pt.not_found()) {
const auto& selector_node = pt.get_child("selector");
parse_selectors(selector_node, selected_vertices, selected_edges,
dummy_selected_vertex_properties,
dummy_selected_edge_properties);
}
} catch (boost::property_tree::ptree_error const& e) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid write_option: " + std::string(e.what()));
}
using loader_t =
vineyard::gar_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info_path);
MPI_Barrier(comm_spec.comm());
BOOST_LEAF_ASSIGN(frag_group_id, loader.LoadFragmentAsFragmentGroup());
auto loader = std::make_unique<loader_t>(client, comm_spec);
BOOST_LEAF_CHECK(loader->Init(graph_info_path, selected_vertices,
selected_edges, true, false,
store_in_local));
BOOST_LEAF_ASSIGN(frag_group_id, loader->LoadFragmentAsFragmentGroup());
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"The vineyard is not compiled with GAR support");
Expand Down Expand Up @@ -187,18 +273,53 @@ __attribute__((visibility("hidden"))) static bl::result<void> ArchiveGraph(
vineyard::ObjectID frag_group_id, const grape::CommSpec& comm_spec,
vineyard::Client& client, const gs::rpc::GSParams& params) {
#ifdef ENABLE_GAR
BOOST_LEAF_AUTO(graph_info_path,
BOOST_LEAF_AUTO(output_path,
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));

BOOST_LEAF_AUTO(write_option,
params.Get<std::string>(gs::rpc::WRITE_OPTIONS));
boost::property_tree::ptree pt;
std::string graph_name, file_type;
int64_t vertex_chunk_size, edge_chunk_size;
bool store_in_local;
std::stringstream ss(write_option);
std::vector<std::string> selected_vertices;
std::vector<std::string> selected_edges;
std::unordered_map<std::string, std::vector<std::string>>
selected_vertex_properties;
std::unordered_map<std::string, std::vector<std::string>>
selected_edge_properties;
try {
boost::property_tree::read_json(ss, pt);
graph_name = pt.get<std::string>("graphar_graph_name");
file_type = pt.get<std::string>("graphar_file_type", "parquet");
vertex_chunk_size =
pt.get<int64_t>("graphar_vertex_chunk_size", 262144); // default 2^18
edge_chunk_size =
pt.get<int64_t>("graphar_edge_chunk_size", 4194304); // default 2^22
store_in_local = pt.get<bool>("graphar_store_in_local", false);
if (pt.find("selector") != pt.not_found()) {
const auto& selector_node = pt.get_child("selector");
parse_selectors(selector_node, selected_vertices, selected_edges,
selected_vertex_properties, selected_edge_properties);
}
} catch (boost::property_tree::ptree_error const& e) {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidValueError,
"Invalid write_option: " + std::string(e.what()));
}
auto fg = std::dynamic_pointer_cast<vineyard::ArrowFragmentGroup>(
client.GetObject(frag_group_id));
auto fid = comm_spec.WorkerToFrag(comm_spec.worker_id());
auto frag_id = fg->Fragments().at(fid);
auto frag = std::static_pointer_cast<_GRAPH_TYPE>(client.GetObject(frag_id));

using archive_t = vineyard::ArrowFragmentWriter<_GRAPH_TYPE>;
archive_t archive(frag, comm_spec, graph_info_path);
archive.WriteFragment();
using writer_t = vineyard::ArrowFragmentWriter<_GRAPH_TYPE>;
auto writer = std::make_unique<writer_t>();
BOOST_LEAF_CHECK(writer->Init(
frag, comm_spec, graph_name, output_path, vertex_chunk_size,
edge_chunk_size, file_type, selected_vertices, selected_edges,
selected_vertex_properties, selected_edge_properties, store_in_local));
BOOST_LEAF_CHECK(writer->WriteGraphInfo(output_path));
BOOST_LEAF_CHECK(writer->WriteFragment());

return {};
#else
Expand Down
1 change: 0 additions & 1 deletion docs/reference/session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Session Functions
graphscope.get_default_session
graphscope.has_default_session
graphscope.set_option
graphscope.get_option
graphscope.g
graphscope.gremlin
graphscope.graphlearn
Loading

0 comments on commit 1dd3916

Please sign in to comment.