Skip to content

Commit

Permalink
Mutex Works
Browse files Browse the repository at this point in the history
Replay Mutex state via Sync Op record and replay.
Using the naive solution for now because when CondVars
get replayed they will need locks to be held, so handling
that optimization will require some actual thought to get it done properly.

Uses `cur_count` in exec_env as an ID.
  • Loading branch information
Brian Zhao committed Jan 20, 2024
1 parent 92c1451 commit e0f4939
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion include/wamr_exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct WAMRExecEnv { // multiple
//
// /* Previous thread's exec env of a WASM module instance. */
// struct WASMExecEnv *prev;
uint8 cur_count{};
ssize_t cur_count{};

/* Note: field module_inst, argv_buf, native_stack_boundary,
susƒend_flags, aux_stack_boundary, aux_stack_bottom, and
Expand Down
4 changes: 2 additions & 2 deletions include/wamr_memory_instance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct WAMRMemoryInstance {

void dump_impl(WASMMemoryInstance *env) {
module_type = env->module_type;
ref_count = env->ref_count+1;
ref_count = env->ref_count;
LOGV(ERROR)<< "ref_count:" << ref_count;
num_bytes_per_page = env->num_bytes_per_page;
cur_page_count = env->cur_page_count;
Expand All @@ -49,7 +49,7 @@ struct WAMRMemoryInstance {
};
void restore_impl(WASMMemoryInstance *env) {
env->module_type = module_type;
env->ref_count = ref_count;
env->ref_count = ref_count+1;
LOGV(ERROR)<< "ref_count:" << env->ref_count;
env->num_bytes_per_page = num_bytes_per_page;
env->cur_page_count = cur_page_count;
Expand Down
1 change: 1 addition & 0 deletions include/wamr_wasi_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct SocketMetaData {
struct WAMRWASIContext {
std::map<int, std::tuple<std::string, std::vector<std::tuple<int,int,fd_op>>>> fd_map;
std::map<int, SocketMetaData> socket_fd_map;
std::vector<struct sync_op_t> sync_ops;
std::vector<std::string> dir;
std::vector<std::string> map_dir;
WAMRArgvEnvironValues argv_environ;
Expand Down
3 changes: 0 additions & 3 deletions src/checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ void serialize_to_file(WASMExecEnv *instance) {
close(fd);
}
auto all_count = 1;
auto cur_count = 0;
// fill vector
#if !defined(_WIN32)
std::unique_lock as_ul(wamr->as_mtx);
Expand Down Expand Up @@ -184,9 +183,7 @@ void serialize_to_file(WASMExecEnv *instance) {
#endif // windows has no threads so only does it once
auto a = new WAMRExecEnv();
dump(a, instance);
a->cur_count = cur_count;
as.emplace_back(a);
cur_count++;
#if !defined(_WIN32)
elem = (WASMExecEnv *)bh_list_elem_next(elem);
}
Expand Down
17 changes: 14 additions & 3 deletions src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,16 @@ void WAMRInstance::invoke_init_c() {
if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) {
LOGV(ERROR) << "The wasi " << name << " function is not found.";
}
wasm_runtime_call_wasm(exec_env, func, 0, nullptr);
else{
wasm_runtime_call_wasm(exec_env, func, 0, nullptr);
}
auto name1 = "__wasm_call_ctors";
if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) {
LOGV(ERROR) << "The wasi " << name1 << " function is not found.";
}
wasm_runtime_call_wasm(exec_env, func, 0, nullptr);
else{
wasm_runtime_call_wasm(exec_env, func, 0, nullptr);
}
return;
}
int WAMRInstance::invoke_fopen(std::string &path, uint32 option) {
Expand Down Expand Up @@ -509,6 +513,7 @@ WAMRInstance::register_tid_map(){

extern "C" int32 pthread_mutex_lock_wrapper(wasm_exec_env_t, uint32*);
extern "C" int32 pthread_mutex_unlock_wrapper(wasm_exec_env_t, uint32*);
extern "C" int32 pthread_mutex_init_wrapper(wasm_exec_env_t, uint32*, void*);
void
WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){
if(main){
Expand All @@ -531,6 +536,7 @@ WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){
if((*sync_iter).tid == mytid){

// do op
printf("replay %ld, op %d\n", sync_iter->tid, sync_iter->sync_op);
switch(sync_iter->sync_op){
case SYNC_OP_MUTEX_LOCK:
pthread_mutex_lock_wrapper(exec_env, &(sync_iter->ref));
Expand All @@ -539,6 +545,7 @@ WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env){
pthread_mutex_unlock_wrapper(exec_env, &(sync_iter->ref));
break;
}
++sync_iter;
// wakeup everyone
sync_op_cv.notify_all();
}
Expand Down Expand Up @@ -580,6 +587,9 @@ void WAMRInstance::recover(std::vector<std::unique_ptr<WAMRExecEnv>> *execEnv) {
restore(main_exec_env, cur_env);
auto main_env = cur_env;
auto main_saved_call_chain = main_env->restore_call_chain;
cur_thread = main_env->cur_count;
register_tid_map();

fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain);

main_env->is_restore = true;
Expand Down Expand Up @@ -639,7 +649,7 @@ void WAMRInstance::recover(std::vector<std::unique_ptr<WAMRExecEnv>> *execEnv) {

fprintf(stderr, "invoke main %p %p\n", cur_env, cur_env->restore_call_chain);
// replay sync ops to get OS state matching
replay_sync_ops(true, cur_env);
replay_sync_ops(true, main_env);
invoke_main();
}
}
Expand Down Expand Up @@ -726,6 +736,7 @@ WASMExecEnv *restore_env() {
exec_env->restore_call_chain = s;
// */
wamr->cur_thread = exec_env->cur_count;
exec_env->is_restore = true;
fprintf(stderr, "restore_env: %p %p\n", exec_env, s);

Expand Down
2 changes: 2 additions & 0 deletions src/wamr_exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void WAMRExecEnv::dump_impl(WASMExecEnv *env) {
this->frames.emplace_back(dumped_frame);
cur_frame = cur_frame->prev_frame;
}
this->cur_count = env->cur_count;
}
void WAMRExecEnv::restore_impl(WASMExecEnv *env) {
env->suspend_flags.flags = flags;
Expand Down Expand Up @@ -99,4 +100,5 @@ void WAMRExecEnv::restore_impl(WASMExecEnv *env) {
}
env->cur_frame = prev_frame;
}
env->cur_count = this->cur_count;
}
3 changes: 2 additions & 1 deletion src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ int gettid() { return GetCurrentThreadId(); }
#endif

