From 54f176bd6dda4f84e20fde4ba87616088fc70a6b Mon Sep 17 00:00:00 2001 From: victoryang00 Date: Fri, 26 Jan 2024 06:49:35 +0000 Subject: [PATCH] TCP works --- gateway/main.cpp | 4 +- include/wamr.h | 4 +- include/wamr_export.h | 1 + include/wamr_wasi_context.h | 2 +- src/wamr.cpp | 2 +- src/wamr_export.cpp | 34 ++++++++++------ src/wamr_wasi_context.cpp | 80 ++++++++++++++++++++++++++----------- test/tcp_server.c | 8 ++-- 8 files changed, 89 insertions(+), 46 deletions(-) diff --git a/gateway/main.cpp b/gateway/main.cpp index b28ba44..2470152 100644 --- a/gateway/main.cpp +++ b/gateway/main.cpp @@ -484,7 +484,7 @@ int main() { new_client = socket(AF_INET, SOCK_STREAM, 0); to_start.new_client = new_client; address.sin_family = AF_INET; - address.sin_port = htons(client_port); + address.sin_port = htons(server_port); if (inet_pton(AF_INET, std::get<2>(tmp_tuple).c_str(), &address.sin_addr) <= 0) { LOGV(ERROR) << "Invalid address/ Address not supported"; exit(EXIT_FAILURE); @@ -538,7 +538,7 @@ int main() { }); to_start.new_server = new_server; to_start.is_sleep = false; - tcp_pair[fmt::format("{}:{}", server_ip, server_port)] = to_start; + tcp_pair[fmt::format("{}:{}", std::get<2>(tmp_tuple), server_port)] = to_start; } else is_forward = true; break; diff --git a/include/wamr.h b/include/wamr.h index b792055..0f3de08 100644 --- a/include/wamr.h +++ b/include/wamr.h @@ -16,6 +16,7 @@ #include "wamr_wasi_context.h" #include "wasm_runtime.h" #include +#include #include #include #include @@ -42,7 +43,8 @@ class WAMRInstance { std::vector ns_pool_{}; std::map>>> fd_map_{}; // add offset to pair->tuple, 3rd param 'int' - std::map socket_fd_map_{}; + std::map new_sock_map_{}; + std::map> socket_fd_map_{}; SocketAddrPool local_addr{}; // lwcp is LightWeight CheckPoint std::map lwcp_list; diff --git a/include/wamr_export.h b/include/wamr_export.h index a4331e5..e7d254b 100644 --- a/include/wamr_export.h +++ b/include/wamr_export.h @@ -39,6 +39,7 @@ void insert_socket(int, int, int, int); void update_socket_fd_address(int, struct SocketAddrPool *); void init_gateway(struct SocketAddrPool *address); void set_tcp(); +int get_sock_fd(int); void insert_sync_op(wasm_exec_env_t exec_env, uint32 *mutex, enum sync_op locking); void restart_execution(uint32 targs); extern int pthread_create_wrapper(wasm_exec_env_t exec_env, uint32 *thread, const void *attr, uint32 elem_index, diff --git a/include/wamr_wasi_context.h b/include/wamr_wasi_context.h index 24de88a..8df8e48 100644 --- a/include/wamr_wasi_context.h +++ b/include/wamr_wasi_context.h @@ -67,7 +67,7 @@ struct SocketMetaData { }; struct WAMRWASIContext { std::map>>> fd_map; - std::map> socket_fd_map; + std::map socket_fd_map; std::vector sync_ops; std::vector dir; std::vector map_dir; diff --git a/src/wamr.cpp b/src/wamr.cpp index 893a748..02a7d0e 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -70,7 +70,7 @@ WAMRInstance::WAMRInstance(const char *wasm_path, bool is_jit) : is_jit(is_jit) LOGV(ERROR) << "Init runtime environment failed.\n"; throw; } - initialiseWAMRNatives(); + // initialiseWAMRNatives(); char *buffer{}; if (!load_wasm_binary(wasm_path, &buffer)) { LOGV(ERROR) << "Load wasm binary failed.\n"; diff --git a/src/wamr_export.cpp b/src/wamr_export.cpp index 373f84c..81162b1 100644 --- a/src/wamr_export.cpp +++ b/src/wamr_export.cpp @@ -135,6 +135,13 @@ void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, unsigned long *r #endif // only support one type of requests at a time void set_tcp() { wamr->op_data.is_tcp = true; } +int get_sock_fd(int fd) { + if (wamr->socket_fd_map_.find(fd) != wamr->socket_fd_map_.end()) + return wamr->new_sock_map_[fd]; + else + return fd; +}; + /** fopen, fseek, fwrite, fread */ void insert_fd(int fd, const char *path, int flags, int offset, enum fd_op op) { if (fd > 2) { @@ -180,6 +187,7 @@ void remove_fd(int fd) { LOGV(ERROR) << "fd not found " << fd; } } + /* create fd-socketmetadata map and store the "domain", "type", "protocol" value */ @@ -226,7 +234,7 @@ void update_socket_fd_address(int fd, SocketAddrPool *address) { wamr->socket_fd_map_[fd].socketAddress.ip6[7] = address->ip6[7]; } } -#if !defined(_WIN32) + void init_gateway(SocketAddrPool *address) { // tell gateway to keep alive the server if (wamr->op_data.op != MVVM_SOCK_RESUME && wamr->op_data.op != MVVM_SOCK_RESUME_TCP_SERVER) { @@ -268,7 +276,7 @@ void init_gateway(SocketAddrPool *address) { close(fd); } } -#endif + #if defined(__APPLE__) int gettid() { uint64_t tid; @@ -281,7 +289,7 @@ int gettid() { return GetCurrentThreadId(); } void insert_sync_op(wasm_exec_env_t exec_env, uint32 *mutex, enum sync_op locking) { printf("insert sync on offset %d, as op: %d\n", *mutex, locking); - struct sync_op_t sync_op = {.tid = ((uint32)exec_env->cur_count), .ref = *mutex, .sync_op = locking}; + struct sync_op_t sync_op = {.tid = exec_env->cur_count, .ref = *mutex, .sync_op = locking}; wamr->sync_ops.push_back(sync_op); } @@ -290,11 +298,11 @@ void lightweight_checkpoint(WASMExecEnv *exec_env) { if (((AOTFrame *)exec_env->cur_frame)) { fid = (((AOTFrame *)exec_env->cur_frame)->func_index); } - // LOGV(DEBUG) << "checkpoint " << gettid() << " func(" << fid << ")"; - // if (fid == -1) { - // LOGV(DEBUG) << "skip checkpoint"; - // return; - // } + LOGV(DEBUG) << "checkpoint " << gettid() << " func(" << fid << ")"; + if (fid == -1) { + LOGV(DEBUG) << "skip checkpoint"; + return; + } std::unique_lock as_ul(wamr->as_mtx); wamr->lwcp_list[gettid()]++; @@ -306,11 +314,11 @@ void lightweight_uncheckpoint(WASMExecEnv *exec_env) { if (((AOTFrame *)exec_env->cur_frame)) { fid = (((AOTFrame *)exec_env->cur_frame)->func_index); } - // LOGV(DEBUG) << "uncheckpoint " << gettid() << " func(" << fid << ")"; - // if (fid == -1) { - // LOGV(DEBUG) << "skip uncheckpoint"; - // return; - // } + LOGV(DEBUG) << "uncheckpoint " << gettid() << " func(" << fid << ")"; + if (fid == -1) { + LOGV(DEBUG) << "skip uncheckpoint"; + return; + } std::unique_lock as_ul(wamr->as_mtx); if (wamr->lwcp_list[gettid()] == 0) { // someone has reset our counter diff --git a/src/wamr_wasi_context.cpp b/src/wamr_wasi_context.cpp index 697a06a..e2ab583 100644 --- a/src/wamr_wasi_context.cpp +++ b/src/wamr_wasi_context.cpp @@ -148,47 +148,79 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) { wamr->should_snapshot_socket = true; bool is_tcp_server = false; int old_fd = 0; + int res; + bool is_4 = false; for (auto [fd, socketMetaData] : this->socket_fd_map) { wamr->op_data.is_tcp |= socketMetaData.type; is_tcp_server |= socketMetaData.is_server; + is_4 |= socketMetaData.socketAddress.is_4; if (!socketMetaData.is_server) { old_fd = fd; } } is_tcp_server &= wamr->op_data.is_tcp; - for (auto [fd, socketMetaData] : this->socket_fd_map) { - // udp? - if (!wamr->op_data.is_tcp) { // udp - auto res = - wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd); - } else if (socketMetaData.socketAddress.is_4) { - struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress); - inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr); + for (auto [fd, socketMetaData] : this->socket_fd_map) { + if (!socketMetaData.is_server) { + if (!wamr->op_data.is_tcp) { // udp + res = + wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd); + } else if (socketMetaData.socketAddress.is_4 || is_4) { + struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress); + inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr); - socklen_t sockaddr4_size = sizeof(sockaddr4); + socklen_t sockaddr4_size = sizeof(sockaddr4); - if (is_tcp_server && socketMetaData.is_server) { - wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4)); - // This ip should be old ip? - } else { wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4)); - } - } else { - struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress); - inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr); - - socklen_t sockaddr6_size = sizeof(sockaddr6); - if (is_tcp_server && socketMetaData.is_server) { - wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)); } else { + struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress); + inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr); + + socklen_t sockaddr6_size = sizeof(sockaddr6); wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)); } + + // renumber or not? + // LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd; + wamr->socket_fd_map_[fd] = socketMetaData; } + } + for (auto [fd, socketMetaData] : this->socket_fd_map) { + if (socketMetaData.is_server) { + if (!wamr->op_data.is_tcp) { // udp + res = + wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd); + } else if (socketMetaData.socketAddress.is_4 || is_4) { + struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress); + inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr); - // renumber or not? - // LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd; - wamr->socket_fd_map_[fd] = socketMetaData; + socklen_t sockaddr4_size = sizeof(sockaddr4); + + if (is_tcp_server) { + wamr->new_sock_map_[fd] = + wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4)); + + // This ip should be old ip? + } else { + wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4)); + } + } else { + struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress); + inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr); + + socklen_t sockaddr6_size = sizeof(sockaddr6); + if (is_tcp_server) { + wamr->new_sock_map_[fd] = + wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)); + } else { + wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6)); + } + } + + // renumber or not? + // LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd; + wamr->socket_fd_map_[fd] = socketMetaData; + } } } #endif diff --git a/test/tcp_server.c b/test/tcp_server.c index 3628c09..909d2b9 100644 --- a/test/tcp_server.c +++ b/test/tcp_server.c @@ -72,10 +72,10 @@ void init_connect(int socket_fd) { init_sockaddr_inet((struct sockaddr_in *)&addr); printf("[Server] Create socket\n"); - socket_fd = socket(af, SOCK_STREAM, 0); - if (socket_fd < 0) { - perror("Create socket failed"); - exit(-1); + int ret = socket(AF_INET, SOCK_STREAM, 0); + while (ret != socket_fd) { + ret = socket(AF_INET, SOCK_STREAM, 0); + printf("connect %d %d\n", ret, socket_fd); } printf("[Server] Bind socket\n");