Skip to content

Commit

Permalink
TCP works
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 26, 2024
1 parent 135801a commit 54f176b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 46 deletions.
4 changes: 2 additions & 2 deletions gateway/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ int main() {
new_client = socket(AF_INET, SOCK_STREAM, 0);
to_start.new_client = new_client;
address.sin_family = AF_INET;
address.sin_port = htons(client_port);
address.sin_port = htons(server_port);
if (inet_pton(AF_INET, std::get<2>(tmp_tuple).c_str(), &address.sin_addr) <= 0) {
LOGV(ERROR) << "Invalid address/ Address not supported";
exit(EXIT_FAILURE);
Expand Down Expand Up @@ -538,7 +538,7 @@ int main() {
});
to_start.new_server = new_server;
to_start.is_sleep = false;
tcp_pair[fmt::format("{}:{}", server_ip, server_port)] = to_start;
tcp_pair[fmt::format("{}:{}", std::get<2>(tmp_tuple), server_port)] = to_start;
} else
is_forward = true;
break;
Expand Down
4 changes: 3 additions & 1 deletion include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "wamr_wasi_context.h"
#include "wasm_runtime.h"
#include <condition_variable>
#include <functional>
#include <iterator>
#include <mutex>
#include <ranges>
Expand All @@ -42,7 +43,8 @@ class WAMRInstance {
std::vector<const char *> ns_pool_{};
std::map<int, std::tuple<std::string, std::vector<std::tuple<int, int, fd_op>>>> fd_map_{};
// add offset to pair->tuple, 3rd param 'int'
std::map<int, SocketMetaData> socket_fd_map_{};
std::map<int,int> new_sock_map_{};
std::map<int, SocketMetaData, std::greater<int>> socket_fd_map_{};
SocketAddrPool local_addr{};
// lwcp is LightWeight CheckPoint
std::map<ssize_t, int> lwcp_list;
Expand Down
1 change: 1 addition & 0 deletions include/wamr_export.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void insert_socket(int, int, int, int);
void update_socket_fd_address(int, struct SocketAddrPool *);
void init_gateway(struct SocketAddrPool *address);
void set_tcp();
int get_sock_fd(int);
void insert_sync_op(wasm_exec_env_t exec_env, uint32 *mutex, enum sync_op locking);
void restart_execution(uint32 targs);
extern int pthread_create_wrapper(wasm_exec_env_t exec_env, uint32 *thread, const void *attr, uint32 elem_index,
Expand Down
2 changes: 1 addition & 1 deletion include/wamr_wasi_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct SocketMetaData {
};
struct WAMRWASIContext {
std::map<int, std::tuple<std::string, std::vector<std::tuple<int, int, fd_op>>>> fd_map;
std::map<int, SocketMetaData, std::greater<int>> socket_fd_map;
std::map<int, SocketMetaData> socket_fd_map;
std::vector<struct sync_op_t> sync_ops;
std::vector<std::string> dir;
std::vector<std::string> map_dir;
Expand Down
2 changes: 1 addition & 1 deletion src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ WAMRInstance::WAMRInstance(const char *wasm_path, bool is_jit) : is_jit(is_jit)
LOGV(ERROR) << "Init runtime environment failed.\n";
throw;
}
initialiseWAMRNatives();
// initialiseWAMRNatives();
char *buffer{};
if (!load_wasm_binary(wasm_path, &buffer)) {
LOGV(ERROR) << "Load wasm binary failed.\n";
Expand Down
34 changes: 21 additions & 13 deletions src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ void replay_sock_recv_from_data(uint32_t sock, uint8 **ri_data, unsigned long *r
#endif
// only support one type of requests at a time
void set_tcp() { wamr->op_data.is_tcp = true; }
int get_sock_fd(int fd) {
if (wamr->socket_fd_map_.find(fd) != wamr->socket_fd_map_.end())
return wamr->new_sock_map_[fd];
else
return fd;
};

/** fopen, fseek, fwrite, fread */
void insert_fd(int fd, const char *path, int flags, int offset, enum fd_op op) {
if (fd > 2) {
Expand Down Expand Up @@ -180,6 +187,7 @@ void remove_fd(int fd) {
LOGV(ERROR) << "fd not found " << fd;
}
}

/*
create fd-socketmetadata map and store the "domain", "type", "protocol" value
*/
Expand Down Expand Up @@ -226,7 +234,7 @@ void update_socket_fd_address(int fd, SocketAddrPool *address) {
wamr->socket_fd_map_[fd].socketAddress.ip6[7] = address->ip6[7];
}
}
#if !defined(_WIN32)

void init_gateway(SocketAddrPool *address) {
// tell gateway to keep alive the server
if (wamr->op_data.op != MVVM_SOCK_RESUME && wamr->op_data.op != MVVM_SOCK_RESUME_TCP_SERVER) {
Expand Down Expand Up @@ -268,7 +276,7 @@ void init_gateway(SocketAddrPool *address) {
close(fd);
}
}
#endif

#if defined(__APPLE__)
int gettid() {
uint64_t tid;
Expand All @@ -281,7 +289,7 @@ int gettid() { return GetCurrentThreadId(); }

void insert_sync_op(wasm_exec_env_t exec_env, uint32 *mutex, enum sync_op locking) {
printf("insert sync on offset %d, as op: %d\n", *mutex, locking);
struct sync_op_t sync_op = {.tid = ((uint32)exec_env->cur_count), .ref = *mutex, .sync_op = locking};
struct sync_op_t sync_op = {.tid = exec_env->cur_count, .ref = *mutex, .sync_op = locking};
wamr->sync_ops.push_back(sync_op);
}

Expand All @@ -290,11 +298,11 @@ void lightweight_checkpoint(WASMExecEnv *exec_env) {
if (((AOTFrame *)exec_env->cur_frame)) {
fid = (((AOTFrame *)exec_env->cur_frame)->func_index);
}
// LOGV(DEBUG) << "checkpoint " << gettid() << " func(" << fid << ")";
// if (fid == -1) {
// LOGV(DEBUG) << "skip checkpoint";
// return;
// }
LOGV(DEBUG) << "checkpoint " << gettid() << " func(" << fid << ")";
if (fid == -1) {
LOGV(DEBUG) << "skip checkpoint";
return;
}

std::unique_lock as_ul(wamr->as_mtx);
wamr->lwcp_list[gettid()]++;
Expand All @@ -306,11 +314,11 @@ void lightweight_uncheckpoint(WASMExecEnv *exec_env) {
if (((AOTFrame *)exec_env->cur_frame)) {
fid = (((AOTFrame *)exec_env->cur_frame)->func_index);
}
// LOGV(DEBUG) << "uncheckpoint " << gettid() << " func(" << fid << ")";
// if (fid == -1) {
// LOGV(DEBUG) << "skip uncheckpoint";
// return;
// }
LOGV(DEBUG) << "uncheckpoint " << gettid() << " func(" << fid << ")";
if (fid == -1) {
LOGV(DEBUG) << "skip uncheckpoint";
return;
}
std::unique_lock as_ul(wamr->as_mtx);
if (wamr->lwcp_list[gettid()] == 0) {
// someone has reset our counter
Expand Down
80 changes: 56 additions & 24 deletions src/wamr_wasi_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,47 +148,79 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
wamr->should_snapshot_socket = true;
bool is_tcp_server = false;
int old_fd = 0;
int res;
bool is_4 = false;
for (auto [fd, socketMetaData] : this->socket_fd_map) {
wamr->op_data.is_tcp |= socketMetaData.type;
is_tcp_server |= socketMetaData.is_server;
is_4 |= socketMetaData.socketAddress.is_4;
if (!socketMetaData.is_server) {
old_fd = fd;
}
}
is_tcp_server &= wamr->op_data.is_tcp;
for (auto [fd, socketMetaData] : this->socket_fd_map) {
// udp?
if (!wamr->op_data.is_tcp) { // udp
auto res =
wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd);

} else if (socketMetaData.socketAddress.is_4) {
struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr);
for (auto [fd, socketMetaData] : this->socket_fd_map) {
if (!socketMetaData.is_server) {
if (!wamr->op_data.is_tcp) { // udp
res =
wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd);
} else if (socketMetaData.socketAddress.is_4 || is_4) {
struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr);

socklen_t sockaddr4_size = sizeof(sockaddr4);
socklen_t sockaddr4_size = sizeof(sockaddr4);

if (is_tcp_server && socketMetaData.is_server) {
wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));
// This ip should be old ip?
} else {
wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));
}
} else {
struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr);

socklen_t sockaddr6_size = sizeof(sockaddr6);
if (is_tcp_server && socketMetaData.is_server) {
wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6));
} else {
struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr);

socklen_t sockaddr6_size = sizeof(sockaddr6);
wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6));
}

