From 49d2d40e81c3598cc49c38efe2f5db77d37f49b7 Mon Sep 17 00:00:00 2001 From: victoryang00 Date: Thu, 21 Mar 2024 03:57:14 -0700 Subject: [PATCH] commit rdma memory --- .github/workflows/build-ubuntu.yml | 2 +- include/wamr_memory_instance.h | 8 +- include/wamr_read_write.h | 408 ++++++++++++++++++----------- lib/yalantinglibs | 2 +- src/checkpoint.cpp | 1 - src/profile.cpp | 1 - src/restore.cpp | 3 +- src/wamr.cpp | 15 +- 8 files changed, 274 insertions(+), 166 deletions(-) diff --git a/.github/workflows/build-ubuntu.yml b/.github/workflows/build-ubuntu.yml index d358134..f43f6e0 100644 --- a/.github/workflows/build-ubuntu.yml +++ b/.github/workflows/build-ubuntu.yml @@ -21,7 +21,7 @@ jobs: - uses: actions/checkout@v2 - name: Install Dependency - run: sudo apt install libssl-dev gcc-13 g++-13 libspdlog-dev libfmt-dev llvm-14-dev libedit-dev libcxxopts-dev libpfm4-dev ninja-build libpcap-dev libopenblas-pthread-dev ibverbs-providers + run: sudo apt install libssl-dev gcc-13 g++-13 libspdlog-dev libfmt-dev llvm-14-dev libedit-dev libcxxopts-dev libpfm4-dev ninja-build libpcap-dev libopenblas-pthread-dev libibverbs-dev librdmacm-dev - name: Checkout run: | diff --git a/include/wamr_memory_instance.h b/include/wamr_memory_instance.h index c7fd41a..4b040c2 100644 --- a/include/wamr_memory_instance.h +++ b/include/wamr_memory_instance.h @@ -15,6 +15,7 @@ #include "wamr_serializer.h" #include "wasm_runtime.h" #include +#include #include struct WAMRMemoryInstance { /* Module type */ @@ -35,7 +36,7 @@ struct WAMRMemoryInstance { * when memory is re-allocated, the heap data and memory data * must be copied to new memory also */ - std::vector memory_data; + std::span memory_data; /* Heap data base address */ std::vector heap_data; @@ -47,8 +48,9 @@ struct WAMRMemoryInstance { cur_page_count = env->cur_page_count; max_page_count = env->max_page_count; is_shared_memory = env->is_shared_memory; - memory_data.resize(env->memory_data_size); - memcpy(memory_data.data(), env->memory_data, env->memory_data_size); + // memory_data.resize(env->memory_data_size); + memory_data = std::span(env->memory_data, env->memory_data_size); + // memcpy(memory_data.data(), env->memory_data, env->memory_data_size); heap_data = std::vector(env->heap_data, env->heap_data_end); }; void restore_impl(WASMMemoryInstance *env); diff --git a/include/wamr_read_write.h b/include/wamr_read_write.h index d6c8f04..4b03c8d 100644 --- a/include/wamr_read_write.h +++ b/include/wamr_read_write.h @@ -11,7 +11,7 @@ */ #ifndef MVVM_WAMR_READ_WRITE_H #define MVVM_WAMR_READ_WRITE_H -#include "struct_pack/struct_pack.hpp" +#include "ylt/struct_pack.hpp" #include #include #ifndef _WIN32 @@ -20,6 +20,7 @@ #include #include #endif + struct WriteStream { virtual bool write(const char *data, std::size_t sz) const { return false; }; virtual ~WriteStream() = default; @@ -27,6 +28,7 @@ struct WriteStream { struct ReadStream { virtual bool read(char *data, std::size_t sz) const { return false; }; virtual bool ignore(std::size_t sz) const { return false; }; + virtual const char *read_view(size_t len) { return nullptr; } virtual std::size_t tellg() const { return 0; }; virtual ~ReadStream() = default; }; @@ -39,6 +41,14 @@ struct FwriteStream : public WriteStream { struct FreadStream : public ReadStream { FILE *file; bool read(char *data, std::size_t sz) const override { return fread(data, sz, 1, file) == 1; } + const char *read_view(size_t len) override { + char *buffer = new char[len]; + if (fread(buffer, len, 1, file) != 1) { + delete[] buffer; + return nullptr; + } + return buffer; + } bool ignore(std::size_t sz) const override { return fseek(file, sz, SEEK_CUR) == 0; } std::size_t tellg() const override { return ftell(file); } explicit FreadStream(const char *file_name) : file(fopen(file_name, "rb")) {} @@ -63,15 +73,15 @@ struct SocketWriteStream : public WriteStream { explicit SocketWriteStream(const char *address, int port) { sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { - SPDLOG_ERROR("Socket creation failed\n"); + SPDLOG_ERROR("Socket creation failed"); return; } - sockaddr_in server_addr; + sockaddr_in server_addr{}; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); inet_pton(AF_INET, address, &server_addr.sin_addr); if (connect(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) { - SPDLOG_ERROR("Connection failed\n"); + SPDLOG_ERROR("Connection failed"); close(sock_fd); exit(EXIT_FAILURE); } @@ -96,23 +106,37 @@ struct SocketReadStream : public ReadStream { position += totalReceived; return true; } + const char *read_view(size_t len) override { + char *buffer = new char[len]; + std::size_t totalReceived = 0; + while (totalReceived < len) { + ssize_t received = recv(client_fd, buffer + totalReceived, len - totalReceived, 0); + if (received == -1 || received == 0) { + return nullptr; + } + totalReceived += received; + } + position += len; + + return buffer; + } explicit SocketReadStream(const char *address, int port) { sock_fd = socket(AF_INET, SOCK_STREAM, 0); if (sock_fd == -1) { - SPDLOG_ERROR("Socket creation failed\n"); + SPDLOG_ERROR("Socket creation failed"); return; } - sockaddr_in server_addr; + sockaddr_in server_addr{}; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); inet_pton(AF_INET, address, &server_addr.sin_addr); auto addr_len = sizeof(server_addr); - SPDLOG_INFO("[Server] Bind socket {} {}\n", address, port); + SPDLOG_INFO("[Server] Bind socket {} {}", address, port); if (bind(sock_fd, (struct sockaddr *)&server_addr, addr_len) < 0) { SPDLOG_ERROR("Bind failed"); exit(EXIT_FAILURE); } - SPDLOG_INFO("[Server] Listening on socket\n"); + SPDLOG_INFO("[Server] Listening on socket"); if (listen(sock_fd, 3) < 0) { SPDLOG_ERROR("Listen failed"); exit(EXIT_FAILURE); @@ -154,7 +178,7 @@ struct __attribute((packed)) rdma_buffer_attr { #define CQ_CAPACITY (16) #define MAX_SGE (2) #define MAX_WR (8) -#define DEFAULT_RDMA_PORT (20886) +#define BUFFER_SIZE (1024 * 1024 * 1024L) class RDMAEndpoint { public: struct rdma_event_channel *cm_event_channel = nullptr; @@ -165,42 +189,46 @@ class RDMAEndpoint { struct ibv_qp_init_attr qp_init_attr {}; struct ibv_qp *client_qp = nullptr; struct ibv_mr *client_metadata_mr = nullptr, *server_buffer_mr = nullptr, *server_metadata_mr = nullptr; - struct rdma_buffer_attr client_metadata_attr{}, server_metadata_attr{}; - struct ibv_recv_wr client_recv_wr {}, *bad_client_recv_wr = nullptr; - struct ibv_send_wr server_send_wr {}, *bad_server_send_wr = nullptr; - struct ibv_sge client_recv_sge {}, server_send_sge{}; + struct rdma_buffer_attr client_metadata_attr {}; + struct rdma_buffer_attr server_metadata_attr {}; + struct ibv_recv_wr client_recv_wr {}; + struct ibv_recv_wr *bad_client_recv_wr = nullptr; + struct ibv_send_wr server_send_wr {}; + struct ibv_send_wr *bad_server_send_wr = nullptr; + struct ibv_sge client_recv_sge {}; + struct ibv_sge server_send_sge {}; RDMAEndpoint() = default; static void show_rdma_cmid(struct rdma_cm_id *id) { if (!id) { - printf("Passed ptr is nullptr\n"); + SPDLOG_ERROR("Passed ptr is nullptr"); return; } - printf("RDMA cm id at %p \n", id); + SPDLOG_DEBUG("RDMA cm id at {:p} ", ((void *)id)); if (id->verbs && id->verbs->device) - printf("dev_ctx: %p (device name: %s) \n", id->verbs, id->verbs->device->name); + SPDLOG_DEBUG("dev_ctx: {:p} (device name: {}) ", ((void *)id->verbs), id->verbs->device->name); if (id->channel) - printf("cm event channel %p\n", id->channel); - printf("QP: %p, port_space %x, port_num %u \n", id->qp, id->ps, id->port_num); + SPDLOG_DEBUG("cm event channel {:p} ", ((void *)id->channel)); + SPDLOG_DEBUG("QP: {:p} , port_space {:x}, port_num {} ", ((void *)id->qp), id->ps, id->port_num); } static void show_rdma_buffer_attr(struct rdma_buffer_attr *attr) { if (!attr) { - printf("Passed attr is nullptr\n"); + SPDLOG_ERROR("Passed attr is nullptr"); return; } - printf("---------------------------------------------------------\n"); - printf("buffer attr, addr: %p , len: %u , stag : 0x%x \n", (void *)attr->address, (unsigned int)attr->length, - attr->stag.local_stag); - printf("---------------------------------------------------------\n"); + SPDLOG_DEBUG("---------------------------------------------------------"); + SPDLOG_DEBUG("buffer attr, addr: {:p} , len: {} , stag : 0x{:x} ", (void *)attr->address, + (unsigned int)attr->length, (uint8_t)attr->stag.local_stag); + SPDLOG_DEBUG("---------------------------------------------------------"); } static struct ibv_mr *rdma_buffer_alloc(struct ibv_pd *pd, uint32_t size, enum ibv_access_flags permission) { struct ibv_mr *mr = nullptr; if (!pd) { - printf("Protection domain is nullptr \n"); + SPDLOG_ERROR("Protection domain is nullptr "); return nullptr; } void *buf = calloc(1, size); if (!buf) { - printf("failed to allocate buffer, -ENOMEM\n"); + SPDLOG_ERROR("failed to allocate buffer, -ENOMEM"); return nullptr; } mr = rdma_buffer_register(pd, buf, size, permission); @@ -213,19 +241,19 @@ class RDMAEndpoint { enum ibv_access_flags permission) { struct ibv_mr *mr = nullptr; if (!pd) { - printf("Protection domain is nullptr, ignoring \n"); + SPDLOG_ERROR("Protection domain is nullptr, ignoring "); return nullptr; } mr = ibv_reg_mr(pd, addr, length, permission); if (!mr) { - printf("Failed to create mr on buffer, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to create mr on buffer, errno: {} ", -errno); return nullptr; } return mr; } static void rdma_buffer_free(struct ibv_mr *mr) { if (!mr) { - printf("Passed memory region is nullptr, ignoring\n"); + SPDLOG_ERROR("Passed memory region is nullptr, ignoring"); return; } void *to_free = mr->addr; @@ -234,7 +262,7 @@ class RDMAEndpoint { } static void rdma_buffer_deregister(struct ibv_mr *mr) { if (!mr) { - printf("Passed memory region is nullptr, ignoring\n"); + SPDLOG_ERROR("Passed memory region is nullptr, ignoring"); return; } ibv_dereg_mr(mr); @@ -244,18 +272,18 @@ class RDMAEndpoint { int ret = 1; ret = rdma_get_cm_event(echannel, cm_event); if (ret) { - printf("Failed to retrieve a cm event, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to retrieve a cm event, errno: {} ", -errno); return -errno; } if (0 != (*cm_event)->status) { - printf("CM event has non zero status: %d\n", (*cm_event)->status); + SPDLOG_ERROR("CM event has non zero status: {}", (*cm_event)->status); ret = -((*cm_event)->status); rdma_ack_cm_event(*cm_event); return ret; } if ((*cm_event)->event != expected_event) { - printf("Unexpected event received: %s [ expecting: %s ]", rdma_event_str((*cm_event)->event), - rdma_event_str(expected_event)); + SPDLOG_ERROR("Unexpected event received: {} [ expecting: {} ]", rdma_event_str((*cm_event)->event), + rdma_event_str(expected_event)); rdma_ack_cm_event(*cm_event); return -1; } @@ -267,25 +295,26 @@ class RDMAEndpoint { int ret = -1, i, total_wc = 0; ret = ibv_get_cq_event(comp_channel, &cq_ptr, &context); if (ret) { - printf("Failed to get next CQ event due to %d \n", -errno); + SPDLOG_ERROR("Failed to get next CQ event due to {} ", -errno); return -errno; } ret = ibv_req_notify_cq(cq_ptr, 0); if (ret) { - printf("Failed to request further notifications %d \n", -errno); + SPDLOG_ERROR("Failed to request further notifications {} ", -errno); return -errno; } total_wc = 0; do { ret = ibv_poll_cq(cq_ptr, max_wc - total_wc, wc + total_wc); if (ret < 0) { - printf("Failed to poll cq for wc due to %d \n", ret); + SPDLOG_ERROR("Failed to poll cq for wc due to {} ", ret); } total_wc += ret; } while (total_wc < max_wc); for (i = 0; i < total_wc; i++) { if (wc[i].status != IBV_WC_SUCCESS) { - printf("Work completion (WC) has error status: %s at index %d", ibv_wc_status_str(wc[i].status), i); + SPDLOG_ERROR("Work completion (WC) has error status: {} at index {}", ibv_wc_status_str(wc[i].status), + i); return -(wc[i].status); } } @@ -296,32 +325,32 @@ class RDMAEndpoint { }; class RDMAReadStream : public ReadStream, public RDMAEndpoint { mutable long position = 0; - mutable uint8_t *buffer; + mutable std::vector buffer; mutable long buffer_size; int setup_client_resources() { int ret = -1; if (!cm_client_id) { - printf("Client id is still nullptr \n"); + SPDLOG_ERROR("Client id is still nullptr "); return -EINVAL; } pd = ibv_alloc_pd(cm_client_id->verbs); if (!pd) { - printf("Failed to allocate a protection domain errno: %d\n", -errno); + SPDLOG_ERROR("Failed to allocate a protection domain errno: {}", -errno); return -errno; } io_completion_channel = ibv_create_comp_channel(cm_client_id->verbs); if (!io_completion_channel) { - printf("Failed to create an I/O completion event channel, %d\n", -errno); + SPDLOG_ERROR("Failed to create an I/O completion event channel, {}", -errno); return -errno; } cq = ibv_create_cq(cm_client_id->verbs, CQ_CAPACITY, nullptr, io_completion_channel, 0); if (!cq) { - printf("Failed to create a completion queue (cq), errno: %d\n", -errno); + SPDLOG_ERROR("Failed to create a completion queue (cq), errno: {}", -errno); return -errno; } ret = ibv_req_notify_cq(cq, 0); if (ret) { - printf("Failed to request notifications on CQ errno: %d \n", -errno); + SPDLOG_ERROR("Failed to request notifications on CQ errno: {} ", -errno); return -errno; } bzero(&qp_init_attr, sizeof qp_init_attr); @@ -334,7 +363,7 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { qp_init_attr.send_cq = cq; ret = rdma_create_qp(cm_client_id, pd, &qp_init_attr); if (ret) { - printf("Failed to create QP due to errno: %d\n", -errno); + SPDLOG_ERROR("Failed to create QP due to errno: {}", -errno); return -errno; } client_qp = cm_client_id->qp; @@ -345,35 +374,35 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { int ret = -1; cm_event_channel = rdma_create_event_channel(); if (!cm_event_channel) { - printf("Creating cm event channel failed with errno : (%d)", -errno); + SPDLOG_ERROR("Creating cm event channel failed with errno : ({})", -errno); return -errno; } ret = rdma_create_id(cm_event_channel, &cm_server_id, nullptr, RDMA_PS_TCP); if (ret) { - printf("Creating server cm id failed with errno: %d ", -errno); + SPDLOG_ERROR("Creating server cm id failed with errno: {} ", -errno); return -errno; } ret = rdma_bind_addr(cm_server_id, (struct sockaddr *)server_addr); if (ret) { - printf("Failed to bind server address, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to bind server address, errno: {} ", -errno); return -errno; } ret = rdma_listen(cm_server_id, 8); if (ret) { - printf("rdma_listen failed to listen on server address, errno: %d ", -errno); + SPDLOG_ERROR("rdma_listen failed to listen on server address, errno: {} ", -errno); return -errno; } - printf("Server is listening successfully at: %s , port: %d \n", inet_ntoa(server_addr->sin_addr), - ntohs(server_addr->sin_port)); + SPDLOG_INFO("Server is listening successfully at: {} , port: {} ", inet_ntoa(server_addr->sin_addr), + ntohs(server_addr->sin_port)); ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_CONNECT_REQUEST, &cm_event); if (ret) { - printf("Failed to get cm event, ret = %d \n", ret); + SPDLOG_ERROR("Failed to get cm event, ret = {} ", ret); return ret; } cm_client_id = cm_event->id; ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge the cm event errno: %d \n", -errno); + SPDLOG_ERROR("Failed to acknowledge the cm event errno: {} ", -errno); return -errno; } return ret; @@ -384,13 +413,13 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { struct sockaddr_in remote_sockaddr; int ret = -1; if (!cm_client_id || !client_qp) { - printf("Client resources are not properly setup\n"); + SPDLOG_ERROR("Client resources are not properly setup"); return -EINVAL; } client_metadata_mr = rdma_buffer_register(pd, &client_metadata_attr, sizeof(client_metadata_attr), (IBV_ACCESS_LOCAL_WRITE)); if (!client_metadata_mr) { - printf("Failed to register client attr buffer\n"); + SPDLOG_ERROR("Failed to register client attr buffer"); return -ENOMEM; } client_recv_sge.addr = (uint64_t)client_metadata_mr->addr; @@ -401,7 +430,7 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { client_recv_wr.num_sge = 1; ret = ibv_post_recv(client_qp, &client_recv_wr, &bad_client_recv_wr); if (ret) { - printf("Failed to pre-post the receive buffer, errno: %d \n", ret); + SPDLOG_ERROR("Failed to pre-post the receive buffer, errno: {} ", ret); return ret; } memset(&conn_param, 0, sizeof(conn_param)); @@ -409,39 +438,39 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { conn_param.responder_resources = 3; ret = rdma_accept(cm_client_id, &conn_param); if (ret) { - printf("Failed to accept the connection, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to accept the connection, errno: {} ", -errno); return -errno; } ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ESTABLISHED, &cm_event); if (ret) { - printf("Failed to get the cm event, errnp: %d \n", -errno); + SPDLOG_ERROR("Failed to get the cm event, errnp: {} ", -errno); return -errno; } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge the cm event %d\n", -errno); + SPDLOG_ERROR("Failed to acknowledge the cm event {}", -errno); return -errno; } memcpy(&remote_sockaddr, rdma_get_peer_addr(cm_client_id), sizeof(struct sockaddr_in)); - printf("A new connection is accepted from %s \n", inet_ntoa(remote_sockaddr.sin_addr)); + SPDLOG_INFO("A new connection is accepted from {} ", inet_ntoa(remote_sockaddr.sin_addr)); return ret; } int send_server_metadata_to_client() { - struct ibv_wc wc; + struct ibv_wc wc{}; int ret = -1; ret = process_work_completion_events(io_completion_channel, &wc, 1); if (ret != 1) { - printf("Failed to receive , ret = %d \n", ret); + SPDLOG_ERROR("Failed to receive , ret = {} ", ret); return ret; } show_rdma_buffer_attr(&client_metadata_attr); - printf("The client has requested buffer length of : %u bytes \n", client_metadata_attr.length); + SPDLOG_DEBUG("The client has requested buffer length of : {} bytes ", (uint32_t)client_metadata_attr.length); server_buffer_mr = rdma_buffer_alloc(pd, client_metadata_attr.length, (enum ibv_access_flags)(((int)IBV_ACCESS_LOCAL_WRITE) | ((int)IBV_ACCESS_REMOTE_READ) | ((int)IBV_ACCESS_REMOTE_WRITE))); if (!server_buffer_mr) { - printf("Server failed to create a buffer \n"); + SPDLOG_ERROR("Server failed to create a buffer "); return -ENOMEM; } server_metadata_attr.address = (uint64_t)server_buffer_mr->addr; @@ -450,7 +479,7 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { server_metadata_mr = rdma_buffer_register(pd, &server_metadata_attr, sizeof(server_metadata_attr), IBV_ACCESS_LOCAL_WRITE); if (!server_metadata_mr) { - printf("Server failed to create to hold server metadata \n"); + SPDLOG_ERROR("Server failed to create to hold server metadata "); return -ENOMEM; } server_send_sge.addr = (uint64_t)&server_metadata_attr; @@ -463,20 +492,19 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { server_send_wr.send_flags = IBV_SEND_SIGNALED; ret = ibv_post_send(client_qp, &server_send_wr, &bad_server_send_wr); if (ret) { - printf("Posting of server metdata failed, errno: %d \n", -errno); + SPDLOG_ERROR("Posting of server metdata failed, errno: {} ", -errno); return -errno; } ret = process_work_completion_events(io_completion_channel, &wc, 1); if (ret != 1) { - printf("Failed to send server metadata, ret = %d \n", ret); + SPDLOG_ERROR("Failed to send server metadata, ret = {} ", ret); return ret; } usleep(1); - printf("Received buffer contents: "); - for (int i = 0; i < 10; i++) { - printf("%x", ((uint8_t *)server_buffer_mr->addr)[i]); + SPDLOG_DEBUG("Received buffer contents: "); + for (int i = 0; i < 4; i++) { + SPDLOG_DEBUG("{}", ((uint8_t *)server_buffer_mr->addr)[i]); } - printf("\n"); return 0; } int disconnect_and_cleanup() { @@ -484,43 +512,43 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { int ret = -1; ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_DISCONNECTED, &cm_event); if (ret) { - printf("Failed to get disconnect event, ret = %d \n", ret); + SPDLOG_ERROR("Failed to get disconnect event, ret = {} ", ret); return ret; } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge the cm event %d\n", -errno); + SPDLOG_ERROR("Failed to acknowledge the cm event {}", -errno); return -errno; } - printf("A disconnect event is received from the client...\n"); + SPDLOG_DEBUG("A disconnect event is received from the client..."); rdma_destroy_qp(cm_client_id); ret = rdma_destroy_id(cm_client_id); if (ret) { - printf("Failed to destroy client id cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy client id cleanly, {} ", -errno); } ret = ibv_destroy_cq(cq); if (ret) { - printf("Failed to destroy completion queue cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy completion queue cleanly, {} ", -errno); } ret = ibv_destroy_comp_channel(io_completion_channel); if (ret) { - printf("Failed to destroy completion channel cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy completion channel cleanly, {} ", -errno); } rdma_buffer_free(server_buffer_mr); rdma_buffer_deregister(server_metadata_mr); rdma_buffer_deregister(client_metadata_mr); ret = ibv_dealloc_pd(pd); if (ret) { - printf("Failed to destroy client protection domain cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy client protection domain cleanly, {} ", -errno); } ret = rdma_destroy_id(cm_server_id); if (ret) { - printf("Failed to destroy server id cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy server id cleanly, {} ", -errno); } rdma_destroy_event_channel(cm_event_channel); - printf("Server shut-down is complete \n"); return 0; } + public: explicit RDMAReadStream(const char *server_name, int port) : RDMAEndpoint() { struct sockaddr_in server_sockaddr; @@ -529,48 +557,88 @@ class RDMAReadStream : public ReadStream, public RDMAEndpoint { server_sockaddr.sin_family = AF_INET; server_sockaddr.sin_port = htons(port); inet_pton(AF_INET, server_name, &server_sockaddr.sin_addr); - ret = start_rdma_server(&server_sockaddr); - if (ret) { - printf("RDMA server failed to start cleanly, ret = %d \n", ret); - } - ret = setup_client_resources(); - if (ret) { - printf("Failed to setup client resources, ret = %d \n", ret); - } - ret = accept_client_connection(); - if (ret) { - printf("Failed to handle client cleanly, ret = %d \n", ret); + long received = 0; + { + ret = start_rdma_server(&server_sockaddr); + if (ret) { + SPDLOG_ERROR("RDMA server failed to start cleanly, ret = {} ", ret); + } + ret = setup_client_resources(); + if (ret) { + SPDLOG_ERROR("Failed to setup client resources, ret = {} ", ret); + } + ret = accept_client_connection(); + if (ret) { + SPDLOG_ERROR("Failed to handle client cleanly, ret = {} ", ret); + } + ret = send_server_metadata_to_client(); + if (ret) { + SPDLOG_ERROR("Failed to send server metadata to the client, ret = {} ", ret); + } + received += client_metadata_attr.length; } - ret = send_server_metadata_to_client(); - if (ret) { - printf("Failed to send server metadata to the client, ret = %d \n", ret); + uint8_t *ptr = (uint8_t *)server_buffer_mr->addr; + size_t size = client_metadata_attr.length; + buffer = std::vector(ptr, ptr + size); + + while (received % BUFFER_SIZE == 0) { + disconnect_and_cleanup(); + ret = start_rdma_server(&server_sockaddr); + if (ret) { + SPDLOG_ERROR("RDMA server failed to start cleanly, ret = {} ", ret); + } + ret = setup_client_resources(); + if (ret) { + SPDLOG_ERROR("Failed to setup client resources, ret = {} ", ret); + } + ret = accept_client_connection(); + if (ret) { + SPDLOG_ERROR("Failed to handle client cleanly, ret = {} ", ret); + } + ret = send_server_metadata_to_client(); + if (ret) { + SPDLOG_ERROR("Failed to send server metadata to the client, ret = {} ", ret); + } + received += client_metadata_attr.length; + uint8_t *ptr2 = (uint8_t *)server_buffer_mr->addr; + size_t size2 = client_metadata_attr.length; + buffer.insert(buffer.end(), ptr2, ptr2 + size2); } - buffer = (uint8_t *)server_buffer_mr->addr; - buffer_size = client_metadata_attr.length; + buffer_size = received; + disconnect_and_cleanup(); } bool read(char *data, std::size_t sz) const override { if (position + sz > buffer_size) { std::throw_with_nested(std::runtime_error("Buffer overflow")); } - memcpy(data, buffer + position, sz); + memcpy(data, buffer.data() + position, sz); position += sz; return true; } + const char *read_view(size_t len) override { + if (position + len > buffer_size) { + std::throw_with_nested(std::runtime_error("Buffer overflow")); + } + const char *ret = (const char *)(buffer.data() + position); + position += len; + return ret; + } bool ignore(std::size_t sz) const override { position += sz; return true; } std::size_t tellg() const override { return position; } - ~RDMAReadStream() override { disconnect_and_cleanup(); }; + ~RDMAReadStream() override = default; }; class RDMAWriteStream : public WriteStream, public RDMAEndpoint { public: - mutable char *buffer; + mutable std::vector buffer{}; mutable long position = 0; struct ibv_cq *client_cq = nullptr; - struct ibv_sge client_send_sge, server_recv_sge; - struct sockaddr_in server_sockaddr; - struct rdma_conn_param conn_param; + struct ibv_sge client_send_sge {}; + struct ibv_sge server_recv_sge {}; + struct sockaddr_in server_sockaddr {}; + struct rdma_conn_param conn_param {}; struct rdma_cm_event *cm_event = nullptr; struct ibv_mr *client_metadata_mr = nullptr, *client_src_mr = nullptr, *client_dst_mr = nullptr, *server_metadata_mr = nullptr; @@ -581,63 +649,64 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { int ret = -1; cm_event_channel = rdma_create_event_channel(); if (!cm_event_channel) { - printf("Creating cm event channel failed, errno: %d \n", -errno); + SPDLOG_ERROR("Creating cm event channel failed, errno: {} ", -errno); return -errno; } ret = rdma_create_id(cm_event_channel, &cm_client_id, nullptr, RDMA_PS_TCP); if (ret) { - printf("Creating cm id failed with errno: %d \n", -errno); + SPDLOG_ERROR("Creating cm id failed with errno: {} ", -errno); return -errno; } ret = rdma_resolve_addr(cm_client_id, nullptr, (struct sockaddr *)s_addr, 2000); if (ret) { - printf("Failed to resolve address, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to resolve address, errno: {} ", -errno); return -errno; } ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ADDR_RESOLVED, &cm_event); if (ret) { - printf("Failed to receive a valid event, ret = %d \n", ret); + SPDLOG_ERROR("Failed to receive a valid event, ret = {} ", ret); return ret; } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge the CM event, errno: %d\n", -errno); + SPDLOG_ERROR("Failed to acknowledge the CM event, errno: {}", -errno); return -errno; } ret = rdma_resolve_route(cm_client_id, 2000); if (ret) { - printf("Failed to resolve route, erno: %d \n", -errno); + SPDLOG_ERROR("Failed to resolve route, erno: {} ", -errno); return -errno; } ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ROUTE_RESOLVED, &cm_event); if (ret) { - printf("Failed to receive a valid event, ret = %d \n", ret); + SPDLOG_ERROR("Failed to receive a valid event, ret = {} ", ret); return ret; } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge the CM event, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to acknowledge the CM event, errno: {} ", -errno); return -errno; } - printf("Trying to connect to server at : %s port: %d \n", inet_ntoa(s_addr->sin_addr), ntohs(s_addr->sin_port)); + SPDLOG_DEBUG("Trying to connect to server at : {} port: {} ", inet_ntoa(s_addr->sin_addr), + ntohs(s_addr->sin_port)); pd = ibv_alloc_pd(cm_client_id->verbs); if (!pd) { - printf("Failed to alloc pd, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to alloc pd, errno: {} ", -errno); return -errno; } io_completion_channel = ibv_create_comp_channel(cm_client_id->verbs); if (!io_completion_channel) { - printf("Failed to create IO completion event channel, errno: %d\n", -errno); + SPDLOG_ERROR("Failed to create IO completion event channel, errno: {}", -errno); return -errno; } client_cq = ibv_create_cq(cm_client_id->verbs, CQ_CAPACITY, nullptr, io_completion_channel, 0); if (!client_cq) { - printf("Failed to create CQ, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to create CQ, errno: {} ", -errno); return -errno; } ret = ibv_req_notify_cq(client_cq, 0); if (ret) { - printf("Failed to request notifications, errno: %d\n", -errno); + SPDLOG_ERROR("Failed to request notifications, errno: {}", -errno); return -errno; } bzero(&qp_init_attr, sizeof qp_init_attr); @@ -650,7 +719,7 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { qp_init_attr.send_cq = client_cq; ret = rdma_create_qp(cm_client_id, pd, &qp_init_attr); if (ret) { - printf("Failed to create QP, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to create QP, errno: {} ", -errno); return -errno; } client_qp = cm_client_id->qp; @@ -661,7 +730,7 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { server_metadata_mr = rdma_buffer_register(pd, &server_metadata_attr, sizeof(server_metadata_attr), (IBV_ACCESS_LOCAL_WRITE)); if (!server_metadata_mr) { - printf("Failed to setup the server metadata mr , -ENOMEM\n"); + SPDLOG_ERROR("Failed to setup the server metadata mr , -ENOMEM"); return -ENOMEM; } server_recv_sge.addr = (uint64_t)server_metadata_mr->addr; @@ -672,7 +741,7 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { server_recv_wr.num_sge = 1; ret = ibv_post_recv(client_qp, &server_recv_wr, &bad_server_recv_wr); if (ret) { - printf("Failed to pre-post the receive buffer, errno: %d \n", ret); + SPDLOG_ERROR("Failed to pre-post the receive buffer, errno: {} ", ret); return ret; } return 0; @@ -687,31 +756,31 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { conn_param.retry_count = 3; ret = rdma_connect(cm_client_id, &conn_param); if (ret) { - printf("Failed to connect to remote host , errno: %d\n", -errno); + SPDLOG_ERROR("Failed to connect to remote host , errno: {}", -errno); return -errno; } ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_ESTABLISHED, &cm_event); if (ret) { - printf("Failed to get cm event, ret = %d \n", ret); + SPDLOG_ERROR("Failed to get cm event, ret = {} ", ret); return ret; } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge cm event, errno: %d\n", -errno); + SPDLOG_ERROR("Failed to acknowledge cm event, errno: {}", -errno); return -errno; } - printf("The client is connected successfully \n"); + SPDLOG_DEBUG("The client is connected successfully "); return 0; } - int client_xchange_metadata_with_server() { + int client_xchange_metadata_with_server(char *b, long sz) { struct ibv_wc wc[2]; int ret = -1; client_src_mr = - rdma_buffer_register(pd, buffer, position, + rdma_buffer_register(pd, b, sz, (enum ibv_access_flags)(((int)IBV_ACCESS_LOCAL_WRITE) | ((int)IBV_ACCESS_REMOTE_READ) | ((int)IBV_ACCESS_REMOTE_WRITE))); if (!client_src_mr) { - printf("Failed to register the first buffer, ret = %d \n", ret); + SPDLOG_ERROR("Failed to register the first buffer, ret = {} ", ret); return ret; } client_metadata_attr.address = (uint64_t)client_src_mr->addr; @@ -720,7 +789,7 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { client_metadata_mr = rdma_buffer_register(pd, &client_metadata_attr, sizeof(client_metadata_attr), IBV_ACCESS_LOCAL_WRITE); if (!client_metadata_mr) { - printf("Failed to register the client metadata buffer, ret = %d \n", ret); + SPDLOG_ERROR("Failed to register the client metadata buffer, ret = {} ", ret); return ret; } client_send_sge.addr = (uint64_t)client_metadata_mr->addr; @@ -733,12 +802,12 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { client_send_wr.send_flags = IBV_SEND_SIGNALED; ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); if (ret) { - printf("Failed to send client metadata, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to send client metadata, errno: {} ", -errno); return -errno; } ret = process_work_completion_events(io_completion_channel, wc, 2); if (ret != 2) { - printf("We failed to get 2 work completions , ret = %d \n", ret); + SPDLOG_ERROR("We failed to get 2 work completions , ret = {} ", ret); return ret; } show_rdma_buffer_attr(&server_metadata_attr); @@ -758,23 +827,24 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { client_send_wr.wr.rdma.rkey = server_metadata_attr.stag.remote_stag; client_send_wr.wr.rdma.remote_addr = server_metadata_attr.address; ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); + if (ret) { - printf("Failed to write client src buffer, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to write client src buffer, errno: {} ", -errno); return -errno; } ret = process_work_completion_events(io_completion_channel, &wc, 1); if (ret != 1) { - printf("We failed to get 1 work completions , ret = %d \n", ret); + SPDLOG_ERROR("We failed to get 1 work completions , ret = {} ", ret); return ret; } ret = ibv_post_send(client_qp, &client_send_wr, &bad_client_send_wr); if (ret) { - printf("Failed to write client src buffer, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to write client src buffer, errno: {} ", -errno); return -errno; } ret = process_work_completion_events(io_completion_channel, &wc, 1); if (ret != 1) { - printf("We failed to get 1 work completions , ret = %d \n", ret); + SPDLOG_ERROR("We failed to get 1 work completions , ret = {} ", ret); return ret; } return 0; @@ -784,40 +854,41 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { int ret = -1; ret = rdma_disconnect(cm_client_id); if (ret) { - printf("Failed to disconnect, errno: %d \n", -errno); + SPDLOG_ERROR("Failed to disconnect, errno: {} ", -errno); } ret = process_rdma_cm_event(cm_event_channel, RDMA_CM_EVENT_DISCONNECTED, &cm_event); if (ret) { - printf("Failed to get RDMA_CM_EVENT_DISCONNECTED event, ret = %d\n", ret); + SPDLOG_ERROR("Failed to get RDMA_CM_EVENT_DISCONNECTED event, ret = {}", ret); } ret = rdma_ack_cm_event(cm_event); if (ret) { - printf("Failed to acknowledge cm event, errno: %d\n", -errno); + SPDLOG_ERROR("Failed to acknowledge cm event, errno: {}", -errno); } rdma_destroy_qp(cm_client_id); ret = rdma_destroy_id(cm_client_id); if (ret) { - printf("Failed to destroy client id cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy client id cleanly, {} ", -errno); } ret = ibv_destroy_cq(client_cq); if (ret) { - printf("Failed to destroy completion queue cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy completion queue cleanly, {} ", -errno); } ret = ibv_destroy_comp_channel(io_completion_channel); if (ret) { - printf("Failed to destroy completion channel cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy completion channel cleanly, {} ", -errno); } rdma_buffer_deregister(server_metadata_mr); rdma_buffer_deregister(client_metadata_mr); rdma_buffer_deregister(client_src_mr); ret = ibv_dealloc_pd(pd); if (ret) { - printf("Failed to destroy client protection domain cleanly, %d \n", -errno); + SPDLOG_ERROR("Failed to destroy client protection domain cleanly, {} ", -errno); } rdma_destroy_event_channel(cm_event_channel); - printf("Client resource clean up is complete \n"); + SPDLOG_DEBUG("Client resource clean up is complete "); return 0; } + public: explicit RDMAWriteStream(const char *server_name, int port) : RDMAEndpoint() { struct rdma_cm_event *cm_event = nullptr; @@ -828,39 +899,76 @@ class RDMAWriteStream : public WriteStream, public RDMAEndpoint { inet_pton(AF_INET, server_name, &server_sockaddr.sin_addr); ret = client_prepare_connection(&server_sockaddr); if (ret) { - printf("Failed to setup client connection , ret = %d \n", ret); + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); } ret = client_pre_post_recv_buffer(); if (ret) { - printf("Failed to setup client connection , ret = %d \n", ret); + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); } ret = client_connect_to_server(); if (ret) { - printf("Failed to setup client connection , ret = %d \n", ret); + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); } } virtual bool write(const char *data, std::size_t sz) const { - memcpy(buffer + position, data, sz); + buffer.insert(buffer.end(), data, data + sz); position += sz; return true; } ~RDMAWriteStream() { int ret = -1; - for (int i = 0; i < 100; i++) { - printf("%x", (uint8_t)buffer[i]); + for (int i = 0; i < 4; i++) { + SPDLOG_DEBUG("{:x}", (uint8_t)buffer[i]); + } + for (int i = 0; i < 4; i++) { + SPDLOG_DEBUG("{:x}", (uint8_t)buffer[i + BUFFER_SIZE]); } - printf("\n"); - ret = client_xchange_metadata_with_server(); + long to_send = position; + ret = client_xchange_metadata_with_server(buffer.data(), std::min(to_send, BUFFER_SIZE)); if (ret) { - printf("Failed to setup client connection , ret = %d \n", ret); + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); } ret = client_remote_memory_ops(); if (ret) { - printf("Failed to finish remote memory ops, ret = %d \n", ret); + SPDLOG_ERROR("Failed to finish remote memory ops, ret = {} ", ret); } ret = client_disconnect_and_clean(); if (ret) { - printf("Failed to clean up client resources, ret = %d \n", ret); + SPDLOG_ERROR("Failed to clean up client resources, ret = {} ", ret); + } + to_send -= BUFFER_SIZE; + while (to_send > 0) { + usleep(1024 * 200); + ret = client_prepare_connection(&server_sockaddr); + if (ret) { + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); + } + ret = client_pre_post_recv_buffer(); + if (ret) { + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); + } + ret = client_connect_to_server(); + if (ret) { + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); + } + SPDLOG_DEBUG("{}", position - to_send); + for (int i = 0; i < 4; i++) { + SPDLOG_DEBUG("{:x}", (uint8_t)buffer[i + position - to_send]); + } + ret = client_xchange_metadata_with_server(buffer.data() + (position - to_send), + std::min(to_send, BUFFER_SIZE)); + if (ret) { + SPDLOG_ERROR("Failed to setup client connection , ret = {} ", ret); + } + ret = client_remote_memory_ops(); + if (ret) { + SPDLOG_ERROR("Failed to finish remote memory ops, ret = {} ", ret); + } + ret = client_disconnect_and_clean(); + if (ret) { + SPDLOG_ERROR("Failed to clean up client resources, ret = {} ", ret); + } + to_send -= BUFFER_SIZE; } }; }; diff --git a/lib/yalantinglibs b/lib/yalantinglibs index cb1aec8..3b73dfa 160000 --- a/lib/yalantinglibs +++ b/lib/yalantinglibs @@ -1 +1 @@ -Subproject commit cb1aec86c71f20c89ed0a7b6ae130fdb05ac75f6 +Subproject commit 3b73dfa989b440851dfd7b7265ac09847b68e732 diff --git a/src/checkpoint.cpp b/src/checkpoint.cpp index 2161476..51d56e7 100644 --- a/src/checkpoint.cpp +++ b/src/checkpoint.cpp @@ -29,7 +29,6 @@ std::ostringstream re{}; WriteStream *writer; std::vector> as; std::mutex as_mtx; -long snapshot_memory = 0; int main(int argc, char *argv[]) { spdlog::cfg::load_env_levels(); diff --git a/src/profile.cpp b/src/profile.cpp index db9572a..7be0757 100644 --- a/src/profile.cpp +++ b/src/profile.cpp @@ -30,7 +30,6 @@ std::ostringstream re{}; FwriteStream *writer; std::vector> as; std::mutex as_mtx; -long snapshot_memory = 0; std::vector>> stack_record; void unwind(WASMExecEnv *instance) { diff --git a/src/restore.cpp b/src/restore.cpp index d3f9e94..47d532a 100644 --- a/src/restore.cpp +++ b/src/restore.cpp @@ -10,7 +10,7 @@ * UC Santa Cruz Sluglab. */ -#include "struct_pack/struct_pack.hpp" +#include "ylt/struct_pack.hpp" #include "wamr.h" #include "wamr_exec_env.h" #include "wamr_export.h" @@ -28,7 +28,6 @@ ReadStream *reader; WriteStream *writer; WAMRInstance *wamr = nullptr; std::vector> as; -long snapshot_memory = 0; int main(int argc, char **argv) { spdlog::cfg::load_env_levels(); diff --git a/src/wamr.cpp b/src/wamr.cpp index dbf7b5d..ed77f5a 100644 --- a/src/wamr.cpp +++ b/src/wamr.cpp @@ -41,7 +41,6 @@ WAMRInstance::ThreadArgs **argptr; std::counting_semaphore<100> wakeup(0); std::counting_semaphore<100> thread_init(0); -extern long snapshot_memory; extern WriteStream *writer; extern std::vector> as; @@ -811,8 +810,7 @@ long get_rss() { void serialize_to_file(WASMExecEnv *instance) { // gateway auto start = std::chrono::high_resolution_clock::now(); - if (snapshot_memory == 0) - snapshot_memory = get_rss(); + #if WASM_ENABLE_LIB_PTHREAD != 0 auto cluster = wasm_exec_env_get_cluster(instance); auto all_count = bh_list_length(&cluster->exec_env_list); @@ -973,19 +971,22 @@ void serialize_to_file(WASMExecEnv *instance) { } // finish filling vector #endif - auto used_memory = get_rss(); +#if __linux__ if (dynamic_cast(writer)) { auto buffer = struct_pack::serialize(as); - ((RDMAWriteStream *)writer)->buffer = buffer.data(); + ((RDMAWriteStream *)writer)->buffer = buffer; ((RDMAWriteStream *)writer)->position = buffer.size(); + SPDLOG_DEBUG("Snapshot size: {}\n", buffer.size()); + delete ((RDMAWriteStream *)writer); + } else +#endif struct_pack::serialize_to(*writer, as); auto end = std::chrono::high_resolution_clock::now(); // get duration in us auto dur = std::chrono::duration_cast(end - start); SPDLOG_INFO("Snapshot time: {} s", dur.count() / 1000000.0); - SPDLOG_INFO("Memory usage: {} MB", (used_memory - snapshot_memory) / 1024 / 1024); - delete writer; + SPDLOG_INFO("Memory usage: {} MB", get_rss() / 1024 / 1024); exit(EXIT_SUCCESS); } \ No newline at end of file