Skip to content

Commit

Permalink
GraphAr: Upgrade GraphAr to v0.11.2 and revise the gar loader and wri…
Browse files Browse the repository at this point in the history
…ter (#1765)

- Upgrade GraphAr to v0.11.2 and adapt the `GARFragmentLoader` and
  `ArrowFragmentWriter` with latest GraphAr API
- Support generate the GraphInfo with schema of property graph.
- Support dumps and load `ArrowFrgment` with GraphAr format with local
  in different workers. (We only support write and load to a shared
  location like nfs or OSS before)

Signed-off-by: acezen <[email protected]>
  • Loading branch information
acezen authored Feb 23, 2024
1 parent 9a15b37 commit 91f000e
Show file tree
Hide file tree
Showing 15 changed files with 629 additions and 310 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build-test-graph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ jobs:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib:/usr/local/lib64:/usr/local/lib/x86_64-linux-gnu
export VINEYARD_DATA_DIR=`pwd`/gstest
export GAR_DATA_DIR=`pwd`/gar-test
export TMPDIR="${TMPDIR:-$(dirname $(mktemp))}"
rm -rf default.etcd
Expand Down
2 changes: 1 addition & 1 deletion modules/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/powturbo/include"

if(BUILD_VINEYARD_GRAPH_WITH_GAR)
target_compile_definitions(vineyard_graph PUBLIC -DENABLE_GAR)
find_package(gar QUIET)
find_package(gar 0.11.2 QUIET)
if (gar_FOUND)
message(STATUS "-- Found GraphAr: ${GAR_LIBRARIES}")
target_include_directories(vineyard_graph PRIVATE ${GAR_INCLUDE_DIRS})
Expand Down
20 changes: 12 additions & 8 deletions modules/graph/fragment/gar_fragment_builder_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,27 +140,31 @@ boost::leaf::result<void> generate_csr(
auto tvnum = tvnums[v_label];
// build the arrow's offset array
std::shared_ptr<arrow::Buffer> offsets_buffer;
int64_t buffer_length = static_cast<int64_t>(tvnum + 1);
ARROW_OK_ASSIGN_OR_RAISE(
offsets_buffer, arrow::AllocateBuffer((tvnum + 1) * sizeof(int64_t)));
offsets_buffer, arrow::AllocateBuffer(buffer_length * sizeof(int64_t)));
if (v_label == vertex_label) {
int64_t array_length = offset_array->length();
VINEYARD_ASSERT(array_length <= buffer_length,
"Invalid offset array: the offset array length is larger "
"than the tvnum + 1.");
memcpy(offsets_buffer->mutable_data(),
reinterpret_cast<const uint8_t*>(offset_array->raw_values()),
offset_array->length() * sizeof(int64_t));
array_length * sizeof(int64_t));
// we do not store the edge offset of outer vertices, so fill edge_num
// to the outer vertices offset
std::fill_n(
reinterpret_cast<int64_t*>(offsets_buffer->mutable_data() +
offset_array->length() * sizeof(int64_t)),
(tvnum + 1) - offset_array->length(), edge_num);
std::fill_n(reinterpret_cast<int64_t*>(offsets_buffer->mutable_data() +
array_length * sizeof(int64_t)),
buffer_length - array_length, edge_num);
edges[v_label] =
std::make_shared<PodArrayBuilder<nbr_unit_t>>(client, edge_num);
} else {
std::fill_n(reinterpret_cast<int64_t*>(offsets_buffer->mutable_data()),
tvnum + 1, 0);
buffer_length, 0);
edges[v_label] = std::make_shared<PodArrayBuilder<nbr_unit_t>>(client, 0);
}
edge_offsets[v_label] = std::make_shared<arrow::Int64Array>(
arrow::int64(), tvnum + 1, offsets_buffer, nullptr, 0, 0);
arrow::int64(), buffer_length, offsets_buffer, nullptr, 0, 0);
}

std::vector<int64_t> chunk_offsets(num_chunks + 1, 0);
Expand Down
32 changes: 0 additions & 32 deletions modules/graph/fragment/gar_fragment_builder_string.cc

This file was deleted.

6 changes: 4 additions & 2 deletions modules/graph/loader/gar_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ limitations under the License.

#include "graph/loader/gar_fragment_loader.h"

#include "gar/util/data_type.h"

#include "arrow/api.h"
#include "gar/graph_info.h"

