Skip to content

Commit

Permalink
Fix error for using vineyard-graph-loader with multi-workers. (#1603)
Browse files Browse the repository at this point in the history
Fixes #1602

Signed-off-by: SighingSnow <[email protected]>
  • Loading branch information
SighingSnow authored Nov 2, 2023
1 parent 0eefec8 commit 5cc5a2c
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 302 deletions.
110 changes: 81 additions & 29 deletions modules/graph/fragment/arrow_fragment_builder_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,10 @@ ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::AddEdgesToExistedLabel(

// get previous src_id, dst_id.
std::vector<vid_t> prev_src_ids, prev_dst_ids;
// we get sorted lids, we need to recover the original order
// one way is to rearrange table, the other is rearragne lids
// we choose rearrange lids
std::vector<eid_t> eids;
std::shared_ptr<vid_array_t> prev_src_id_list, prev_dst_id_list;
std::unordered_map<std::string, int> vis;
for (label_id_t v_label = 0; v_label < vertex_label_num_; v_label++) {
Expand All @@ -996,32 +1000,65 @@ ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::AddEdgesToExistedLabel(
for (auto v : iv) {
auto offset = vertex_offset(v);
vid_t src_id = vid_parser_.GenerateId(v_label, offset);
// if(!directed_ && vis.count(src_id)) continue;
auto oe = this->GetOutgoingAdjList(v, label_id);
// if(oe.Size()) prev_src_ids.push_back(src_id);
for (auto& e : oe) {
auto dst_label = vertex_label(e.neighbor());
auto dst_offset = vertex_offset(e.neighbor());
vid_t dst_id = vid_parser_.GenerateId(dst_label, dst_offset);
std::string key1 =
std::to_string(src_id) + "_" + std::to_string(dst_id);
std::string key2 =
std::to_string(dst_id) + "_" + std::to_string(src_id);
if (!directed_) {
if (vis.count(std::to_string(src_id) + std::to_string(dst_id)) ||
vis.count(std::to_string(dst_id) + std::to_string(src_id)))
if (vis.count(key1) || vis.count(key2)) {
continue;
vis[std::to_string(src_id) + std::to_string(dst_id)] = 1;
vis[std::to_string(dst_id) + std::to_string(src_id)] = 1;
}

vis[key1] = 1;
vis[key2] = 1;
eids.push_back(e.edge_id());
prev_src_ids.push_back(src_id);
prev_dst_ids.push_back(dst_id);
} else {
if (vis.count(std::to_string(src_id) + std::to_string(dst_id)))
if (vis.count(key1)) {
continue;
vis[std::to_string(src_id) + std::to_string(dst_id)] = 1;
}
vis[key1] = 1;
eids.push_back(e.edge_id());
prev_src_ids.push_back(src_id);
prev_dst_ids.push_back(dst_id);
}
}
if (directed_) {
auto ie = this->GetIncomingAdjList(v, label_id);
for (auto& e : ie) {
auto dst_label = vertex_label(e.neighbor());
auto dst_offset = vertex_offset(e.neighbor());
vid_t dst_id = vid_parser_.GenerateId(dst_label, dst_offset);
std::string key =
std::to_string(dst_id) + "_" + std::to_string(src_id);
if (vis.count(key)) {
continue;
}
vis[key] = 1;
eids.push_back(e.edge_id());
prev_src_ids.push_back(dst_id);
prev_dst_ids.push_back(src_id);
}
}
}
}

std::vector<vid_t> sorted_src_ids(prev_src_ids.size());
std::vector<vid_t> sorted_dst_ids(prev_dst_ids.size());

for (size_t i = 0; i < eids.size(); ++i) {
sorted_src_ids[eids[i]] = prev_src_ids[i];
sorted_dst_ids[eids[i]] = prev_dst_ids[i];
}
prev_src_ids.clear();
prev_dst_ids.clear();

auto gen_prev_fn =
[](const std::vector<VID_T>& vids, arrow::MemoryPool* pool,
std::shared_ptr<vid_array_t>& lid_list) -> boost::leaf::result<void> {
Expand All @@ -1030,19 +1067,16 @@ ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::AddEdgesToExistedLabel(
ARROW_OK_OR_RAISE(builder.Finish(&lid_list));
return {};
};
gen_prev_fn(prev_src_ids, pool, prev_src_id_list);
prev_src_ids.clear();
gen_prev_fn(prev_dst_ids, pool, prev_dst_id_list);
prev_dst_ids.clear();
gen_prev_fn(sorted_src_ids, pool, prev_src_id_list);
sorted_src_ids.clear();
gen_prev_fn(sorted_dst_ids, pool, prev_dst_id_list);
sorted_dst_ids.clear();
// check duplicates
auto src_id_array = edge_src[0];
auto dst_id_array = edge_dst[0];
edge_src.push_back(prev_src_id_list);
edge_dst.push_back(prev_dst_id_list);

// gurantee the id->table is in correct order
edge_src.push_back(edge_src[0]);
edge_dst.push_back(edge_dst[0]);
edge_src[0] = prev_src_id_list;
edge_dst[0] = prev_dst_id_list;
bool is_multigraph = is_multigraph_;
if (directed_) {
generate_directed_csr<vid_t, eid_t>(
Expand Down Expand Up @@ -1076,8 +1110,8 @@ ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::AddEdgesToExistedLabel(
ArrowFragmentBaseBuilder<OID_T, VID_T, VERTEX_MAP_T, COMPACT> builder(*this);
builder.set_edge_label_num_(edge_label_num_);
std::vector<std::shared_ptr<arrow::Table>> edge_tables;
edge_tables.push_back(edge_data_table(label_id)); // previous one
edge_tables.push_back(std::move(edge_table)); // current one
edge_tables.push_back(edge_data_table(label_id)); // previous one

builder.set_edge_tables_(label_id, std::make_shared<TableBuilder>(
client, std::move(edge_tables), true));
Expand Down Expand Up @@ -1134,17 +1168,35 @@ ArrowFragment<OID_T, VID_T, VERTEX_MAP_T, COMPACT>::AddEdgesToExistedLabel(
builder.oe_offsets_lists_.resize(vertex_label_num_);

for (label_id_t i = 0; i < vertex_label_num_; ++i) {
auto fn = [this, &builder, i, &label_id, &ie_list, &oe_list,
&ie_offsets_list, &oe_offsets_list](Client* client) -> Status {
if (directed_) {
builder.set_ie_lists_(i, label_id, ie_list[i]);
builder.set_ie_offsets_lists_(i, label_id, ie_offsets_list[i]);
}
builder.set_oe_lists_(i, label_id, oe_list[i]);
builder.set_oe_offsets_lists_(i, label_id, oe_offsets_list[i]);
return Status::OK();
};
tg.AddTask(fn, &client);
if (directed_) {
builder.ie_lists_[i].resize(edge_label_num_);
builder.ie_offsets_lists_[i].resize(edge_label_num_);
}
builder.oe_lists_[i].resize(edge_label_num_);
builder.oe_offsets_lists_[i].resize(edge_label_num_);
for (label_id_t j = 0; j < edge_label_num_; ++j) {
auto fn = [this, &builder, &label_id, &ie_list, &oe_list,
&ie_offsets_list, &oe_offsets_list, &ie_offsets_lists_expanded,
&oe_offsets_lists_expanded](Client* client, const label_id_t i,
const label_id_t j) -> Status {
if (j == label_id) {
if (directed_) {
builder.set_ie_lists_(i, j, ie_list[i]);
builder.set_ie_offsets_lists_(i, j, ie_offsets_list[i]);
}
builder.set_oe_lists_(i, j, oe_list[i]);
builder.set_oe_offsets_lists_(i, j, oe_offsets_list[i]);
} else {
if (directed_) {
builder.set_ie_offsets_lists_(i, j,
ie_offsets_lists_expanded[i][j]);
}
builder.set_oe_offsets_lists_(i, j, oe_offsets_lists_expanded[i][j]);
}
return Status::OK();
};
tg.AddTask(fn, &client, i, j);
}
}
tg.TakeResults();

Expand Down
Loading

0 comments on commit 5cc5a2c

Please sign in to comment.