Skip to content

Commit

Permalink
tcp recv is not blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 18, 2024
1 parent b82863a commit c017f62
Show file tree
Hide file tree
Showing 7 changed files with 348 additions and 287 deletions.
532 changes: 294 additions & 238 deletions gateway/main.cpp

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class WAMRInstance {
int invoke_sock_open(uint32_t domain, uint32_t socktype, uint32_t protocol, uint32_t sockfd);
int invoke_sock_listen(uint32_t sockfd, uint32_t fd);
int invoke_sock_bind(uint32_t sockfd, struct sockaddr *sock, socklen_t sock_size);
int invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t sock_size);
int invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t *sock_size);
int invoke_recv(int sockfd, uint8 **buf, size_t len, int flags);
int invoke_recvfrom(int sockfd, uint8 **buf, size_t len, int flags, struct sockaddr *src_addr, socklen_t *addrlen);
~WAMRInstance();
Expand Down
21 changes: 11 additions & 10 deletions src/checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
#include "wamr.h"
#include "wamr_wasi_context.h"
#include "wasm_runtime.h"
#if !defined(_WIN32)
#include "thread_manager.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#endif
#include <condition_variable>
#include <cstdio>
#include <cxxopts.hpp>
Expand All @@ -22,6 +17,11 @@
#include <string>
#include <thread>
#include <tuple>
#if !defined(_WIN32)
#include "thread_manager.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#endif

