From e0f493941ec7fe3217a2e8de0a3c02c6f0358a5f Mon Sep 17 00:00:00 2001 From: Brian Zhao Date: Fri, 19 Jan 2024 23:28:09 -0800 Subject: [PATCH] Mutex Works Replay Mutex state via Sync Op record and replay. Using the naive solution for now because when CondVars get replayed they will need locks to be held, so handling that optimization will require some actual thought to get it done properly. Uses `cur_count` in exec_env as an ID. --- include/wamr_exec_env.h | 2 +- include/wamr_memory_instance.h | 4 ++-- include/wamr_wasi_context.h | 1 + lib/wasm-micro-runtime | 2 +- src/checkpoint.cpp | 3 --- src/wamr.cpp | 17 ++++++++++++++--- src/wamr_exec_env.cpp | 2 ++ src/wamr_export.cpp | 3 ++- src/wamr_wasi_context.cpp | 4 +++- test/mutex.c | 1 + test/openmp.c | 4 ++-- 11 files changed, 29 insertions(+), 14 deletions(-) diff --git a/include/wamr_exec_env.h b/include/wamr_exec_env.h index 637bd9f..8faf444 100644 --- a/include/wamr_exec_env.h +++ b/include/wamr_exec_env.h @@ -18,7 +18,7 @@ struct WAMRExecEnv { // multiple // // /* Previous thread's exec env of a WASM module instance. */ // struct WASMExecEnv *prev; - uint8 cur_count{}; + ssize_t cur_count{}; /* Note: field module_inst, argv_buf, native_stack_boundary, susÆ’end_flags, aux_stack_boundary, aux_stack_bottom, and diff --git a/include/wamr_memory_instance.h b/include/wamr_memory_instance.h index feb6119..38549d4 100644 --- a/include/wamr_memory_instance.h +++ b/include/wamr_memory_instance.h @@ -33,7 +33,7 @@ struct WAMRMemoryInstance { void dump_impl(WASMMemoryInstance *env) { module_type = env->module_type; - ref_count = env->ref_count+1; + ref_count = env->ref_count; LOGV(ERROR)<< "ref_count:" << ref_count; num_bytes_per_page = env->num_bytes_per_page; cur_page_count = env->cur_page_count; @@ -49,7 +49,7 @@ struct WAMRMemoryInstance { }; void restore_impl(WASMMemoryInstance *env) { env->module_type = module_type; - env->ref_count = ref_count; + env->ref_count = ref_count+1; LOGV(ERROR)<< "ref_count:" << env->ref_count; env->num_bytes_per_page = num_bytes_per_page; env->cur_page_count = cur_page_count; diff --git a/include/wamr_wasi_context.h b/include/wamr_wasi_context.h index d915ebc..6ae1129 100644 --- a/include/wamr_wasi_context.h +++ b/include/wamr_wasi_context.h @@ -72,6 +72,7 @@ struct SocketMetaData { struct WAMRWASIContext { std::map>>> fd_map; std::map socket_fd_map; + std::vector sync_ops; std::vector dir; std::vector map_dir; WAMRArgvEnvironValues argv_environ; diff --git a/lib/wasm-micro-runtime b/lib/wasm-micro-runtime index 6e96e9c..3fbfcb1 160000 --- a/lib/wasm-micro-runtime +++ b/lib/wasm-micro-runtime @@ -1 +1 @@ -Subproject commit 6e96e9c016ca47fa56f050100480a9730651428b +Subproject commit 3fbfcb1f960f827e8612df50de3caa4c60c391f1 diff --git a/src/checkpoint.cpp b/src/checkpoint.cpp index 4187cfc..dce76d9 100644 --- a/src/checkpoint.cpp +++ b/src/checkpoint.cpp @@ -145,7 +145,6 @@ void serialize_to_file(WASMExecEnv *instance) { close(fd); } auto all_count = 1; - auto cur_count = 0; // fill vector #if !defined(_WIN32) std::unique_lock as_ul(wamr->as_mtx); @@ -184,9 +183,7 @@ void serialize_to_file(WASMExecEnv *instance) { #endif // windows has no threads so only does it once auto a = new WAMRExecEnv(); dump(a, instance); - a->cur_count = cur_count; as.emplace_back(a); - cur_count++; #if !defined(_WIN32) elem = (WASMExecEnv *)bh_list_elem_next(elem); } diff --git a/src/wamr.cpp b/src/wamr.cpp index ce7104c..602ba03 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -122,12 +122,16 @@ void WAMRInstance::invoke_init_c() { if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) { LOGV(ERROR) << "The wasi " << name << " function is not found."; } - wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + else{ + wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + } auto name1 = "__wasm_call_ctors"; if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) { LOGV(ERROR) << "The wasi " << name1 << " function is not found."; } - wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + else{ + wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + } return; } int WAMRInstance::invoke_fopen(std::string &path, uint32 option) { @@ -509,6 +513,7 @@ WAMRInstance::register_tid_map(){ extern "C" int32 pthread_mutex_lock_wrapper(wasm_exec_env_t, uint32*); extern "C" int32 pthread_mutex_unlock_wrapper(wasm_exec_env_t, uint32*); +extern "C" int32 pthread_mutex_init_wrapper(wasm_exec_env_t, uint32*, void*); void WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){ if(main){ @@ -531,6 +536,7 @@ WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){ if((*sync_iter).tid == mytid){ // do op + printf("replay %ld, op %d\n", sync_iter->tid, sync_iter->sync_op); switch(sync_iter->sync_op){ case SYNC_OP_MUTEX_LOCK: pthread_mutex_lock_wrapper(exec_env, &(sync_iter->ref)); @@ -539,6 +545,7 @@ WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){ pthread_mutex_unlock_wrapper(exec_env, &(sync_iter->ref)); break; } + ++sync_iter; // wakeup everyone sync_op_cv.notify_all(); } @@ -580,6 +587,9 @@ void WAMRInstance::recover(std::vector> *execEnv) { restore(main_exec_env, cur_env); auto main_env = cur_env; auto main_saved_call_chain = main_env->restore_call_chain; + cur_thread = main_env->cur_count; + register_tid_map(); + fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain); main_env->is_restore = true; @@ -639,7 +649,7 @@ void WAMRInstance::recover(std::vector> *execEnv) { fprintf(stderr, "invoke main %p %p\n", cur_env, cur_env->restore_call_chain); // replay sync ops to get OS state matching - replay_sync_ops(true, cur_env); + replay_sync_ops(true, main_env); invoke_main(); } } @@ -726,6 +736,7 @@ WASMExecEnv *restore_env() { exec_env->restore_call_chain = s; // */ + wamr->cur_thread = exec_env->cur_count; exec_env->is_restore = true; fprintf(stderr, "restore_env: %p %p\n", exec_env, s); diff --git a/src/wamr_exec_env.cpp b/src/wamr_exec_env.cpp index 8f8b9de..3fe4388 100644 --- a/src/wamr_exec_env.cpp +++ b/src/wamr_exec_env.cpp @@ -18,6 +18,7 @@ void WAMRExecEnv::dump_impl(WASMExecEnv *env) { this->frames.emplace_back(dumped_frame); cur_frame = cur_frame->prev_frame; } + this->cur_count = env->cur_count; } void WAMRExecEnv::restore_impl(WASMExecEnv *env) { env->suspend_flags.flags = flags; @@ -99,4 +100,5 @@ void WAMRExecEnv::restore_impl(WASMExecEnv *env) { } env->cur_frame = prev_frame; } + env->cur_count = this->cur_count; } diff --git a/src/wamr_export.cpp b/src/wamr_export.cpp index 026876c..a39ea97 100644 --- a/src/wamr_export.cpp +++ b/src/wamr_export.cpp @@ -216,7 +216,8 @@ int gettid() { return GetCurrentThreadId(); } #endif void insert_sync_op(wasm_exec_env_t exec_env, uint32* mutex, enum sync_op locking){ - struct sync_op_t sync_op = {.tid = gettid(), .ref = *mutex, .sync_op = locking}; + printf("insert sync on offset %d, as op: %d\n", *mutex, locking); + struct sync_op_t sync_op = {.tid = exec_env->cur_count, .ref = *mutex, .sync_op = locking}; wamr->sync_ops.push_back(sync_op); } diff --git a/src/wamr_wasi_context.cpp b/src/wamr_wasi_context.cpp index 3fb55b4..9b54908 100644 --- a/src/wamr_wasi_context.cpp +++ b/src/wamr_wasi_context.cpp @@ -22,6 +22,7 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) { SocketMetaData socketMetaDataCopy = socketMetaData; this->socket_fd_map[fd] = socketMetaDataCopy; } + this->sync_ops.assign(wamr->sync_ops.begin(), wamr->sync_ops.end()); } void WAMRWASIContext::restore_impl(WASIArguments *env) { int r; @@ -79,4 +80,5 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) { wamr->invoke_frenumber(tmp_sock_fd, fd); } #endif -}; \ No newline at end of file + wamr->sync_ops.assign(this->sync_ops.begin(), this->sync_ops.end()); +}; diff --git a/test/mutex.c b/test/mutex.c index a9d775b..43e6a48 100644 --- a/test/mutex.c +++ b/test/mutex.c @@ -24,6 +24,7 @@ static void *thread(void *arg) { } int main(int argc, char **argv) { + pthread_mutex_init(&m,NULL); pthread_t tids[MAX_NUM_THREADS]; for (int i = 0; i < MAX_NUM_THREADS; i++) { diff --git a/test/openmp.c b/test/openmp.c index 8d1e69d..b5cb197 100644 --- a/test/openmp.c +++ b/test/openmp.c @@ -4,10 +4,10 @@ int g_count = 0; int main(int argc, char **argv) { -#pragma omp parallel num_threads(8) +#pragma omp parallel { printf("Hello World... from thread = %d\n", omp_get_thread_num()); - for (int i = 0; i < 1000000; i++) { + for (int i = 0; i < 100000; i++) { __atomic_fetch_add(&g_count, 1, __ATOMIC_SEQ_CST); printf("print!!!%d\n", i); }