namespace vineyard {

std::shared_ptr<arrow::Schema> ConstructSchemaFromPropertyGroup(
const GraphArchive::PropertyGroup& property_group) {
const std::shared_ptr<GraphArchive::PropertyGroup>& property_group) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const auto& prop : property_group.GetProperties()) {
for (const auto& prop : property_group->GetProperties()) {
fields.emplace_back(arrow::field(
prop.name, GraphArchive::DataType::DataTypeToArrowDataType(prop.type)));
}
Expand Down
58 changes: 28 additions & 30 deletions modules/graph/loader/gar_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ limitations under the License.

namespace GraphArchive {
class GraphInfo;
class VertexInfo;
class EdgeInfo;
class PropertyGroup;
enum class AdjListType : std::uint8_t;
Expand All @@ -51,7 +52,7 @@ enum class AdjListType : std::uint8_t;
namespace vineyard {

std::shared_ptr<arrow::Schema> ConstructSchemaFromPropertyGroup(
const GraphArchive::PropertyGroup& property_group);
const std::shared_ptr<GraphArchive::PropertyGroup>& property_group);

template <typename OID_T = property_graph_types::OID_TYPE,
typename VID_T = property_graph_types::VID_TYPE,
Expand Down Expand Up @@ -80,15 +81,26 @@ class GARFragmentLoader {

public:
/**
* @brief Initialize the GARFragmentLoader.
* Notes that if vertex_labels or edge_labels are empty, the loader will
* load all vertices and edges.
*
* @param client
* @param comm_spec
* @param graph_info The graph info of the GAR.
* @param directed
* @param client vineyard client.
* @param comm_spec communication spec.
* @param graph_info_yaml graph info yaml path.
* @param vertex_labels vertex labels to project subgraph.
* @param edge_labels edge labels to project subgraph.
* @param directed whether the graph is directed.
* @param generate_eid whether to generate edge id.
* @param store_in_local whether the gar data files are stored in local.
*/
GARFragmentLoader(Client& client, const grape::CommSpec& comm_spec,
const std::string& graph_info_yaml, bool directed = true,
bool generate_eid = false);
GARFragmentLoader(
Client& client, const grape::CommSpec& comm_spec,
const std::string& graph_info_yaml,
const std::vector<std::string>& vertex_labels = {},
const std::vector<std::vector<std::string>>& edge_labels = {},
bool directed = true, bool generate_eid = false,
bool store_in_local = false);

~GARFragmentLoader() = default;

Expand All @@ -111,7 +123,7 @@ class GARFragmentLoader {
const std::string& vertex_label);

boost::leaf::result<void> loadEdgeTableOfLabel(
const GraphArchive::EdgeInfo& edge_info,
const std::shared_ptr<GraphArchive::EdgeInfo>& edge_info,
GraphArchive::AdjListType adj_list_type);

boost::leaf::result<void> initSchema(PropertyGraphSchema& schema);
Expand All @@ -131,34 +143,16 @@ class GARFragmentLoader {
label_id_t label_id, const std::shared_ptr<arrow::Array> id_array_in,
bool all_be_local_vertex, std::shared_ptr<arrow::Array>& out);

fid_t getPartitionId(gar_id_t oid, label_id_t label_id) {
auto chunk_index = oid / vertex_chunk_sizes_[label_id];
auto& vertex_chunk_begins =
vertex_chunk_begin_of_frag_[vertex_labels_[label_id]];
// binary search
fid_t low = 0, high = comm_spec_.fnum();
while (low <= high) {
fid_t mid = (low + high) / 2;
if (vertex_chunk_begins[mid] <= chunk_index &&
vertex_chunk_begins[mid + 1] > chunk_index) {
return mid;
} else if (vertex_chunk_begins[mid] > chunk_index) {
high = mid - 1;
} else {
low = mid + 1;
}
}
return low;
}
boost::leaf::result<void> initializeVertexChunkBeginAndNum(
int vertex_label_index,
const std::shared_ptr<GraphArchive::VertexInfo>& vertex_info);

private:
Client& client_;
grape::CommSpec comm_spec_;
std::shared_ptr<vertex_map_t> vm_ptr_;
std::shared_ptr<GraphArchive::GraphInfo> graph_info_;

std::map<std::string, std::vector<int64_t>> vertex_chunk_begin_of_frag_;

bool directed_;

label_id_t vertex_label_num_;
Expand All @@ -177,6 +171,10 @@ class GARFragmentLoader {

bool generate_eid_;
IdParser<vid_t> vid_parser_;

bool store_in_local_;
std::vector<int64_t> vertex_chunk_begins_;
std::vector<int64_t> vertex_chunk_nums_;
};

namespace detail {
Expand Down
Loading

0 comments on commit 91f000e

Please sign in to comment.