Skip to content

Commit

Permalink
push the serializer for socket and benchmark result
Browse files Browse the repository at this point in the history
  • Loading branch information
victoryang00 committed Mar 6, 2024
1 parent c77f752 commit f8c00e8
Show file tree
Hide file tree
Showing 11 changed files with 1,037 additions and 871 deletions.
Empty file added doc/burst_computing.md
Empty file.
1 change: 1 addition & 0 deletions doc/optimistic_computing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Optimistic Computing
1 change: 1 addition & 0 deletions include/wamr.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct mvvm_op_data {
int size;
SocketAddrPool addr[MVVM_MAX_ADDR][2];
};
std::string removeExtension(std::string &);
bool is_ip_in_cidr(const char *base_ip, int subnet_mask_len, uint32_t ip);
bool is_ipv6_in_cidr(const char *base_ip_str, int subnet_mask_len, struct in6_addr *ip);
long get_rss();
Expand Down
1 change: 1 addition & 0 deletions include/wamr_module_instance_extra.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#if WASM_ENABLE_WASI_NN != 0
#include "wamr_wasi_nn_context.h"
#endif
#include "wamr_serializer.h"
#include "wasm_runtime.h"
#include <memory>
#include <vector>
Expand Down
148 changes: 144 additions & 4 deletions include/wamr_read_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,165 @@
#ifndef MVVM_WAMR_READ_WRITE_H
#define MVVM_WAMR_READ_WRITE_H
#include "struct_pack/struct_pack.hpp"
#include <spdlog/spdlog.h>
#ifndef _WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
struct FwriteStream {
struct WriteStream {
virtual bool write(const char *data, std::size_t sz) const = 0;
};
struct ReadStream {
virtual bool read(char *data, std::size_t sz) const = 0;
virtual bool ignore(std::size_t sz) const = 0;
virtual std::size_t tellg() const = 0;
};
struct FwriteStream : public WriteStream {
FILE *file;
bool write(const char *data, std::size_t sz) const { return fwrite(data, sz, 1, file) == 1; }
explicit FwriteStream(const char *file_name) : file(fopen(file_name, "wb")) {}
~FwriteStream() { fclose(file); }
};

struct FreadStream {
struct FreadStream : public ReadStream {
FILE *file;
bool read(char *data, std::size_t sz) const { return fread(data, sz, 1, file) == 1; }
[[nodiscard]] bool ignore(std::size_t sz) const { return fseek(file, sz, SEEK_CUR) == 0; }
[[nodiscard]] std::size_t tellg() const {
bool ignore(std::size_t sz) const { return fseek(file, sz, SEEK_CUR) == 0; }
std::size_t tellg() const {
// if you worry about ftell performance, just use an variable to record it.
return ftell(file);
}
explicit FreadStream(const char *file_name) : file(fopen(file_name, "rb")) {}
~FreadStream() { fclose(file); }
};
static_assert(ReaderStreamTrait<FreadStream, char>, "Reader must conform to ReaderStreamTrait");
static_assert(WriterStreamTrait<FwriteStream, char>, "Writer must conform to WriterStreamTrait");

struct SocketWriteStream : public WriteStream {
int sock_fd; // Socket file descriptor

bool write(const char *data, std::size_t sz) const {
std::size_t totalSent = 0;
while (totalSent < sz) {
ssize_t sent = send(sock_fd, data + totalSent, sz - totalSent, 0);
if (sent == -1) {
// Handle error. For simplicity, just returning false here.
return false;
}
totalSent += sent;
}
return true;
}
explicit SocketWriteStream(const char *address, int port) {
// Example: Initialize a client socket and connect to the given address and port
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd == -1) {
SPDLOG_ERROR("Socket creation failed\n");
return;
}

sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// Convert address from text to binary form
inet_pton(AF_INET, address, &server_addr.sin_addr);

if (connect(sock_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
SPDLOG_ERROR("Connection failed\n");
close(sock_fd);
exit(EXIT_FAILURE);
}
}

~SocketWriteStream() {
close(sock_fd); // Close the socket descriptor
}
};
struct SocketReadStream : public ReadStream {
int sock_fd; // Socket file descriptor
int client_fd;
mutable std::size_t position = 0; // Track the amount of data read

// bool read(char *data, std::size_t sz) const {
// ssize_t bytes_read = recv(client_fd, data, sz, 0);
// SPDLOG_DEBUG("{}, {}",data,sz);
// if (bytes_read > 0) {
// position += bytes_read;
// return static_cast<std::size_t>(bytes_read) == sz;
// }
// return false;
// }
bool read(char *data, std::size_t sz) const {
std::size_t totalReceived = 0;
while (totalReceived < sz) {
ssize_t received = recv(client_fd, data + totalReceived, sz - totalReceived, 0);
if (received == -1) {
// Handle error. For simplicity, just returning false here.
return false;
} else if (received == 0) {
// Connection closed
return false;
}
totalReceived += received;
}
position += totalReceived;
return true;
}

explicit SocketReadStream(const char *address, int port) {
// Example: Initialize a client socket and connect to the given address and port
sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (sock_fd == -1) {
SPDLOG_ERROR("Socket creation failed\n");
return;
}

sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
// Convert address from text to binary form
inet_pton(AF_INET, address, &server_addr.sin_addr);
auto addr_len = sizeof(server_addr);
SPDLOG_INFO("[Server] Bind socket {} {}\n", address, port);
if (bind(sock_fd, (struct sockaddr *)&server_addr, addr_len) < 0) {
SPDLOG_ERROR("Bind failed");
exit(EXIT_FAILURE);
}

SPDLOG_INFO("[Server] Listening on socket\n");
if (listen(sock_fd, 3) < 0) {
SPDLOG_ERROR("Listen failed");
exit(EXIT_FAILURE);
}
client_fd = accept(sock_fd, (struct sockaddr *)&server_addr, (socklen_t *)&addr_len);
}
// "Ignore" sz bytes of data
bool ignore(std::size_t sz) const {
char buffer[1024]; // Temporary buffer to discard data
std::size_t total_ignored = 0;

while (total_ignored < sz) {
std::size_t to_ignore = std::min(sz - total_ignored, sizeof(buffer));
ssize_t ignored = recv(client_fd, buffer, to_ignore, 0);
if (ignored <= 0) { // Check for error or close
return false;
}
total_ignored += ignored;
position += ignored; // Update position
}

return true;
}

// Report the current position
std::size_t tellg() const { return position; }
~SocketReadStream() {
close(sock_fd); // Close the socket descriptor
}
};
static_assert(ReaderStreamTrait<SocketReadStream, char>, "Reader must conform to ReaderStreamTrait");
static_assert(WriterStreamTrait<SocketWriteStream, char>, "Writer must conform to WriterStreamTrait");

#endif /* MVVM_WAMR_READ_WRITE_H */
16 changes: 16 additions & 0 deletions include/wamr_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,20 @@ concept CheckerTrait = requires(T &t, K k) {
{ t->equal_impl(k) } -> std::convertible_to<bool>;
};

template <typename T, typename WriteDataType>
concept WriterStreamTrait = requires(T &t, const WriteDataType *data, std::size_t size) {
// Requires a write method that accepts WriteDataType and returns void or a boolean.
{ t.write(data, size) } -> std::same_as<bool>;
};

template <typename T, typename ReadDataType>
concept ReaderStreamTrait = requires(T &t, ReadDataType *data, std::size_t size) {
// Requires a read method that accepts a pointer to ReadDataType and size, returns bool.
{ t.read(data, size) } -> std::same_as<bool>;
// Requires an ignore method that accepts size and returns bool.
{ t.ignore(size) } -> std::same_as<bool>;
// Requires a tellg method that returns std::size_t.
{ t.tellg() } -> std::same_as<std::size_t>;
};

#endif // MVVM_WAMR_SERIALIZER_H
2 changes: 2 additions & 0 deletions include/wamr_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#ifndef MVVM_WAMR_TYPE_H
#define MVVM_WAMR_TYPE_H
#include "wasm_runtime.h"
#include "wamr_serializer.h"

struct WAMRType {
uint16 param_count;
uint16 result_count;
Expand Down
Loading

0 comments on commit f8c00e8

Please sign in to comment.