Skip to content

Commit

Permalink
Merge branch 'alibaba:main' into groot_rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
longbinlai authored Dec 29, 2023
2 parents a0f6515 + 4a23938 commit b91033f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
7 changes: 7 additions & 0 deletions flex/storages/rt_mutable_graph/file_names.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ inline void copy_file(const std::string& src, const std::string& dst) {
LOG(ERROR) << "Failed to set read/write permission for file: " << dst
<< " " << errorCode.message() << std::endl;
}

// For a newly created file, you may need to close and then reopen it,
// otherwise you may encounter a copy_file_range "Invalid cross-device link"
// error, one possible cause of the error could be that the
// file's metadata has not yet been flushed to the file system.
close(dst_fd);
dst_fd = open(dst.c_str(), O_WRONLY);
}
ssize_t ret;
do {
Expand Down
2 changes: 2 additions & 0 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ void ODPSFragmentLoader::addVertices(label_t v_label_id,
return std::dynamic_pointer_cast<IRecordBatchSupplier>(res);
}
};
return AbstractArrowFragmentLoader::AddVerticesRecordBatch(
v_label_id, v_files, record_batch_supplier_creator);
}

void ODPSFragmentLoader::loadVertices() {
Expand Down
7 changes: 5 additions & 2 deletions interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,11 @@ where
{
init_env();
let cancel_hook = sink.get_cancel_hook().clone();
let mut lock = JOB_CANCEL_MAP.write().expect("lock poisoned");
lock.insert(conf.job_id, cancel_hook);
if let Ok(mut lock) = JOB_CANCEL_MAP.write() {
lock.insert(conf.job_id, cancel_hook);
} else {
return Err(BuildJobError::from("JOB_CANCEL_MAP is poisoned;"))?;
}
let peer_guard = Arc::new(AtomicUsize::new(0));
let conf = Arc::new(conf);
let workers = allocate_local_worker(&conf)?;
Expand Down

0 comments on commit b91033f

Please sign in to comment.