diff --git a/modules/graph/fragment/arrow_fragment_builder_impl.h b/modules/graph/fragment/arrow_fragment_builder_impl.h index 29e08184..6194b8f3 100644 --- a/modules/graph/fragment/arrow_fragment_builder_impl.h +++ b/modules/graph/fragment/arrow_fragment_builder_impl.h @@ -988,6 +988,10 @@ ArrowFragment::AddEdgesToExistedLabel( // get previous src_id, dst_id. std::vector prev_src_ids, prev_dst_ids; + // we get sorted lids, we need to recover the original order + // one way is to rearrange table, the other is rearragne lids + // we choose rearrange lids + std::vector eids; std::shared_ptr prev_src_id_list, prev_dst_id_list; std::unordered_map vis; for (label_id_t v_label = 0; v_label < vertex_label_num_; v_label++) { @@ -996,32 +1000,65 @@ ArrowFragment::AddEdgesToExistedLabel( for (auto v : iv) { auto offset = vertex_offset(v); vid_t src_id = vid_parser_.GenerateId(v_label, offset); - // if(!directed_ && vis.count(src_id)) continue; auto oe = this->GetOutgoingAdjList(v, label_id); - // if(oe.Size()) prev_src_ids.push_back(src_id); for (auto& e : oe) { auto dst_label = vertex_label(e.neighbor()); auto dst_offset = vertex_offset(e.neighbor()); vid_t dst_id = vid_parser_.GenerateId(dst_label, dst_offset); + std::string key1 = + std::to_string(src_id) + "_" + std::to_string(dst_id); + std::string key2 = + std::to_string(dst_id) + "_" + std::to_string(src_id); if (!directed_) { - if (vis.count(std::to_string(src_id) + std::to_string(dst_id)) || - vis.count(std::to_string(dst_id) + std::to_string(src_id))) + if (vis.count(key1) || vis.count(key2)) { continue; - vis[std::to_string(src_id) + std::to_string(dst_id)] = 1; - vis[std::to_string(dst_id) + std::to_string(src_id)] = 1; + } + + vis[key1] = 1; + vis[key2] = 1; + eids.push_back(e.edge_id()); prev_src_ids.push_back(src_id); prev_dst_ids.push_back(dst_id); } else { - if (vis.count(std::to_string(src_id) + std::to_string(dst_id))) + if (vis.count(key1)) { continue; - vis[std::to_string(src_id) + std::to_string(dst_id)] = 1; + } + vis[key1] = 1; + eids.push_back(e.edge_id()); prev_src_ids.push_back(src_id); prev_dst_ids.push_back(dst_id); } } + if (directed_) { + auto ie = this->GetIncomingAdjList(v, label_id); + for (auto& e : ie) { + auto dst_label = vertex_label(e.neighbor()); + auto dst_offset = vertex_offset(e.neighbor()); + vid_t dst_id = vid_parser_.GenerateId(dst_label, dst_offset); + std::string key = + std::to_string(dst_id) + "_" + std::to_string(src_id); + if (vis.count(key)) { + continue; + } + vis[key] = 1; + eids.push_back(e.edge_id()); + prev_src_ids.push_back(dst_id); + prev_dst_ids.push_back(src_id); + } + } } } + std::vector sorted_src_ids(prev_src_ids.size()); + std::vector sorted_dst_ids(prev_dst_ids.size()); + + for (size_t i = 0; i < eids.size(); ++i) { + sorted_src_ids[eids[i]] = prev_src_ids[i]; + sorted_dst_ids[eids[i]] = prev_dst_ids[i]; + } + prev_src_ids.clear(); + prev_dst_ids.clear(); + auto gen_prev_fn = [](const std::vector& vids, arrow::MemoryPool* pool, std::shared_ptr& lid_list) -> boost::leaf::result { @@ -1030,19 +1067,16 @@ ArrowFragment::AddEdgesToExistedLabel( ARROW_OK_OR_RAISE(builder.Finish(&lid_list)); return {}; }; - gen_prev_fn(prev_src_ids, pool, prev_src_id_list); - prev_src_ids.clear(); - gen_prev_fn(prev_dst_ids, pool, prev_dst_id_list); - prev_dst_ids.clear(); + gen_prev_fn(sorted_src_ids, pool, prev_src_id_list); + sorted_src_ids.clear(); + gen_prev_fn(sorted_dst_ids, pool, prev_dst_id_list); + sorted_dst_ids.clear(); // check duplicates auto src_id_array = edge_src[0]; auto dst_id_array = edge_dst[0]; + edge_src.push_back(prev_src_id_list); + edge_dst.push_back(prev_dst_id_list); - // gurantee the id->table is in correct order - edge_src.push_back(edge_src[0]); - edge_dst.push_back(edge_dst[0]); - edge_src[0] = prev_src_id_list; - edge_dst[0] = prev_dst_id_list; bool is_multigraph = is_multigraph_; if (directed_) { generate_directed_csr( @@ -1076,8 +1110,8 @@ ArrowFragment::AddEdgesToExistedLabel( ArrowFragmentBaseBuilder builder(*this); builder.set_edge_label_num_(edge_label_num_); std::vector> edge_tables; - edge_tables.push_back(edge_data_table(label_id)); // previous one edge_tables.push_back(std::move(edge_table)); // current one + edge_tables.push_back(edge_data_table(label_id)); // previous one builder.set_edge_tables_(label_id, std::make_shared( client, std::move(edge_tables), true)); @@ -1134,17 +1168,35 @@ ArrowFragment::AddEdgesToExistedLabel( builder.oe_offsets_lists_.resize(vertex_label_num_); for (label_id_t i = 0; i < vertex_label_num_; ++i) { - auto fn = [this, &builder, i, &label_id, &ie_list, &oe_list, - &ie_offsets_list, &oe_offsets_list](Client* client) -> Status { - if (directed_) { - builder.set_ie_lists_(i, label_id, ie_list[i]); - builder.set_ie_offsets_lists_(i, label_id, ie_offsets_list[i]); - } - builder.set_oe_lists_(i, label_id, oe_list[i]); - builder.set_oe_offsets_lists_(i, label_id, oe_offsets_list[i]); - return Status::OK(); - }; - tg.AddTask(fn, &client); + if (directed_) { + builder.ie_lists_[i].resize(edge_label_num_); + builder.ie_offsets_lists_[i].resize(edge_label_num_); + } + builder.oe_lists_[i].resize(edge_label_num_); + builder.oe_offsets_lists_[i].resize(edge_label_num_); + for (label_id_t j = 0; j < edge_label_num_; ++j) { + auto fn = [this, &builder, &label_id, &ie_list, &oe_list, + &ie_offsets_list, &oe_offsets_list, &ie_offsets_lists_expanded, + &oe_offsets_lists_expanded](Client* client, const label_id_t i, + const label_id_t j) -> Status { + if (j == label_id) { + if (directed_) { + builder.set_ie_lists_(i, j, ie_list[i]); + builder.set_ie_offsets_lists_(i, j, ie_offsets_list[i]); + } + builder.set_oe_lists_(i, j, oe_list[i]); + builder.set_oe_offsets_lists_(i, j, oe_offsets_list[i]); + } else { + if (directed_) { + builder.set_ie_offsets_lists_(i, j, + ie_offsets_lists_expanded[i][j]); + } + builder.set_oe_offsets_lists_(i, j, oe_offsets_lists_expanded[i][j]); + } + return Status::OK(); + }; + tg.AddTask(fn, &client, i, j); + } } tg.TakeResults(); diff --git a/modules/graph/test/arrow_fragment_extend_test.cc b/modules/graph/test/arrow_fragment_extend_test.cc new file mode 100644 index 00000000..f87395d9 --- /dev/null +++ b/modules/graph/test/arrow_fragment_extend_test.cc @@ -0,0 +1,259 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include + +#include +#include +#include + +#include "client/client.h" + +#include "common/util/functions.h" +#include "common/util/uuid.h" +#include "graph/loader/arrow_fragment_loader.h" +#include "graph/loader/fragment_loader_utils.h" + +using namespace vineyard; // NOLINT(build/namespaces) + +using GraphType = ArrowFragment; +using LabelType = typename GraphType::label_id_t; + +bool Validate(vineyard::Client& client, const grape::CommSpec& comm_spec, + vineyard::ObjectID true_frag_group, + vineyard::ObjectID test_frag_group) { + std::shared_ptr test_fg = + std::dynamic_pointer_cast( + client.GetObject(test_frag_group)); + std::shared_ptr true_fg = + std::dynamic_pointer_cast( + client.GetObject(true_frag_group)); + auto edges = [](const std::shared_ptr& frag, + std::vector>& oe, + std::vector>& ie) { + for (LabelType elabel = 0; elabel < frag->edge_label_num(); ++elabel) { + for (LabelType vlabel = 0; vlabel < frag->vertex_label_num(); ++vlabel) { + auto iv = frag->InnerVertices(vlabel); + for (auto v : iv) { + std::string src_id = frag->GetId(v); + auto oes = frag->GetOutgoingAdjList(v, elabel); + for (auto e : oes) { + auto dst_id = frag->GetId(e.neighbor()); + int data = e.get_data(0); + oe.emplace_back(src_id, dst_id, data); + } + if (frag->directed()) { + auto ies = frag->GetIncomingAdjList(v, elabel); + for (auto& e : ies) { + auto dst_id = frag->GetId(e.neighbor()); + int data = e.get_int(0); + ie.emplace_back(dst_id, src_id, data); + } + } + } + } + } + sort(oe.begin(), oe.end()); + sort(ie.begin(), ie.end()); + }; + + auto inner_vertices = [](const std::shared_ptr& frag, + std::vector& verts) { + for (LabelType vlabel = 0; vlabel < frag->vertex_label_num(); ++vlabel) { + auto iv = frag->InnerVertices(vlabel); + for (auto v : iv) { + std::string t = frag->GetId(v); + verts.push_back(t); + } + } + sort(verts.begin(), verts.end()); + }; + + // NB: only retrieve local fragments. + int f1_num = true_fg->total_frag_num(), f2_num = test_fg->total_frag_num(); + int idx1 = 0, idx2 = 0; + std::vector> true_ivs(f1_num), test_ivs(f2_num); + std::vector>> true_ies( + f1_num), + true_oes(f1_num), test_ies(f2_num), test_oes(f2_num); + auto const& true_fragments = true_fg->Fragments(); + for (const auto& pair : true_fg->FragmentLocations()) { + if (pair.second == client.instance_id()) { + auto frag = std::dynamic_pointer_cast( + client.GetObject(true_fragments.at(pair.first))); + edges(frag, true_ies[idx1], true_oes[idx1]); + inner_vertices(frag, true_ivs[idx1]); + idx1++; + } + } + + auto const& test_fragments = test_fg->Fragments(); + for (const auto& pair : test_fg->FragmentLocations()) { + if (pair.second == client.instance_id()) { + auto frag = std::dynamic_pointer_cast( + client.GetObject(test_fragments.at(pair.first))); + inner_vertices(frag, test_ivs[idx2]); + edges(frag, test_ies[idx2], test_oes[idx2]); + idx2++; + } + } + + if (idx1 != idx2) { + LOG(ERROR) << "fragment number is different"; + return false; + } + + int len = idx1; + for (int i = 0; i < len; ++i) { + if (true_ivs[i].size() != test_ivs[i].size()) { + LOG(ERROR) << "different inner vertices number"; + return false; + } + if (true_ies[i].size() != test_ies[i].size()) { + LOG(ERROR) << "different inner edges number"; + return false; + } + if (true_oes[i].size() != test_oes[i].size()) { + LOG(ERROR) << "different inner edges number"; + return false; + } + for (int j = 0; j < true_ivs[i].size(); ++j) { + if (true_ivs[i][j] != test_ivs[i][j]) { + LOG(ERROR) << "ground-truth v is " << true_ivs[i][j] + << "and program is " << test_ivs[i][j]; + return false; + } + } + for (int j = 0; j < true_ies[i].size(); ++j) { + if (true_ies[i][j] != test_ies[i][j]) { + LOG(ERROR) << "different inner edge"; + return false; + } + } + for (int j = 0; j < true_oes[i].size(); ++j) { + if (true_oes[i][j] != test_oes[i][j]) { + LOG(ERROR) << "different outgoing edge"; + return false; + } + } + } + return true; +} + +int main(int argc, char** argv) { + if (argc < 2) { + printf( + "usage: ./arrow_fragment_label_data_extend [vdata_path] " + "[edata_path]\n"); + return 1; + } + int index = 1; + std::string ipc_socket = std::string(argv[index++]); + std::string v_file_path = vineyard::ExpandEnvironmentVariables(argv[index++]); + std::string e_file_path = vineyard::ExpandEnvironmentVariables(argv[index++]); + + std::string v_file_suffix = ".csv#header_row=true&label=person"; + std::string e_file_suffix = + ".csv#header_row=true&label=knows&src_label=person&dst_label=person"; + + vineyard::Client client; + VINEYARD_CHECK_OK(client.Connect(ipc_socket)); + + LOG(INFO) << "Connected to IPCServer: " << ipc_socket; + + using loader_t = + ArrowFragmentLoader; + + grape::InitMPIComm(); + { + grape::CommSpec comm_spec; + comm_spec.Init(MPI_COMM_WORLD); + vineyard::ObjectID true_frag_group; + + // first construct a basic graph + { + MPI_Barrier(comm_spec.comm()); + std::string vfile = v_file_path + v_file_suffix; + std::string efile = e_file_path + e_file_suffix; + auto loader = std::make_unique( + client, comm_spec, std::vector{efile}, + std::vector{vfile}, /* directed */ 1, + /*generate_eid*/ false); + true_frag_group = loader->LoadFragmentAsFragmentGroup().value(); + } + + vineyard::ObjectID test_frag_group; + LOG(INFO) << "start test extending"; + { + MPI_Barrier(comm_spec.comm()); + // vertex_extending + { + std::string vfile = v_file_path + "_1" + v_file_suffix; + auto loader = std::make_unique( + client, comm_spec, std::vector{}, + std::vector{vfile}, /* directed */ 1, + /*generate_eid*/ false, /* retain_oid */ true); + test_frag_group = loader->LoadFragment().value(); + for (int i = 2; i < 4; ++i) { + LOG(INFO) << "start extending " << i << "th vertex table"; + vfile = v_file_path + "_" + std::to_string(i) + v_file_suffix; + auto loader = std::make_unique( + client, comm_spec, std::vector{}, + std::vector{vfile}, /* directed */ 1, + /*generate_eid*/ false, /* retain_oid */ true); + test_frag_group = + loader->AddDataToExistedVLabel(test_frag_group, 0).value(); + LOG(INFO) << "end extending " << i << "th vertex table"; + } + } + // edge_extending + LOG(INFO) << "start edge extending"; + { + std::string efile = e_file_path + "_1" + e_file_suffix; + // first load one part e_file + auto loader = std::make_unique( + client, comm_spec, std::vector{efile}, + std::vector{}, /* directed */ 1, + /*generate_eid*/ false, /* retain_oid */ true); + test_frag_group = loader->AddLabelsToFragment(test_frag_group).value(); + for (int i = 2; i < 4; ++i) { + LOG(INFO) << "start extending " << i << "th edge table"; + efile = e_file_path + "_" + std::to_string(i) + e_file_suffix; + auto loader = std::make_unique( + client, comm_spec, std::vector{efile}, + std::vector{}, /* directed */ 1, + /*generate_eid*/ false, /* retain_oid */ true); + test_frag_group = + loader->AddDataToExistedELabel(test_frag_group, 0).value(); + LOG(INFO) << "end extending " << i << "th edge table"; + } + test_frag_group = + ConstructFragmentGroup(client, test_frag_group, comm_spec).value(); + } + + LOG(INFO) << "end edge extending"; + MPI_Barrier(comm_spec.comm()); + } + bool valid = Validate(client, comm_spec, true_frag_group, test_frag_group); + if (valid) { + LOG(INFO) << "Passed arrow fragment data extend test..."; + } else { + LOG(ERROR) << "Failed arrow fragment data extend test..."; + } + } + grape::FinalizeMPIComm(); + + return 0; +} diff --git a/modules/graph/test/arrow_fragment_label_data_extend.cc b/modules/graph/test/arrow_fragment_label_data_extend.cc deleted file mode 100644 index 47a97b18..00000000 --- a/modules/graph/test/arrow_fragment_label_data_extend.cc +++ /dev/null @@ -1,273 +0,0 @@ -/** Copyright 2020-2023 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include - -#include -#include - -#include "client/client.h" - -#include "common/util/logging.h" -#include "common/util/uuid.h" -#include "graph/fragment/graph_schema.h" -#include "graph/loader/arrow_fragment_loader.h" -#include "graph/loader/fragment_loader_utils.h" - -using namespace vineyard; // NOLINT(build/namespaces) - -using GraphType = ArrowFragment; -using LabelType = typename GraphType::label_id_t; - -void WriteOut(vineyard::Client& client, const grape::CommSpec& comm_spec, - vineyard::ObjectID fragment_group_id) { - LOG(INFO) << "Loaded graph to vineyard: " << fragment_group_id; - std::shared_ptr fg = - std::dynamic_pointer_cast( - client.GetObject(fragment_group_id)); - - for (const auto& pair : fg->Fragments()) { - LOG(INFO) << "[frag-" << pair.first << "]: " << pair.second; - } - - // NB: only retrieve local fragments. - auto locations = fg->FragmentLocations(); - for (const auto& pair : fg->Fragments()) { - if (locations.at(pair.first) != client.instance_id()) { - continue; - } - auto frag_id = pair.second; - auto frag = std::dynamic_pointer_cast(client.GetObject(frag_id)); - auto schema = frag->schema(); - auto mg_schema = vineyard::MaxGraphSchema(schema); - mg_schema.DumpToFile("/tmp/" + std::to_string(fragment_group_id) + ".json"); - - LOG(INFO) << "graph total node number: " << frag->GetTotalNodesNum(); - LOG(INFO) << "fragment edge number: " << frag->GetEdgeNum(); - LOG(INFO) << "fragment in edge number: " << frag->GetInEdgeNum(); - LOG(INFO) << "fragment out edge number: " << frag->GetOutEdgeNum(); - - LOG(INFO) << "[worker-" << comm_spec.worker_id() - << "] loaded graph to vineyard: " << ObjectIDToString(frag_id) - << " ..."; - - LOG(INFO) << "--------------- relabel vertex table check ..."; - for (LabelType vlabel = 0; vlabel < frag->vertex_label_num(); ++vlabel) { - LOG(INFO) << "--------------- start dump vertex label" << vlabel - << "---------------"; - auto iv = frag->InnerVertices(vlabel); - for (auto v : iv) { - LOG(INFO) << frag->GetId(v); - } - } - - LOG(INFO) << "--------------- relabel edge table check ..."; - - for (LabelType elabel = 0; elabel < frag->edge_label_num(); ++elabel) { - LOG(INFO) << "--------------- start dump edge label " << elabel - << "---------------"; - for (LabelType vlabel = 0; vlabel < frag->vertex_label_num(); ++vlabel) { - auto ie = frag->InnerVertices(vlabel); - for (auto v : ie) { - auto oe = frag->GetOutgoingAdjList(v, elabel); - for (auto e : oe) { - LOG(INFO) << frag->GetId(v) << " -> " << frag->GetId(e.neighbor()) - << " " << static_cast(e.get_data(0)) - << " " << static_cast(e.get_data(1)) - << " " << static_cast(e.get_data(2)) - << " " << static_cast(e.get_data(3)); - } - if (frag->directed()) { - auto ie = frag->GetIncomingAdjList(v, elabel); - for (auto& e : ie) { - LOG(INFO) << frag->GetId(e.neighbor()) << " -> " << frag->GetId(v) - << " " << static_cast(e.get_data(0)) - << " " << static_cast(e.get_data(1)) - << " " << static_cast(e.get_data(2)) - << " " << static_cast(e.get_data(3)); - } - } - } - } - } - } -} - -namespace detail { - -std::shared_ptr makeInt64Array(int idx) { - std::vector data(4); - arrow::Int64Builder builder; - int begin = idx * 4; - for (int i = begin; i < begin + 4; i++) { - data[i - begin] = i; - } - CHECK_ARROW_ERROR(builder.AppendValues(data)); - std::shared_ptr out; - CHECK_ARROW_ERROR(builder.Finish(&out)); - return arrow::ChunkedArray::Make({out}).ValueOrDie(); -} - -std::shared_ptr attachMetadata( - std::shared_ptr schema, std::string const& key, - std::string const& value) { - std::shared_ptr metadata; - if (schema->HasMetadata()) { - metadata = schema->metadata()->Copy(); - } else { - metadata = std::make_shared(); - } - metadata->Append(key, value); - return schema->WithMetadata(metadata); -} - -std::vector> makeVTables( - int n, std::string label_name = "person") { - std::vector> tables; - auto schema = std::make_shared( - std::vector>{ - arrow::field("id", arrow::int64()), - arrow::field("value1", arrow::int64()), - arrow::field("value2", arrow::int64()), - arrow::field("value3", arrow::int64()), - arrow::field("value4", arrow::int64()), - }); - schema = attachMetadata(schema, "label", "person"); - auto table = arrow::Table::Make( - schema, {makeInt64Array(n), makeInt64Array(n), makeInt64Array(n), - makeInt64Array(n), makeInt64Array(n)}); - tables.push_back(table); - return {tables}; -} - -std::vector>> makeETables( - int n, std::string label_name = "knows") { - std::vector> tables; - auto schema = std::make_shared( - std::vector>{ - arrow::field("src_id", arrow::int64()), - arrow::field("dst_id", arrow::int64()), - arrow::field("value1", arrow::int64()), - arrow::field("value2", arrow::int64()), - arrow::field("value3", arrow::int64()), - arrow::field("value4", arrow::int64()), - }); - schema = attachMetadata(schema, "label", "knows"); - schema = attachMetadata(schema, "src_label", "person"); - schema = attachMetadata(schema, "dst_label", "person"); - auto table = arrow::Table::Make( - schema, {makeInt64Array(n), makeInt64Array(n), makeInt64Array(n), - makeInt64Array(n), makeInt64Array(n), makeInt64Array(n)}); - tables.push_back(table); - return {tables}; -} -} // namespace detail - -int main(int argc, char** argv) { - if (argc < 2) { - printf( - "usage: ./arrow_fragment_label_data_extend [directed]\n"); - return 1; - } - int index = 1; - std::string ipc_socket = std::string(argv[index++]); - - int directed = 1; - if (argc > index) { - directed = atoi(argv[index]); - } - - vineyard::Client client; - VINEYARD_CHECK_OK(client.Connect(ipc_socket)); - - LOG(INFO) << "Connected to IPCServer: " << ipc_socket; - - using loader_t = ArrowFragmentLoader; - - grape::InitMPIComm(); - - { - grape::CommSpec comm_spec; - comm_spec.Init(MPI_COMM_WORLD); - vineyard::ObjectID frag_group_id = vineyard::InvalidObjectID(); - - // first construct a basic graph - { - auto vtables = ::detail::makeVTables(0); - auto etables = ::detail::makeETables(0); - auto loader = std::make_unique( - client, comm_spec, vtables, etables, directed != 0, - /*generate_eid*/ false, /*retain_oid*/ true); - frag_group_id = loader->LoadFragmentAsFragmentGroup().value(); - WriteOut(client, comm_spec, frag_group_id); - } - - for (int i = 1; i < 3; ++i) { - auto vtables = ::detail::makeVTables(i); - auto etables = std::vector>>(); - auto loader = std::make_unique( - client, comm_spec, vtables, etables, directed != 0, - /*generate_eid*/ false, /*retain_oid*/ true); - std::shared_ptr fg = - std::dynamic_pointer_cast( - client.GetObject(frag_group_id)); - auto locations = fg->FragmentLocations(); - for (const auto& pair : fg->Fragments()) { - if (locations.at(pair.first) != client.instance_id()) { - continue; - } - auto frag_id = pair.second; - frag_id = loader->AddDataToExistedVLabel(frag_id, 0).value(); - frag_group_id = - ConstructFragmentGroup(client, frag_id, comm_spec).value(); - } - } - LOG(INFO) << "[START DUMP FRAGMENT GROUP AFTER INCREMENTAL ADD VERTICES]"; - WriteOut(client, comm_spec, frag_group_id); - LOG(INFO) << "[END FRAGMENT GROUP AFTER INCREMENTAL ADD VERTICES]"; - - for (int i = 1; i < 3; ++i) { - auto vtables = std::vector>(); - auto etables = ::detail::makeETables(i); - auto loader = std::make_unique( - client, comm_spec, vtables, etables, directed != 0, - /*generate_eid*/ false, /*retain_oid*/ true); - std::shared_ptr fg = - std::dynamic_pointer_cast( - client.GetObject(frag_group_id)); - auto locations = fg->FragmentLocations(); - for (const auto& pair : fg->Fragments()) { - if (locations.at(pair.first) != client.instance_id()) { - continue; - } - auto frag_id = pair.second; - frag_id = loader->AddDataToExistedELabel(frag_id, 0).value(); - frag_group_id = - ConstructFragmentGroup(client, frag_id, comm_spec).value(); - } - WriteOut(client, comm_spec, frag_group_id); - } - LOG(INFO) << "[START DUMP FRAGMENT GROUP AFTER INCREMENTAL ADD EDGES]"; - WriteOut(client, comm_spec, frag_group_id); - LOG(INFO) << "[END DUMP FRAGMENT GROUP AFTER INCREMENTAL ADD EDGES]"; - } - grape::FinalizeMPIComm(); - - LOG(INFO) << "Passed arrow fragment relabel test..."; - - return 0; -} diff --git a/test/runner.py b/test/runner.py index 3a4c2ecc..73aca9a4 100755 --- a/test/runner.py +++ b/test/runner.py @@ -13,6 +13,8 @@ from argparse import ArgumentParser from typing import Union +import pandas as pd + if 'NON_RANDOM_IPC_SOCKET' in os.environ: VINEYARD_CI_IPC_SOCKET = '/tmp/vineyard.ci.sock' else: @@ -486,6 +488,31 @@ def run_vineyard_spill_tests(meta, allocator, endpoints, tests): run_test(tests, 'spill_test') +def run_graph_extend_test(tests): + data_dir = os.getenv('VINEYARD_DATA_DIR') + vdata = pd.read_csv(data_dir + '/p2p_v.csv') + edata = pd.read_csv(data_dir + '/p2p_e.csv') + n1, n2 = vdata.shape[0] // 3, edata.shape[0] // 3 + for i in range(1, 4): + if i == 3: + m1 = vdata.iloc[2 * n1 :, :] + m2 = edata.iloc[2 * n2 :, :] + else: + m1 = vdata.iloc[(i - 1) * n1 : i * n1, :] + m2 = edata.iloc[(i - 1) * n2 : i * n2, :] + m1.to_csv(data_dir + '/p2p_v_%d.csv' % i, index=False) + m2.to_csv(data_dir + '/p2p_e_%d.csv' % i, index=False) + run_test( + tests, + 'arrow_fragment_extend_test', + '$VINEYARD_DATA_DIR/p2p_v', + '$VINEYARD_DATA_DIR/p2p_e', + ) + for i in range(1, 4): + os.remove(data_dir + '/p2p_v_%d.csv' % i) + os.remove(data_dir + '/p2p_e_%d.csv' % i) + + def run_graph_tests(meta, allocator, endpoints, tests): meta_prefix = 'vineyard_test_%s' % time.time() metadata_settings = make_metadata_settings(meta, endpoints, meta_prefix) @@ -495,6 +522,7 @@ def run_graph_tests(meta, allocator, endpoints, tests): default_ipc_socket=VINEYARD_CI_IPC_SOCKET, ) as (_, rpc_socket_port): run_test(tests, 'arrow_fragment_test') + run_graph_extend_test(tests) run_test( tests, 'arrow_fragment_gar_test',