diff --git a/flex/storages/rt_mutable_graph/file_names.h b/flex/storages/rt_mutable_graph/file_names.h index 4a2763bfbce0..7ffa6bbb365f 100644 --- a/flex/storages/rt_mutable_graph/file_names.h +++ b/flex/storages/rt_mutable_graph/file_names.h @@ -98,6 +98,13 @@ inline void copy_file(const std::string& src, const std::string& dst) { LOG(ERROR) << "Failed to set read/write permission for file: " << dst << " " << errorCode.message() << std::endl; } + + // For a newly created file, you may need to close and then reopen it, + // otherwise you may encounter a copy_file_range "Invalid cross-device link" + // error, one possible cause of the error could be that the + // file's metadata has not yet been flushed to the file system. + close(dst_fd); + dst_fd = open(dst.c_str(), O_WRONLY); } ssize_t ret; do { diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc index 89ef8f88f848..4336f3280b91 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc @@ -411,6 +411,8 @@ void ODPSFragmentLoader::addVertices(label_t v_label_id, return std::dynamic_pointer_cast(res); } }; + return AbstractArrowFragmentLoader::AddVerticesRecordBatch( + v_label_id, v_files, record_batch_supplier_creator); } void ODPSFragmentLoader::loadVertices() { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs index bc3a7dc1465d..9c6e9ec08db4 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/lib.rs @@ -264,8 +264,11 @@ where { init_env(); let cancel_hook = sink.get_cancel_hook().clone(); - let mut lock = JOB_CANCEL_MAP.write().expect("lock poisoned"); - lock.insert(conf.job_id, cancel_hook); + if let Ok(mut lock) = JOB_CANCEL_MAP.write() { + lock.insert(conf.job_id, cancel_hook); + } else { + return Err(BuildJobError::from("JOB_CANCEL_MAP is poisoned;"))?; + } let peer_guard = Arc::new(AtomicUsize::new(0)); let conf = Arc::new(conf); let workers = allocate_local_worker(&conf)?;