Skip to content

Commit

Permalink
Add send timer
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Jan 27, 2024
1 parent 34262cf commit dce471b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 28 deletions.
6 changes: 5 additions & 1 deletion include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#include "wamr_read_write.h"
#include "wamr_wasi_context.h"
#include "wasm_runtime.h"
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iterator>
#include <mutex>
#include <numeric>
#include <ranges>
#include <semaphore>
#include <tuple>
Expand All @@ -43,7 +45,7 @@ class WAMRInstance {
std::vector<const char *> ns_pool_{};
std::map<int, std::tuple<std::string, std::vector<std::tuple<int, int, fd_op>>>> fd_map_{};
// add offset to pair->tuple, 3rd param 'int'
std::map<int,int> new_sock_map_{};
std::map<int, int> new_sock_map_{};
std::map<int, SocketMetaData, std::greater<int>> socket_fd_map_{};
SocketAddrPool local_addr{};
// lwcp is LightWeight CheckPoint
Expand Down Expand Up @@ -80,6 +82,8 @@ class WAMRInstance {
bool replace_mfence_with_nop();
bool replace_nop_with_int3();
bool replace_switch_with_nop();
std::chrono::time_point<std::chrono::high_resolution_clock> time;
std::vector<long long> latencies;
WASMFunction *get_func();
void set_func(WASMFunction *);
#if WASM_ENABLE_AOT != 0
Expand Down
25 changes: 19 additions & 6 deletions src/wamr_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ void insert_sock_send_to_data(uint32_t sock, uint8 *si_data, uint32 si_data_len,

void insert_sock_recv_from_data(uint32_t sock, uint8 *ri_data, uint32 ri_data_len, uint16_t ri_flags,
__wasi_addr_t *src_addr) {

if(wamr->time != std::chrono::high_resolution_clock::time_point())
wamr->latencies.emplace_back(
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::high_resolution_clock::now() - wamr->time)
.count());
wamr->time = std::chrono::high_resolution_clock::now();
if (wamr->latencies.size() % 1000 ==0) {
long long sum = std::accumulate(wamr->latencies.begin(), wamr->latencies.end(), 0LL);
double average_latency = static_cast<double>(sum) / wamr->latencies.size();
fprintf(stderr, "average latency %f\n", average_latency);


}
if (wamr->socket_fd_map_.find(sock) != wamr->socket_fd_map_.end()) {
WasiSockRecvFromData recvFromData{};
recvFromData.sock = sock;
Expand Down Expand Up @@ -145,8 +158,8 @@ int get_sock_fd(int fd) {
/** fopen, fseek, fwrite, fread */
void insert_fd(int fd, const char *path, int flags, int offset, enum fd_op op) {
if (fd > 2) {
LOGV(INFO) << "insert_fd(fd,filename,flags, offset) fd:" << fd << " flags:" << flags << " offset:" << offset
<< " op:" << op;
// LOGV(INFO) << "insert_fd(fd,filename,flags, offset) fd:" << fd << " flags:" << flags << " offset:" << offset
// << " op:" << op;
std::string path_;
std::vector<std::tuple<int, int, enum fd_op>> ops_;
std::tie(path_, ops_) = wamr->fd_map_[fd];
Expand Down Expand Up @@ -234,7 +247,7 @@ void update_socket_fd_address(int fd, SocketAddrPool *address) {
wamr->socket_fd_map_[fd].socketAddress.ip6[7] = address->ip6[7];
}
}

#if !defined(_WIN32)
void init_gateway(SocketAddrPool *address) {
// tell gateway to keep alive the server
if (wamr->op_data.op != MVVM_SOCK_RESUME && wamr->op_data.op != MVVM_SOCK_RESUME_TCP_SERVER) {
Expand Down Expand Up @@ -276,7 +289,7 @@ void init_gateway(SocketAddrPool *address) {
close(fd);
}
}

#endif
#if defined(__APPLE__)
int gettid() {
uint64_t tid;
Expand All @@ -288,8 +301,8 @@ int gettid() { return GetCurrentThreadId(); }
#endif

void insert_sync_op(wasm_exec_env_t exec_env, uint32 *mutex, enum sync_op locking) {
printf("insert sync on offset %d, as op: %d\n", *mutex, locking);
struct sync_op_t sync_op = {.tid = exec_env->cur_count, .ref = *mutex, .sync_op = locking};
// printf("insert sync on offset %d, as op: %d\n", *mutex, locking);
struct sync_op_t sync_op = {.tid = ((uint32)exec_env->cur_count), .ref = *mutex, .sync_op = locking};
wamr->sync_ops.push_back(sync_op);
}

Expand Down
43 changes: 22 additions & 21 deletions src/wamr_wasi_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,29 +120,30 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
int r;

if (!wamr->should_snapshot_socket) {
#if !defined(_WIN32)
#if defined(_WIN32)
wamr->should_snapshot_socket = true;
#endif
for (auto [fd, res] : this->fd_map) {
// differ from path from file
auto path = std::get<0>(res);
LOGV(INFO) << "fd: " << fd << " path: " << path;
for (auto [flags, offset, op] : std::get<1>(res)) {
for (auto [fd, res] : this->fd_map) {
// differ from path from file
LOGV(INFO) << "fd: " << fd << " path: " << path << " flag: " << flags << " op: " << op;
switch (op) {
case MVVM_FOPEN:
r = wamr->invoke_fopen(path, fd);
LOGV(ERROR) << r;
if (r != fd)
wamr->invoke_frenumber(r, fd);
wamr->fd_map_[fd] = res;
break;
case MVVM_FWRITE:
case MVVM_FREAD:
case MVVM_FSEEK:
wamr->invoke_fseek(fd, flags);
break;
auto path = std::get<0>(res);
LOGV(INFO) << "fd: " << fd << " path: " << path;
for (auto [flags, offset, op] : std::get<1>(res)) {
// differ from path from file
LOGV(INFO) << "fd: " << fd << " path: " << path << " flag: " << flags << " op: " << op;
switch (op) {
case MVVM_FOPEN:
r = wamr->invoke_fopen(path, fd);
LOGV(ERROR) << r;
if (r != fd)
wamr->invoke_frenumber(r, fd);
wamr->fd_map_[fd] = res;
break;
case MVVM_FWRITE:
case MVVM_FREAD:
case MVVM_FSEEK:
wamr->invoke_fseek(fd, flags);
break;
}
}
}
}
Expand Down Expand Up @@ -203,7 +204,7 @@ void WAMRWASIContext::restore_impl(WASIArguments *env) {
if (is_tcp_server) {
wamr->new_sock_map_[fd] =
wamr->invoke_sock_accept(old_fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));

// This ip should be old ip?
} else {
wamr->invoke_sock_connect(fd, (struct sockaddr *)&sockaddr4, sizeof(sockaddr4));
Expand Down

0 comments on commit dce471b

Please sign in to comment.