Skip to content

Commit

Permalink
Merge pull request #788 from openziti/improve-parse-message
Browse files Browse the repository at this point in the history
Improve parse message
  • Loading branch information
ekoby authored Dec 10, 2024
2 parents 4365c76 + fa4623b commit 1aa86a8
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 35 deletions.
11 changes: 7 additions & 4 deletions inc_internal/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

#define MAGIC_INIT {0x3, 0x6, 0x9, 0xC}

typedef char magic_t[4];
typedef union {
char magic[4];
int32_t magint;
} magic_t;

#define HEADER_FIELDS(XX) \
XX(content, uint32_t)\
Expand All @@ -52,7 +55,7 @@ static header_t EMPTY_HEADER = {
typedef struct {
uint32_t header_id;
uint32_t length;
uint8_t *value;
const uint8_t *value;
} hdr_t;

#define var_header(id, var) header(id, sizeof(var), &(var))
Expand Down Expand Up @@ -97,9 +100,9 @@ bool message_get_bytes_header(message *m, int header_id, const uint8_t **ptr, si

uint8_t *write_hdr(const hdr_t *h, uint8_t *buf);

int parse_hdrs(uint8_t *buf, uint32_t len, hdr_t **hp);
int parse_hdrs(const uint8_t *buf, uint32_t len, hdr_t **hp);

message *message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE]);
int message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE], message **msg_p);

message *message_new(pool_t *pool, uint32_t content, const hdr_t *headers, int nheaders, size_t body_len);

Expand Down
2 changes: 2 additions & 0 deletions includes/ziti/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ is offline or did not respond to the request*/
#define ZITI_CERT_FAILED_VALIDATION (-36)
/** returned when the certificate doesn't have an externalId") \*/
#define ZITI_MISSING_CERT_CLAIM (-37)
/** ziti could not allocate memory */
#define ZITI_ALLOC_FAILED (-38)


