From 089eee8bc59ab093142de528d27ab79392815b8c Mon Sep 17 00:00:00 2001 From: victoryang00 Date: Wed, 20 Mar 2024 02:22:42 -0700 Subject: [PATCH] add rdma support --- .github/workflows/build-ubuntu.yml | 2 +- CMakeLists.txt | 1 + gateway/playground.cpp | 4 +- include/wamr_read_write.h | 977 +++++++++++++++++++++-------- src/checkpoint.cpp | 8 +- src/profile.cpp | 10 +- src/restore.cpp | 13 +- src/wamr.cpp | 10 +- src/wamr_native.cpp | 8 + 9 files changed, 773 insertions(+), 260 deletions(-) diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index f861fb8..d358134 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -21,7 +21,7 @@ jobs: - uses: actions/checkout@v2 - name: Install Dependency - run: sudo apt install libssl-dev gcc-13 g++-13 libspdlog-dev libfmt-dev llvm-14-dev libedit-dev libcxxopts-dev libpfm4-dev ninja-build libpcap-dev libopenblas-pthread-dev + run: sudo apt install libssl-dev gcc-13 g++-13 libspdlog-dev libfmt-dev llvm-14-dev libedit-dev libcxxopts-dev libpfm4-dev ninja-build libpcap-dev libopenblas-pthread-dev ibverbs-providers - name: Checkout run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index a58f9e5..529776b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,6 +41,7 @@ elseif (LINUX) /opt/OpenBLAS/include/ $ENV{BLAS_HOME}/include) message(${BLAS_INCLUDE_DIRS}) + set(WIN_EXTRA_LIBS ibverbs rdmacm) else () set(WAMR_BUILD_PLATFORM "windows") set(WAMR_BUILD_LIB_PTHREAD 1) diff --git a/gateway/playground.cpp b/gateway/playground.cpp index b005587..4c8d8d2 100644 --- a/gateway/playground.cpp +++ b/gateway/playground.cpp @@ -167,8 +167,8 @@ task SyncAndBlockTraffic(string src_ip, string dst_ip, short_word dstport, Sniffer(filter, iface, PacketHandler); // TCP connections setup - auto tcp_v_to_s = make_unique(src_ip, dst_ip, srcport, dstport, iface, TCPConnection::ESTABLISHED); - auto tcp_s_to_v = make_unique(dst_ip, src_ip, dstport, srcport, iface, TCPConnection::ESTABLISHED); + auto tcp_v_to_s = make_unique(src_ip, dst_ip, srcport, dstport, iface, 4); + auto tcp_s_to_v = make_unique(dst_ip, src_ip, dstport, srcport, iface, 4); // Synchronize the ACK and SEQ numbers tcp_v_to_s->Sync(); diff --git a/include/wamr_read_write.h b/include/wamr_read_write.h index e9a7af0..d6c8f04 100644 --- a/include/wamr_read_write.h +++ b/include/wamr_read_write.h @@ -9,10 +9,10 @@ * Copyright 2024 Regents of the Univeristy of California * UC Santa Cruz Sluglab. */ - #ifndef MVVM_WAMR_READ_WRITE_H #define MVVM_WAMR_READ_WRITE_H #include "struct_pack/struct_pack.hpp" +#include #include #ifndef _WIN32 #include @@ -21,43 +21,39 @@ #include #endif struct WriteStream { - virtual bool write(const char *data, std::size_t sz) const {}; + virtual bool write(const char *data, std::size_t sz) const { return false; }; + virtual ~WriteStream() = default; }; struct ReadStream { - virtual bool read(char *data, std::size_t sz) const {}; - virtual bool ignore(std::size_t sz) const {}; - virtual std::size_t tellg() const {}; + virtual bool read(char *data, std::size_t sz) const { return false; }; + virtual bool ignore(std::size_t sz) const { return false; }; + virtual std::size_t tellg() const { return 0; }; + virtual ~ReadStream() = default; }; struct FwriteStream : public WriteStream { FILE *file; - bool write(const char *data, std::size_t sz) const { return fwrite(data, sz, 1, file) == 1; } + bool write(const char *data, std::size_t sz) const override { return fwrite(data, sz, 1, file) == 1; } explicit FwriteStream(const char *file_name) : file(fopen(file_name, "wb")) {} - ~FwriteStream() { fclose(file); } + ~FwriteStream() override { fclose(file); } }; - struct FreadStream : public ReadStream { FILE *file; - bool read(char *data, std::size_t sz) const { return fread(data, sz, 1, file) == 1; } - bool ignore(std::size_t sz) const { return fseek(file, sz, SEEK_CUR) == 0; } - std::size_t tellg() const { - // if you worry about ftell performance, just use an variable to record it. - return ftell(file); - } + bool read(char *data, std::size_t sz) const override { return fread(data, sz, 1, file) == 1; } + bool ignore(std::size_t sz) const override { return fseek(file, sz, SEEK_CUR) == 0; } + std::size_t tellg() const override { return ftell(file); } explicit FreadStream(const char *file_name) : file(fopen(file_name, "rb")) {} - ~FreadStream() { fclose(file); } + ~FreadStream() override { fclose(file); } }; static_assert(ReaderStreamTrait, "Reader must conform to ReaderStreamTrait"); static_assert(WriterStreamTrait, "Writer must conform to WriterStreamTrait"); #ifndef _WIN32 struct SocketWriteStream : public WriteStream { - int sock_fd; // Socket file descriptor - - bool write(const char *data, std::size_t sz) const { + int sock_fd; + bool write(const char *data, std::size_t sz) const override { std::size_t totalSent = 0; while (totalSent < sz) { ssize_t sent = send(sock_fd, data + totalSent, sz - totalSent, 0); if (sent == -1) { - // Handle error. For simplicity, just returning false here. return false; } totalSent += sent; @@ -65,44 +61,34 @@ struct SocketWriteStream : public WriteStream { return true; } explicit SocketWriteStream(const char *address, int port) { - // Example: Initialize a client socket and connect to the given address and port sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { SPDLOG_ERROR("Socket creation failed\n"); return; } - sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); - // Convert address from text to binary form inet_pton(AF_INET, address, &server_addr.sin_addr); - if (connect(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { SPDLOG_ERROR("Connection failed\n"); close(sock_fd); exit(EXIT_FAILURE); } } - - ~SocketWriteStream() { - close(sock_fd); // Close the socket descriptor - } + ~SocketWriteStream() override { close(sock_fd); } }; struct SocketReadStream : public ReadStream { - int sock_fd; // Socket file descriptor + int sock_fd; int client_fd; - mutable std::size_t position = 0; // Track the amount of data read - - bool read(char *data, std::size_t sz) const { + mutable std::size_t position = 0; + bool read(char *data, std::size_t sz) const override { std::size_t totalReceived = 0; while (totalReceived < sz) { ssize_t received = recv(client_fd, data + totalReceived, sz - totalReceived, 0); if (received == -1) { - // Handle error. For simplicity, just returning false here. return false; } else if (received == 0) { - // Connection closed return false; } totalReceived += received; @@ -110,19 +96,15 @@ struct SocketReadStream : public ReadStream { position += totalReceived; return true; } - explicit SocketReadStream(const char *address, int port) { - // Example: Initialize a client socket and connect to the given address and port sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { SPDLOG_ERROR("Socket creation failed\n"); return; } - sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); - // Convert address from text to binary form inet_pton(AF_INET, address, &server_addr.sin_addr); auto addr_len = sizeof(server_addr); SPDLOG_INFO("[Server] Bind socket {} {}\n", address, port); @@ -130,7 +112,6 @@ struct SocketReadStream : public ReadStream { SPDLOG_ERROR("Bind failed"); exit(EXIT_FAILURE); } - SPDLOG_INFO("[Server] Listening on socket\n"); if (listen(sock_fd, 3) < 0) { SPDLOG_ERROR("Listen failed"); @@ -138,252 +119,750 @@ struct SocketReadStream : public ReadStream { } client_fd = accept(sock_fd, (struct sockaddr *)&server_addr, (socklen_t *)&addr_len); } - // "Ignore" sz bytes of data - bool ignore(std::size_t sz) const { - char buffer[1024]; // Temporary buffer to discard data + bool ignore(std::size_t sz) const override { + char buffer[1024]; std::size_t total_ignored = 0; - while (total_ignored < sz) { std::size_t to_ignore = std::min(sz - total_ignored, sizeof(buffer)); ssize_t ignored = recv(client_fd, buffer, to_ignore, 0); - if (ignored <= 0) { // Check for error or close + if (ignored <= 0) { return false; } total_ignored += ignored; - position += ignored; // Update position + position += ignored; } - return true; } - - // Report the current position - std::size_t tellg() const { return position; } - ~SocketReadStream() { - close(sock_fd); // Close the socket descriptor - } + std::size_t tellg() const override { return position; } + ~SocketReadStream() override { close(sock_fd); } }; static_assert(ReaderStreamTrait, "Reader must conform to ReaderStreamTrait"); static_assert(WriterStreamTrait, "Writer must conform to WriterStreamTrait"); #endif -#ifdef __LINUX__ +#if __linux__ #include - -class RDMAReadStream { - struct ibv_context *context; - struct ibv_pd *pd; - struct ibv_cq *cq; - struct ibv_qp *qp; - struct ibv_mr *mr; - - mutable std::size_t position = 0; - void *buffer; - std::size_t buffer_size; - +#include +struct __attribute((packed)) rdma_buffer_attr { + uint64_t address; + uint32_t length; + union stag { + uint32_t local_stag; + uint32_t remote_stag; + } stag; +}; +#define CQ_CAPACITY (16) +#define MAX_SGE (2) +#define MAX_WR (8) +#define DEFAULT_RDMA_PORT (20886) +class RDMAEndpoint { public: - explicit RDMAReadStream(const char *device_name, std::size_t buffer_size) : buffer_size(buffer_size) { - // Initialize RDMA device - struct ibv_device **dev_list = ibv_get_device_list(NULL); - context = ibv_open_device(*dev_list); - - // Allocate Protection Domain - pd = ibv_alloc_pd(context); - - // Create Completion Queue - cq = ibv_create_cq(context, 1, NULL, NULL, 0); - - // Initialize QP - struct ibv_qp_init_attr qp_init_attr; - memset(&qp_init_attr, 0, sizeof(qp_init_attr)); - qp_init_attr.send_cq = cq; - qp_init_attr.recv_cq = cq; - qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connection - qp_init_attr.cap.max_send_wr = 10; // Max Work Requests - qp_init_attr.cap.max_recv_wr = 10; - qp_init_attr.cap.max_send_sge = 1; // Max Scatter/Gather Elements - qp_init_attr.cap.max_recv_sge = 1; - qp = ibv_create_qp(pd, &qp_init_attr); - - // Allocate and register memory region - buffer = std::malloc(buffer_size); - mr = ibv_reg_mr(pd, buffer, buffer_size, - IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE); - - // Connection setup, exchange QP info with the peer, etc., are omitted for simplicity + struct rdma_event_channel *cm_event_channel = nullptr; + struct rdma_cm_id *cm_server_id = nullptr, *cm_client_id = nullptr; + struct ibv_pd *pd = nullptr; + struct ibv_comp_channel *io_completion_channel = nullptr; + struct ibv_cq *cq = nullptr; + struct ibv_qp_init_attr qp_init_attr {}; + struct ibv_qp *client_qp = nullptr; + struct ibv_mr *client_metadata_mr = nullptr, *server_buffer_mr = nullptr, *server_metadata_mr = nullptr; + struct rdma_buffer_attr client_metadata_attr{}, server_metadata_attr{}; + struct ibv_recv_wr client_recv_wr {}, *bad_client_recv_wr = nullptr; + struct ibv_send_wr server_send_wr {}, *bad_server_send_wr = nullptr; + struct ibv_sge client_recv_sge {}, server_send_sge{}; + RDMAEndpoint() = default; + static void show_rdma_cmid(struct rdma_cm_id *id) { + if (!id) { + printf("Passed ptr is nullptr\n"); + return; + } + printf("RDMA cm id at %p \n", id); + if (id->verbs && id->verbs->device) + printf("dev_ctx: %p (device name: %s) \n", id->verbs, id->verbs->device->name); + if (id->channel) + printf("cm event channel %p\n", id->channel); + printf("QP: %p, port_space %x, port_num %u \n", id->qp, id->ps, id->port_num); } - - bool read(char *data, std::size_t sz) const { - if (sz > buffer_size) { - std::cerr << "Requested read size exceeds buffer size" << std::endl; - return false; + static void show_rdma_buffer_attr(struct rdma_buffer_attr *attr) { + if (!attr) { + printf("Passed attr is nullptr\n"); + return; } - - struct ibv_sge sge; - memset(&sge, 0, sizeof(sge)); - sge.addr = (uintptr_t)buffer; - sge.length = sz; - sge.lkey = mr->lkey; - - struct ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = 0; // Use this for identifying the WR in the completion queue - wr.sg_list = &sge; - wr.num_sge = 1; - wr.opcode = IBV_WR_RDMA_READ; - wr.send_flags = IBV_SEND_SIGNALED; - wr.wr.rdma.remote_addr = remote_address; // Needs to be set beforehand - wr.wr.rdma.rkey = remote_key; // Needs to be exchanged with the peer beforehand - - struct ibv_send_wr *bad_wr; - if (ibv_post_send(qp, &wr, &bad_wr)) { - std::cerr << "Failed to post RDMA read WR" << std::endl; - return false; + printf("---------------------------------------------------------\n"); + printf("buffer attr, addr: %p , len: %u , stag : 0x%x \n", (void *)attr->address, (unsigned int)attr->length, + attr->stag.local_stag); + printf("---------------------------------------------------------\n"); + } + static struct ibv_mr *rdma_buffer_alloc(struct ibv_pd *pd, uint32_t size, enum ibv_access_flags permission) { + struct ibv_mr *mr = nullptr; + if (!pd) { + printf("Protection domain is nullptr \n"); + return nullptr; } - - // Poll for completion - struct ibv_wc wc; + void *buf = calloc(1, size); + if (!buf) { + printf("failed to allocate buffer, -ENOMEM\n"); + return nullptr; + } + mr = rdma_buffer_register(pd, buf, size, permission); + if (!mr) { + free(buf); + } + return mr; + } + static struct ibv_mr *rdma_buffer_register(struct ibv_pd *pd, void *addr, uint32_t length, + enum ibv_access_flags permission) { + struct ibv_mr *mr = nullptr; + if (!pd) { + printf("Protection domain is nullptr, ignoring \n"); + return nullptr; + } + mr = ibv_reg_mr(pd, addr, length, permission); + if (!mr) { + printf("Failed to create mr on buffer, errno: %d \n", -errno); + return nullptr; + } + return mr; + } + static void rdma_buffer_free(struct ibv_mr *mr) { + if (!mr) { + printf("Passed memory region is nullptr, ignoring\n"); + return; + } + void *to_free = mr->addr; + rdma_buffer_deregister(mr); + free(to_free); + } + static void rdma_buffer_deregister(struct ibv_mr *mr) { + if (!mr) { + printf("Passed memory region is nullptr, ignoring\n"); + return; + } + ibv_dereg_mr(mr); + } + static int process_rdma_cm_event(struct rdma_event_channel *echannel, enum rdma_cm_event_type expected_event, + struct rdma_cm_event **cm_event) { + int ret = 1; + ret = rdma_get_cm_event(echannel, cm_event); + if (ret) { + printf("Failed to retrieve a cm event, errno: %d \n", -errno); + return -errno; + } + if (0 != (*cm_event)->status) { + printf("CM event has non zero status: %d\n", (*cm_event)->status); + ret = -((*cm_event)->status); + rdma_ack_cm_event(*cm_event); + return ret; + } + if ((*cm_event)->event != expected_event) { + printf("Unexpected event received: %s [ expecting: %s ]", rdma_event_str((*cm_event)->event), + rdma_event_str(expected_event)); + rdma_ack_cm_event(*cm_event); + return -1; + } + return ret; + } + static int process_work_completion_events(struct ibv_comp_channel *comp_channel, struct ibv_wc *wc, int max_wc) { + struct ibv_cq *cq_ptr = nullptr; + void *context = nullptr; + int ret = -1, i, total_wc = 0; + ret = ibv_get_cq_event(comp_channel, &cq_ptr, &context); + if (ret) { + printf("Failed to get next CQ event due to %d \n", -errno); + return -errno; + } + ret = ibv_req_notify_cq(cq_ptr, 0); + if (ret) { + printf("Failed to request further notifications %d \n", -errno); + return -errno; + } + total_wc = 0; do { - int nc = ibv_poll_cq(cq, 1, &wc); - if (nc < 0) { - std::cerr << "Poll CQ failed" << std::endl; - return false; + ret = ibv_poll_cq(cq_ptr, max_wc - total_wc, wc + total_wc); + if (ret < 0) { + printf("Failed to poll cq for wc due to %d \n", ret); } - } while (wc.status != IBV_WC_SUCCESS || wc.wr_id != wr.wr_id); - - // Copy data to user buffer - memcpy(data, buffer, sz); - position += sz; // Update position - return true; - } - bool ignore(std::size_t sz) const { - char buffer[1024]; // Temporary buffer to discard data - std::size_t total_ignored = 0; - - while (total_ignored < sz) { - std::size_t to_ignore = std::min(sz - total_ignored, sizeof(buffer)); - ssize_t ignored = read(buffer, to_ignore); - if (ignored <= 0) { // Check for error or close - return false; + total_wc += ret; + } while (total_wc < max_wc); + for (i = 0; i < total_wc; i++) { + if (wc[i].status != IBV_WC_SUCCESS) { + printf("Work completion (WC) has error status: %s at index %d", ibv_wc_status_str(wc[i].status), i); + return -(wc[i].status); } - total_ignored += ignored; - position += ignored; // Update position } - + ibv_ack_cq_events(cq_ptr, 1); + return total_wc; + } + ~RDMAEndpoint() = default; +}; +class RDMAReadStream : public ReadStream, public RDMAEndpoint { + mutable long position = 0; + mutable uint8_t *buffer; + mutable long buffer_size; + int setup_client_resources() { + int ret = -1; + if (!cm_client_id) { + printf("Client id is still nullptr \n"); + return -EINVAL; + } + pd = ibv_alloc_pd(cm_client_id->verbs); + if (!pd) { + printf("Failed to allocate a protection domain errno: %d\n", -errno); + return -errno; + } + io_completion_channel = ibv_create_comp_channel(cm_client_id->verbs); + if (!io_completion_channel) { + printf("Failed to create an I/O completion event channel, %d\n", -errno); + return -errno; + } + cq = ibv_create_cq(cm_client_id->verbs, CQ_CAPACITY, nullptr, io_completion_channel, 0); + if (!cq) { + printf("Failed to create a completion queue (cq), errno: %d\n", -errno); + return -errno; + } + ret = ibv_req_notify_cq(cq, 0); + if (ret) { + printf("Failed to request notifications on CQ errno: %d \n", -errno); + return -errno; + } + bzero(&qp_init_attr, sizeof qp_init_attr); + qp_init_attr.cap.max_recv_sge = MAX_SGE; + qp_init_attr.cap.max_recv_wr = MAX_WR; + qp_init_attr.cap.max_send_sge = MAX_SGE; + qp_init_attr.cap.max_send_wr = MAX_WR; + qp_init_attr.qp_type = IBV_QPT_RC; + qp_init_attr.recv_cq = cq; + qp_init_attr.send_cq = cq; + ret = rdma_create_qp(cm_client_id, pd, &qp_init_attr); + if (ret) { + printf("Failed to create QP due to errno: %d\n", -errno); + return -errno; + } + client_qp = cm_client_id->qp; + return ret; + } + int start_rdma_server(struct sockaddr_in *server_addr) { + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + cm_event_channel = rdma_create_event_channel(); + if (!cm_event_channel) { + printf("Creating cm event channel failed with errno : (%d)", -errno); + return -errno; + } + ret = rdma_create_id(cm_event_channel, &cm_server_id, nullptr, RDMA_PS_TCP); + if (ret) { + printf("Creating server cm id failed with errno: %d ", -errno); + return -errno; + } + ret = rdma_bind_addr(cm_server_id, (struct sockaddr *)server_addr); + if (ret) { + printf("Failed to bind server address, errno: %d \n", -errno); + return -errno; + } + ret = rdma_listen(cm_server_id, 8); + if (ret) { + printf("rdma_listen failed to listen on server address, errno: %d ", -errno); + return -errno; + } + printf("Server is listening successfully at: %s , port: %d \n", inet_ntoa(server_addr->sin_addr), + ntohs(server_addr->sin_port)); + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_CONNECT_REQUEST, &cm_event); + if (ret) { + printf("Failed to get cm event, ret = %d \n", ret); + return ret; + } + cm_client_id = cm_event->id; + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge the cm event errno: %d \n", -errno); + return -errno; + } + return ret; + } + int accept_client_connection() { + struct rdma_conn_param conn_param; + struct rdma_cm_event *cm_event = nullptr; + struct sockaddr_in remote_sockaddr; + int ret = -1; + if (!cm_client_id || !client_qp) { + printf("Client resources are not properly setup\n"); + return -EINVAL; + } + client_metadata_mr = + rdma_buffer_register(pd, &client_metadata_attr, sizeof(client_metadata_attr), (IBV_ACCESS_LOCAL_WRITE)); + if (!client_metadata_mr) { + printf("Failed to register client attr buffer\n"); + return -ENOMEM; + } + client_recv_sge.addr = (uint64_t)client_metadata_mr->addr; + client_recv_sge.length = client_metadata_mr->length; + client_recv_sge.lkey = client_metadata_mr->lkey; + bzero(&client_recv_wr, sizeof(client_recv_wr)); + client_recv_wr.sg_list = &client_recv_sge; + client_recv_wr.num_sge = 1; + ret = ibv_post_recv(client_qp, &client_recv_wr, &bad_client_recv_wr); + if (ret) { + printf("Failed to pre-post the receive buffer, errno: %d \n", ret); + return ret; + } + memset(&conn_param, 0, sizeof(conn_param)); + conn_param.initiator_depth = 3; + conn_param.responder_resources = 3; + ret = rdma_accept(cm_client_id, &conn_param); + if (ret) { + printf("Failed to accept the connection, errno: %d \n", -errno); + return -errno; + } + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ESTABLISHED, &cm_event); + if (ret) { + printf("Failed to get the cm event, errnp: %d \n", -errno); + return -errno; + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge the cm event %d\n", -errno); + return -errno; + } + memcpy(&remote_sockaddr, rdma_get_peer_addr(cm_client_id), sizeof(struct sockaddr_in)); + printf("A new connection is accepted from %s \n", inet_ntoa(remote_sockaddr.sin_addr)); + return ret; + } + int send_server_metadata_to_client() { + struct ibv_wc wc; + int ret = -1; + ret = process_work_completion_events(io_completion_channel, &wc, 1); + if (ret != 1) { + printf("Failed to receive , ret = %d \n", ret); + return ret; + } + show_rdma_buffer_attr(&client_metadata_attr); + printf("The client has requested buffer length of : %u bytes \n", client_metadata_attr.length); + server_buffer_mr = + rdma_buffer_alloc(pd, client_metadata_attr.length, + (enum ibv_access_flags)(((int)IBV_ACCESS_LOCAL_WRITE) | ((int)IBV_ACCESS_REMOTE_READ) | + ((int)IBV_ACCESS_REMOTE_WRITE))); + if (!server_buffer_mr) { + printf("Server failed to create a buffer \n"); + return -ENOMEM; + } + server_metadata_attr.address = (uint64_t)server_buffer_mr->addr; + server_metadata_attr.length = (uint32_t)server_buffer_mr->length; + server_metadata_attr.stag.local_stag = (uint32_t)server_buffer_mr->lkey; + server_metadata_mr = + rdma_buffer_register(pd, &server_metadata_attr, sizeof(server_metadata_attr), IBV_ACCESS_LOCAL_WRITE); + if (!server_metadata_mr) { + printf("Server failed to create to hold server metadata \n"); + return -ENOMEM; + } + server_send_sge.addr = (uint64_t)&server_metadata_attr; + server_send_sge.length = sizeof(server_metadata_attr); + server_send_sge.lkey = server_metadata_mr->lkey; + bzero(&server_send_wr, sizeof(server_send_wr)); + server_send_wr.sg_list = &server_send_sge; + server_send_wr.num_sge = 1; + server_send_wr.opcode = IBV_WR_SEND; + server_send_wr.send_flags = IBV_SEND_SIGNALED; + ret = ibv_post_send(client_qp, &server_send_wr, &bad_server_send_wr); + if (ret) { + printf("Posting of server metdata failed, errno: %d \n", -errno); + return -errno; + } + ret = process_work_completion_events(io_completion_channel, &wc, 1); + if (ret != 1) { + printf("Failed to send server metadata, ret = %d \n", ret); + return ret; + } + usleep(1); + printf("Received buffer contents: "); + for (int i = 0; i < 10; i++) { + printf("%x", ((uint8_t *)server_buffer_mr->addr)[i]); + } + printf("\n"); + return 0; + } + int disconnect_and_cleanup() { + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_DISCONNECTED, &cm_event); + if (ret) { + printf("Failed to get disconnect event, ret = %d \n", ret); + return ret; + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge the cm event %d\n", -errno); + return -errno; + } + printf("A disconnect event is received from the client...\n"); + rdma_destroy_qp(cm_client_id); + ret = rdma_destroy_id(cm_client_id); + if (ret) { + printf("Failed to destroy client id cleanly, %d \n", -errno); + } + ret = ibv_destroy_cq(cq); + if (ret) { + printf("Failed to destroy completion queue cleanly, %d \n", -errno); + } + ret = ibv_destroy_comp_channel(io_completion_channel); + if (ret) { + printf("Failed to destroy completion channel cleanly, %d \n", -errno); + } + rdma_buffer_free(server_buffer_mr); + rdma_buffer_deregister(server_metadata_mr); + rdma_buffer_deregister(client_metadata_mr); + ret = ibv_dealloc_pd(pd); + if (ret) { + printf("Failed to destroy client protection domain cleanly, %d \n", -errno); + } + ret = rdma_destroy_id(cm_server_id); + if (ret) { + printf("Failed to destroy server id cleanly, %d \n", -errno); + } + rdma_destroy_event_channel(cm_event_channel); + printf("Server shut-down is complete \n"); + return 0; + } +public: + explicit RDMAReadStream(const char *server_name, int port) : RDMAEndpoint() { + struct sockaddr_in server_sockaddr; + int ret = -1; + bzero(&server_sockaddr, sizeof(server_sockaddr)); + server_sockaddr.sin_family = AF_INET; + server_sockaddr.sin_port = htons(port); + inet_pton(AF_INET, server_name, &server_sockaddr.sin_addr); + ret = start_rdma_server(&server_sockaddr); + if (ret) { + printf("RDMA server failed to start cleanly, ret = %d \n", ret); + } + ret = setup_client_resources(); + if (ret) { + printf("Failed to setup client resources, ret = %d \n", ret); + } + ret = accept_client_connection(); + if (ret) { + printf("Failed to handle client cleanly, ret = %d \n", ret); + } + ret = send_server_metadata_to_client(); + if (ret) { + printf("Failed to send server metadata to the client, ret = %d \n", ret); + } + buffer = (uint8_t *)server_buffer_mr->addr; + buffer_size = client_metadata_attr.length; + } + bool read(char *data, std::size_t sz) const override { + if (position + sz > buffer_size) { + std::throw_with_nested(std::runtime_error("Buffer overflow")); + } + memcpy(data, buffer + position, sz); + position += sz; return true; } - std::size_t tellg() const { return position; } - - ~RDMAReadStream() { - ibv_dereg_mr(mr); - std::free(buffer); - ibv_destroy_qp(qp); - ibv_destroy_cq(cq); - ibv_dealloc_pd(pd); - ibv_close_device(context); - // Note: Proper error checking and handling are omitted for brevity + bool ignore(std::size_t sz) const override { + position += sz; + return true; } + std::size_t tellg() const override { return position; } + ~RDMAReadStream() override { disconnect_and_cleanup(); }; }; -class RDMAWriteStream { - struct ibv_context *context; - struct ibv_pd *pd; - struct ibv_cq *cq; - struct ibv_qp *qp; - struct ibv_mr *mr; - - void *buffer; - std::size_t buffer_size; - uint64_t remote_address; - uint32_t remote_key; - +class RDMAWriteStream : public WriteStream, public RDMAEndpoint { public: - explicit RDMAWriteStream(const char *device_name, std::size_t buffer_size) : buffer_size(buffer_size) { - // Initialization (device, PD, CQ, QP, buffer, and MR) is similar to RDMAReadStream - // Initialize RDMA device - struct ibv_device **dev_list = ibv_get_device_list(NULL); - context = ibv_open_device(*dev_list); - - // Allocate Protection Domain - pd = ibv_alloc_pd(context); - - // Create Completion Queue - cq = ibv_create_cq(context, 1, NULL, NULL, 0); - - // Initialize QP - struct ibv_qp_init_attr qp_init_attr; - memset(&qp_init_attr, 0, sizeof(qp_init_attr)); - qp_init_attr.send_cq = cq; - qp_init_attr.recv_cq = cq; - qp_init_attr.qp_type = IBV_QPT_RC; // Reliable Connection - qp_init_attr.cap.max_send_wr = 10; // Max Work Requests - qp_init_attr.cap.max_recv_wr = 10; - qp_init_attr.cap.max_send_sge = 1; // Max Scatter/Gather Elements - qp_init_attr.cap.max_recv_sge = 1; - qp = ibv_create_qp(pd, &qp_init_attr); - - // Allocate and register memory region - buffer = std::malloc(buffer_size); - mr = ibv_reg_mr(pd, buffer, buffer_size, - IBV_ACCESS_LOCAL_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ); - // Assume remote_address and remote_key are set up through some initialization method + mutable char *buffer; + mutable long position = 0; + struct ibv_cq *client_cq = nullptr; + struct ibv_sge client_send_sge, server_recv_sge; + struct sockaddr_in server_sockaddr; + struct rdma_conn_param conn_param; + struct rdma_cm_event *cm_event = nullptr; + struct ibv_mr *client_metadata_mr = nullptr, *client_src_mr = nullptr, *client_dst_mr = nullptr, + *server_metadata_mr = nullptr; + struct ibv_send_wr client_send_wr, *bad_client_send_wr = nullptr; + struct ibv_recv_wr server_recv_wr, *bad_server_recv_wr = nullptr; + int client_prepare_connection(struct sockaddr_in *s_addr) { + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + cm_event_channel = rdma_create_event_channel(); + if (!cm_event_channel) { + printf("Creating cm event channel failed, errno: %d \n", -errno); + return -errno; + } + ret = rdma_create_id(cm_event_channel, &cm_client_id, nullptr, RDMA_PS_TCP); + if (ret) { + printf("Creating cm id failed with errno: %d \n", -errno); + return -errno; + } + ret = rdma_resolve_addr(cm_client_id, nullptr, (struct sockaddr *)s_addr, 2000); + if (ret) { + printf("Failed to resolve address, errno: %d \n", -errno); + return -errno; + } + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ADDR_RESOLVED, &cm_event); + if (ret) { + printf("Failed to receive a valid event, ret = %d \n", ret); + return ret; + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge the CM event, errno: %d\n", -errno); + return -errno; + } + ret = rdma_resolve_route(cm_client_id, 2000); + if (ret) { + printf("Failed to resolve route, erno: %d \n", -errno); + return -errno; + } + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ROUTE_RESOLVED, &cm_event); + if (ret) { + printf("Failed to receive a valid event, ret = %d \n", ret); + return ret; + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge the CM event, errno: %d \n", -errno); + return -errno; + } + printf("Trying to connect to server at : %s port: %d \n", inet_ntoa(s_addr->sin_addr), ntohs(s_addr->sin_port)); + pd = ibv_alloc_pd(cm_client_id->verbs); + if (!pd) { + printf("Failed to alloc pd, errno: %d \n", -errno); + return -errno; + } + io_completion_channel = ibv_create_comp_channel(cm_client_id->verbs); + if (!io_completion_channel) { + printf("Failed to create IO completion event channel, errno: %d\n", -errno); + return -errno; + } + client_cq = ibv_create_cq(cm_client_id->verbs, CQ_CAPACITY, nullptr, io_completion_channel, 0); + if (!client_cq) { + printf("Failed to create CQ, errno: %d \n", -errno); + return -errno; + } + ret = ibv_req_notify_cq(client_cq, 0); + if (ret) { + printf("Failed to request notifications, errno: %d\n", -errno); + return -errno; + } + bzero(&qp_init_attr, sizeof qp_init_attr); + qp_init_attr.cap.max_recv_sge = MAX_SGE; + qp_init_attr.cap.max_recv_wr = MAX_WR; + qp_init_attr.cap.max_send_sge = MAX_SGE; + qp_init_attr.cap.max_send_wr = MAX_WR; + qp_init_attr.qp_type = IBV_QPT_RC; + qp_init_attr.recv_cq = client_cq; + qp_init_attr.send_cq = client_cq; + ret = rdma_create_qp(cm_client_id, pd, &qp_init_attr); + if (ret) { + printf("Failed to create QP, errno: %d \n", -errno); + return -errno; + } + client_qp = cm_client_id->qp; + return 0; } - - virtual bool write(const char *data, std::size_t sz) const { - if (sz > buffer_size) { - std::cerr << "Write size exceeds buffer size" << std::endl; - return false; + int client_pre_post_recv_buffer() { + int ret = -1; + server_metadata_mr = + rdma_buffer_register(pd, &server_metadata_attr, sizeof(server_metadata_attr), (IBV_ACCESS_LOCAL_WRITE)); + if (!server_metadata_mr) { + printf("Failed to setup the server metadata mr , -ENOMEM\n"); + return -ENOMEM; } - - // Copy the data to the local buffer - memcpy(buffer, data, sz); - - // Prepare scatter/gather entry - struct ibv_sge sge; - memset(&sge, 0, sizeof(sge)); - sge.addr = (uintptr_t)buffer; - sge.length = sz; - sge.lkey = mr->lkey; - - // Prepare the work request - struct ibv_send_wr wr; - memset(&wr, 0, sizeof(wr)); - wr.wr_id = 0; // Unique identifier - wr.sg_list = &sge; - wr.num_sge = 1; - wr.opcode = IBV_WR_RDMA_WRITE; - wr.send_flags = IBV_SEND_SIGNALED; - wr.wr.rdma.remote_addr = remote_address; - wr.wr.rdma.rkey = remote_key; - - struct ibv_send_wr *bad_wr; - - // Post the RDMA write work request - if (ibv_post_send(qp, &wr, &bad_wr)) { - std::cerr << "Failed to post RDMA write WR" << std::endl; - return false; + server_recv_sge.addr = (uint64_t)server_metadata_mr->addr; + server_recv_sge.length = (uint32_t)server_metadata_mr->length; + server_recv_sge.lkey = (uint32_t)server_metadata_mr->lkey; + bzero(&server_recv_wr, sizeof(server_recv_wr)); + server_recv_wr.sg_list = &server_recv_sge; + server_recv_wr.num_sge = 1; + ret = ibv_post_recv(client_qp, &server_recv_wr, &bad_server_recv_wr); + if (ret) { + printf("Failed to pre-post the receive buffer, errno: %d \n", ret); + return ret; } - - // Poll for completion + return 0; + } + int client_connect_to_server() { + struct rdma_conn_param conn_param {}; + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + bzero(&conn_param, sizeof(conn_param)); + conn_param.initiator_depth = 3; + conn_param.responder_resources = 3; + conn_param.retry_count = 3; + ret = rdma_connect(cm_client_id, &conn_param); + if (ret) { + printf("Failed to connect to remote host , errno: %d\n", -errno); + return -errno; + } + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ESTABLISHED, &cm_event); + if (ret) { + printf("Failed to get cm event, ret = %d \n", ret); + return ret; + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge cm event, errno: %d\n", -errno); + return -errno; + } + printf("The client is connected successfully \n"); + return 0; + } + int client_xchange_metadata_with_server() { + struct ibv_wc wc[2]; + int ret = -1; + client_src_mr = + rdma_buffer_register(pd, buffer, position, + (enum ibv_access_flags)(((int)IBV_ACCESS_LOCAL_WRITE) | ((int)IBV_ACCESS_REMOTE_READ) | + ((int)IBV_ACCESS_REMOTE_WRITE))); + if (!client_src_mr) { + printf("Failed to register the first buffer, ret = %d \n", ret); + return ret; + } + client_metadata_attr.address = (uint64_t)client_src_mr->addr; + client_metadata_attr.length = client_src_mr->length; + client_metadata_attr.stag.local_stag = client_src_mr->lkey; + client_metadata_mr = + rdma_buffer_register(pd, &client_metadata_attr, sizeof(client_metadata_attr), IBV_ACCESS_LOCAL_WRITE); + if (!client_metadata_mr) { + printf("Failed to register the client metadata buffer, ret = %d \n", ret); + return ret; + } + client_send_sge.addr = (uint64_t)client_metadata_mr->addr; + client_send_sge.length = (uint32_t)client_metadata_mr->length; + client_send_sge.lkey = client_metadata_mr->lkey; + bzero(&client_send_wr, sizeof(client_send_wr)); + client_send_wr.sg_list = &client_send_sge; + client_send_wr.num_sge = 1; + client_send_wr.opcode = IBV_WR_SEND; + client_send_wr.send_flags = IBV_SEND_SIGNALED; + ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); + if (ret) { + printf("Failed to send client metadata, errno: %d \n", -errno); + return -errno; + } + ret = process_work_completion_events(io_completion_channel, wc, 2); + if (ret != 2) { + printf("We failed to get 2 work completions , ret = %d \n", ret); + return ret; + } + show_rdma_buffer_attr(&server_metadata_attr); + return 0; + } + int client_remote_memory_ops() { struct ibv_wc wc; - do { - int nc = ibv_poll_cq(cq, 1, &wc); - if (nc < 0) { - std::cerr << "Poll CQ failed" << std::endl; - return false; - } - } while (wc.status != IBV_WC_SUCCESS || wc.wr_id != wr.wr_id); - - return true; // Successfully written + int ret = -1; + client_send_sge.addr = (uint64_t)client_src_mr->addr; + client_send_sge.length = (uint32_t)client_src_mr->length; + client_send_sge.lkey = client_src_mr->lkey; + bzero(&client_send_wr, sizeof(client_send_wr)); + client_send_wr.sg_list = &client_send_sge; + client_send_wr.num_sge = 1; + client_send_wr.opcode = IBV_WR_RDMA_WRITE; + client_send_wr.send_flags = IBV_SEND_SIGNALED; + client_send_wr.wr.rdma.rkey = server_metadata_attr.stag.remote_stag; + client_send_wr.wr.rdma.remote_addr = server_metadata_attr.address; + ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); + if (ret) { + printf("Failed to write client src buffer, errno: %d \n", -errno); + return -errno; + } + ret = process_work_completion_events(io_completion_channel, &wc, 1); + if (ret != 1) { + printf("We failed to get 1 work completions , ret = %d \n", ret); + return ret; + } + ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); + if (ret) { + printf("Failed to write client src buffer, errno: %d \n", -errno); + return -errno; + } + ret = process_work_completion_events(io_completion_channel, &wc, 1); + if (ret != 1) { + printf("We failed to get 1 work completions , ret = %d \n", ret); + return ret; + } + return 0; } - - ~RDMAWriteStream() { - // Deallocate resources - ibv_dereg_mr(mr); - std::free(buffer); - ibv_destroy_qp(qp); - ibv_destroy_cq(cq); - ibv_dealloc_pd(pd); - ibv_close_device(context); + int client_disconnect_and_clean() { + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + ret = rdma_disconnect(cm_client_id); + if (ret) { + printf("Failed to disconnect, errno: %d \n", -errno); + } + ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_DISCONNECTED, &cm_event); + if (ret) { + printf("Failed to get RDMA_CM_EVENT_DISCONNECTED event, ret = %d\n", ret); + } + ret = rdma_ack_cm_event(cm_event); + if (ret) { + printf("Failed to acknowledge cm event, errno: %d\n", -errno); + } + rdma_destroy_qp(cm_client_id); + ret = rdma_destroy_id(cm_client_id); + if (ret) { + printf("Failed to destroy client id cleanly, %d \n", -errno); + } + ret = ibv_destroy_cq(client_cq); + if (ret) { + printf("Failed to destroy completion queue cleanly, %d \n", -errno); + } + ret = ibv_destroy_comp_channel(io_completion_channel); + if (ret) { + printf("Failed to destroy completion channel cleanly, %d \n", -errno); + } + rdma_buffer_deregister(server_metadata_mr); + rdma_buffer_deregister(client_metadata_mr); + rdma_buffer_deregister(client_src_mr); + ret = ibv_dealloc_pd(pd); + if (ret) { + printf("Failed to destroy client protection domain cleanly, %d \n", -errno); + } + rdma_destroy_event_channel(cm_event_channel); + printf("Client resource clean up is complete \n"); + return 0; } +public: + explicit RDMAWriteStream(const char *server_name, int port) : RDMAEndpoint() { + struct rdma_cm_event *cm_event = nullptr; + int ret = -1; + bzero(&server_sockaddr, sizeof(server_sockaddr)); + server_sockaddr.sin_family = AF_INET; + server_sockaddr.sin_port = htons(port); + inet_pton(AF_INET, server_name, &server_sockaddr.sin_addr); + ret = client_prepare_connection(&server_sockaddr); + if (ret) { + printf("Failed to setup client connection , ret = %d \n", ret); + } + ret = client_pre_post_recv_buffer(); + if (ret) { + printf("Failed to setup client connection , ret = %d \n", ret); + } + ret = client_connect_to_server(); + if (ret) { + printf("Failed to setup client connection , ret = %d \n", ret); + } + } + virtual bool write(const char *data, std::size_t sz) const { + memcpy(buffer + position, data, sz); + position += sz; + return true; + } + ~RDMAWriteStream() { + int ret = -1; + for (int i = 0; i < 100; i++) { + printf("%x", (uint8_t)buffer[i]); + } + printf("\n"); + ret = client_xchange_metadata_with_server(); + if (ret) { + printf("Failed to setup client connection , ret = %d \n", ret); + } + ret = client_remote_memory_ops(); + if (ret) { + printf("Failed to finish remote memory ops, ret = %d \n", ret); + } + ret = client_disconnect_and_clean(); + if (ret) { + printf("Failed to clean up client resources, ret = %d \n", ret); + } + }; }; static_assert(ReaderStreamTrait, "Reader must conform to ReaderStreamTrait"); static_assert(WriterStreamTrait, "Writer must conform to WriterStreamTrait"); diff --git a/src/checkpoint.cpp b/src/checkpoint.cpp index 3e63319..2161476 100644 --- a/src/checkpoint.cpp +++ b/src/checkpoint.cpp @@ -55,7 +55,8 @@ int main(int argc, char *argv[]) { "x,function_count", "The function count to stop", cxxopts::value()->default_value("0"))( "o,offload_addr", "The next hop to offload", cxxopts::value()->default_value(""))( "s,offload_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( - "c,count", "The step index to test execution", cxxopts::value()->default_value("0")); + "c,count", "The step index to test execution", cxxopts::value()->default_value("0"))( + "r,rdma", "Whether to use RDMA device", cxxopts::value()->default_value("0")); auto result = options.parse(argc, argv); if (result["help"].as()) { @@ -72,6 +73,7 @@ int main(int argc, char *argv[]) { auto offload_addr = result["offload_addr"].as(); auto offload_port = result["offload_port"].as(); auto ns_pool = result["ns_pool"].as>(); + auto rdma = result["rdma"].as(); snapshot_threshold = result["count"].as(); stop_func_threshold = result["function_count"].as(); is_debug = result["is_debug"].as(); @@ -93,6 +95,10 @@ int main(int argc, char *argv[]) { if (offload_addr.empty()) writer = new FwriteStream((removeExtension(target) + ".bin").c_str()); #ifndef _WIN32 +#if __linux__ + else if (rdma) + writer = new RDMAWriteStream(offload_addr.c_str(), offload_port); +#endif else writer = new SocketWriteStream(offload_addr.c_str(), offload_port); #endif diff --git a/src/profile.cpp b/src/profile.cpp index 4251686..db9572a 100644 --- a/src/profile.cpp +++ b/src/profile.cpp @@ -245,10 +245,10 @@ int main(int argc, char *argv[]) { std::sort(last_func_idx.begin(), last_func_idx.end(), [&last_func_count](size_t a, size_t b) { return last_func_count[a] > last_func_count[b]; }); for (const auto &e : last_func_idx) { - std::cout << std::format("{} {}\n", func_name[e], last_func_count[e]); + std::cout << fmt::format("{} {}\n", func_name[e], last_func_count[e]); std::cout << "IP count:\n"; for (auto [ip, cnt] : ip_count_per_func[e]) { - std::cout << std::format("func {} ip {} count {}\n", e, ip, cnt); + std::cout << fmt::format("func {} ip {} count {}\n", e, ip, cnt); } } std::cout << std::endl; @@ -257,7 +257,7 @@ int main(int argc, char *argv[]) { std::sort(func_idx.begin(), func_idx.end(), [&func_count](size_t a, size_t b) { return func_count[a] > func_count[b]; }); for (const auto &e : func_idx) { - std::cout << std::format("{} {}\n", func_name[e], func_count[e]); + std::cout << fmt::format("{} {}\n", func_name[e], func_count[e]); } std::ofstream out(target + ".pgo"); @@ -267,10 +267,10 @@ int main(int argc, char *argv[]) { for (auto [ip, cnt] : ip_count_per_func[e]) { auto freq = (double)cnt / (double)total_sample_count; if (freq > 0.15) { - std::cout << std::format("pgo name {} idx {} ip {} freq {}\n", func_name[e], aot_idx[e], ip, freq); + std::cout << fmt::format("pgo name {} idx {} ip {} freq {}\n", func_name[e], aot_idx[e], ip, freq); pgo_list.emplace_back(aot_idx[e], ip); } else { - std::cout << std::format("no pgo name {} idx {} ip {} freq {}\n", func_name[e], aot_idx[e], ip, freq); + std::cout << fmt::format("no pgo name {} idx {} ip {} freq {}\n", func_name[e], aot_idx[e], ip, freq); } } } diff --git a/src/restore.cpp b/src/restore.cpp index 022a69c..d3f9e94 100644 --- a/src/restore.cpp +++ b/src/restore.cpp @@ -29,6 +29,7 @@ WriteStream *writer; WAMRInstance *wamr = nullptr; std::vector> as; long snapshot_memory = 0; + int main(int argc, char **argv) { spdlog::cfg::load_env_levels(); cxxopts::Options options("MVVM", "Migratable Velocity Virtual Machine, to ship the VM state to another machine"); @@ -40,7 +41,8 @@ int main(int argc, char **argv) { "e,source_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( "o,offload_addr", "The next hop to offload", cxxopts::value()->default_value(""))( "s,offload_port", "The next hop port to offload", cxxopts::value()->default_value("0"))( - "c,count", "The value for epoch value", cxxopts::value()->default_value("0")); + "c,count", "The value for epoch value", cxxopts::value()->default_value("0"))( + "r,rdma", "Whether to use RDMA device", cxxopts::value()->default_value("0")); // Can first discover from the wasi context. auto result = options.parse(argc, argv); @@ -54,6 +56,7 @@ int main(int argc, char **argv) { auto offload_addr = result["offload_addr"].as(); auto offload_port = result["offload_port"].as(); auto count = result["count"].as(); + auto rdma = result["rdma"].as(); snapshot_threshold = count; register_sigtrap(); @@ -66,6 +69,10 @@ int main(int argc, char **argv) { if (source_addr.empty()) reader = new FreadStream((removeExtension(target) + ".bin").c_str()); // writer #if !defined(_WIN32) +#if __linux__ + else if(rdma) + reader = new RDMAReadStream(source_addr.c_str(), source_port); +#endif else reader = new SocketReadStream(source_addr.c_str(), source_port); #endif @@ -73,6 +80,10 @@ int main(int argc, char **argv) { if (offload_addr.empty()) writer = new FwriteStream((removeExtension(target) + ".bin").c_str()); #if !defined(_WIN32) +#if __linux__ + else if(rdma) + writer = new RDMAWriteStream(offload_addr.c_str(), offload_port); +#endif else writer = new SocketWriteStream(offload_addr.c_str(), offload_port); // is server for all and the is server? diff --git a/src/wamr.cpp b/src/wamr.cpp index 0e062f0..dbf7b5d 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -15,6 +15,7 @@ #include "platform_common.h" #include "wamr_export.h" #include "wamr_native.h" +#include "wamr_read_write.h" #include "wasm_export.h" #include "wasm_interp.h" #include "wasm_runtime.h" @@ -973,11 +974,18 @@ void serialize_to_file(WASMExecEnv *instance) { // finish filling vector #endif auto used_memory = get_rss(); - struct_pack::serialize_to(*writer, as); + if (dynamic_cast(writer)) { + auto buffer = struct_pack::serialize(as); + ((RDMAWriteStream *)writer)->buffer = buffer.data(); + ((RDMAWriteStream *)writer)->position = buffer.size(); + } else + struct_pack::serialize_to(*writer, as); + auto end = std::chrono::high_resolution_clock::now(); // get duration in us auto dur = std::chrono::duration_cast(end - start); SPDLOG_INFO("Snapshot time: {} s", dur.count() / 1000000.0); SPDLOG_INFO("Memory usage: {} MB", (used_memory - snapshot_memory) / 1024 / 1024); + delete writer; exit(EXIT_SUCCESS); } \ No newline at end of file diff --git a/src/wamr_native.cpp b/src/wamr_native.cpp index f48e5a2..3bdbf2d 100644 --- a/src/wamr_native.cpp +++ b/src/wamr_native.cpp @@ -70,6 +70,14 @@ static void dgemm_wrapper(wasm_exec_env_t exec_env, int32_t m, int32_t n, int32_ fprintf(stderr, "CPU matrix mul took: %f [s]\n", time_spent); #endif #endif +} +static void lambda_read_wrapper(wasm_exec_env_t exec_env, int32_t m){ + fprintf(stderr, "lambda_read_wrapper: %d\n", m); + +} +static void lambda_write_wrapper(wasm_exec_env_t exec_env, int32_t m){ + fprintf(stderr, "lambda_write_wrapper: %d\n", m); + } static NativeSymbol ns1[] = { REG_NATIVE_FUNC(dgemm, "(iiiF*i*iF*i)"),