void insert_sync_op(wasm_exec_env_t exec_env, uint32* mutex, enum sync_op locking){
struct sync_op_t sync_op = {.tid = gettid(), .ref = *mutex, .sync_op = locking};
printf("insert sync on offset %d, as op: %d\n", *mutex, locking);
struct sync_op_t sync_op = {.tid = exec_env->cur_count, .ref = *mutex, .sync_op = locking};
wamr->sync_ops.push_back(sync_op);
}

Expand Down
4 changes: 3 additions & 1 deletion src/wamr_wasi_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ void WAMRWASIContext::dump_impl(WASIArguments *env) {
SocketMetaData socketMetaDataCopy = socketMetaData;
this->socket_fd_map[fd] = socketMetaDataCopy;
}
this->sync_ops.assign(wamr->sync_ops.begin(), wamr->sync_ops.end());
}
void WAMRWASIContext::restore_impl(WASIArguments *env) {
int r;
Expand Down Expand Up @@ -79,4 +80,5 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
wamr->invoke_frenumber(tmp_sock_fd, fd);
}
#endif
};
wamr->sync_ops.assign(this->sync_ops.begin(), this->sync_ops.end());
};
1 change: 1 addition & 0 deletions test/mutex.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static void *thread(void *arg) {
}

int main(int argc, char **argv) {
pthread_mutex_init(&m,NULL);
pthread_t tids[MAX_NUM_THREADS];

for (int i = 0; i < MAX_NUM_THREADS; i++) {
Expand Down
4 changes: 2 additions & 2 deletions test/openmp.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
int g_count = 0;

int main(int argc, char **argv) {
#pragma omp parallel num_threads(8)
#pragma omp parallel
{
printf("Hello World... from thread = %d\n", omp_get_thread_num());
for (int i = 0; i < 1000000; i++) {
for (int i = 0; i < 100000; i++) {
__atomic_fetch_add(&g_count, 1, __ATOMIC_SEQ_CST);
printf("print!!!%d\n", i);
}
Expand Down

0 comments on commit e0f4939

Please sign in to comment.