From 853925f43621b2a0843d4107e0b7a058fc9e2c25 Mon Sep 17 00:00:00 2001 From: Ryan Hartlage Date: Sat, 14 Dec 2024 14:34:53 -0500 Subject: [PATCH] Add packet queue support to gea2 interface --- include/tiny_gea2_interface.h | 93 +++--- src/tiny_gea2_interface.c | 425 +++++++++++++----------- src/tiny_gea3_interface.c | 44 +-- test/tests/tiny_gea2_interface_test.cpp | 98 ++++-- 4 files changed, 370 insertions(+), 290 deletions(-) diff --git a/include/tiny_gea2_interface.h b/include/tiny_gea2_interface.h index 0edf4e9..90a20fa 100644 --- a/include/tiny_gea2_interface.h +++ b/include/tiny_gea2_interface.h @@ -2,15 +2,10 @@ * @file * @brief * - * This component is interrupt-aware and handles byte transmit/receive in the - * interrupt context. Publication of messages is done via a background task in - * tiny_gea2_interface_run() so the application does not have to do - * anything special. - * - * Additionally, this component does not do any queueing of packets. If a send - * is in progress and another message is sent, then the currently sending message - * is discarded. In order to prevent this, clients can check whether the interface - * is currently sending and wait before attempting to send a packet. + * This component is interrupt-aware and safely handles byte transmit/receive in + * an interrupt context. Publication of messages is done via a background task in + * tiny_gea2_interface_run() so the application does not have to do anything + * special to maintain context safety when receiving packets. * * If a message is received, all messages received after will be dropped until * tiny_gea2_interface_run() is called. @@ -28,71 +23,73 @@ #include "tiny_crc16.h" #include "tiny_event.h" #include "tiny_fsm.h" +#include "tiny_queue.h" #include "tiny_timer.h" typedef struct { i_tiny_gea_interface_t interface; + tiny_fsm_t fsm; + tiny_event_t on_receive; + tiny_event_t on_diagnostics_event; + tiny_event_subscription_t msec_interrupt_subscription; + tiny_event_subscription_t byte_received_subscription; + i_tiny_uart_t* uart; + tiny_timer_t timer; + uint8_t address; + bool ignore_destination_address; + uint8_t retries; + tiny_timer_group_t timer_group; + struct { - tiny_fsm_t fsm; - tiny_event_t on_receive; - tiny_event_t on_diagnostics_event; - tiny_event_subscription_t msec_interrupt_subscription; - tiny_event_subscription_t byte_received_subscription; - i_tiny_uart_t* uart; - tiny_timer_t timer; - uint8_t address; - bool ignore_destination_address; + tiny_queue_t queue; + tiny_timer_t queue_timer; + uint8_t* buffer; + uint8_t buffer_size; + uint8_t state; + uint8_t offset; + uint16_t crc; + bool escaped; + volatile bool active; + volatile bool packet_queued_in_background; + uint8_t expected_reflection; uint8_t retries; - tiny_timer_group_t timer_group; - - struct - { - uint8_t* buffer; - uint8_t buffer_size; - uint8_t state; - uint8_t offset; - uint16_t crc; - bool escaped; - volatile bool active; - volatile bool packet_queued_in_background; - uint8_t expected_reflection; - uint8_t retries; - } send; + } send; - struct - { - uint8_t* buffer; - uint16_t crc; - uint8_t buffer_size; - uint8_t count; - bool escaped; - volatile bool packet_ready; - } receive; - } _private; + struct + { + uint8_t* buffer; + uint16_t crc; + uint8_t buffer_size; + uint8_t count; + bool escaped; + volatile bool packet_ready; + } receive; } tiny_gea2_interface_t; /*! * Initialize a GEA2 interface. */ void tiny_gea2_interface_init( - tiny_gea2_interface_t* instance, + tiny_gea2_interface_t* self, i_tiny_uart_t* uart, i_tiny_time_source_t* time_source, i_tiny_event_t* msec_interrupt, - uint8_t* receive_buffer, - uint8_t receive_buffer_size, + uint8_t address, uint8_t* send_buffer, uint8_t send_buffer_size, - uint8_t address, + uint8_t* receive_buffer, + uint8_t receive_buffer_size, + uint8_t* send_queue_buffer, + size_t send_queue_buffer_size, bool ignore_destination_address, uint8_t retries); /*! * Run the interface and publish received packets. */ -void tiny_gea2_interface_run(tiny_gea2_interface_t* instance); +void tiny_gea2_interface_run(tiny_gea2_interface_t* self); #endif diff --git a/src/tiny_gea2_interface.c b/src/tiny_gea2_interface.c index 0c8622a..ae70af9 100644 --- a/src/tiny_gea2_interface.c +++ b/src/tiny_gea2_interface.c @@ -67,15 +67,15 @@ static void state_collision_cooldown(tiny_fsm_t* fsm, const tiny_fsm_signal_t si static self_t* interface_from_fsm(tiny_fsm_t* fsm) { - return container_of(self_t, _private.fsm, fsm); + return container_of(self_t, fsm, fsm); } static void byte_received(void* context, const void* _args) { - self_t* instance = context; + self_t* self = context; const tiny_uart_on_receive_args_t* args = _args; - tiny_fsm_send_signal(&instance->_private.fsm, signal_byte_received, &args->byte); + tiny_fsm_send_signal(&self->fsm, signal_byte_received, &args->byte); } #define needs_escape(_byte) ((_byte & 0xFC) == tiny_gea_esc) @@ -84,11 +84,11 @@ static void byte_received(void* context, const void* _args) static void state_idle(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: case signal_send_ready: - if(instance->_private.send.active) { + if(self->send.active) { tiny_fsm_transition(fsm, state_send); } break; @@ -96,11 +96,11 @@ static void state_idle(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const vo case signal_byte_received: { const uint8_t* byte = data; - if(instance->_private.receive.packet_ready) { + if(self->receive.packet_ready) { break; } - if(*byte == tiny_gea_stx && !instance->_private.receive.packet_ready) { + if(*byte == tiny_gea_stx && !self->receive.packet_ready) { tiny_fsm_transition(fsm, state_receive); } else { @@ -112,109 +112,109 @@ static void state_idle(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const vo static void reflection_timeout(void* context) { - self_t* instance = context; - tiny_fsm_send_signal(&instance->_private.fsm, signal_reflection_timeout, NULL); + self_t* self = context; + tiny_fsm_send_signal(&self->fsm, signal_reflection_timeout, NULL); } -static bool determine_byte_to_send_considering_escapes(self_t* instance, uint8_t byte, uint8_t* byte_to_send) +static bool determine_byte_to_send_considering_escapes(self_t* self, uint8_t byte, uint8_t* byte_to_send) { - if(!instance->_private.send.escaped && needs_escape(byte)) { - instance->_private.send.escaped = true; + if(!self->send.escaped && needs_escape(byte)) { + self->send.escaped = true; *byte_to_send = tiny_gea_esc; } else { - instance->_private.send.escaped = false; + self->send.escaped = false; *byte_to_send = byte; } - return !instance->_private.send.escaped; + return !self->send.escaped; } -static void send_next_byte(self_t* instance) +static void send_next_byte(self_t* self) { uint8_t byte_to_send = 0; tiny_timer_start( - &instance->_private.timer_group, - &instance->_private.timer, + &self->timer_group, + &self->timer, gea2_reflection_timeout_msec, - instance, + self, reflection_timeout); - switch(instance->_private.send.state) { + switch(self->send.state) { case send_state_stx: byte_to_send = tiny_gea_stx; - instance->_private.send.state = send_state_data; + self->send.state = send_state_data; break; case send_state_data: - if(determine_byte_to_send_considering_escapes(instance, instance->_private.send.buffer[instance->_private.send.offset], &byte_to_send)) { - reinterpret(send_packet, instance->_private.send.buffer, const send_packet_t*); - instance->_private.send.offset++; + if(determine_byte_to_send_considering_escapes(self, self->send.buffer[self->send.offset], &byte_to_send)) { + reinterpret(send_packet, self->send.buffer, const send_packet_t*); + self->send.offset++; - if(instance->_private.send.offset >= send_packet->data_length - data_length_bytes_not_included_in_data) { - instance->_private.send.state = send_state_crc_msb; + if(self->send.offset >= send_packet->data_length - data_length_bytes_not_included_in_data) { + self->send.state = send_state_crc_msb; } } break; case send_state_crc_msb: - byte_to_send = instance->_private.send.crc >> 8; - if(determine_byte_to_send_considering_escapes(instance, byte_to_send, &byte_to_send)) { - instance->_private.send.state = send_state_crc_lsb; + byte_to_send = self->send.crc >> 8; + if(determine_byte_to_send_considering_escapes(self, byte_to_send, &byte_to_send)) { + self->send.state = send_state_crc_lsb; } break; case send_state_crc_lsb: - byte_to_send = instance->_private.send.crc; - if(determine_byte_to_send_considering_escapes(instance, byte_to_send, &byte_to_send)) { - instance->_private.send.state = send_state_etx; + byte_to_send = self->send.crc; + if(determine_byte_to_send_considering_escapes(self, byte_to_send, &byte_to_send)) { + self->send.state = send_state_etx; } break; case send_state_etx: byte_to_send = tiny_gea_etx; - instance->_private.send.state = send_state_done; + self->send.state = send_state_done; break; } - instance->_private.send.expected_reflection = byte_to_send; - tiny_uart_send(instance->_private.uart, byte_to_send); + self->send.expected_reflection = byte_to_send; + tiny_uart_send(self->uart, byte_to_send); } -static void handle_send_failure(self_t* instance) +static void handle_send_failure(self_t* self) { - if(instance->_private.send.retries > 0) { - instance->_private.send.retries--; + if(self->send.retries > 0) { + self->send.retries--; } else { - instance->_private.send.active = false; + self->send.active = false; } - tiny_fsm_transition(&instance->_private.fsm, state_collision_cooldown); + tiny_fsm_transition(&self->fsm, state_collision_cooldown); } static void state_send(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: - instance->_private.send.state = send_state_stx; - instance->_private.send.offset = 0; - instance->_private.send.escaped = false; + self->send.state = send_state_stx; + self->send.offset = 0; + self->send.escaped = false; - send_next_byte(instance); + send_next_byte(self); break; case signal_byte_received: { const uint8_t* byte = data; - if(*byte == instance->_private.send.expected_reflection) { - if(instance->_private.send.state == send_state_done) { - reinterpret(send_packet, instance->_private.send.buffer, send_packet_t*); + if(*byte == self->send.expected_reflection) { + if(self->send.state == send_state_done) { + reinterpret(send_packet, self->send.buffer, send_packet_t*); if(is_broadcast_address(send_packet->destination)) { - instance->_private.send.active = false; + self->send.active = false; tiny_fsm_transition(fsm, state_idle_cooldown); } else { @@ -222,156 +222,156 @@ static void state_send(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const vo } } else { - send_next_byte(instance); + send_next_byte(self); } } else { - handle_send_failure(instance); + handle_send_failure(self); } } break; case signal_reflection_timeout: - handle_send_failure(instance); + handle_send_failure(self); tiny_fsm_transition(fsm, state_idle_cooldown); break; } } -static void handle_success(self_t* instance) +static void handle_success(self_t* self) { - instance->_private.send.active = false; - tiny_fsm_transition(&instance->_private.fsm, state_idle_cooldown); + self->send.active = false; + tiny_fsm_transition(&self->fsm, state_idle_cooldown); } static void ack_timeout(void* context) { - self_t* instance = context; - tiny_fsm_send_signal(&instance->_private.fsm, signal_ack_timeout, NULL); + self_t* self = context; + tiny_fsm_send_signal(&self->fsm, signal_ack_timeout, NULL); } -static void start_ack_timeout_timer(self_t* instance) +static void start_ack_timeout_timer(self_t* self) { tiny_timer_start( - &instance->_private.timer_group, - &instance->_private.timer, + &self->timer_group, + &self->timer, tiny_gea_ack_timeout_msec, - instance, + self, ack_timeout); } static void state_wait_for_ack(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: - start_ack_timeout_timer(instance); + start_ack_timeout_timer(self); break; case signal_byte_received: { const uint8_t* byte = data; if(*byte == tiny_gea_ack) { - handle_success(instance); + handle_success(self); } else { - handle_send_failure(instance); + handle_send_failure(self); } } break; case signal_ack_timeout: - handle_send_failure(instance); + handle_send_failure(self); break; } } static bool received_packet_has_valid_crc(self_t* self) { - return self->_private.receive.crc == 0; + return self->receive.crc == 0; } static bool received_packet_has_minimum_valid_length(self_t* self) { - return self->_private.receive.count >= packet_bytes_not_included_in_payload; + return self->receive.count >= packet_bytes_not_included_in_payload; } static bool received_packet_has_valid_length(self_t* self) { - reinterpret(packet, self->_private.receive.buffer, tiny_gea_packet_t*); - return (packet->payload_length == self->_private.receive.count + unbuffered_bytes); + reinterpret(packet, self->receive.buffer, tiny_gea_packet_t*); + return (packet->payload_length == self->receive.count + unbuffered_bytes); } -static void buffer_received_byte(self_t* instance, uint8_t byte) +static void buffer_received_byte(self_t* self, uint8_t byte) { - if(instance->_private.receive.count == 0) { - instance->_private.receive.crc = tiny_gea_crc_seed; + if(self->receive.count == 0) { + self->receive.crc = tiny_gea_crc_seed; } - if(instance->_private.receive.count < instance->_private.receive.buffer_size) { - instance->_private.receive.buffer[instance->_private.receive.count++] = byte; + if(self->receive.count < self->receive.buffer_size) { + self->receive.buffer[self->receive.count++] = byte; - instance->_private.receive.crc = tiny_crc16_byte( - instance->_private.receive.crc, + self->receive.crc = tiny_crc16_byte( + self->receive.crc, byte); } } static bool received_packet_is_addressed_to_me(self_t* self) { - reinterpret(packet, self->_private.receive.buffer, tiny_gea_packet_t*); - return (packet->destination == self->_private.address) || + reinterpret(packet, self->receive.buffer, tiny_gea_packet_t*); + return (packet->destination == self->address) || is_broadcast_address(packet->destination) || - self->_private.ignore_destination_address; + self->ignore_destination_address; } -static void send_ack(self_t* instance, uint8_t address) +static void send_ack(self_t* self, uint8_t address) { if(!is_broadcast_address(address)) { - tiny_uart_send(instance->_private.uart, tiny_gea_ack); + tiny_uart_send(self->uart, tiny_gea_ack); } } -static void process_received_byte(self_t* instance, const uint8_t byte) +static void process_received_byte(self_t* self, const uint8_t byte) { - reinterpret(packet, instance->_private.receive.buffer, tiny_gea_packet_t*); + reinterpret(packet, self->receive.buffer, tiny_gea_packet_t*); - if(instance->_private.receive.escaped) { - instance->_private.receive.escaped = false; - buffer_received_byte(instance, byte); + if(self->receive.escaped) { + self->receive.escaped = false; + buffer_received_byte(self, byte); return; } switch(byte) { case tiny_gea_esc: - instance->_private.receive.escaped = true; + self->receive.escaped = true; break; case tiny_gea_stx: - instance->_private.receive.count = 0; + self->receive.count = 0; break; case tiny_gea_etx: - if(!received_packet_has_minimum_valid_length(instance) || !received_packet_has_valid_length(instance)) { + if(!received_packet_has_minimum_valid_length(self) || !received_packet_has_valid_length(self)) { break; } - if(!received_packet_has_valid_crc(instance)) { + if(!received_packet_has_valid_crc(self)) { break; } - if(!received_packet_is_addressed_to_me(instance)) { + if(!received_packet_is_addressed_to_me(self)) { break; } packet->payload_length -= tiny_gea_packet_transmission_overhead; - instance->_private.receive.packet_ready = true; + self->receive.packet_ready = true; - send_ack(instance, packet->destination); + send_ack(self, packet->destination); - tiny_fsm_transition(&instance->_private.fsm, state_idle_cooldown); + tiny_fsm_transition(&self->fsm, state_idle_cooldown); break; default: - buffer_received_byte(instance, byte); + buffer_received_byte(self, byte); break; } } @@ -383,30 +383,30 @@ static tiny_timer_ticks_t get_collision_timeout(uint8_t address, uint8_t pseudo_ static void collision_idle_timeout(void* context) { - self_t* instance = context; - tiny_fsm_send_signal(&instance->_private.fsm, signal_collision_idle_timeout, NULL); + self_t* self = context; + tiny_fsm_send_signal(&self->fsm, signal_collision_idle_timeout, NULL); } -static void start_collision_idle_timeout_timer(self_t* instance) +static void start_collision_idle_timeout_timer(self_t* self) { - uint8_t current_ticks = (uint8_t)tiny_time_source_ticks(instance->_private.timer_group.time_source); - tiny_timer_ticks_t collision_timeout_ticks = get_collision_timeout(instance->_private.address, current_ticks); + uint8_t current_ticks = (uint8_t)tiny_time_source_ticks(self->timer_group.time_source); + tiny_timer_ticks_t collision_timeout_ticks = get_collision_timeout(self->address, current_ticks); tiny_timer_start( - &instance->_private.timer_group, - &instance->_private.timer, + &self->timer_group, + &self->timer, collision_timeout_ticks, - instance, + self, collision_idle_timeout); } static void state_collision_cooldown(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: - start_collision_idle_timeout_timer(instance); + start_collision_idle_timeout_timer(self); break; case signal_collision_idle_timeout: @@ -424,34 +424,34 @@ static void state_collision_cooldown(tiny_fsm_t* fsm, const tiny_fsm_signal_t si static void interbyte_timeout(void* context) { - self_t* instance = context; - tiny_fsm_send_signal(&instance->_private.fsm, signal_interbyte_timeout, NULL); + self_t* self = context; + tiny_fsm_send_signal(&self->fsm, signal_interbyte_timeout, NULL); } -static void start_interbyte_timeout_timer(self_t* instance) +static void start_interbyte_timeout_timer(self_t* self) { tiny_timer_start( - &instance->_private.timer_group, - &instance->_private.timer, + &self->timer_group, + &self->timer, gea2_interbyte_timeout_msec, - instance, + self, interbyte_timeout); } static void state_receive(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: - instance->_private.receive.count = 0; - start_interbyte_timeout_timer(instance); + self->receive.count = 0; + start_interbyte_timeout_timer(self); break; case signal_byte_received: { const uint8_t* byte = data; - start_interbyte_timeout_timer(instance); - process_received_byte(instance, *byte); + start_interbyte_timeout_timer(self); + process_received_byte(self, *byte); break; } @@ -463,8 +463,8 @@ static void state_receive(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const static void idle_cooldown_timeout(void* context) { - self_t* instance = context; - tiny_fsm_send_signal(&instance->_private.fsm, signal_idle_cooldown_timeout, NULL); + self_t* self = context; + tiny_fsm_send_signal(&self->fsm, signal_idle_cooldown_timeout, NULL); } static tiny_timer_ticks_t get_idle_timeout(uint8_t address) @@ -474,21 +474,21 @@ static tiny_timer_ticks_t get_idle_timeout(uint8_t address) static void state_idle_cooldown(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, const void* data) { - self_t* instance = interface_from_fsm(fsm); + self_t* self = interface_from_fsm(fsm); switch(signal) { case tiny_fsm_signal_entry: tiny_timer_start( - &instance->_private.timer_group, - &instance->_private.timer, - get_idle_timeout(instance->_private.address), - instance, + &self->timer_group, + &self->timer, + get_idle_timeout(self->address), + self, idle_cooldown_timeout); break; case signal_byte_received: { const uint8_t* byte = data; - if(*byte == tiny_gea_stx && !instance->_private.receive.packet_ready) { + if(*byte == tiny_gea_stx && !self->receive.packet_ready) { tiny_fsm_transition(fsm, state_receive); } else { @@ -504,71 +504,114 @@ static void state_idle_cooldown(tiny_fsm_t* fsm, const tiny_fsm_signal_t signal, } } -static void prepare_buffered_packet_for_transmission(self_t* instance) +static void prepare_buffered_packet_for_transmission(self_t* self) { - reinterpret(send_packet, instance->_private.send.buffer, send_packet_t*); + reinterpret(send_packet, self->send.buffer, send_packet_t*); send_packet->data_length += tiny_gea_packet_transmission_overhead; - instance->_private.send.crc = tiny_crc16_block(tiny_gea_crc_seed, (uint8_t*)send_packet, send_packet->data_length - data_length_bytes_not_included_in_data); - instance->_private.send.state = send_state_stx; - instance->_private.send.offset = 0; + self->send.crc = tiny_crc16_block(tiny_gea_crc_seed, (uint8_t*)send_packet, send_packet->data_length - data_length_bytes_not_included_in_data); + self->send.state = send_state_stx; + self->send.offset = 0; + self->send.retries = self->retries; + self->send.active = true; + self->send.packet_queued_in_background = true; } -static bool send_worker( - i_tiny_gea_interface_t* _instance, +static void populate_send_packet( + self_t* self, + tiny_gea_packet_t* packet, uint8_t destination, uint8_t payload_length, tiny_gea_interface_send_callback_t callback, void* context, bool set_source_address) { - reinterpret(instance, _instance, self_t*); - if(instance->_private.send.active) { - return false; + packet->payload_length = payload_length; + callback(context, (tiny_gea_packet_t*)packet); + if(set_source_address) { + packet->source = self->address; } + packet->destination = destination; +} - if(payload_length + send_packet_header_size > instance->_private.send.buffer_size) { - return false; - } +static void stop_polling_queue(self_t* self) +{ + tiny_timer_stop(&self->timer_group, &self->send.queue_timer); +} - reinterpret(send_packet, instance->_private.send.buffer, tiny_gea_packet_t*); - send_packet->payload_length = payload_length; - callback(context, send_packet); +static void poll_queue(void* context) +{ + self_t* self = context; + uint16_t size; - if(set_source_address) { - send_packet->source = instance->_private.address; + if(tiny_queue_count(&self->send.queue) == 0) { + stop_polling_queue(self); + return; } - send_packet->destination = destination; - prepare_buffered_packet_for_transmission(instance); + if(!self->send.active) { + tiny_queue_dequeue(&self->send.queue, self->send.buffer, &size); + prepare_buffered_packet_for_transmission(self); + } +} - instance->_private.send.retries = instance->_private.retries; - instance->_private.send.active = true; - instance->_private.send.packet_queued_in_background = true; +static void start_polling_queue(self_t* self) +{ + tiny_timer_start( + &self->timer_group, + &self->send.queue_timer, + 0, + self, + poll_queue); +} - return true; +static bool send_worker( + i_tiny_gea_interface_t* _self, + uint8_t destination, + uint8_t payload_length, + tiny_gea_interface_send_callback_t callback, + void* context, + bool set_source_address) +{ + reinterpret(self, _self, self_t*); + + if(payload_length + send_packet_header_size > self->send.buffer_size) { + return false; + } + + if(self->send.active) { + uint8_t buffer[255]; + populate_send_packet(self, (tiny_gea_packet_t*)buffer, destination, payload_length, callback, context, set_source_address); + start_polling_queue(self); + return tiny_queue_enqueue(&self->send.queue, buffer, tiny_gea_packet_overhead + payload_length); + } + else { + populate_send_packet(self, (tiny_gea_packet_t*)self->send.buffer, destination, payload_length, callback, context, set_source_address); + prepare_buffered_packet_for_transmission(self); + return true; + } } static void msec_interrupt_callback(void* context, const void* _args) { - self_t* instance = context; + self_t* self = context; (void)_args; - if(instance->_private.send.packet_queued_in_background) { - instance->_private.send.packet_queued_in_background = false; - tiny_fsm_send_signal(&instance->_private.fsm, signal_send_ready, NULL); + if(self->send.packet_queued_in_background) { + self->send.packet_queued_in_background = false; + tiny_fsm_send_signal(&self->fsm, signal_send_ready, NULL); } - tiny_timer_group_run(&instance->_private.timer_group); + tiny_timer_group_run(&self->timer_group); } static bool send( - i_tiny_gea_interface_t* _instance, + i_tiny_gea_interface_t* _self, uint8_t destination, uint8_t payload_length, void* context, tiny_gea_interface_send_callback_t callback) { - return send_worker(_instance, destination, payload_length, callback, context, true); + return send_worker(_self, destination, payload_length, callback, context, true); } static bool forward( @@ -581,63 +624,67 @@ static bool forward( return send_worker(_self, destination, payload_length, callback, context, false); } -static i_tiny_event_t* get_on_receive_event(i_tiny_gea_interface_t* _instance) +static i_tiny_event_t* get_on_receive_event(i_tiny_gea_interface_t* _self) { - reinterpret(instance, _instance, self_t*); - return &instance->_private.on_receive.interface; + reinterpret(self, _self, self_t*); + return &self->on_receive.interface; } static const i_tiny_gea_interface_api_t api = { send, forward, get_on_receive_event }; void tiny_gea2_interface_init( - self_t* instance, + tiny_gea2_interface_t* self, i_tiny_uart_t* uart, i_tiny_time_source_t* time_source, i_tiny_event_t* msec_interrupt, - uint8_t* receive_buffer, - uint8_t receive_buffer_size, + uint8_t address, uint8_t* send_buffer, uint8_t send_buffer_size, - uint8_t address, + uint8_t* receive_buffer, + uint8_t receive_buffer_size, + uint8_t* send_queue_buffer, + size_t send_queue_buffer_size, bool ignore_destination_address, uint8_t retries) { - instance->interface.api = &api; - instance->_private.uart = uart; - instance->_private.address = address; - instance->_private.retries = default_retries; - instance->_private.ignore_destination_address = ignore_destination_address; - instance->_private.receive.buffer = receive_buffer; - instance->_private.receive.buffer_size = receive_buffer_size; - instance->_private.receive.packet_ready = false; - instance->_private.receive.escaped = false; - instance->_private.send.buffer = send_buffer; - instance->_private.send.buffer_size = send_buffer_size; - instance->_private.send.active = false; - instance->_private.send.packet_queued_in_background = false; - instance->_private.retries = retries; + self->interface.api = &api; + self->uart = uart; + self->address = address; + self->retries = default_retries; + self->ignore_destination_address = ignore_destination_address; + self->receive.buffer = receive_buffer; + self->receive.buffer_size = receive_buffer_size; + self->receive.packet_ready = false; + self->receive.escaped = false; + self->send.buffer = send_buffer; + self->send.buffer_size = send_buffer_size; + self->send.active = false; + self->send.packet_queued_in_background = false; + self->retries = retries; + + tiny_queue_init(&self->send.queue, send_queue_buffer, send_queue_buffer_size); - tiny_timer_group_init(&instance->_private.timer_group, time_source); + tiny_timer_group_init(&self->timer_group, time_source); - tiny_event_subscription_init(&instance->_private.byte_received_subscription, instance, byte_received); - tiny_event_subscribe(tiny_uart_on_receive(uart), &instance->_private.byte_received_subscription); + tiny_event_subscription_init(&self->byte_received_subscription, self, byte_received); + tiny_event_subscribe(tiny_uart_on_receive(uart), &self->byte_received_subscription); - tiny_event_subscription_init(&instance->_private.msec_interrupt_subscription, instance, msec_interrupt_callback); - tiny_event_subscribe(msec_interrupt, &instance->_private.msec_interrupt_subscription); + tiny_event_subscription_init(&self->msec_interrupt_subscription, self, msec_interrupt_callback); + tiny_event_subscribe(msec_interrupt, &self->msec_interrupt_subscription); - tiny_event_init(&instance->_private.on_receive); - tiny_event_init(&instance->_private.on_diagnostics_event); + tiny_event_init(&self->on_receive); + tiny_event_init(&self->on_diagnostics_event); - tiny_fsm_init(&instance->_private.fsm, state_idle); + tiny_fsm_init(&self->fsm, state_idle); } -void tiny_gea2_interface_run(self_t* instance) +void tiny_gea2_interface_run(self_t* self) { - if(instance->_private.receive.packet_ready) { + if(self->receive.packet_ready) { tiny_gea_interface_on_receive_args_t args; - args.packet = (const tiny_gea_packet_t*)instance->_private.receive.buffer; + args.packet = (const tiny_gea_packet_t*)self->receive.buffer; - tiny_event_publish(&instance->_private.on_receive, &args); - instance->_private.receive.packet_ready = false; + tiny_event_publish(&self->on_receive, &args); + self->receive.packet_ready = false; } } diff --git a/src/tiny_gea3_interface.c b/src/tiny_gea3_interface.c index c72eb5e..1f13d0b 100644 --- a/src/tiny_gea3_interface.c +++ b/src/tiny_gea3_interface.c @@ -121,15 +121,15 @@ static void byte_received(void* context, const void* _args) } } -static bool determine_byte_to_send_considering_escapes(self_t* self, uint8_t byte, uint8_t* byteToSend) +static bool determine_byte_to_send_considering_escapes(self_t* self, uint8_t byte, uint8_t* byte_to_send) { if(!self->send_escaped && needs_escape(byte)) { self->send_escaped = true; - *byteToSend = tiny_gea_esc; + *byte_to_send = tiny_gea_esc; } else { self->send_escaped = false; - *byteToSend = byte; + *byte_to_send = byte; } return !self->send_escaped; @@ -137,9 +137,9 @@ static bool determine_byte_to_send_considering_escapes(self_t* self, uint8_t byt static void prepare_buffered_packet_for_transmission(self_t* self) { - reinterpret(sendPacket, self->send_buffer, send_packet_t*); - sendPacket->data_length += tiny_gea_packet_transmission_overhead; - self->send_crc = tiny_crc16_block(tiny_gea_crc_seed, (const uint8_t*)sendPacket, sendPacket->data_length - data_length_bytes_not_included_in_data); + reinterpret(send_packet, self->send_buffer, send_packet_t*); + send_packet->data_length += tiny_gea_packet_transmission_overhead; + self->send_crc = tiny_crc16_block(tiny_gea_crc_seed, (const uint8_t*)send_packet, send_packet->data_length - data_length_bytes_not_included_in_data); self->send_state = send_state_data; self->send_offset = 0; } @@ -160,41 +160,41 @@ static void byte_sent(void* context, const void* args) return; } - uint8_t byteToSend = 0; + uint8_t byte_to_send = 0; switch(self->send_state) { case send_state_data: - if(determine_byte_to_send_considering_escapes(self, self->send_buffer[self->send_offset], &byteToSend)) { - reinterpret(sendPacket, self->send_buffer, const send_packet_t*); + if(determine_byte_to_send_considering_escapes(self, self->send_buffer[self->send_offset], &byte_to_send)) { + reinterpret(send_packet, self->send_buffer, const send_packet_t*); self->send_offset++; - if(self->send_offset >= sendPacket->data_length - data_length_bytes_not_included_in_data) { + if(self->send_offset >= send_packet->data_length - data_length_bytes_not_included_in_data) { self->send_state = send_state_crc_msb; } } break; case send_state_crc_msb: - byteToSend = self->send_crc >> 8; - if(determine_byte_to_send_considering_escapes(self, byteToSend, &byteToSend)) { + byte_to_send = self->send_crc >> 8; + if(determine_byte_to_send_considering_escapes(self, byte_to_send, &byte_to_send)) { self->send_state = send_state_crc_lsb; } break; case send_state_crc_lsb: - byteToSend = self->send_crc; - if(determine_byte_to_send_considering_escapes(self, byteToSend, &byteToSend)) { + byte_to_send = self->send_crc; + if(determine_byte_to_send_considering_escapes(self, byte_to_send, &byte_to_send)) { self->send_state = send_state_etx; } break; case send_state_etx: self->send_in_progress = false; - byteToSend = tiny_gea_etx; + byte_to_send = tiny_gea_etx; break; } - tiny_uart_send(self->uart, byteToSend); + tiny_uart_send(self->uart, byte_to_send); } static void populate_send_packet( @@ -204,11 +204,11 @@ static void populate_send_packet( uint8_t payload_length, tiny_gea_interface_send_callback_t callback, void* context, - bool setSourceAddress) + bool set_source_address) { packet->payload_length = payload_length; callback(context, (tiny_gea_packet_t*)packet); - if(setSourceAddress) { + if(set_source_address) { packet->source = self->address; } packet->destination = destination; @@ -220,7 +220,7 @@ static bool send_worker( uint8_t payload_length, tiny_gea_interface_send_callback_t callback, void* context, - bool setSourceAddress) + bool set_source_address) { reinterpret(self, _self, self_t*); @@ -230,14 +230,14 @@ static bool send_worker( if(self->send_in_progress) { uint8_t buffer[255]; - populate_send_packet(self, (tiny_gea_packet_t*)buffer, destination, payload_length, callback, context, setSourceAddress); + populate_send_packet(self, (tiny_gea_packet_t*)buffer, destination, payload_length, callback, context, set_source_address); if(!tiny_queue_enqueue(&self->send_queue, buffer, tiny_gea_packet_overhead + payload_length)) { return false; } } else { - reinterpret(sendPacket, self->send_buffer, tiny_gea_packet_t*); - populate_send_packet(self, sendPacket, destination, payload_length, callback, context, setSourceAddress); + reinterpret(send_packet, self->send_buffer, tiny_gea_packet_t*); + populate_send_packet(self, send_packet, destination, payload_length, callback, context, set_source_address); prepare_buffered_packet_for_transmission(self); self->send_in_progress = true; tiny_uart_send(self->uart, tiny_gea_stx); diff --git a/test/tests/tiny_gea2_interface_test.cpp b/test/tests/tiny_gea2_interface_test.cpp index bef354e..cd64a0a 100644 --- a/test/tests/tiny_gea2_interface_test.cpp +++ b/test/tests/tiny_gea2_interface_test.cpp @@ -21,6 +21,7 @@ extern "C" { enum { address = 0xAD, send_buffer_size = 10, + send_queue_size = 20, receive_buffer_size = 9, idle_cooldown_msec = 10 + (address & 0x1F), gea2_reflection_timeout_msec = 6, @@ -34,11 +35,12 @@ enum { TEST_GROUP(tiny_gea2_interface) { - tiny_gea2_interface_t instance; + tiny_gea2_interface_t self; tiny_uart_double_t uart; tiny_event_subscription_t receiveSubscription; uint8_t send_buffer[send_buffer_size]; uint8_t receive_buffer[receive_buffer_size]; + uint8_t send_queue_buffer[send_queue_size]; tiny_time_source_double_t time_source; tiny_event_t msec_interrupt; @@ -50,52 +52,58 @@ TEST_GROUP(tiny_gea2_interface) tiny_time_source_double_init(&time_source); tiny_gea2_interface_init( - &instance, + &self, &uart.interface, &time_source.interface, &msec_interrupt.interface, - receive_buffer, - receive_buffer_size, - send_buffer, - send_buffer_size, address, + send_buffer, + sizeof(send_buffer), + receive_buffer, + sizeof(receive_buffer), + send_queue_buffer, + sizeof(send_queue_buffer), false, default_retries); tiny_event_subscription_init(&receiveSubscription, NULL, packet_received); - tiny_event_subscribe(tiny_gea_interface_on_receive(&instance.interface), &receiveSubscription); + tiny_event_subscribe(tiny_gea_interface_on_receive(&self.interface), &receiveSubscription); } void given_that_ignore_destination_address_is_enabled() { tiny_gea2_interface_init( - &instance, + &self, &uart.interface, &time_source.interface, &msec_interrupt.interface, - receive_buffer, - receive_buffer_size, - send_buffer, - send_buffer_size, address, + send_buffer, + sizeof(send_buffer), + receive_buffer, + sizeof(receive_buffer), + send_queue_buffer, + sizeof(send_queue_buffer), true, default_retries); - tiny_event_subscribe(tiny_gea_interface_on_receive(&instance.interface), &receiveSubscription); + tiny_event_subscribe(tiny_gea_interface_on_receive(&self.interface), &receiveSubscription); } void given_that_retries_have_been_set_to(uint8_t retries) { tiny_gea2_interface_init( - &instance, + &self, &uart.interface, &time_source.interface, &msec_interrupt.interface, - receive_buffer, - receive_buffer_size, - send_buffer, - send_buffer_size, address, + send_buffer, + sizeof(send_buffer), + receive_buffer, + sizeof(receive_buffer), + send_queue_buffer, + sizeof(send_queue_buffer), false, retries); } @@ -161,7 +169,7 @@ TEST_GROUP(tiny_gea2_interface) void after_the_interface_is_run() { - tiny_gea2_interface_run(&instance); + tiny_gea2_interface_run(&self); } void nothing_should_happen() @@ -202,13 +210,20 @@ TEST_GROUP(tiny_gea2_interface) void when_packet_is_sent(tiny_gea_packet_t * packet) { - tiny_gea_interface_send(&instance.interface, packet->destination, packet->payload_length, packet, send_callback); + tiny_gea_interface_send(&self.interface, packet->destination, packet->payload_length, packet, send_callback); + after_msec_interrupt_fires(); + } + + void when_packets_are_sent(tiny_gea_packet_t * packet_1, tiny_gea_packet_t * packet_2) + { + tiny_gea_interface_send(&self.interface, packet_1->destination, packet_1->payload_length, packet_1, send_callback); + tiny_gea_interface_send(&self.interface, packet_2->destination, packet_2->payload_length, packet_2, send_callback); after_msec_interrupt_fires(); } void when_packet_is_forwarded(tiny_gea_packet_t * packet) { - tiny_gea_interface_forward(&instance.interface, packet->destination, packet->payload_length, packet, send_callback); + tiny_gea_interface_forward(&self.interface, packet->destination, packet->payload_length, packet, send_callback); after_msec_interrupt_fires(); } @@ -1168,19 +1183,40 @@ TEST(tiny_gea2_interface, should_stop_sending_when_an_unexpected_byte_is_receive should_be_able_to_send_a_message_after_collision_cooldown(); } -TEST(tiny_gea2_interface, should_ignore_send_requests_when_already_sending) +TEST(tiny_gea2_interface, should_queue_send_requests_when_already_sending) { - should_send_bytes_via_uart(tiny_gea_stx); - tiny_gea_STATIC_ALLOC_PACKET(packet, 0); - packet->destination = 0x45; - when_packet_is_sent(packet); + tiny_gea_STATIC_ALLOC_PACKET(packet, 1); + packet->destination = 0xFF; + packet->payload[0] = 0xD5; - tiny_gea_STATIC_ALLOC_PACKET(differentPacket, 0); - packet->destination = 0x80; - when_packet_is_sent(differentPacket); + tiny_gea_STATIC_ALLOC_PACKET(another_packet, 1); + another_packet->destination = 0x21; + another_packet->payload[0] = 0x42; - should_send_bytes_via_uart(0x45); - after_bytes_are_received_via_uart(tiny_gea_stx); + given_uart_echoing_is_enabled(); + + should_send_bytes_via_uart( + tiny_gea_stx, + 0xFF, // dst + 0x08, // len + address, // src + 0xD5, // payload + 0xB8, // crc + 0xA9, + tiny_gea_etx); + when_packets_are_sent(packet, another_packet); + + should_send_bytes_via_uart( + tiny_gea_stx, + 0x21, // dst + 0x08, // len + address, // src + 0x42, // payload + 0x41, // crc + 0xAE, + tiny_gea_etx); + after(100); + after_msec_interrupt_fires(); } TEST(tiny_gea2_interface, should_retry_a_message_if_no_ack_is_received)