From f8c00e8ff9fbaf7f8a8dc7d9ddf217305bf786b3 Mon Sep 17 00:00:00 2001 From: victoryang00 Date: Wed, 6 Mar 2024 23:23:53 +0000 Subject: [PATCH] push the serializer for socket and benchmark result --- doc/burst_computing.md | 0 doc/optimistic_computing.md | 1 + include/wamr.h | 1 + include/wamr_module_instance_extra.h | 1 + include/wamr_read_write.h | 148 ++- include/wamr_serializer.h | 16 + include/wamr_type.h | 2 + src/checkpoint.cpp | 203 +--- src/profile.cpp | 15 +- src/restore.cpp | 46 +- src/wamr.cpp | 1475 +++++++++++++++----------- 11 files changed, 1037 insertions(+), 871 deletions(-) create mode 100644 doc/burst_computing.md create mode 100644 doc/optimistic_computing.md diff --git a/doc/burst_computing.md b/doc/burst_computing.md new file mode 100644 index 0000000..e69de29 diff --git a/doc/optimistic_computing.md b/doc/optimistic_computing.md new file mode 100644 index 0000000..8168fb7 --- /dev/null +++ b/doc/optimistic_computing.md @@ -0,0 +1 @@ +# Optimistic Computing diff --git a/include/wamr.h b/include/wamr.h index b275b7a..d4c4086 100644 --- a/include/wamr.h +++ b/include/wamr.h @@ -88,6 +88,7 @@ struct mvvm_op_data { int size; SocketAddrPool addr[MVVM_MAX_ADDR][2]; }; +std::string removeExtension(std::string &); bool is_ip_in_cidr(const char *base_ip, int subnet_mask_len, uint32_t ip); bool is_ipv6_in_cidr(const char *base_ip_str, int subnet_mask_len, struct in6_addr *ip); long get_rss(); diff --git a/include/wamr_module_instance_extra.h b/include/wamr_module_instance_extra.h index 7f3b020..7535aac 100644 --- a/include/wamr_module_instance_extra.h +++ b/include/wamr_module_instance_extra.h @@ -16,6 +16,7 @@ #if WASM_ENABLE_WASI_NN != 0 #include "wamr_wasi_nn_context.h" #endif +#include "wamr_serializer.h" #include "wasm_runtime.h" #include #include diff --git a/include/wamr_read_write.h b/include/wamr_read_write.h index d328d40..894b603 100644 --- a/include/wamr_read_write.h +++ b/include/wamr_read_write.h @@ -13,25 +13,165 @@ #ifndef MVVM_WAMR_READ_WRITE_H #define MVVM_WAMR_READ_WRITE_H #include "struct_pack/struct_pack.hpp" +#include #ifndef _WIN32 +#include +#include +#include #include #endif -struct FwriteStream { +struct WriteStream { + virtual bool write(const char *data, std::size_t sz) const = 0; +}; +struct ReadStream { + virtual bool read(char *data, std::size_t sz) const = 0; + virtual bool ignore(std::size_t sz) const = 0; + virtual std::size_t tellg() const = 0; +}; +struct FwriteStream : public WriteStream { FILE *file; bool write(const char *data, std::size_t sz) const { return fwrite(data, sz, 1, file) == 1; } explicit FwriteStream(const char *file_name) : file(fopen(file_name, "wb")) {} ~FwriteStream() { fclose(file); } }; -struct FreadStream { +struct FreadStream : public ReadStream { FILE *file; bool read(char *data, std::size_t sz) const { return fread(data, sz, 1, file) == 1; } - [[nodiscard]] bool ignore(std::size_t sz) const { return fseek(file, sz, SEEK_CUR) == 0; } - [[nodiscard]] std::size_t tellg() const { + bool ignore(std::size_t sz) const { return fseek(file, sz, SEEK_CUR) == 0; } + std::size_t tellg() const { // if you worry about ftell performance, just use an variable to record it. return ftell(file); } explicit FreadStream(const char *file_name) : file(fopen(file_name, "rb")) {} ~FreadStream() { fclose(file); } }; +static_assert(ReaderStreamTrait, "Reader must conform to ReaderStreamTrait"); +static_assert(WriterStreamTrait, "Writer must conform to WriterStreamTrait"); + +struct SocketWriteStream : public WriteStream { + int sock_fd; // Socket file descriptor + + bool write(const char *data, std::size_t sz) const { + std::size_t totalSent = 0; + while (totalSent < sz) { + ssize_t sent = send(sock_fd, data + totalSent, sz - totalSent, 0); + if (sent == -1) { + // Handle error. For simplicity, just returning false here. + return false; + } + totalSent += sent; + } + return true; + } + explicit SocketWriteStream(const char *address, int port) { + // Example: Initialize a client socket and connect to the given address and port + sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd == -1) { + SPDLOG_ERROR("Socket creation failed\n"); + return; + } + + sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + // Convert address from text to binary form + inet_pton(AF_INET, address, &server_addr.sin_addr); + + if (connect(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { + SPDLOG_ERROR("Connection failed\n"); + close(sock_fd); + exit(EXIT_FAILURE); + } + } + + ~SocketWriteStream() { + close(sock_fd); // Close the socket descriptor + } +}; +struct SocketReadStream : public ReadStream { + int sock_fd; // Socket file descriptor + int client_fd; + mutable std::size_t position = 0; // Track the amount of data read + + // bool read(char *data, std::size_t sz) const { + // ssize_t bytes_read = recv(client_fd, data, sz, 0); + // SPDLOG_DEBUG("{}, {}",data,sz); + // if (bytes_read > 0) { + // position += bytes_read; + // return static_cast(bytes_read) == sz; + // } + // return false; + // } + bool read(char *data, std::size_t sz) const { + std::size_t totalReceived = 0; + while (totalReceived < sz) { + ssize_t received = recv(client_fd, data + totalReceived, sz - totalReceived, 0); + if (received == -1) { + // Handle error. For simplicity, just returning false here. + return false; + } else if (received == 0) { + // Connection closed + return false; + } + totalReceived += received; + } + position += totalReceived; + return true; + } + + explicit SocketReadStream(const char *address, int port) { + // Example: Initialize a client socket and connect to the given address and port + sock_fd = socket(AF_INET, SOCK_STREAM, 0); + if (sock_fd == -1) { + SPDLOG_ERROR("Socket creation failed\n"); + return; + } + + sockaddr_in server_addr; + server_addr.sin_family = AF_INET; + server_addr.sin_port = htons(port); + // Convert address from text to binary form + inet_pton(AF_INET, address, &server_addr.sin_addr); + auto addr_len = sizeof(server_addr); + SPDLOG_INFO("[Server] Bind socket {} {}\n", address, port); + if (bind(sock_fd, (struct sockaddr *)&server_addr, addr_len) < 0) { + SPDLOG_ERROR("Bind failed"); + exit(EXIT_FAILURE); + } + + SPDLOG_INFO("[Server] Listening on socket\n"); + if (listen(sock_fd, 3) < 0) { + SPDLOG_ERROR("Listen failed"); + exit(EXIT_FAILURE); + } + client_fd = accept(sock_fd, (struct sockaddr *)&server_addr, (socklen_t *)&addr_len); + } + // "Ignore" sz bytes of data + bool ignore(std::size_t sz) const { + char buffer[1024]; // Temporary buffer to discard data + std::size_t total_ignored = 0; + + while (total_ignored < sz) { + std::size_t to_ignore = std::min(sz - total_ignored, sizeof(buffer)); + ssize_t ignored = recv(client_fd, buffer, to_ignore, 0); + if (ignored <= 0) { // Check for error or close + return false; + } + total_ignored += ignored; + position += ignored; // Update position + } + + return true; + } + + // Report the current position + std::size_t tellg() const { return position; } + ~SocketReadStream() { + close(sock_fd); // Close the socket descriptor + } +}; +static_assert(ReaderStreamTrait, "Reader must conform to ReaderStreamTrait"); +static_assert(WriterStreamTrait, "Writer must conform to WriterStreamTrait"); + #endif /* MVVM_WAMR_READ_WRITE_H */ diff --git a/include/wamr_serializer.h b/include/wamr_serializer.h index 20dffd1..05dc0dc 100644 --- a/include/wamr_serializer.h +++ b/include/wamr_serializer.h @@ -27,4 +27,20 @@ concept CheckerTrait = requires(T &t, K k) { { t->equal_impl(k) } -> std::convertible_to; }; +template +concept WriterStreamTrait = requires(T &t, const WriteDataType *data, std::size_t size) { + // Requires a write method that accepts WriteDataType and returns void or a boolean. + { t.write(data, size) } -> std::same_as; +}; + +template +concept ReaderStreamTrait = requires(T &t, ReadDataType *data, std::size_t size) { + // Requires a read method that accepts a pointer to ReadDataType and size, returns bool. + { t.read(data, size) } -> std::same_as; + // Requires an ignore method that accepts size and returns bool. + { t.ignore(size) } -> std::same_as; + // Requires a tellg method that returns std::size_t. + { t.tellg() } -> std::same_as; +}; + #endif // MVVM_WAMR_SERIALIZER_H diff --git a/include/wamr_type.h b/include/wamr_type.h index 33ba5a3..b6f8ba8 100644 --- a/include/wamr_type.h +++ b/include/wamr_type.h @@ -13,6 +13,8 @@ #ifndef MVVM_WAMR_TYPE_H #define MVVM_WAMR_TYPE_H #include "wasm_runtime.h" +#include "wamr_serializer.h" + struct WAMRType { uint16 param_count; uint16 result_count; diff --git a/src/checkpoint.cpp b/src/checkpoint.cpp index fd23170..51123d3 100644 --- a/src/checkpoint.cpp +++ b/src/checkpoint.cpp @@ -26,184 +26,10 @@ WAMRInstance *wamr = nullptr; std::ostringstream re{}; -FwriteStream *writer; +WriteStream *writer; std::vector> as; std::mutex as_mtx; long snapshot_memory = 0; -void serialize_to_file(WASMExecEnv *instance) { - // gateway - auto start = std::chrono::high_resolution_clock::now(); - if (snapshot_memory == 0) - snapshot_memory = get_rss(); -#if WASM_ENABLE_LIB_PTHREAD != 0 - auto cluster = wasm_exec_env_get_cluster(instance); - auto all_count = bh_list_length(&cluster->exec_env_list); - // fill vector - - std::unique_lock as_ul(wamr->as_mtx); - SPDLOG_DEBUG("get lock"); - wamr->ready++; - wamr->lwcp_list[((uint64_t)instance->handle)]++; - if (wamr->ready == all_count) { - wamr->should_snapshot = true; - } - // If we're not all ready - SPDLOG_DEBUG("thread {}, with {} ready out of {} total", ((uint64_t)instance->handle), wamr->ready, all_count); -#endif -#if !defined(_WIN32) - if (!wamr->socket_fd_map_.empty() && wamr->should_snapshot) { - // tell gateway to keep alive the server - struct sockaddr_in addr {}; - int fd = 0; - ssize_t rc; - SocketAddrPool src_addr{}; - bool is_server = false; - for (auto [tmp_fd, sock_data] : wamr->socket_fd_map_) { - if (sock_data.is_server) { - is_server = true; - break; - } - } - wamr->op_data.op = is_server ? MVVM_SOCK_SUSPEND_TCP_SERVER : MVVM_SOCK_SUSPEND; - - for (auto [tmp_fd, sock_data] : wamr->socket_fd_map_) { - int idx = wamr->op_data.size; - src_addr = sock_data.socketAddress; - auto tmp_ip4 = - fmt::format("{}.{}.{}.{}", src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], src_addr.ip4[3]); - auto tmp_ip6 = - fmt::format("{}:{}:{}:{}:{}:{}:{}:{}", src_addr.ip6[0], src_addr.ip6[1], src_addr.ip6[2], - src_addr.ip6[3], src_addr.ip6[4], src_addr.ip6[5], src_addr.ip6[6], src_addr.ip6[7]); - if (src_addr.is_4 && tmp_ip4 == "0.0.0.0" || !src_addr.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") { - src_addr = wamr->local_addr; - src_addr.port = sock_data.socketAddress.port; - } - SPDLOG_INFO("addr: {} {}.{}.{}.{} port: {}", tmp_fd, src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], - src_addr.ip4[3], src_addr.port); - // make the rest coroutine? - tmp_ip4 = fmt::format("{}.{}.{}.{}", sock_data.socketSentToData.dest_addr.ip.ip4[0], - sock_data.socketSentToData.dest_addr.ip.ip4[1], - sock_data.socketSentToData.dest_addr.ip.ip4[2], - sock_data.socketSentToData.dest_addr.ip.ip4[3]); - tmp_ip6 = fmt::format( - "{}:{}:{}:{}:{}:{}:{}:{}", sock_data.socketSentToData.dest_addr.ip.ip6[0], - sock_data.socketSentToData.dest_addr.ip.ip6[1], sock_data.socketSentToData.dest_addr.ip.ip6[2], - sock_data.socketSentToData.dest_addr.ip.ip6[3], sock_data.socketSentToData.dest_addr.ip.ip6[4], - sock_data.socketSentToData.dest_addr.ip.ip6[5], sock_data.socketSentToData.dest_addr.ip.ip6[6], - sock_data.socketSentToData.dest_addr.ip.ip6[7]); - if (tmp_ip4 == "0.0.0.0" || tmp_ip6 == "0:0:0:0:0:0:0:0") { - if (!wamr->op_data.is_tcp) { - if (sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip4 == "0.0.0.0" || - !sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") { - wamr->op_data.addr[idx][1].is_4 = sock_data.socketRecvFromDatas[0].src_addr.ip.is_4; - std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketRecvFromDatas[0].src_addr.ip.ip4, - sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip4)); - std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketRecvFromDatas[0].src_addr.ip.ip6, - sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip6)); - wamr->op_data.addr[idx][1].port = sock_data.socketRecvFromDatas[0].src_addr.port; - } else { - wamr->op_data.addr[idx][1].is_4 = sock_data.socketSentToData.dest_addr.ip.is_4; - std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketSentToData.dest_addr.ip.ip4, - sizeof(sock_data.socketSentToData.dest_addr.ip.ip4)); - std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketSentToData.dest_addr.ip.ip6, - sizeof(sock_data.socketSentToData.dest_addr.ip.ip6)); - wamr->op_data.addr[idx][1].port = sock_data.socketSentToData.dest_addr.port; - } - } else { - // if it's not socket - if (!is_server) { - int tmp_fd = 0; - unsigned int size_ = sizeof(sockaddr_in); - sockaddr_in *ss = (sockaddr_in *)malloc(size_); - wamr->invoke_sock_getsockname(tmp_fd, (sockaddr **)&ss, &size_); - if (ss->sin_family == AF_INET) { - auto *ipv4 = (struct sockaddr_in *)ss; - uint32_t ip = ntohl(ipv4->sin_addr.s_addr); - wamr->op_data.addr[idx][1].is_4 = true; - wamr->op_data.addr[idx][1].ip4[0] = (ip >> 24) & 0xFF; - wamr->op_data.addr[idx][1].ip4[1] = (ip >> 16) & 0xFF; - wamr->op_data.addr[idx][1].ip4[2] = (ip >> 8) & 0xFF; - wamr->op_data.addr[idx][1].ip4[3] = ip & 0xFF; - wamr->op_data.addr[idx][1].port = ntohs(ipv4->sin_port); - } else { - auto *ipv6 = (struct sockaddr_in6 *)ss; - wamr->op_data.addr[idx][1].is_4 = false; - const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr; - for (int i = 0; i < 16; i += 2) { - wamr->op_data.addr[idx][1].ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1]; - } - wamr->op_data.addr[idx][1].port = ntohs(ipv6->sin6_port); - } - free(ss); - } else if (sock_data.is_server) { - wamr->op_data.size--; - } - } - } - SPDLOG_DEBUG("dest_addr: {}.{}.{}.{}:{}", wamr->op_data.addr[idx][1].ip4[0], - wamr->op_data.addr[idx][1].ip4[1], wamr->op_data.addr[idx][1].ip4[2], - wamr->op_data.addr[idx][1].ip4[3], wamr->op_data.addr[idx][1].port); - wamr->op_data.size += 1; - } - // Create a socket - if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - SPDLOG_ERROR("socket error"); - throw std::runtime_error("socket error"); - } - - addr.sin_family = AF_INET; - addr.sin_port = htons(MVVM_SOCK_PORT); - - // Convert IPv4 and IPv6 addresses from text to binary form - if (inet_pton(AF_INET, MVVM_SOCK_ADDR, &addr.sin_addr) <= 0) { - SPDLOG_ERROR("AF_INET not supported"); - exit(EXIT_FAILURE); - } - // Connect to the server - if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - SPDLOG_ERROR("Connection Failed {}", errno); - exit(EXIT_FAILURE); - } - - SPDLOG_DEBUG("Connected successfully"); - rc = send(fd, &wamr->op_data, sizeof(struct mvvm_op_data), 0); - if (rc == -1) { - SPDLOG_ERROR("send error"); - exit(EXIT_FAILURE); - } - - // Clean up - close(fd); - } -#endif -#if WASM_ENABLE_LIB_PTHREAD != 0 - if (wamr->ready < all_count) { - // Then wait for someone else to get here and finish the job - std::condition_variable as_cv; - as_cv.wait(as_ul); - } - wasm_cluster_suspend_all_except_self(cluster, instance); - auto elem = (WASMExecEnv *)bh_list_first_elem(&cluster->exec_env_list); - while (elem) { - instance = elem; -#endif // windows has no threads so only does it once - auto a = new WAMRExecEnv(); - dump(a, instance); - as.emplace_back(a); -#if WASM_ENABLE_LIB_PTHREAD != 0 - elem = (WASMExecEnv *)bh_list_elem_next(elem); - } - // finish filling vector -#endif - auto used_memory = get_rss(); - struct_pack::serialize_to(*writer, as); - auto end = std::chrono::high_resolution_clock::now(); - // get duration in us - auto dur = std::chrono::duration_cast(end - start); - SPDLOG_INFO("Snapshot time: {} s", dur.count() / 1000000.0); - SPDLOG_INFO("Memory usage: {} MB", (used_memory - snapshot_memory) / 1024 /1024); - exit(EXIT_SUCCESS); -} int main(int argc, char *argv[]) { spdlog::cfg::load_env_levels(); @@ -227,19 +53,10 @@ int main(int argc, char *argv[]) { "i,is_debug", "The value for is_debug value", cxxopts::value()->default_value("false"))( "f,function", "The function index to test execution", cxxopts::value()->default_value("0"))( "x,function_count", "The function count to stop", cxxopts::value()->default_value("0"))( + "o,offload_addr", "The next hop to offload", cxxopts::value()->default_value(""))( + "s,offload_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( "c,count", "The step index to test execution", cxxopts::value()->default_value("0")); - auto removeExtension = [](std::string &filename) { - size_t dotPos = filename.find_last_of('.'); - std::string res; - if (dotPos != std::string::npos) { - // Extract the substring before the period - res = filename.substr(0, dotPos); - } else { - // If there's no period in the string, it means there's no extension. - SPDLOG_ERROR("No extension found."); - } - return res; - }; + auto result = options.parse(argc, argv); if (result["help"].as()) { std::cout << options.help() << std::endl; @@ -252,6 +69,8 @@ int main(int argc, char *argv[]) { auto env = result["env"].as>(); auto arg = result["arg"].as>(); auto addr = result["addr"].as>(); + auto offload_addr = result["offload_addr"].as(); + auto offload_port = result["offload_port"].as(); auto ns_pool = result["ns_pool"].as>(); snapshot_threshold = result["count"].as(); stop_func_threshold = result["function_count"].as(); @@ -270,13 +89,15 @@ int main(int argc, char *argv[]) { SPDLOG_DEBUG("arg {}", e); } register_sigtrap(); - - writer = new FwriteStream((removeExtension(target) + ".bin").c_str()); + if (offload_addr.empty()) + writer = new FwriteStream((removeExtension(target) + ".bin").c_str()); + else + writer = new SocketWriteStream(offload_addr.c_str(), offload_port); wamr = new WAMRInstance(target.c_str(), is_jit); wamr->set_wasi_args(dir, map_dir, env, arg, addr, ns_pool); wamr->instantiate(); - // wamr->get_int3_addr(); - // wamr->replace_int3_with_nop(); + wamr->get_int3_addr(); + wamr->replace_int3_with_nop(); // freopen("output.txt", "w", stdout); #if defined(_WIN32) diff --git a/src/profile.cpp b/src/profile.cpp index 93e23a4..8c3d950 100644 --- a/src/profile.cpp +++ b/src/profile.cpp @@ -32,8 +32,6 @@ std::vector> as; std::mutex as_mtx; long snapshot_memory = 0; -void serialize_to_file(WASMExecEnv *instance) {} - std::vector> stack_record; void unwind(WASMExecEnv *instance) { auto cur_frame = (AOTFrame *)instance->cur_frame; @@ -46,9 +44,7 @@ void unwind(WASMExecEnv *instance) { stack_record.emplace_back(stack); } -void profile_sigint_handler(int sig) { - wamr->replace_nop_with_int3(); -} +void profile_sigint_handler(int sig) { wamr->replace_nop_with_int3(); } void profile_sigtrap_handler(int sig) { auto exec_env = wamr->get_exec_env(); @@ -84,9 +80,8 @@ void profile_register_sigint() { int main(int argc, char *argv[]) { spdlog::cfg::load_env_levels(); - cxxopts::Options options( - "MVVM_profile", - "Migratable Velocity Virtual Machine profile part, to find the hotspot function."); + cxxopts::Options options("MVVM_profile", + "Migratable Velocity Virtual Machine profile part, to find the hotspot function."); options.add_options()("t,target", "The webassembly file to execute", cxxopts::value()->default_value("./test/counter.wasm"))( "j,jit", "Whether the jit mode or interp mode", cxxopts::value()->default_value("false"))( @@ -105,7 +100,7 @@ int main(int argc, char *argv[]) { "f,function", "The function index to test execution", cxxopts::value()->default_value("0"))( "x,function_count", "The function count to stop", cxxopts::value()->default_value("0"))( "c,count", "The step index to test execution", cxxopts::value()->default_value("0"))( - "w,wasm", "The wasm file", cxxopts::value()->default_value("")); + "w,wasm", "The wasm file", cxxopts::value()->default_value("")); auto removeExtension = [](std::string &filename) { size_t dotPos = filename.find_last_of('.'); std::string res; @@ -232,7 +227,7 @@ int main(int argc, char *argv[]) { func_name[e] = name; } pclose(pipe); - + // print the result std::cout << "Last level function called count\n" << "--------------------------------\n" diff --git a/src/restore.cpp b/src/restore.cpp index 695eecb..7b12ff7 100644 --- a/src/restore.cpp +++ b/src/restore.cpp @@ -24,12 +24,11 @@ #include #endif -FreadStream *reader; -FwriteStream *writer; +ReadStream *reader; +WriteStream *writer; WAMRInstance *wamr = nullptr; - -void serialize_to_file(WASMExecEnv *instance) {} - +std::vector> as; +long snapshot_memory = 0; int main(int argc, char **argv) { spdlog::cfg::load_env_levels(); cxxopts::Options options("MVVM", "Migratable Velocity Virtual Machine, to ship the VM state to another machine"); @@ -37,20 +36,12 @@ int main(int argc, char **argv) { cxxopts::value()->default_value("./test/counter.wasm"))( "j,jit", "Whether the jit mode or interp mode", cxxopts::value()->default_value("false"))( "h,help", "The value for epoch value", cxxopts::value()->default_value("false"))( + "i,source_addr", "The next hop to offload", cxxopts::value()->default_value(""))( + "e,source_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( + "o,offload_addr", "The next hop to offload", cxxopts::value()->default_value(""))( + "s,offload_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( "c,count", "The value for epoch value", cxxopts::value()->default_value("0")); // Can first discover from the wasi context. - auto removeExtension = [](std::string &filename) { - size_t dotPos = filename.find_last_of('.'); - std::string res; - if (dotPos != std::string::npos) { - // Extract the substring before the period - res = filename.substr(0, dotPos); - } else { - // If there's no period in the string, it means there's no extension. - SPDLOG_ERROR("No extension found."); - } - return res; - }; auto result = options.parse(argc, argv); if (result["help"].as()) { @@ -58,13 +49,30 @@ int main(int argc, char **argv) { exit(0); } auto target = result["target"].as(); + auto source_addr = result["source_addr"].as(); + auto source_port = result["source_port"].as(); + auto offload_addr = result["offload_addr"].as(); + auto offload_port = result["offload_port"].as(); auto count = result["count"].as(); snapshot_threshold = count; register_sigtrap(); - reader = new FreadStream((removeExtension(target) + ".bin").c_str()); wamr = new WAMRInstance(target.c_str(), false); + wamr->instantiate(); + + wamr->get_int3_addr(); + wamr->replace_int3_with_nop(); + if (source_addr.empty()) + reader = new FreadStream((removeExtension(target) + ".bin").c_str()); // writer + else + reader = new SocketReadStream(source_addr.c_str(), source_port); + + if (offload_addr.empty()) + writer = new FwriteStream((removeExtension(target) + ".bin").c_str()); + else + writer = new SocketWriteStream(offload_addr.c_str(), offload_port); + auto a = struct_pack::deserialize>>(*reader).value(); // is server for all and the is server? #if !defined(_WIN32) @@ -78,7 +86,7 @@ int main(int argc, char **argv) { SPDLOG_DEBUG("new ip {}.{}.{}.{}:{}", src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], src_addr.ip4[3], src_addr.port); // got from wamr - for (auto &[fd, socketMetaData] : a[a.size() - 1]->module_inst.wasi_ctx.socket_fd_map) { + for (auto &[_, socketMetaData] : a[a.size() - 1]->module_inst.wasi_ctx.socket_fd_map) { wamr->op_data.is_tcp |= socketMetaData.type; is_tcp_server |= socketMetaData.is_server; } diff --git a/src/wamr.cpp b/src/wamr.cpp index 1494308..0e062f0 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -1,14 +1,14 @@ /* -* The WebAssembly Live Migration Project -* -* By: Aibo Hu -* Yiwei Yang -* Brian Zhao -* Andrew Quinn -* -* Copyright 2024 Regents of the Univeristy of California -* UC Santa Cruz Sluglab. -*/ + * The WebAssembly Live Migration Project + * + * By: Aibo Hu + * Yiwei Yang + * Brian Zhao + * Andrew Quinn + * + * Copyright 2024 Regents of the Univeristy of California + * UC Santa Cruz Sluglab. + */ #include "wamr.h" #include "platform_api_vmcore.h" @@ -30,384 +30,399 @@ #if defined(__linux__) #include #elif defined(__APPLE__) -#include #include +#include #else -#include #include +#include #endif WAMRInstance::ThreadArgs **argptr; std::counting_semaphore<100> wakeup(0); std::counting_semaphore<100> thread_init(0); - +extern long snapshot_memory; +extern WriteStream *writer; +extern std::vector> as; + +std::string removeExtension(std::string &filename) { + size_t dotPos = filename.find_last_of('.'); + std::string res; + if (dotPos != std::string::npos) { + // Extract the substring before the period + res = filename.substr(0, dotPos); + } else { + // If there's no period in the string, it means there's no extension. + SPDLOG_ERROR("No extension found."); + } + return res; +}; static auto string_vec_to_cstr_array = [](const std::vector &vecStr) { - std::vector cstrArray(vecStr.size()); - if (vecStr.data() == nullptr || vecStr[0].empty()) - return std::vector(0); - SPDLOG_DEBUG("vecStr[0]: {}", vecStr[0]); - std::transform(vecStr.begin(), vecStr.end(), cstrArray.begin(), [](const std::string &str) { return str.c_str(); }); - return cstrArray; + std::vector cstrArray(vecStr.size()); + if (vecStr.data() == nullptr || vecStr[0].empty()) + return std::vector(0); + SPDLOG_DEBUG("vecStr[0]: {}", vecStr[0]); + std::transform(vecStr.begin(), vecStr.end(), cstrArray.begin(), [](const std::string &str) { return str.c_str(); }); + return cstrArray; }; WAMRInstance::WAMRInstance(const char *wasm_path, bool is_jit, std::string policy) : is_jit(is_jit), policy(policy) { - { - std::string path(wasm_path); - - if (path.substr(path.length() - 5) == ".wasm") { - is_aot = false; - wasm_file_path = path; - aot_file_path = path.substr(0, path.length() - 5) + ".aot"; - } else if (path.substr(path.length() - 4) == ".aot") { - is_aot = true; - wasm_file_path = path.substr(0, path.length() - 4) + ".wasm"; - aot_file_path = path; - } else { - std::cout << "Invalid file extension. Please provide a path ending in either '.wasm' or '.aot'." - << std::endl; - throw; - } - } - - RuntimeInitArgs wasm_args; - memset(&wasm_args, 0, sizeof(RuntimeInitArgs)); - wasm_args.mem_alloc_type = Alloc_With_Allocator; - wasm_args.mem_alloc_option.allocator.malloc_func = ((void *)malloc); - wasm_args.mem_alloc_option.allocator.realloc_func = ((void *)realloc); - wasm_args.mem_alloc_option.allocator.free_func = ((void *)free); - wasm_args.max_thread_num = 16; - if (!is_jit) - wasm_args.running_mode = RunningMode::Mode_Interp; - else - wasm_args.running_mode = RunningMode::Mode_LLVM_JIT; - // static char global_heap_buf[512 * 1024];// what is this? - // wasm_args.mem_alloc_type = Alloc_With_Pool; - // wasm_args.mem_alloc_option.pool.heap_buf = global_heap_buf; - // wasm_args.mem_alloc_option.pool.heap_size = sizeof(global_heap_buf); - bh_log_set_verbose_level(0); - if (!wasm_runtime_full_init(&wasm_args)) { - SPDLOG_ERROR("Init runtime environment failed."); - throw; - } - // initialiseWAMRNatives(); - char *buffer{}; - if (!load_wasm_binary(wasm_path, &buffer)) { - SPDLOG_ERROR("Load wasm binary failed.\n"); - throw; - } - module = wasm_runtime_load((uint8_t *)buffer, buf_size, error_buf, sizeof(error_buf)); - if (!module) { - SPDLOG_ERROR("Load wasm module failed. error: {}", error_buf); - throw; - } + { + std::string path(wasm_path); + + if (path.substr(path.length() - 5) == ".wasm") { + is_aot = false; + wasm_file_path = path; + aot_file_path = path.substr(0, path.length() - 5) + ".aot"; + } else if (path.substr(path.length() - 4) == ".aot") { + is_aot = true; + wasm_file_path = path.substr(0, path.length() - 4) + ".wasm"; + aot_file_path = path; + } else { + std::cout << "Invalid file extension. Please provide a path ending in either '.wasm' or '.aot'." + << std::endl; + throw; + } + } + + RuntimeInitArgs wasm_args; + memset(&wasm_args, 0, sizeof(RuntimeInitArgs)); + wasm_args.mem_alloc_type = Alloc_With_Allocator; + wasm_args.mem_alloc_option.allocator.malloc_func = ((void *)malloc); + wasm_args.mem_alloc_option.allocator.realloc_func = ((void *)realloc); + wasm_args.mem_alloc_option.allocator.free_func = ((void *)free); + wasm_args.max_thread_num = 16; + if (!is_jit) + wasm_args.running_mode = RunningMode::Mode_Interp; + else + wasm_args.running_mode = RunningMode::Mode_LLVM_JIT; + // static char global_heap_buf[512 * 1024];// what is this? + // wasm_args.mem_alloc_type = Alloc_With_Pool; + // wasm_args.mem_alloc_option.pool.heap_buf = global_heap_buf; + // wasm_args.mem_alloc_option.pool.heap_size = sizeof(global_heap_buf); + bh_log_set_verbose_level(0); + if (!wasm_runtime_full_init(&wasm_args)) { + SPDLOG_ERROR("Init runtime environment failed."); + throw; + } + // initialiseWAMRNatives(); + char *buffer{}; + if (!load_wasm_binary(wasm_path, &buffer)) { + SPDLOG_ERROR("Load wasm binary failed.\n"); + throw; + } + module = wasm_runtime_load((uint8_t *)buffer, buf_size, error_buf, sizeof(error_buf)); + if (!module) { + SPDLOG_ERROR("Load wasm module failed. error: {}", error_buf); + throw; + } #if !defined(_WIN32) - struct ifaddrs *ifaddr, *ifa; - int family, s; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) == -1) { - SPDLOG_ERROR("getifaddrs"); - exit(EXIT_FAILURE); - } - - for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { - if (ifa->ifa_addr == nullptr) - continue; - - if (ifa->ifa_addr->sa_family == AF_INET) { - // IPv4 - auto *ipv4 = (struct sockaddr_in *)ifa->ifa_addr; - uint32_t ip = ntohl(ipv4->sin_addr.s_addr); - if (is_ip_in_cidr(MVVM_SOCK_ADDR, MVVM_SOCK_MASK, ip)) { - // Extract IPv4 address - local_addr.ip4[0] = (ip >> 24) & 0xFF; - local_addr.ip4[1] = (ip >> 16) & 0xFF; - local_addr.ip4[2] = (ip >> 8) & 0xFF; - local_addr.ip4[3] = ip & 0xFF; - if (local_addr.ip4[1] == 17) { - break; - } - } - - } else if (ifa->ifa_addr->sa_family == AF_INET6) { - // IPv6 - auto *ipv6 = (struct sockaddr_in6 *)ifa->ifa_addr; - // Extract IPv6 address - const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr; - if (is_ipv6_in_cidr(MVVM_SOCK_ADDR6, MVVM_SOCK_MASK6, &ipv6->sin6_addr)) { - for (int i = 0; i < 16; i += 2) { - local_addr.ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1]; - } - } - } - } - local_addr.is_4 = true; - - freeifaddrs(ifaddr); + struct ifaddrs *ifaddr, *ifa; + int family, s; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) { + SPDLOG_ERROR("getifaddrs"); + exit(EXIT_FAILURE); + } + + for (ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr) + continue; + + if (ifa->ifa_addr->sa_family == AF_INET) { + // IPv4 + auto *ipv4 = (struct sockaddr_in *)ifa->ifa_addr; + uint32_t ip = ntohl(ipv4->sin_addr.s_addr); + if (is_ip_in_cidr(MVVM_SOCK_ADDR, MVVM_SOCK_MASK, ip)) { + // Extract IPv4 address + local_addr.ip4[0] = (ip >> 24) & 0xFF; + local_addr.ip4[1] = (ip >> 16) & 0xFF; + local_addr.ip4[2] = (ip >> 8) & 0xFF; + local_addr.ip4[3] = ip & 0xFF; + if (local_addr.ip4[1] == 17) { + break; + } + } + + } else if (ifa->ifa_addr->sa_family == AF_INET6) { + // IPv6 + auto *ipv6 = (struct sockaddr_in6 *)ifa->ifa_addr; + // Extract IPv6 address + const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr; + if (is_ipv6_in_cidr(MVVM_SOCK_ADDR6, MVVM_SOCK_MASK6, &ipv6->sin6_addr)) { + for (int i = 0; i < 16; i += 2) { + local_addr.ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1]; + } + } + } + } + local_addr.is_4 = true; + + freeifaddrs(ifaddr); #endif } bool WAMRInstance::load_wasm_binary(const char *wasm_path, char **buffer_ptr) { - *buffer_ptr = bh_read_file_to_buffer(wasm_path, &buf_size); - if (!*buffer_ptr) { - SPDLOG_ERROR("Open wasm app file failed.\n"); - return false; - } - if ((get_package_type((const uint8_t *)*buffer_ptr, buf_size) != Wasm_Module_Bytecode) && - (get_package_type((const uint8_t *)*buffer_ptr, buf_size) != Wasm_Module_AoT)) { - SPDLOG_ERROR("WASM bytecode or AOT object is expected but other file format"); - - BH_FREE(*buffer_ptr); - return false; - } - - return true; + *buffer_ptr = bh_read_file_to_buffer(wasm_path, &buf_size); + if (!*buffer_ptr) { + SPDLOG_ERROR("Open wasm app file failed.\n"); + return false; + } + if ((get_package_type((const uint8_t *)*buffer_ptr, buf_size) != Wasm_Module_Bytecode) && + (get_package_type((const uint8_t *)*buffer_ptr, buf_size) != Wasm_Module_AoT)) { + SPDLOG_ERROR("WASM bytecode or AOT object is expected but other file format"); + + BH_FREE(*buffer_ptr); + return false; + } + + return true; } WAMRInstance::~WAMRInstance() { - if (!exec_env) - wasm_runtime_destroy_exec_env(exec_env); - if (!module_inst) - wasm_runtime_deinstantiate(module_inst); - if (!module) - wasm_runtime_unload(module); - wasm_runtime_destroy(); + if (!exec_env) + wasm_runtime_destroy_exec_env(exec_env); + if (!module_inst) + wasm_runtime_deinstantiate(module_inst); + if (!module) + wasm_runtime_unload(module); + wasm_runtime_destroy(); } void WAMRInstance::find_func(const char *name) { - if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) { - SPDLOG_ERROR("The wasi\"{}\"function is not found.", name); - auto target_module = get_module_instance()->e; - for (int i = 0; i < target_module->function_count; i++) { - auto cur_func = &target_module->functions[i]; - if (cur_func->is_import_func) { - SPDLOG_DEBUG("{} {}", cur_func->u.func_import->field_name, i); - - if (!strcmp(cur_func->u.func_import->field_name, name)) { - - func = ((WASMFunctionInstanceCommon *)cur_func); - break; - } - - } else { - SPDLOG_DEBUG("{} {}", cur_func->u.func->field_name, i); - - if (!strcmp(cur_func->u.func->field_name, name)) { - func = ((WASMFunctionInstanceCommon *)cur_func); - break; - } - } - } - } + if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) { + SPDLOG_ERROR("The wasi\"{}\"function is not found.", name); + auto target_module = get_module_instance()->e; + for (int i = 0; i < target_module->function_count; i++) { + auto cur_func = &target_module->functions[i]; + if (cur_func->is_import_func) { + SPDLOG_DEBUG("{} {}", cur_func->u.func_import->field_name, i); + + if (!strcmp(cur_func->u.func_import->field_name, name)) { + + func = ((WASMFunctionInstanceCommon *)cur_func); + break; + } + + } else { + SPDLOG_DEBUG("{} {}", cur_func->u.func->field_name, i); + + if (!strcmp(cur_func->u.func->field_name, name)) { + func = ((WASMFunctionInstanceCommon *)cur_func); + break; + } + } + } + } } int WAMRInstance::invoke_main() { - if (!(func = wasm_runtime_lookup_wasi_start_function(module_inst))) { - SPDLOG_ERROR("The wasi mode main function is not found."); - return -1; - } + if (!(func = wasm_runtime_lookup_wasi_start_function(module_inst))) { + SPDLOG_ERROR("The wasi mode main function is not found."); + return -1; + } - return wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + return wasm_runtime_call_wasm(exec_env, func, 0, nullptr); } void WAMRInstance::invoke_init_c() { - auto name1 = "__wasm_call_ctors"; - if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) { - SPDLOG_ERROR("The wasi ", name1, " function is not found."); - } else { - wasm_runtime_call_wasm(exec_env, func, 0, nullptr); - } + auto name1 = "__wasm_call_ctors"; + if (!(func = wasm_runtime_lookup_function(module_inst, name1, nullptr))) { + SPDLOG_ERROR("The wasi ", name1, " function is not found."); + } else { + wasm_runtime_call_wasm(exec_env, func, 0, nullptr); + } } int WAMRInstance::invoke_fopen(std::string &path, uint32 option) { - char *buffer_ = nullptr; - uint32_t buffer_for_wasm; - find_func("o_"); - buffer_for_wasm = wasm_runtime_module_malloc(module_inst, path.size(), (void **)&buffer_); - if (buffer_for_wasm != 0) { - uint32 argv[3]; - argv[0] = buffer_for_wasm; // pass the buffer_ address for WASM space - argv[1] = option; // the size of buffer_ - argv[2] = 15; // the size of buffer_ - strncpy(buffer_, path.c_str(), path.size()); // use native address for accessing in runtime - wasm_runtime_call_wasm(exec_env, func, 3, argv); - wasm_runtime_module_free(module_inst, buffer_for_wasm); - return ((int)argv[0]); - } - return -1; + char *buffer_ = nullptr; + uint32_t buffer_for_wasm; + find_func("o_"); + buffer_for_wasm = wasm_runtime_module_malloc(module_inst, path.size(), (void **)&buffer_); + if (buffer_for_wasm != 0) { + uint32 argv[3]; + argv[0] = buffer_for_wasm; // pass the buffer_ address for WASM space + argv[1] = option; // the size of buffer_ + argv[2] = 15; // the size of buffer_ + strncpy(buffer_, path.c_str(), path.size()); // use native address for accessing in runtime + wasm_runtime_call_wasm(exec_env, func, 3, argv); + wasm_runtime_module_free(module_inst, buffer_for_wasm); + return ((int)argv[0]); + } + return -1; }; int WAMRInstance::invoke_frenumber(uint32 fd, uint32 to) { - uint32 argv[2] = {fd, to}; - find_func("__wasi_fd_renumber"); - wasm_runtime_call_wasm(exec_env, func, 2, argv); - return argv[0]; + uint32 argv[2] = {fd, to}; + find_func("__wasi_fd_renumber"); + wasm_runtime_call_wasm(exec_env, func, 2, argv); + return argv[0]; }; int WAMRInstance::invoke_sock_open(uint32_t domain, uint32_t socktype, uint32_t protocol, uint32_t sockfd) { - uint32 argv[4] = {domain, socktype, protocol, sockfd}; - find_func("s_"); - auto res = wasm_runtime_call_wasm(exec_env, func, 4, argv); - return argv[0]; + uint32 argv[4] = {domain, socktype, protocol, sockfd}; + find_func("s_"); + auto res = wasm_runtime_call_wasm(exec_env, func, 4, argv); + return argv[0]; } int WAMRInstance::invoke_sock_client_connect(uint32_t sockfd, struct sockaddr *sock, socklen_t sock_size) { - uint32 argv[1] = {sockfd}; - find_func("ci_"); - wasm_runtime_call_wasm(exec_env, func, 1, argv); - int res = argv[0]; - return -1; + uint32 argv[1] = {sockfd}; + find_func("ci_"); + wasm_runtime_call_wasm(exec_env, func, 1, argv); + int res = argv[0]; + return -1; } int WAMRInstance::invoke_sock_server_connect(uint32_t sockfd, struct sockaddr *sock, socklen_t sock_size) { - uint32 argv[1] = {sockfd}; - find_func("si_"); - wasm_runtime_call_wasm(exec_env, func, 1, argv); - int res = argv[0]; - return -1; + uint32 argv[1] = {sockfd}; + find_func("si_"); + wasm_runtime_call_wasm(exec_env, func, 1, argv); + int res = argv[0]; + return -1; } int WAMRInstance::invoke_sock_accept(uint32_t sockfd, struct sockaddr *sock, socklen_t sock_size) { - char *buffer1_ = nullptr; - char *buffer2_ = nullptr; - uint32_t buffer1_for_wasm; - uint32_t buffer2_for_wasm; - find_func("accept"); - - buffer1_for_wasm = - wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer1_)); - buffer2_for_wasm = - wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer2_)); - if (buffer1_for_wasm != 0 && buffer2_for_wasm != 0) { - uint32 argv[3]; - memcpy(buffer1_, sock, sizeof(struct sockaddr)); // use native address for accessing in runtime - memcpy(buffer2_, &sock_size, sizeof(socklen_t)); // use native address for accessing in runtime - argv[0] = sockfd; // pass the buffer_ address for WASM space - argv[1] = buffer1_for_wasm; - argv[2] = buffer2_for_wasm; - wasm_runtime_call_wasm(exec_env, func, 3, argv); - int res = argv[0]; - wasm_runtime_module_free(module_inst, buffer1_for_wasm); - wasm_runtime_module_free(module_inst, buffer2_for_wasm); - return res; - } - return -1; + char *buffer1_ = nullptr; + char *buffer2_ = nullptr; + uint32_t buffer1_for_wasm; + uint32_t buffer2_for_wasm; + find_func("accept"); + + buffer1_for_wasm = + wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer1_)); + buffer2_for_wasm = + wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer2_)); + if (buffer1_for_wasm != 0 && buffer2_for_wasm != 0) { + uint32 argv[3]; + memcpy(buffer1_, sock, sizeof(struct sockaddr)); // use native address for accessing in runtime + memcpy(buffer2_, &sock_size, sizeof(socklen_t)); // use native address for accessing in runtime + argv[0] = sockfd; // pass the buffer_ address for WASM space + argv[1] = buffer1_for_wasm; + argv[2] = buffer2_for_wasm; + wasm_runtime_call_wasm(exec_env, func, 3, argv); + int res = argv[0]; + wasm_runtime_module_free(module_inst, buffer1_for_wasm); + wasm_runtime_module_free(module_inst, buffer2_for_wasm); + return res; + } + return -1; } int WAMRInstance::invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t *sock_size) { - char *buffer1_ = nullptr; - char *buffer2_ = nullptr; - uint32_t buffer1_for_wasm; - uint32_t buffer2_for_wasm; - find_func("getsockname"); - buffer1_for_wasm = wasm_runtime_module_malloc(module_inst, *sock_size, reinterpret_cast(&buffer1_)); - buffer2_for_wasm = wasm_runtime_module_malloc(module_inst, sizeof(socklen_t), reinterpret_cast(&buffer2_)); - if (buffer1_for_wasm != 0) { - uint32 argv[3]; - memcpy(buffer1_, *sock, sizeof(struct sockaddr)); - argv[0] = sockfd; - argv[1] = buffer1_for_wasm; - argv[2] = buffer2_for_wasm; - wasm_runtime_call_wasm(exec_env, func, 3, argv); - memcpy(*sock, buffer1_, sizeof(struct sockaddr)); - int res = argv[0]; - wasm_runtime_module_free(module_inst, buffer1_for_wasm); - return res; - } - return -1; + char *buffer1_ = nullptr; + char *buffer2_ = nullptr; + uint32_t buffer1_for_wasm; + uint32_t buffer2_for_wasm; + find_func("getsockname"); + buffer1_for_wasm = wasm_runtime_module_malloc(module_inst, *sock_size, reinterpret_cast(&buffer1_)); + buffer2_for_wasm = wasm_runtime_module_malloc(module_inst, sizeof(socklen_t), reinterpret_cast(&buffer2_)); + if (buffer1_for_wasm != 0) { + uint32 argv[3]; + memcpy(buffer1_, *sock, sizeof(struct sockaddr)); + argv[0] = sockfd; + argv[1] = buffer1_for_wasm; + argv[2] = buffer2_for_wasm; + wasm_runtime_call_wasm(exec_env, func, 3, argv); + memcpy(*sock, buffer1_, sizeof(struct sockaddr)); + int res = argv[0]; + wasm_runtime_module_free(module_inst, buffer1_for_wasm); + return res; + } + return -1; } int WAMRInstance::invoke_fseek(uint32 fd, uint32 flags, uint32 offset) { - // return 0; - find_func("__wasi_fd_seek"); - uint32 argv[5] = {fd, offset, 0, flags, 0}; - auto res = wasm_runtime_call_wasm(exec_env, func, 5, argv); - return argv[0]; + // return 0; + find_func("__wasi_fd_seek"); + uint32 argv[5] = {fd, offset, 0, flags, 0}; + auto res = wasm_runtime_call_wasm(exec_env, func, 5, argv); + return argv[0]; } int WAMRInstance::invoke_preopen(uint32 fd, const std::string &path) { - char *buffer_ = nullptr; - uint32_t buffer_for_wasm; - find_func("__wasilibc_nocwd_openat_nomode"); - buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast(&buffer_)); - if (buffer_for_wasm != 0) { - uint32 argv[3]; - strncpy(buffer_, path.c_str(), path.size()); // use native address for accessing in runtime - argv[0] = fd; // pass the buffer_ address for WASM space - argv[1] = buffer_for_wasm; // the size of buffer_ - argv[2] = 102; // O_RW | O_CREATE - wasm_runtime_call_wasm(exec_env, func, 3, argv); - int res = argv[0]; - wasm_runtime_module_free(module_inst, buffer_for_wasm); - return res; - } - return -1; + char *buffer_ = nullptr; + uint32_t buffer_for_wasm; + find_func("__wasilibc_nocwd_openat_nomode"); + buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast(&buffer_)); + if (buffer_for_wasm != 0) { + uint32 argv[3]; + strncpy(buffer_, path.c_str(), path.size()); // use native address for accessing in runtime + argv[0] = fd; // pass the buffer_ address for WASM space + argv[1] = buffer_for_wasm; // the size of buffer_ + argv[2] = 102; // O_RW | O_CREATE + wasm_runtime_call_wasm(exec_env, func, 3, argv); + int res = argv[0]; + wasm_runtime_module_free(module_inst, buffer_for_wasm); + return res; + } + return -1; } int WAMRInstance::invoke_recv(int sockfd, uint8 **buf, size_t len, int flags) { - char *buffer_ = nullptr; - uint32_t buffer_for_wasm; - find_func("recv"); - - buffer_for_wasm = wasm_runtime_module_malloc(module_inst, len, reinterpret_cast(&buffer_)); - if (buffer_for_wasm != 0) { - uint32 argv[4]; - memcpy(buffer_, *buf, len); // use native address for accessing in runtime - argv[0] = sockfd; // pass the buffer_ address for WASM space - argv[1] = buffer_for_wasm; // the size of buffer_ - argv[2] = len; - argv[3] = flags; - wasm_runtime_call_wasm(exec_env, func, 4, argv); - int res = argv[0]; - memcpy(*buf, buffer_, len); - wasm_runtime_module_free(module_inst, buffer_for_wasm); - return res; - } - return -1; + char *buffer_ = nullptr; + uint32_t buffer_for_wasm; + find_func("recv"); + + buffer_for_wasm = wasm_runtime_module_malloc(module_inst, len, reinterpret_cast(&buffer_)); + if (buffer_for_wasm != 0) { + uint32 argv[4]; + memcpy(buffer_, *buf, len); // use native address for accessing in runtime + argv[0] = sockfd; // pass the buffer_ address for WASM space + argv[1] = buffer_for_wasm; // the size of buffer_ + argv[2] = len; + argv[3] = flags; + wasm_runtime_call_wasm(exec_env, func, 4, argv); + int res = argv[0]; + memcpy(*buf, buffer_, len); + wasm_runtime_module_free(module_inst, buffer_for_wasm); + return res; + } + return -1; } int WAMRInstance::invoke_recvfrom(int sockfd, uint8 **buf, size_t len, int flags, struct sockaddr *src_addr, - socklen_t *addrlen) { - char *buffer1_ = nullptr; - char *buffer2_ = nullptr; - char *buffer3_ = nullptr; - uint32_t buffer1_for_wasm; - uint32_t buffer2_for_wasm; - uint32_t buffer3_for_wasm; - find_func("recvfrom"); - - buffer1_for_wasm = wasm_runtime_module_malloc(module_inst, len, reinterpret_cast(&buffer1_)); - buffer2_for_wasm = - wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer2_)); - buffer3_for_wasm = wasm_runtime_module_malloc(module_inst, sizeof(socklen_t), reinterpret_cast(&buffer3_)); - if (buffer1_for_wasm != 0 && buffer2_for_wasm != 0 && buffer3_for_wasm != 0) { - uint32 argv[6]; - memcpy(buffer1_, *buf, len); // use native address for accessing in runtime - memcpy(buffer2_, src_addr, sizeof(struct sockaddr)); // use native address for accessing in runtime - memcpy(buffer3_, addrlen, sizeof(socklen_t)); // use native address for accessing in runtime - argv[0] = sockfd; // pass the buffer_ address for WASM space - argv[1] = buffer1_for_wasm; // the size of buffer_ - argv[2] = len; - argv[3] = flags; - argv[4] = buffer2_for_wasm; - argv[5] = buffer3_for_wasm; - wasm_runtime_call_wasm(exec_env, func, 6, argv); - int res = argv[0]; - memcpy(*buf, buffer1_, len); - wasm_runtime_module_free(module_inst, buffer1_for_wasm); - wasm_runtime_module_free(module_inst, buffer2_for_wasm); - wasm_runtime_module_free(module_inst, buffer3_for_wasm); - return res; - } - return -1; + socklen_t *addrlen) { + char *buffer1_ = nullptr; + char *buffer2_ = nullptr; + char *buffer3_ = nullptr; + uint32_t buffer1_for_wasm; + uint32_t buffer2_for_wasm; + uint32_t buffer3_for_wasm; + find_func("recvfrom"); + + buffer1_for_wasm = wasm_runtime_module_malloc(module_inst, len, reinterpret_cast(&buffer1_)); + buffer2_for_wasm = + wasm_runtime_module_malloc(module_inst, sizeof(struct sockaddr), reinterpret_cast(&buffer2_)); + buffer3_for_wasm = wasm_runtime_module_malloc(module_inst, sizeof(socklen_t), reinterpret_cast(&buffer3_)); + if (buffer1_for_wasm != 0 && buffer2_for_wasm != 0 && buffer3_for_wasm != 0) { + uint32 argv[6]; + memcpy(buffer1_, *buf, len); // use native address for accessing in runtime + memcpy(buffer2_, src_addr, sizeof(struct sockaddr)); // use native address for accessing in runtime + memcpy(buffer3_, addrlen, sizeof(socklen_t)); // use native address for accessing in runtime + argv[0] = sockfd; // pass the buffer_ address for WASM space + argv[1] = buffer1_for_wasm; // the size of buffer_ + argv[2] = len; + argv[3] = flags; + argv[4] = buffer2_for_wasm; + argv[5] = buffer3_for_wasm; + wasm_runtime_call_wasm(exec_env, func, 6, argv); + int res = argv[0]; + memcpy(*buf, buffer1_, len); + wasm_runtime_module_free(module_inst, buffer1_for_wasm); + wasm_runtime_module_free(module_inst, buffer2_for_wasm); + wasm_runtime_module_free(module_inst, buffer3_for_wasm); + return res; + } + return -1; } WASMExecEnv *WAMRInstance::get_exec_env() { - return cur_env; // should return the current thread's + return cur_env; // should return the current thread's } WASMModuleInstance *WAMRInstance::get_module_instance() const { - return reinterpret_cast(exec_env->module_inst); + return reinterpret_cast(exec_env->module_inst); } #if WASM_ENABLE_AOT != 0 AOTModule *WAMRInstance::get_module() const { - return reinterpret_cast(reinterpret_cast(exec_env->module_inst)->module); + return reinterpret_cast(reinterpret_cast(exec_env->module_inst)->module); } #endif void restart_execution(uint32 id) { - WAMRInstance::ThreadArgs *targs = argptr[id]; - wasm_interp_call_func_bytecode((WASMModuleInstance *)targs->exec_env->module_inst, targs->exec_env, - targs->exec_env->cur_frame->function, targs->exec_env->cur_frame->prev_frame); + WAMRInstance::ThreadArgs *targs = argptr[id]; + wasm_interp_call_func_bytecode((WASMModuleInstance *)targs->exec_env->module_inst, targs->exec_env, + targs->exec_env->cur_frame->function, targs->exec_env->cur_frame->prev_frame); } #if WASM_ENABLE_LIB_PTHREAD != 0 extern "C" { @@ -415,388 +430,554 @@ korp_mutex syncop_mutex; korp_cond syncop_cv; } void WAMRInstance::replay_sync_ops(bool main, wasm_exec_env_t exec_env) { - if (main) { - std::map ref_map = {}; - for (auto &i : sync_ops) { - // remap to new tids so that if we reserialize it'll be correct - i.tid = tid_map[i.tid]; - if (ref_map.find(i.ref) != ref_map.end()) { - pthread_mutex_init_wrapper(exec_env, &i.ref, nullptr); - ref_map[i.ref] = i.tid; - } - } - // start from the beginning - sync_iter = sync_ops.begin(); - thread_init.release(100); - } else { - // wait for remap to finish - thread_init.acquire(); - } - // Actually replay - os_mutex_lock(&syncop_mutex); - while (sync_iter != sync_ops.end()) { - SPDLOG_INFO("test {} == {}, op {}\n", (uint64_t)exec_env->handle, (uint64_t)sync_iter->tid, - ((int)sync_iter->sync_op)); - if (((uint64_t)(*sync_iter).tid) == ((uint64_t)exec_env->handle)) { - SPDLOG_INFO("replay {}, op {}\n", sync_iter->tid, ((int)sync_iter->sync_op)); - auto mysync = sync_iter; - ++sync_iter; - // do op - switch (mysync->sync_op) { - case SYNC_OP_MUTEX_LOCK: - pthread_mutex_lock_wrapper(exec_env, &(mysync->ref)); - break; - case SYNC_OP_MUTEX_UNLOCK: - pthread_mutex_unlock_wrapper(exec_env, &(mysync->ref)); - break; - case SYNC_OP_COND_WAIT: - pthread_cond_wait_wrapper(exec_env, &(mysync->ref), nullptr); - break; - case SYNC_OP_COND_SIGNAL: - pthread_cond_signal_wrapper(exec_env, &(mysync->ref)); - break; - case SYNC_OP_COND_BROADCAST: - pthread_cond_broadcast_wrapper(exec_env, &(mysync->ref)); - break; - case SYNC_OP_ATOMIC_WAIT: - wasm_runtime_atomic_wait( - exec_env->module_inst, - ((uint8_t *)((WASMModuleInstance *)exec_env->module_inst)->memories[0]->memory_data + mysync->ref), - mysync->expected, -1, mysync->wait64); - break; - case SYNC_OP_ATOMIC_NOTIFY: - break; - } - // wakeup everyone - os_cond_signal(&syncop_cv); - } else { - os_cond_reltimedwait(&syncop_cv, &syncop_mutex, 10); - } - } - os_mutex_unlock(&syncop_mutex); + if (main) { + std::map ref_map = {}; + for (auto &i : sync_ops) { + // remap to new tids so that if we reserialize it'll be correct + i.tid = tid_map[i.tid]; + if (ref_map.find(i.ref) != ref_map.end()) { + pthread_mutex_init_wrapper(exec_env, &i.ref, nullptr); + ref_map[i.ref] = i.tid; + } + } + // start from the beginning + sync_iter = sync_ops.begin(); + thread_init.release(100); + } else { + // wait for remap to finish + thread_init.acquire(); + } + // Actually replay + os_mutex_lock(&syncop_mutex); + while (sync_iter != sync_ops.end()) { + SPDLOG_INFO("test {} == {}, op {}\n", (uint64_t)exec_env->handle, (uint64_t)sync_iter->tid, + ((int)sync_iter->sync_op)); + if (((uint64_t)(*sync_iter).tid) == ((uint64_t)exec_env->handle)) { + SPDLOG_INFO("replay {}, op {}\n", sync_iter->tid, ((int)sync_iter->sync_op)); + auto mysync = sync_iter; + ++sync_iter; + // do op + switch (mysync->sync_op) { + case SYNC_OP_MUTEX_LOCK: + pthread_mutex_lock_wrapper(exec_env, &(mysync->ref)); + break; + case SYNC_OP_MUTEX_UNLOCK: + pthread_mutex_unlock_wrapper(exec_env, &(mysync->ref)); + break; + case SYNC_OP_COND_WAIT: + pthread_cond_wait_wrapper(exec_env, &(mysync->ref), nullptr); + break; + case SYNC_OP_COND_SIGNAL: + pthread_cond_signal_wrapper(exec_env, &(mysync->ref)); + break; + case SYNC_OP_COND_BROADCAST: + pthread_cond_broadcast_wrapper(exec_env, &(mysync->ref)); + break; + case SYNC_OP_ATOMIC_WAIT: + wasm_runtime_atomic_wait( + exec_env->module_inst, + ((uint8_t *)((WASMModuleInstance *)exec_env->module_inst)->memories[0]->memory_data + mysync->ref), + mysync->expected, -1, mysync->wait64); + break; + case SYNC_OP_ATOMIC_NOTIFY: + break; + } + // wakeup everyone + os_cond_signal(&syncop_cv); + } else { + os_cond_reltimedwait(&syncop_cv, &syncop_mutex, 10); + } + } + os_mutex_unlock(&syncop_mutex); } // End Sync Op Specific Stuff #endif WAMRExecEnv *child_env; // will call pthread create wrapper if needed? void WAMRInstance::recover(std::vector> *e_) { - execEnv.reserve(e_->size()); - std::transform(e_->begin(), e_->end(), std::back_inserter(execEnv), - [](const std::unique_ptr &uniquePtr) { return uniquePtr ? uniquePtr.get() : nullptr; }); - // got this done tommorrow. - // order threads by id (descending) - std::sort(execEnv.begin(), execEnv.end(), [](const auto &a, const auto &b) { - return a->frames.back()->function_index < b->frames.back()->function_index; - }); - - argptr = (ThreadArgs **)malloc(sizeof(void *) * execEnv.size()); - set_wasi_args(execEnv.front()->module_inst.wasi_ctx); + execEnv.reserve(e_->size()); + std::transform(e_->begin(), e_->end(), std::back_inserter(execEnv), + [](const std::unique_ptr &uniquePtr) { return uniquePtr ? uniquePtr.get() : nullptr; }); + // got this done tommorrow. + // order threads by id (descending) + std::sort(execEnv.begin(), execEnv.end(), [](const auto &a, const auto &b) { + return a->frames.back()->function_index < b->frames.back()->function_index; + }); - instantiate(); - auto mi = module_inst; + argptr = (ThreadArgs **)malloc(sizeof(void *) * execEnv.size()); + set_wasi_args(execEnv.front()->module_inst.wasi_ctx); - get_int3_addr(); - replace_int3_with_nop(); - this->time = std::chrono::high_resolution_clock::now(); + this->time = std::chrono::high_resolution_clock::now(); - restore(execEnv.front(), cur_env); - if (tid_start_arg_map.find(execEnv.back()->cur_count) != tid_start_arg_map.end()) { - std::sort(execEnv.begin() + 1, execEnv.end(), [&](const auto &a, const auto &b) { - return tid_start_arg_map[a->cur_count].second < tid_start_arg_map[b->cur_count].second; - }); - } + restore(execEnv.front(), cur_env); + if (tid_start_arg_map.find(execEnv.back()->cur_count) != tid_start_arg_map.end()) { + std::sort(execEnv.begin() + 1, execEnv.end(), [&](const auto &a, const auto &b) { + return tid_start_arg_map[a->cur_count].second < tid_start_arg_map[b->cur_count].second; + }); + } - auto main_env = cur_env; - auto main_saved_call_chain = main_env->restore_call_chain; - cur_thread = ((uint64_t)main_env->handle); + auto main_env = cur_env; + auto main_saved_call_chain = main_env->restore_call_chain; + cur_thread = ((uint64_t)main_env->handle); - fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain); + fprintf(stderr, "main_env created %p %p\n\n", main_env, main_saved_call_chain); - main_env->is_restore = true; + main_env->is_restore = true; - main_env->restore_call_chain = nullptr; + main_env->restore_call_chain = nullptr; - invoke_init_c(); +// invoke_init_c(); #if WASM_ENABLE_LIB_PTHREAD != 0 - spawn_child(main_env, true); + spawn_child(main_env, true); #endif - // restart main thread execution - if (!is_aot) { - wasm_interp_call_func_bytecode(get_module_instance(), get_exec_env(), get_exec_env()->cur_frame->function, - get_exec_env()->cur_frame->prev_frame); - } else { - exec_env = cur_env = main_env; - module_inst = main_env->module_inst; - - fprintf(stderr, "invoke_init_c\n"); - fprintf(stderr, "wakeup.release\n"); - wakeup.release(100); - - cur_env->is_restore = true; - cur_env->restore_call_chain = main_saved_call_chain; + // restart main thread execution + if (!is_aot) { + wasm_interp_call_func_bytecode(get_module_instance(), get_exec_env(), get_exec_env()->cur_frame->function, + get_exec_env()->cur_frame->prev_frame); + } else { + exec_env = cur_env = main_env; + module_inst = main_env->module_inst; + + fprintf(stderr, "invoke_init_c\n"); + fprintf(stderr, "wakeup.release\n"); + wakeup.release(100); + + cur_env->is_restore = true; + cur_env->restore_call_chain = main_saved_call_chain; #if WASM_ENABLE_LIB_PTHREAD != 0 - fprintf(stderr, "invoke main %p %p\n", cur_env, cur_env->restore_call_chain); - // replay sync ops to get OS state matching - wamr_handle_map(execEnv.front()->cur_count, ((uint64_t)main_env->handle)); + fprintf(stderr, "invoke main %p %p\n", cur_env, cur_env->restore_call_chain); + // replay sync ops to get OS state matching + wamr_handle_map(execEnv.front()->cur_count, ((uint64_t)main_env->handle)); - replay_sync_ops(true, main_env); + replay_sync_ops(true, main_env); #endif - if (this->time != std::chrono::time_point()) { - auto end = std::chrono::high_resolution_clock::now(); - // get duration in us - auto dur = std::chrono::duration_cast(end - this->time); - this->time = std::chrono::time_point(); - SPDLOG_INFO("Recover time: {}\n", dur.count() / 1000000.0); - // put things back - } - invoke_main(); - } + if (this->time != std::chrono::time_point()) { + auto end = std::chrono::high_resolution_clock::now(); + // get duration in us + auto dur = std::chrono::duration_cast(end - this->time); + this->time = std::chrono::time_point(); + SPDLOG_INFO("Recover time: {}\n", dur.count() / 1000000.0); + // put things back + } + invoke_main(); + } } #if WASM_ENABLE_LIB_PTHREAD != 0 void WAMRInstance::spawn_child(WASMExecEnv *cur_env, bool main) { - static std::vector::iterator iter; - static uint64 parent; - if (main) { - iter = ++(execEnv.begin()); - parent = 0; - } - // Each thread needs it's own thread arg - auto thread_arg = ThreadArgs{cur_env}; - static std::mutex mtx; - static std::condition_variable cv; - std::unique_lock ul(mtx); - - while (iter != execEnv.end()) { - // Get parent's virtual TID from child's OS TID - if (parent == 0) { - child_env = *iter; - parent = child_env->cur_count; - if (tid_start_arg_map.find(child_env->cur_count) != tid_start_arg_map.end()) { - parent = tid_start_arg_map[parent].second; - } - parent = child_tid_map[parent]; - for (auto &[tid, vtid] : tid_start_arg_map) { - if (vtid.second == parent) { - parent = tid; - break; - } - } - SPDLOG_ERROR("{} {}", parent, child_env->cur_count); - } // calculate parent TID once - if (parent != ((uint64_t)cur_env->handle) && (parent != !main)) { - cv.wait(ul); - continue; - } - // requires to record the args and callback for the pthread. - argptr[id] = &thread_arg; - // restart thread execution - SPDLOG_DEBUG("pthread_create_wrapper, func {}\n", child_env->cur_count); - // module_inst = wasm_runtime_instantiate(module, stack_size, heap_size, error_buf, sizeof(error_buf)); - if (tid_start_arg_map.find(child_env->cur_count) != tid_start_arg_map.end()) { - // find the parent env - auto *saved_env = cur_env->restore_call_chain; - cur_env->restore_call_chain = NULL; - exec_env->is_restore = true; - // main thread - thread_spawn_wrapper(cur_env, tid_start_arg_map[child_env->cur_count].first); - cur_env->restore_call_chain = saved_env; - exec_env->is_restore = false; - - } else { - exec_env->is_restore = true; - pthread_create_wrapper(cur_env, nullptr, nullptr, id, id); // tid_map - } - fprintf(stderr, "child spawned %p %p\n\n", cur_env, child_env); - // sleep(1); - thread_init.acquire(); - // advance ptr - ++iter; - parent = 0; - cv.notify_all(); - } + static std::vector::iterator iter; + static uint64 parent; + if (main) { + iter = ++(execEnv.begin()); + parent = 0; + } + // Each thread needs it's own thread arg + auto thread_arg = ThreadArgs{cur_env}; + static std::mutex mtx; + static std::condition_variable cv; + std::unique_lock ul(mtx); + + while (iter != execEnv.end()) { + // Get parent's virtual TID from child's OS TID + if (parent == 0) { + child_env = *iter; + parent = child_env->cur_count; + if (tid_start_arg_map.find(child_env->cur_count) != tid_start_arg_map.end()) { + parent = tid_start_arg_map[parent].second; + } + parent = child_tid_map[parent]; + for (auto &[tid, vtid] : tid_start_arg_map) { + if (vtid.second == parent) { + parent = tid; + break; + } + } + SPDLOG_ERROR("{} {}", parent, child_env->cur_count); + } // calculate parent TID once + if (parent != ((uint64_t)cur_env->handle) && (parent != !main)) { + cv.wait(ul); + continue; + } + // requires to record the args and callback for the pthread. + argptr[id] = &thread_arg; + // restart thread execution + SPDLOG_DEBUG("pthread_create_wrapper, func {}\n", child_env->cur_count); + // module_inst = wasm_runtime_instantiate(module, stack_size, heap_size, error_buf, sizeof(error_buf)); + if (tid_start_arg_map.find(child_env->cur_count) != tid_start_arg_map.end()) { + // find the parent env + auto *saved_env = cur_env->restore_call_chain; + cur_env->restore_call_chain = NULL; + exec_env->is_restore = true; + // main thread + thread_spawn_wrapper(cur_env, tid_start_arg_map[child_env->cur_count].first); + cur_env->restore_call_chain = saved_env; + exec_env->is_restore = false; + + } else { + exec_env->is_restore = true; + pthread_create_wrapper(cur_env, nullptr, nullptr, id, id); // tid_map + } + fprintf(stderr, "child spawned %p %p\n\n", cur_env, child_env); + // sleep(1); + thread_init.acquire(); + // advance ptr + ++iter; + parent = 0; + cv.notify_all(); + } } #endif WASMFunction *WAMRInstance::get_func() { return static_cast(func); } void WAMRInstance::set_func(WASMFunction *f) { func = static_cast(f); } void WAMRInstance::set_wasi_args(const std::vector &dir_list, const std::vector &map_dir_list, - const std::vector &env_list, const std::vector &arg_list, - const std::vector &addr_list, - const std::vector &ns_lookup_pool) { - - dir_ = string_vec_to_cstr_array(dir_list); - map_dir_ = string_vec_to_cstr_array(map_dir_list); - env_ = string_vec_to_cstr_array(env_list); - arg_ = string_vec_to_cstr_array(arg_list); - addr_ = string_vec_to_cstr_array(addr_list); - ns_pool_ = string_vec_to_cstr_array(ns_lookup_pool); - - wasm_runtime_set_wasi_args_ex(this->module, dir_.data(), dir_.size(), map_dir_.data(), map_dir_.size(), env_.data(), - env_.size(), const_cast(arg_.data()), arg_.size(), 0, 1, 2); - - wasm_runtime_set_wasi_addr_pool(module, addr_.data(), addr_.size()); - wasm_runtime_set_wasi_ns_lookup_pool(module, ns_pool_.data(), ns_pool_.size()); + const std::vector &env_list, const std::vector &arg_list, + const std::vector &addr_list, + const std::vector &ns_lookup_pool) { + + dir_ = string_vec_to_cstr_array(dir_list); + map_dir_ = string_vec_to_cstr_array(map_dir_list); + env_ = string_vec_to_cstr_array(env_list); + arg_ = string_vec_to_cstr_array(arg_list); + addr_ = string_vec_to_cstr_array(addr_list); + ns_pool_ = string_vec_to_cstr_array(ns_lookup_pool); + + wasm_runtime_set_wasi_args_ex(this->module, dir_.data(), dir_.size(), map_dir_.data(), map_dir_.size(), env_.data(), + env_.size(), const_cast(arg_.data()), arg_.size(), 0, 1, 2); + + wasm_runtime_set_wasi_addr_pool(module, addr_.data(), addr_.size()); + wasm_runtime_set_wasi_ns_lookup_pool(module, ns_pool_.data(), ns_pool_.size()); } void WAMRInstance::set_wasi_args(WAMRWASIContext &context) { - set_wasi_args(context.dir, context.map_dir, context.env_list, context.argv_list, context.addr_pool, - context.ns_lookup_list); + set_wasi_args(context.dir, context.map_dir, context.env_list, context.argv_list, context.addr_pool, + context.ns_lookup_list); } extern WAMRInstance *wamr; extern "C" { // stop name mangling so it can be linked externally void wamr_wait(wasm_exec_env_t exec_env) { - SPDLOG_DEBUG("child getting ready to wait {}", fmt::ptr(exec_env)); - thread_init.release(1); - wamr->spawn_child(exec_env, false); - SPDLOG_DEBUG("finish child restore"); - wakeup.acquire(); + SPDLOG_DEBUG("child getting ready to wait {}", fmt::ptr(exec_env)); + thread_init.release(1); + wamr->spawn_child(exec_env, false); + SPDLOG_DEBUG("finish child restore"); + wakeup.acquire(); #if WASM_ENABLE_LIB_PTHREAD != 0 - SPDLOG_DEBUG("go child!! {}", ((uint64_t)exec_env->handle)); - wamr->replay_sync_ops(false, exec_env); - SPDLOG_DEBUG("finish syncing"); + SPDLOG_DEBUG("go child!! {}", ((uint64_t)exec_env->handle)); + wamr->replay_sync_ops(false, exec_env); + SPDLOG_DEBUG("finish syncing"); #endif - if (wamr->time != std::chrono::time_point()) { - auto end = std::chrono::high_resolution_clock::now(); - // get duration in us - auto dur = std::chrono::duration_cast(end - wamr->time); - wamr->time = std::chrono::time_point(); - SPDLOG_INFO("Recover time: {}\n", dur.count() / 1000000.0); - // put things back - } - // finished restoring - exec_env->is_restore = true; - fprintf(stderr, "invoke side%p\n", ((WASMModuleInstance *)exec_env->module_inst)->global_data); + if (wamr->time != std::chrono::time_point()) { + auto end = std::chrono::high_resolution_clock::now(); + // get duration in us + auto dur = std::chrono::duration_cast(end - wamr->time); + wamr->time = std::chrono::time_point(); + SPDLOG_INFO("Recover time: {}\n", dur.count() / 1000000.0); + // put things back + } + // finished restoring + exec_env->is_restore = true; + fprintf(stderr, "invoke side%p\n", ((WASMModuleInstance *)exec_env->module_inst)->global_data); } WASMExecEnv *restore_env(WASMModuleInstanceCommon *module_inst) { - auto exec_env = wasm_exec_env_create_internal(module_inst, wamr->stack_size); - restore(child_env, exec_env); + auto exec_env = wasm_exec_env_create_internal(module_inst, wamr->stack_size); + restore(child_env, exec_env); - auto s = exec_env->restore_call_chain; + auto s = exec_env->restore_call_chain; - wamr->cur_thread = ((uint64_t)exec_env->handle); - exec_env->is_restore = true; - fprintf(stderr, "restore_env: %p %p\n", exec_env, s); + wamr->cur_thread = ((uint64_t)exec_env->handle); + exec_env->is_restore = true; + fprintf(stderr, "restore_env: %p %p\n", exec_env, s); - return exec_env; + return exec_env; } } void WAMRInstance::instantiate() { - module_inst = wasm_runtime_instantiate(module, stack_size, heap_size, error_buf, sizeof(error_buf)); - if (!module_inst) { - SPDLOG_ERROR("Instantiate wasm module failed. error: {}", error_buf); - throw; - } - cur_env = exec_env = wasm_runtime_create_exec_env(module_inst, stack_size); + module_inst = wasm_runtime_instantiate(module, stack_size, heap_size, error_buf, sizeof(error_buf)); + if (!module_inst) { + SPDLOG_ERROR("Instantiate wasm module failed. error: {}", error_buf); + throw; + } + cur_env = exec_env = wasm_runtime_create_exec_env(module_inst, stack_size); } bool is_ip_in_cidr(const char *base_ip, int subnet_mask_len, uint32_t ip) { - uint32_t base_ip_bin, subnet_mask, network_addr, broadcast_addr; - SPDLOG_DEBUG("base_ip: {} subnet_mask_len: {}", base_ip, subnet_mask_len); - SPDLOG_DEBUG("ip: {}.{}.{}.{}", (ip >> 24) & 0xFF, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF); - - // Convert base IP to binary - if (inet_pton(AF_INET, base_ip, &base_ip_bin) != 1) { - fprintf(stderr, "Error converting base IP to binary\n"); - return false; - } - - // Ensure that the subnet mask length is valid - if (subnet_mask_len < 0 || subnet_mask_len > 32) { - fprintf(stderr, "Invalid subnet mask length\n"); - return false; - } - - // Calculate subnet mask in binary - subnet_mask = htonl(~((1 << (32 - subnet_mask_len)) - 1)); - - // Calculate network and broadcast addresses - network_addr = base_ip_bin & subnet_mask; - broadcast_addr = network_addr | ~subnet_mask; - - // Ensure ip is in network byte order - uint32_t ip_net_order = htonl(ip); - - // Check if IP is within range - return ip_net_order >= network_addr && ip_net_order <= broadcast_addr; + uint32_t base_ip_bin, subnet_mask, network_addr, broadcast_addr; + SPDLOG_DEBUG("base_ip: {} subnet_mask_len: {}", base_ip, subnet_mask_len); + SPDLOG_DEBUG("ip: {}.{}.{}.{}", (ip >> 24) & 0xFF, (ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF); + + // Convert base IP to binary + if (inet_pton(AF_INET, base_ip, &base_ip_bin) != 1) { + fprintf(stderr, "Error converting base IP to binary\n"); + return false; + } + + // Ensure that the subnet mask length is valid + if (subnet_mask_len < 0 || subnet_mask_len > 32) { + fprintf(stderr, "Invalid subnet mask length\n"); + return false; + } + + // Calculate subnet mask in binary + subnet_mask = htonl(~((1 << (32 - subnet_mask_len)) - 1)); + + // Calculate network and broadcast addresses + network_addr = base_ip_bin & subnet_mask; + broadcast_addr = network_addr | ~subnet_mask; + + // Ensure ip is in network byte order + uint32_t ip_net_order = htonl(ip); + + // Check if IP is within range + return ip_net_order >= network_addr && ip_net_order <= broadcast_addr; } bool is_ipv6_in_cidr(const char *base_ip_str, int subnet_mask_len, struct in6_addr *ip) { - struct in6_addr base_ip { - }, subnet_mask{}, network_addr{}, ip_min{}, ip_max{}; - unsigned char mask; - - // Convert base IP to binary - inet_pton(AF_INET6, base_ip_str, &base_ip); - - // Clear subnet_mask and network_addr - memset(&subnet_mask, 0, sizeof(subnet_mask)); - memset(&network_addr, 0, sizeof(network_addr)); - - // Create the subnet mask and network address - for (int i = 0; i < subnet_mask_len / 8; i++) { - subnet_mask.s6_addr[i] = 0xff; - } - if (subnet_mask_len % 8) { - mask = (0xff << (8 - (subnet_mask_len % 8))); - subnet_mask.s6_addr[subnet_mask_len / 8] = mask; - } - - // Apply the subnet mask to the base IP to get the network address - for (int i = 0; i < 16; i++) { - network_addr.s6_addr[i] = base_ip.s6_addr[i] & subnet_mask.s6_addr[i]; - } - - // Calculate the first and last IPs in the range - ip_min = network_addr; - ip_max = network_addr; - for (int i = 15; i >= subnet_mask_len / 8; i--) { - ip_max.s6_addr[i] = 0xff; - } - - // Check if IP is within range - for (int i = 0; i < 16; i++) { - if (ip->s6_addr[i] < ip_min.s6_addr[i] || ip->s6_addr[i] > ip_max.s6_addr[i]) { - return false; - } - } - return true; + struct in6_addr base_ip { + }, subnet_mask{}, network_addr{}, ip_min{}, ip_max{}; + unsigned char mask; + + // Convert base IP to binary + inet_pton(AF_INET6, base_ip_str, &base_ip); + + // Clear subnet_mask and network_addr + memset(&subnet_mask, 0, sizeof(subnet_mask)); + memset(&network_addr, 0, sizeof(network_addr)); + + // Create the subnet mask and network address + for (int i = 0; i < subnet_mask_len / 8; i++) { + subnet_mask.s6_addr[i] = 0xff; + } + if (subnet_mask_len % 8) { + mask = (0xff << (8 - (subnet_mask_len % 8))); + subnet_mask.s6_addr[subnet_mask_len / 8] = mask; + } + + // Apply the subnet mask to the base IP to get the network address + for (int i = 0; i < 16; i++) { + network_addr.s6_addr[i] = base_ip.s6_addr[i] & subnet_mask.s6_addr[i]; + } + + // Calculate the first and last IPs in the range + ip_min = network_addr; + ip_max = network_addr; + for (int i = 15; i >= subnet_mask_len / 8; i--) { + ip_max.s6_addr[i] = 0xff; + } + + // Check if IP is within range + for (int i = 0; i < 16; i++) { + if (ip->s6_addr[i] < ip_min.s6_addr[i] || ip->s6_addr[i] > ip_max.s6_addr[i]) { + return false; + } + } + return true; } long get_rss() { #if defined(__linux__) - long rss = 0; - std::ifstream statm("/proc/self/statm"); - if (statm.is_open()) { - long dummy; // to skip other fields - statm >> dummy >> rss; // skip the first field (size) and read rss - statm.close(); - } else { - std::cerr << "Unable to open /proc/self/statm" << std::endl; - } - return rss * sysconf(_SC_PAGESIZE); // mul + long rss = 0; + std::ifstream statm("/proc/self/statm"); + if (statm.is_open()) { + long dummy; // to skip other fields + statm >> dummy >> rss; // skip the first field (size) and read rss + statm.close(); + } else { + std::cerr << "Unable to open /proc/self/statm" << std::endl; + } + return rss * sysconf(_SC_PAGESIZE); // mul // multiply by page size to get bytes #elif defined(__APPLE__) - mach_msg_type_number_t infoCount = TASK_BASIC_INFO_COUNT; - task_basic_info_data_t taskInfo; - - if (KERN_SUCCESS != task_info(mach_task_self(), - TASK_BASIC_INFO, - (task_info_t)&taskInfo, - &infoCount)) { - std::cerr << "Failed to get task info" << std::endl; - return -1; // Indicate failure - } else { - // taskInfo.resident_size is in bytes - return taskInfo.resident_size; - } + mach_msg_type_number_t infoCount = TASK_BASIC_INFO_COUNT; + task_basic_info_data_t taskInfo; + + if (KERN_SUCCESS != task_info(mach_task_self(), TASK_BASIC_INFO, (task_info_t)&taskInfo, &infoCount)) { + std::cerr << "Failed to get task info" << std::endl; + return -1; // Indicate failure + } else { + // taskInfo.resident_size is in bytes + return taskInfo.resident_size; + } #else - PROCESS_MEMORY_COUNTERS pmc; - if (GetProcessMemoryInfo(GetCurrentProcess(), &pmc, sizeof(pmc))) { - return pmc.WorkingSetSize; // RSS in bytes - } else { - std::cerr << "Failed to get process memory info" << std::endl; - return -1; // Indicate failure - } + PROCESS_MEMORY_COUNTERS pmc; + if (GetProcessMemoryInfo(GetCurrentProcess(), &pmc, sizeof(pmc))) { + return pmc.WorkingSetSize; // RSS in bytes + } else { + std::cerr << "Failed to get process memory info" << std::endl; + return -1; // Indicate failure + } +#endif +} +void serialize_to_file(WASMExecEnv *instance) { + // gateway + auto start = std::chrono::high_resolution_clock::now(); + if (snapshot_memory == 0) + snapshot_memory = get_rss(); +#if WASM_ENABLE_LIB_PTHREAD != 0 + auto cluster = wasm_exec_env_get_cluster(instance); + auto all_count = bh_list_length(&cluster->exec_env_list); + // fill vector + + std::unique_lock as_ul(wamr->as_mtx); + SPDLOG_DEBUG("get lock"); + wamr->ready++; + wamr->lwcp_list[((uint64_t)instance->handle)]++; + if (wamr->ready == all_count) { + wamr->should_snapshot = true; + } + // If we're not all ready + SPDLOG_DEBUG("thread {}, with {} ready out of {} total", ((uint64_t)instance->handle), wamr->ready, all_count); +#endif +#if !defined(_WIN32) + if (!wamr->socket_fd_map_.empty() && wamr->should_snapshot) { + // tell gateway to keep alive the server + struct sockaddr_in addr {}; + int fd = 0; + ssize_t rc; + SocketAddrPool src_addr{}; + bool is_server = false; + for (auto [tmp_fd, sock_data] : wamr->socket_fd_map_) { + if (sock_data.is_server) { + is_server = true; + break; + } + } + wamr->op_data.op = is_server ? MVVM_SOCK_SUSPEND_TCP_SERVER : MVVM_SOCK_SUSPEND; + + for (auto [tmp_fd, sock_data] : wamr->socket_fd_map_) { + int idx = wamr->op_data.size; + src_addr = sock_data.socketAddress; + auto tmp_ip4 = + fmt::format("{}.{}.{}.{}", src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], src_addr.ip4[3]); + auto tmp_ip6 = + fmt::format("{}:{}:{}:{}:{}:{}:{}:{}", src_addr.ip6[0], src_addr.ip6[1], src_addr.ip6[2], + src_addr.ip6[3], src_addr.ip6[4], src_addr.ip6[5], src_addr.ip6[6], src_addr.ip6[7]); + if (src_addr.is_4 && tmp_ip4 == "0.0.0.0" || !src_addr.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") { + src_addr = wamr->local_addr; + src_addr.port = sock_data.socketAddress.port; + } + SPDLOG_INFO("addr: {} {}.{}.{}.{} port: {}", tmp_fd, src_addr.ip4[0], src_addr.ip4[1], src_addr.ip4[2], + src_addr.ip4[3], src_addr.port); + // make the rest coroutine? + tmp_ip4 = fmt::format("{}.{}.{}.{}", sock_data.socketSentToData.dest_addr.ip.ip4[0], + sock_data.socketSentToData.dest_addr.ip.ip4[1], + sock_data.socketSentToData.dest_addr.ip.ip4[2], + sock_data.socketSentToData.dest_addr.ip.ip4[3]); + tmp_ip6 = fmt::format( + "{}:{}:{}:{}:{}:{}:{}:{}", sock_data.socketSentToData.dest_addr.ip.ip6[0], + sock_data.socketSentToData.dest_addr.ip.ip6[1], sock_data.socketSentToData.dest_addr.ip.ip6[2], + sock_data.socketSentToData.dest_addr.ip.ip6[3], sock_data.socketSentToData.dest_addr.ip.ip6[4], + sock_data.socketSentToData.dest_addr.ip.ip6[5], sock_data.socketSentToData.dest_addr.ip.ip6[6], + sock_data.socketSentToData.dest_addr.ip.ip6[7]); + if (tmp_ip4 == "0.0.0.0" || tmp_ip6 == "0:0:0:0:0:0:0:0") { + if (!wamr->op_data.is_tcp) { + if (sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip4 == "0.0.0.0" || + !sock_data.socketSentToData.dest_addr.ip.is_4 && tmp_ip6 == "0:0:0:0:0:0:0:0") { + wamr->op_data.addr[idx][1].is_4 = sock_data.socketRecvFromDatas[0].src_addr.ip.is_4; + std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketRecvFromDatas[0].src_addr.ip.ip4, + sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip4)); + std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketRecvFromDatas[0].src_addr.ip.ip6, + sizeof(sock_data.socketRecvFromDatas[0].src_addr.ip.ip6)); + wamr->op_data.addr[idx][1].port = sock_data.socketRecvFromDatas[0].src_addr.port; + } else { + wamr->op_data.addr[idx][1].is_4 = sock_data.socketSentToData.dest_addr.ip.is_4; + std::memcpy(wamr->op_data.addr[idx][1].ip4, sock_data.socketSentToData.dest_addr.ip.ip4, + sizeof(sock_data.socketSentToData.dest_addr.ip.ip4)); + std::memcpy(wamr->op_data.addr[idx][1].ip6, sock_data.socketSentToData.dest_addr.ip.ip6, + sizeof(sock_data.socketSentToData.dest_addr.ip.ip6)); + wamr->op_data.addr[idx][1].port = sock_data.socketSentToData.dest_addr.port; + } + } else { + // if it's not socket + if (!is_server) { + int tmp_fd = 0; + unsigned int size_ = sizeof(sockaddr_in); + sockaddr_in *ss = (sockaddr_in *)malloc(size_); + wamr->invoke_sock_getsockname(tmp_fd, (sockaddr **)&ss, &size_); + if (ss->sin_family == AF_INET) { + auto *ipv4 = (struct sockaddr_in *)ss; + uint32_t ip = ntohl(ipv4->sin_addr.s_addr); + wamr->op_data.addr[idx][1].is_4 = true; + wamr->op_data.addr[idx][1].ip4[0] = (ip >> 24) & 0xFF; + wamr->op_data.addr[idx][1].ip4[1] = (ip >> 16) & 0xFF; + wamr->op_data.addr[idx][1].ip4[2] = (ip >> 8) & 0xFF; + wamr->op_data.addr[idx][1].ip4[3] = ip & 0xFF; + wamr->op_data.addr[idx][1].port = ntohs(ipv4->sin_port); + } else { + auto *ipv6 = (struct sockaddr_in6 *)ss; + wamr->op_data.addr[idx][1].is_4 = false; + const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr; + for (int i = 0; i < 16; i += 2) { + wamr->op_data.addr[idx][1].ip6[i / 2] = (bytes[i] << 8) + bytes[i + 1]; + } + wamr->op_data.addr[idx][1].port = ntohs(ipv6->sin6_port); + } + free(ss); + } else if (sock_data.is_server) { + wamr->op_data.size--; + } + } + } + SPDLOG_DEBUG("dest_addr: {}.{}.{}.{}:{}", wamr->op_data.addr[idx][1].ip4[0], + wamr->op_data.addr[idx][1].ip4[1], wamr->op_data.addr[idx][1].ip4[2], + wamr->op_data.addr[idx][1].ip4[3], wamr->op_data.addr[idx][1].port); + wamr->op_data.size += 1; + } + // Create a socket + if ((fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + SPDLOG_ERROR("socket error"); + throw std::runtime_error("socket error"); + } + + addr.sin_family = AF_INET; + addr.sin_port = htons(MVVM_SOCK_PORT); + + // Convert IPv4 and IPv6 addresses from text to binary form + if (inet_pton(AF_INET, MVVM_SOCK_ADDR, &addr.sin_addr) <= 0) { + SPDLOG_ERROR("AF_INET not supported"); + exit(EXIT_FAILURE); + } + // Connect to the server + if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + SPDLOG_ERROR("Connection Failed {}", errno); + exit(EXIT_FAILURE); + } + + SPDLOG_DEBUG("Connected successfully"); + rc = send(fd, &wamr->op_data, sizeof(struct mvvm_op_data), 0); + if (rc == -1) { + SPDLOG_ERROR("send error"); + exit(EXIT_FAILURE); + } + + // Clean up + close(fd); + } +#endif +#if WASM_ENABLE_LIB_PTHREAD != 0 + if (wamr->ready < all_count) { + // Then wait for someone else to get here and finish the job + std::condition_variable as_cv; + as_cv.wait(as_ul); + } + wasm_cluster_suspend_all_except_self(cluster, instance); + auto elem = (WASMExecEnv *)bh_list_first_elem(&cluster->exec_env_list); + while (elem) { + instance = elem; +#endif // windows has no threads so only does it once + auto a = new WAMRExecEnv(); + dump(a, instance); + as.emplace_back(a); +#if WASM_ENABLE_LIB_PTHREAD != 0 + elem = (WASMExecEnv *)bh_list_elem_next(elem); + } + // finish filling vector #endif + auto used_memory = get_rss(); + struct_pack::serialize_to(*writer, as); + auto end = std::chrono::high_resolution_clock::now(); + // get duration in us + auto dur = std::chrono::duration_cast(end - start); + SPDLOG_INFO("Snapshot time: {} s", dur.count() / 1000000.0); + SPDLOG_INFO("Memory usage: {} MB", (used_memory - snapshot_memory) / 1024 / 1024); + exit(EXIT_SUCCESS); } \ No newline at end of file