// Put new error codes here and add error string in error.c
Expand Down
29 changes: 21 additions & 8 deletions library/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,6 @@ static bool is_edge(uint32_t content) {
static void dispatch_message(ziti_channel_t *ch, message *m) {
struct waiter_s *w = NULL;

m->nhdrs = parse_hdrs(m->headers, m->header.headers_len, &m->hdrs);

uint32_t reply_to;
bool is_reply = message_get_int32_header(m, ReplyForHeader, (int32_t*)&reply_to);

Expand Down Expand Up @@ -566,6 +564,7 @@ static void dispatch_message(ziti_channel_t *ch, message *m) {
static void process_inbound(ziti_channel_t *ch) {
uint8_t *ptr;
ssize_t len;
int rc = 0;
do {
if (ch->in_next == NULL && pool_has_available(ch->in_msg_pool)) {
if (buffer_available(ch->incoming) < HEADER_SIZE) {
Expand All @@ -583,7 +582,9 @@ static void process_inbound(ziti_channel_t *ch) {

assert(header_read == HEADER_SIZE);

ch->in_next = message_new_from_header(ch->in_msg_pool, header_buf);

rc = message_new_from_header(ch->in_msg_pool, header_buf, &ch->in_next);
if (rc != ZITI_OK) break;
ch->in_body_offset = 0;

CH_LOG(TRACE, "<= ct[%04X] seq[%d] len[%d] hdrs[%d]", ch->in_next->header.content,
Expand All @@ -608,17 +609,29 @@ static void process_inbound(ziti_channel_t *ch) {
ch->in_body_offset += len;

if (ch->in_body_offset == total) {
CH_LOG(TRACE, "message is complete seq[%d] ct[%04X]", ch->in_next->header.seq,
ch->in_next->header.content);

dispatch_message(ch, ch->in_next);

message *msg = ch->in_next;
ch->in_next = NULL;

CH_LOG(TRACE, "message is complete seq[%d] ct[%04X]",
msg->header.seq, msg->header.content);

rc = parse_hdrs(msg->headers, msg->header.headers_len, &msg->hdrs);
if (rc < 0) {
pool_return_obj(msg);
CH_LOG(ERROR, "failed to parse incoming message: %s", ziti_errorstr(rc));
break;
}
msg->nhdrs = rc;
rc = 0;
dispatch_message(ch, msg);
}
}
} while (1);

buffer_cleanup(ch->incoming);
if (rc != 0) {
on_channel_close(ch, rc, 0);
}
}

static void latency_reply_cb(void *ctx, message *reply, int err) {
Expand Down
1 change: 1 addition & 0 deletions library/errors.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
XX(CERT_IN_USE,"the provided certificate already in use") \
XX(CERT_FAILED_VALIDATION, "the provided key/cert are invalid") \
XX(MISSING_CERT_CLAIM, "the certificate is expected to contain an externalId but none was not found") \
XX(ALLOC_FAILED, "memory allocation failed") \
XX(WTF, "WTF: programming error")


Expand Down
64 changes: 45 additions & 19 deletions library/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "message.h"
#include <stdlib.h>
#include <string.h>
#include <ziti/errors.h>

#include "utils.h"
#include "endian_internal.h"
Expand All @@ -25,7 +26,7 @@ static const uint8_t *read_int32(const uint8_t *p, uint32_t *val) {
}

void header_to_buffer(header_t *h, uint8_t *buf) {
memcpy(buf, h->magic, sizeof(h->magic));
memcpy(buf, h->magic.magic, sizeof(h->magic));
uint8_t *offset = buf + sizeof(h->magic);

#define write_field(n,t) {\
Expand All @@ -38,7 +39,7 @@ void header_to_buffer(header_t *h, uint8_t *buf) {
};

void header_from_buffer(header_t *h, uint8_t *buf) {
memcpy(h->magic, buf, sizeof(h->magic));
memcpy(h->magic.magic, buf, sizeof(h->magic));
uint8_t *offset = buf + sizeof(h->magic);

#define read_field(n,t) {\
Expand Down Expand Up @@ -83,26 +84,39 @@ uint8_t *write_hdr(const hdr_t *h, uint8_t *buf) {
return buf + h->length;
}

int parse_hdrs(uint8_t *buf, uint32_t len, hdr_t **hp) {
int parse_hdrs(const uint8_t *buf, uint32_t len, hdr_t **hp) {
const uint8_t *p = buf;

hdr_t *headers = NULL;
int count = 0;
while (p < buf + len) {
if (headers == NULL) {
headers = malloc(sizeof(hdr_t));
}
else {
headers = realloc(headers, (count + 1) * sizeof(hdr_t));
}

p = read_int32(p, &headers[count].header_id);
p = read_int32(p, &headers[count].length);
headers[count].value = p;
p += headers[count].length;
while (p < buf + len - 2 * sizeof(uint32_t)) {
uint32_t length;
p += sizeof(uint32_t);
p = read_int32(p, &length);
p += length;
count++;
}

if (p != buf + len) {
ZITI_LOG(ERROR, "misaligned message headers");
return ZITI_INVALID_STATE;
}

hdr_t *headers = calloc(count, sizeof(hdr_t));
if (headers == NULL) {
ZITI_LOG(ERROR, "failed to allocates message headers");
return ZITI_ALLOC_FAILED;
}

p = buf;
int idx = 0;
while (p < buf + len) {
p = read_int32(p, &headers[idx].header_id);
p = read_int32(p, &headers[idx].length);
headers[idx].value = p;
p += headers[idx].length;
idx++;
}

*hp = headers;
return count;
}
Expand All @@ -119,7 +133,7 @@ static hdr_t *find_header(message *m, int header_id) {
bool message_get_bool_header(message *m, int header_id, bool *v) {
hdr_t *h = find_header(m, header_id);
if (h != NULL) {
int8_t val = *h->value;
char val = (char)h->value[0];
*v = (val != 0);
return true;
}
Expand Down Expand Up @@ -165,19 +179,30 @@ bool message_get_bytes_header(message *m, int header_id, const uint8_t **v, size
return false;
}

message *message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE]) {
int message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE], message **msg_p) {
header_t h;
header_from_buffer(&h, buf);

if (h.magic.magint != EMPTY_HEADER.magic.magint) {
return ZITI_INVALID_STATE;
}

size_t msgbuflen = HEADER_SIZE + h.headers_len + h.body_len;
message *m = pool ? pool_alloc_obj(pool) : alloc_unpooled_obj(sizeof(message) + msgbuflen,
(void (*)(void *)) message_free);

if (m == NULL) {
return ZITI_ALLOC_FAILED;
}
m->msgbuflen = msgbuflen;

size_t msgsize = sizeof(message) + msgbuflen;
if (msgsize > pool_obj_size(m)) {
m->msgbufp = malloc(msgbuflen);
if (m->msgbufp == NULL) {
pool_return_obj(m);
return ZITI_ALLOC_FAILED;
}
}
else {
m->msgbufp = m->msgbuf;
Expand All @@ -186,7 +211,8 @@ message *message_new_from_header(pool_t *pool, uint8_t buf[HEADER_SIZE]) {
memcpy(&m->header, &h, sizeof(h));
m->headers = m->msgbufp + HEADER_SIZE;
m->body = m->headers + h.headers_len;
return m;
*msg_p = m;
return ZITI_OK;
}

message *message_new(pool_t *pool, uint32_t content, const hdr_t *hdrs, int nhdrs, size_t body_len) {
Expand Down
2 changes: 1 addition & 1 deletion programs/ziti-prox-c/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Run: public CLI::App {

Run(): App("run proxy", "run"),
debug(2) {
add_option("--debug,-d", debug, "log level")->envname("ZITI_LOG");
add_option("--debug,-d", debug, "log level");
add_option("--identity,-i", identity, "identity config")->required();
add_option("listener", intercepts, "<name:port>");
add_option("--bind,-b", bindings, "bind service <name:host:port>");
Expand Down
10 changes: 7 additions & 3 deletions tests/message_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <cstring>
#include "message.h"
#include "edge_protocol.h"
#include "ziti/errors.h"

TEST_CASE("simple", "[model]") {
auto p = pool_new(sizeof(message) + 200, 3, (void (*)(void *)) message_free);
Expand All @@ -39,7 +40,8 @@ TEST_CASE("simple", "[model]") {
strncpy(reinterpret_cast<char *>(m1->body), content1, strlen(content1));
message_set_seq(m1, &s1);

auto m2 = message_new_from_header(p, m1->msgbufp);
message *m2;
REQUIRE(message_new_from_header(p, m1->msgbufp, &m2) == ZITI_OK);
CHECK(m2->header.seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
memcpy(m2->msgbufp, m1->msgbufp, m1->msgbuflen);
Expand Down Expand Up @@ -83,7 +85,8 @@ TEST_CASE("large", "[model]") {
message_set_seq(m1, &seq);


auto m2 = message_new_from_header(p, m1->msgbufp);
message *m2;
REQUIRE(message_new_from_header(p, m1->msgbufp, &m2) == ZITI_OK);
CHECK(m2->header.seq == 3334);
CHECK(seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
Expand Down Expand Up @@ -126,7 +129,8 @@ TEST_CASE("large unpooled", "[model]") {
strncpy(reinterpret_cast<char *>(m1->body), content1, strlen(content1));
message_set_seq(m1, &seq);

auto m2 = message_new_from_header(nullptr, m1->msgbufp);
message *m2;
REQUIRE(message_new_from_header(nullptr, m1->msgbufp, &m2) == ZITI_OK);
CHECK(m2->header.seq == 3334);
CHECK(seq == 3334);
CHECK(m2->msgbuflen == m1->msgbuflen);
Expand Down

0 comments on commit 1aa86a8

Please sign in to comment.