Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IO injection abstraction #594

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core-bus/src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ggl/file.h>
#include <ggl/log.h>
#include <ggl/object.h>
#include <ggl/socket.h>
#include <stdbool.h>
#include <stddef.h>

Expand Down Expand Up @@ -50,7 +51,7 @@ GglError ggl_call(
GglBuffer recv_buffer = GGL_BUF(ggl_core_bus_client_payload_array);
EventStreamMessage msg = { 0 };
ret = ggl_client_get_response(
ggl_fd_reader, &conn, recv_buffer, error, &msg
ggl_socket_reader(&conn), recv_buffer, error, &msg
);
if (ret != GGL_ERR_OK) {
return ret;
Expand Down
26 changes: 8 additions & 18 deletions core-bus/src/client_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ggl/eventstream/encode.h>
#include <ggl/eventstream/types.h>
#include <ggl/file.h>
#include <ggl/io.h>
#include <ggl/log.h>
#include <ggl/object.h>
#include <ggl/socket.h>
Expand Down Expand Up @@ -46,14 +47,6 @@ static GglError interface_connect(GglBuffer interface, int *conn_fd) {
return ggl_connect(socket_path.buf, conn_fd);
}

static GglError payload_writer(GglBuffer *buf, void *payload) {
assert(buf != NULL);
assert(payload != NULL);

GglMap *map = payload;
return ggl_serialize(GGL_OBJ_MAP(*map), buf);
}

GglError ggl_client_send_message(
GglBuffer interface,
GglCoreBusRequestType type,
Expand All @@ -79,7 +72,10 @@ GglError ggl_client_send_message(
size_t headers_len = sizeof(headers) / sizeof(headers[0]);

ret = eventstream_encode(
&send_buffer, headers, headers_len, payload_writer, &params
&send_buffer,
headers,
headers_len,
ggl_serialize_reader(&GGL_OBJ_MAP(params))
);
if (ret != GGL_ERR_OK) {
return ret;
Expand All @@ -96,22 +92,16 @@ GglError ggl_client_send_message(
return GGL_ERR_OK;
}

GglError ggl_fd_reader(void *ctx, GglBuffer buf) {
int *fd_ptr = ctx;
return ggl_read_exact(*fd_ptr, buf);
}

GglError ggl_client_get_response(
GglError (*reader)(void *ctx, GglBuffer buf),
void *reader_ctx,
GglReader reader,
GglBuffer recv_buffer,
GglError *error,
EventStreamMessage *response
) {
GglBuffer prelude_buf = ggl_buffer_substr(recv_buffer, 0, 12);
assert(prelude_buf.len == 12);

GglError ret = reader(reader_ctx, prelude_buf);
GglError ret = ggl_reader_call_exact(reader, prelude_buf);
if (ret != GGL_ERR_OK) {
return ret;
}
Expand All @@ -130,7 +120,7 @@ GglError ggl_client_get_response(
GglBuffer data_section
= ggl_buffer_substr(recv_buffer, 0, prelude.data_len);

ret = reader(reader_ctx, data_section);
ret = ggl_reader_call_exact(reader, data_section);
if (ret != GGL_ERR_OK) {
return ret;
}
Expand Down
6 changes: 2 additions & 4 deletions core-bus/src/client_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ggl/core_bus/constants.h>
#include <ggl/error.h>
#include <ggl/eventstream/decode.h>
#include <ggl/io.h>
#include <ggl/object.h>
#include <stdint.h>

Expand All @@ -25,11 +26,8 @@ GglError ggl_client_send_message(
int *conn_fd
);

GglError ggl_fd_reader(void *ctx, GglBuffer buf);

GglError ggl_client_get_response(
GglError (*reader)(void *ctx, GglBuffer buf),
void *reader_ctx,
GglReader reader,
GglBuffer recv_buffer,
GglError *error,
EventStreamMessage *response
Expand Down
14 changes: 7 additions & 7 deletions core-bus/src/client_subscribe.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <ggl/file.h>
#include <ggl/log.h>
#include <ggl/object.h>
#include <ggl/socket.h>
#include <ggl/socket_epoll.h>
#include <ggl/socket_handle.h>
#include <pthread.h>
Expand Down Expand Up @@ -146,7 +147,7 @@ static GglError make_subscribe_request(
GglBuffer recv_buffer = GGL_BUF(ggl_core_bus_client_payload_array);
EventStreamMessage msg = { 0 };
ret = ggl_client_get_response(
ggl_fd_reader, &conn, recv_buffer, error, &msg
ggl_socket_reader(&conn), recv_buffer, error, &msg
);
if (ret != GGL_ERR_OK) {
return ret;
Expand Down Expand Up @@ -248,11 +249,6 @@ void ggl_client_sub_close(uint32_t handle) {
ggl_socket_handle_close(&pool, handle);
}

static GglError socket_handle_reader(void *ctx, GglBuffer buf) {
uint32_t *handle_ptr = ctx;
return ggl_socket_handle_read(&pool, *handle_ptr, buf);
}

typedef struct {
uint32_t handle;
GglObject data;
Expand Down Expand Up @@ -288,8 +284,12 @@ static GglError get_subscription_response(uint32_t handle) {

GglBuffer recv_buffer = GGL_BUF(sub_resp_payload_array);
EventStreamMessage msg = { 0 };
GglSocketHandleReaderCtx reader_ctx;
GglError ret = ggl_client_get_response(
socket_handle_reader, &handle, recv_buffer, NULL, &msg
ggl_socket_handle_reader(&reader_ctx, &pool, handle),
recv_buffer,
NULL,
&msg
);
if (ret != GGL_ERR_OK) {
return ret;
Expand Down
20 changes: 19 additions & 1 deletion core-bus/src/object_serde.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ggl/bump_alloc.h>
#include <ggl/constants.h>
#include <ggl/error.h>
#include <ggl/io.h>
#include <ggl/log.h>
#include <ggl/object.h>
#include <string.h>
Expand Down Expand Up @@ -414,7 +415,7 @@ static GglError read_obj(
}

GglError ggl_serialize(GglObject obj, GglBuffer *buf) {
assert((buf != NULL) && (buf->data != NULL));
assert(buf != NULL);
GglBumpAlloc mem = ggl_bump_alloc_init(*buf);

NestingState state = {
Expand Down Expand Up @@ -520,3 +521,20 @@ GglError ggl_deserialize(

return GGL_ERR_OK;
}

static GglError obj_read(void *ctx, GglBuffer *buf) {
assert(buf != NULL);

GglObject *obj = ctx;

if ((obj == NULL) || (buf == NULL)) {
return GGL_ERR_INVALID;
}

return ggl_serialize(*obj, buf);
}

GglReader ggl_serialize_reader(GglObject *obj) {
assert(obj != NULL);
return (GglReader) { .read = obj_read, .ctx = obj };
}
5 changes: 5 additions & 0 deletions core-bus/src/object_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ggl/alloc.h>
#include <ggl/buffer.h>
#include <ggl/error.h>
#include <ggl/io.h>
#include <ggl/object.h>
#include <stdbool.h>

Expand All @@ -23,4 +24,8 @@ GglError ggl_deserialize(
GglAlloc *alloc, bool copy_bufs, GglBuffer buf, GglObject *obj
);

/// Reader from which a serialized object can be read.
/// Errors if buffer is not large enough for entire object.
GglReader ggl_serialize_reader(GglObject *obj);

#endif
20 changes: 6 additions & 14 deletions core-bus/src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ggl/eventstream/decode.h>
#include <ggl/eventstream/encode.h>
#include <ggl/eventstream/types.h>
#include <ggl/io.h>
#include <ggl/log.h>
#include <ggl/object.h>
#include <ggl/socket_handle.h>
Expand Down Expand Up @@ -277,17 +278,6 @@ GglError ggl_listen(
);
}

static GglError payload_writer(GglBuffer *buf, void *payload) {
GglObject *obj = payload;

if (obj == NULL) {
buf->len = 0;
return GGL_ERR_OK;
}

return ggl_serialize(*obj, buf);
}

void ggl_return_err(uint32_t handle, GglError error) {
assert(error != GGL_ERR_OK); // Returning error ok is invalid

Expand All @@ -301,7 +291,7 @@ void ggl_return_err(uint32_t handle, GglError error) {
size_t resp_headers_len = sizeof(resp_headers) / sizeof(resp_headers[0]);

GglError ret = eventstream_encode(
&send_buffer, resp_headers, resp_headers_len, payload_writer, NULL
&send_buffer, resp_headers, resp_headers_len, GGL_NULL_READER
);

if (ret == GGL_ERR_OK) {
Expand Down Expand Up @@ -333,7 +323,9 @@ void ggl_respond(uint32_t handle, GglObject value) {

GglBuffer send_buffer = GGL_BUF(encode_array);

ret = eventstream_encode(&send_buffer, NULL, 0, payload_writer, &value);
ret = eventstream_encode(
&send_buffer, NULL, 0, ggl_serialize_reader(&value)
);
if (ret != GGL_ERR_OK) {
return;
}
Expand Down Expand Up @@ -384,7 +376,7 @@ void ggl_sub_accept(
size_t resp_headers_len = sizeof(resp_headers) / sizeof(resp_headers[0]);

GglError ret = eventstream_encode(
&send_buffer, resp_headers, resp_headers_len, payload_writer, NULL
&send_buffer, resp_headers, resp_headers_len, GGL_NULL_READER
);
if (ret != GGL_ERR_OK) {
return;
Expand Down
5 changes: 3 additions & 2 deletions eventstream/include/ggl/eventstream/encode.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
#include "types.h"
#include <ggl/buffer.h>
#include <ggl/error.h>
#include <ggl/io.h>
#include <stddef.h>

/// Encode an EventStream packet into a buffer.
/// Payload must fail if it does not fit in provided buffer.
GglError eventstream_encode(
GglBuffer *buf,
const EventStreamHeader *headers,
size_t header_count,
GglError (*payload_writer)(GglBuffer *buf, void *payload),
void *payload
GglReader payload
);

#endif
16 changes: 6 additions & 10 deletions eventstream/src/encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ggl/buffer.h>
#include <ggl/bump_alloc.h>
#include <ggl/error.h>
#include <ggl/io.h>
#include <ggl/log.h>
#include <string.h>
#include <stdbool.h>
Expand Down Expand Up @@ -103,8 +104,7 @@ GglError eventstream_encode(
GglBuffer *buf,
const EventStreamHeader *headers,
size_t header_count,
GglError (*payload_writer)(GglBuffer *buf, void *payload),
void *payload
GglReader payload
) {
assert((headers == NULL) ? (header_count == 0) : true);

Expand Down Expand Up @@ -144,15 +144,11 @@ GglError eventstream_encode(
write_be_u32(headers_len, headers_len_p);

GglBuffer payload_buf = buf_copy;
if (payload_writer == NULL) {
payload_buf.len = 0;
} else {
GglError err = payload_writer(&payload_buf, payload);
if (err != GGL_ERR_OK) {
return err;
}
buf_copy = ggl_buffer_substr(buf_copy, payload_buf.len, SIZE_MAX);
GglError err = ggl_reader_call(payload, &payload_buf);
if (err != GGL_ERR_OK) {
return err;
}
buf_copy = ggl_buffer_substr(buf_copy, payload_buf.len, SIZE_MAX);

uint32_t message_len = 12 + headers_len + (uint32_t) payload_buf.len + 4;

Expand Down
20 changes: 5 additions & 15 deletions ggipc-client/src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <ggl/eventstream/rpc.h>
#include <ggl/eventstream/types.h>
#include <ggl/file.h>
#include <ggl/io.h>
#include <ggl/json_decode.h>
#include <ggl/json_encode.h>
#include <ggl/log.h>
Expand All @@ -38,18 +39,6 @@
static uint8_t payload_array[GGL_IPC_MAX_MSG_LEN];
static pthread_mutex_t payload_array_mtx = PTHREAD_MUTEX_INITIALIZER;

static GglError payload_writer(GglBuffer *buf, void *payload) {
assert(buf != NULL);

if (payload == NULL) {
buf->len = 0;
return GGL_ERR_OK;
}

GglMap *map = payload;
return ggl_json_encode(GGL_OBJ_MAP(*map), buf);
}

static GglError send_message(
int conn,
const EventStreamHeader *headers,
Expand All @@ -60,9 +49,10 @@ static GglError send_message(

GglBuffer send_buffer = GGL_BUF(payload_array);

GglError ret = eventstream_encode(
&send_buffer, headers, headers_len, payload_writer, payload
);
GglReader reader = payload != NULL ? ggl_json_reader(&GGL_OBJ_MAP(*payload))
: GGL_NULL_READER;
GglError ret
= eventstream_encode(&send_buffer, headers, headers_len, reader);
if (ret != GGL_ERR_OK) {
return ret;
}
Expand Down
Loading