// renumber or not?
// LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd;
wamr->socket_fd_map_[fd] = socketMetaData;
}
}
for (auto [fd, socketMetaData] : this->socket_fd_map) {
if (socketMetaData.is_server) {
if (!wamr->op_data.is_tcp) { // udp
res =
wamr->invoke_sock_open(socketMetaData.domain, socketMetaData.type, socketMetaData.protocol, fd);
} else if (socketMetaData.socketAddress.is_4 || is_4) {
struct sockaddr_in sockaddr4 = sockaddr_from_ip4(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr4.sin_addr);

// renumber or not?
// LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd;
wamr->socket_fd_map_[fd] = socketMetaData;
socklen_t sockaddr4_size = sizeof(sockaddr4);

if (is_tcp_server) {
wamr->new_sock_map_[fd] =
wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));

// This ip should be old ip?
} else {
wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));
}
} else {
struct sockaddr_in6 sockaddr6 = sockaddr_from_ip6(socketMetaData.socketAddress);
inet_pton(AF_INET, MVVM_SOCK_ADDR6, &sockaddr6.sin6_addr);

socklen_t sockaddr6_size = sizeof(sockaddr6);
if (is_tcp_server) {
wamr->new_sock_map_[fd] =
wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6));
} else {
wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr6, sizeof(sockaddr6));
}
}

// renumber or not?
// LOGV(INFO) << "tmp_sock_fd " << res << " fd" << fd;
wamr->socket_fd_map_[fd] = socketMetaData;
}
}
}
#endif
Expand Down
8 changes: 4 additions & 4 deletions test/tcp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ void init_connect(int socket_fd) {
init_sockaddr_inet((struct sockaddr_in *)&addr);

printf("[Server] Create socket\n");
socket_fd = socket(af, SOCK_STREAM, 0);
if (socket_fd < 0) {
perror("Create socket failed");
exit(-1);
int ret = socket(AF_INET, SOCK_STREAM, 0);
while (ret != socket_fd) {
ret = socket(AF_INET, SOCK_STREAM, 0);
printf("connect %d %d\n", ret, socket_fd);
}

printf("[Server] Bind socket\n");
Expand Down

0 comments on commit 54f176b

Please sign in to comment.