WAMRInstance *wamr = nullptr;
std::ostringstream re{};
Expand Down Expand Up @@ -127,9 +127,10 @@ void serialize_to_file(WASMExecEnv *instance) {
wamr->op_data.addr[idx][1].port = sock_data.socketSentToData.dest_addr.port;
}
} else {
sockaddr *ss = (sockaddr *)malloc(sizeof(sockaddr));
wamr->invoke_sock_getsockname(fd, &ss, sizeof(*ss));
if (ss->sa_family == AF_INET) {
unsigned int size_ =sizeof(sockaddr_in);
sockaddr_in *ss = (sockaddr_in *)malloc(size_);
wamr->invoke_sock_getsockname(tmp_fd, (sockaddr**)&ss, &size_);
if (ss->sin_family <= AF_INET) {
auto *ipv4 = (struct sockaddr_in *)ss;
uint32_t ip = ntohl(ipv4->sin_addr.s_addr);
wamr->op_data.addr[idx][1].is_4 = true;
Expand All @@ -138,7 +139,7 @@ void serialize_to_file(WASMExecEnv *instance) {
wamr->op_data.addr[idx][1].ip4[2] = (ip >> 8) & 0xFF;
wamr->op_data.addr[idx][1].ip4[3] = ip & 0xFF;
wamr->op_data.addr[idx][1].port = ntohs(ipv4->sin_port);
} else if (ss->sa_family == AF_INET6) {
} else if (ss->sin_family > AF_INET) {
auto *ipv6 = (struct sockaddr_in6 *)ss;
wamr->op_data.addr[idx][1].is_4 = false;
const auto *bytes = (const uint8_t *)ipv6->sin6_addr.s6_addr;
Expand Down Expand Up @@ -181,7 +182,7 @@ void serialize_to_file(WASMExecEnv *instance) {
LOGV(ERROR) << "send error";
exit(EXIT_FAILURE);
}

// Clean up
close(fd);
}
Expand Down
27 changes: 15 additions & 12 deletions src/wamr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ int WAMRInstance::invoke_sock_bind(uint32_t sockfd, struct sockaddr *sock, sockl
char *buffer_ = nullptr;
uint32_t buffer_for_wasm;

buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast<void **>(&buffer_));
buffer_for_wasm = wasm_runtime_module_malloc(module_inst, sock_size, reinterpret_cast<void **>(&buffer_));
if (buffer_for_wasm != 0) {
uint32 argv[3];
memcpy(buffer_, sock, sizeof(sockaddr)); // use native address for accessing in runtime
Expand All @@ -299,7 +299,7 @@ int WAMRInstance::invoke_sock_bind(uint32_t sockfd, struct sockaddr *sock, sockl
}
return -1;
}
int WAMRInstance::invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t sock_size) {
int WAMRInstance::invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **sock, socklen_t *sock_size) {
auto name = "getsockname";
if (!(func = wasm_runtime_lookup_function(module_inst, name, nullptr))) {
LOGV(ERROR) << "The wasi " << name << " function is not found.";
Expand All @@ -323,20 +323,23 @@ int WAMRInstance::invoke_sock_getsockname(uint32_t sockfd, struct sockaddr **soc
}
}

char *buffer_ = nullptr;
uint32_t buffer_for_wasm;
char *buffer1_ = nullptr;
uint32_t buffer1_for_wasm;
char *buffer2_ = nullptr;
uint32_t buffer2_for_wasm;

buffer_for_wasm = wasm_runtime_module_malloc(module_inst, 100, reinterpret_cast<void **>(&buffer_));
if (buffer_for_wasm != 0) {
buffer1_for_wasm = wasm_runtime_module_malloc(module_inst, *sock_size, reinterpret_cast<void **>(&buffer1_));
buffer2_for_wasm = wasm_runtime_module_malloc(module_inst, sizeof(socklen_t), reinterpret_cast<void **>(&buffer2_));
if (buffer1_for_wasm != 0) {
uint32 argv[3];
memcpy(buffer_, *sock, sizeof(struct sockaddr_storage)); // use native address for accessing in runtime
argv[0] = sockfd; // pass the buffer_ address for WASM space
argv[1] = buffer_for_wasm; // the size of buffer_
argv[2] = sock_size; // O_RW | O_CREATE
memcpy(buffer1_, *sock, sizeof(struct sockaddr));
argv[0] = sockfd;
argv[1] = buffer1_for_wasm;
argv[2] = buffer2_for_wasm;
wasm_runtime_call_wasm(exec_env, func, 3, argv);
memcpy(*sock, buffer_, sizeof(struct sockaddr));
memcpy(*sock, buffer1_, sizeof(struct sockaddr));
int res = argv[0];
wasm_runtime_module_free(module_inst, buffer_for_wasm);
wasm_runtime_module_free(module_inst, buffer1_for_wasm);
return res;
}
return -1;
Expand Down
3 changes: 2 additions & 1 deletion src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_le
recvFromData.src_addr.port = src_addr->addr.ip6.port;
}
LOGV(ERROR) << "insert_sock_recv_from_data " << sock << " " << ((struct mvvm_op_data *)ri_data)->op;
if (((struct mvvm_op_data *)ri_data)->op == MVVM_SOCK_FIN) {
if (((struct mvvm_op_data *)ri_data)->op <= MVVM_SOCK_FIN) {
wamr->socket_fd_map_[sock].is_collection = false;
return;
}
Expand Down Expand Up @@ -184,6 +184,7 @@ void remove_fd(int fd) {
create fd-socketmetadata map and store the "domain", "type", "protocol" value
*/
void insert_socket(int fd, int domain, int type, int protocol) {
// if protocol == 1, is the remote protocol, we need to getsockname?
LOGV(INFO) << fmt::format("insert_socket(fd, domain, type, protocol) {} {} {} {}", fd, domain, type, protocol);

if (wamr->socket_fd_map_.find(fd) != wamr->socket_fd_map_.end()) {
Expand Down
42 changes: 18 additions & 24 deletions test/tcp_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,33 @@
#include <wasi_socket_ext.h>
#endif

static void
init_sockaddr_inet(struct sockaddr_in *addr)
{
static void init_sockaddr_inet(struct sockaddr_in *addr) {
/* 127.0.0.1:1234 */
addr->sin_family = AF_INET;
addr->sin_port = htons(1234);
addr->sin_addr.s_addr = my_inet_addr("172.17.0.1");
addr->sin_addr.s_addr = my_inet_addr("172.17.0.2");
}

static void
init_sockaddr_inet6(struct sockaddr_in6 *addr)
{
static void init_sockaddr_inet6(struct sockaddr_in6 *addr) {
/* [::1]:1234 */
addr->sin6_family = AF_INET6;
addr->sin6_port = htons(1234);
addr->sin6_addr = in6addr_loopback;
}

int
main(int argc, char *argv[])
{
int main(int argc, char *argv[]) {
int socket_fd, ret, total_size = 0, af;
char buffer[1024] = { 0 };
char ip_string[64] = { 0 };
char buffer[1024] = {0};
char ip_string[64] = {0};
socklen_t len;
struct sockaddr_storage server_address = { 0 };
struct sockaddr_storage local_address = { 0 };
struct sockaddr_storage server_address = {0};
struct sockaddr_storage local_address = {0};

if (argc > 1 && strcmp(argv[1], "inet6") == 0) {
af = AF_INET6;
len = sizeof(struct sockaddr_in6);
init_sockaddr_inet6((struct sockaddr_in6 *)&server_address);
}
else {
} else {
af = AF_INET;
len = sizeof(struct sockaddr_in);
init_sockaddr_inet((struct sockaddr_in *)&server_address);
Expand Down Expand Up @@ -76,9 +69,8 @@ main(int argc, char *argv[])
return EXIT_FAILURE;
}

if (sockaddr_to_string((struct sockaddr *)&local_address, ip_string,
sizeof(ip_string) / sizeof(ip_string[0]))
!= 0) {
if (sockaddr_to_string((struct sockaddr *)&local_address, ip_string, sizeof(ip_string) / sizeof(ip_string[0])) !=
0) {
printf("[Client] failed to parse local address\n");
close(socket_fd);
return EXIT_FAILURE;
Expand All @@ -87,18 +79,20 @@ main(int argc, char *argv[])
printf("[Client] Local address is: %s\n", ip_string);

printf("[Client] Client receive\n");
while (1) {
ret = recv(socket_fd, buffer + total_size, sizeof(buffer) - total_size,
0);
if (ret <= 0)
break;
for (int i = 0; i < 1024; i++) {
ret = recv(socket_fd, buffer + total_size, sizeof(buffer) - total_size, 0);
if (ret <= 0) {
sleep(1);
continue;
}
total_size += ret;
}

printf("[Client] %d bytes received:\n", total_size);
if (total_size > 0) {
printf("Buffer recieved:\n%s\n", buffer);
}
send(socket_fd, "Hello from client", 17, 0);

close(socket_fd);
printf("[Client] BYE \n");
Expand Down
8 changes: 7 additions & 1 deletion test/tcp_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ run(void *arg)
printf("[Server] Communicate with the new connection #%u @ %p ..\n",
new_socket, (void *)(uintptr_t)pthread_self());

for (i = 0; i < 1000; i++) {
for (i = 0; i < 1024; i++) {
if (send(new_socket, message, strlen(message), 0) < 0) {
perror("Send failed");
break;
}
sleep(10);
}
if(recv(new_socket, message, 1024, 0) < 0)
{
perror("recv failed");
}
printf("recv: %s\n", message);

printf("[Server] Shuting down the new connection #%u ..\n", new_socket);
shutdown(new_socket, SHUT_RDWR);
Expand Down

0 comments on commit c017f62

Please sign in to comment.