From 735b40c229d989e2515acef07909e29cc8cf8a16 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Mon, 12 Sep 2022 16:33:26 -0400 Subject: [PATCH 01/15] queue TCP send packets and send on their own thread to prevent garbling --- protocol/client/whist_client.cpp | 2 + protocol/server/main.c | 2 + protocol/whist/network/network.c | 1 + protocol/whist/network/tcp.c | 101 ++++++++++++++++++++++++------- protocol/whist/network/tcp.h | 14 ++++- 5 files changed, 98 insertions(+), 22 deletions(-) diff --git a/protocol/client/whist_client.cpp b/protocol/client/whist_client.cpp index 6780d40ab49..f7131d98048 100644 --- a/protocol/client/whist_client.cpp +++ b/protocol/client/whist_client.cpp @@ -494,6 +494,8 @@ int whist_client_main(int argc, const char* argv[]) { // Destroy any resources being used by the client LOG_INFO("Closing Client..."); + destroy_tcp_sender(); + destroy_frontend(frontend); LOG_INFO("Client frontend has exited..."); diff --git a/protocol/server/main.c b/protocol/server/main.c index 4462468794c..9d5688e0656 100644 --- a/protocol/server/main.c +++ b/protocol/server/main.c @@ -674,6 +674,8 @@ int main(int argc, char* argv[]) { // Mark the client as permanently deactivated, which lets the threads reap permanently_deactivate_client(server_state.client); + destroy_tcp_sender(); + destroy_input_device(server_state.input_device); server_state.input_device = NULL; diff --git a/protocol/whist/network/network.c b/protocol/whist/network/network.c index 6dcb5405862..dbb420825f5 100644 --- a/protocol/whist/network/network.c +++ b/protocol/whist/network/network.c @@ -250,6 +250,7 @@ void set_tos(SOCKET socket, WhistTOSValue tos) { } void whist_init_networking(void) { + init_tcp_sender(); // Initialize any uninitialized port mappings with the identity for (int i = 0; i <= USHRT_MAX; i++) { if (port_mappings[i] == 0) { diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 4d3c8d4a583..2482e92588a 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -15,6 +15,7 @@ Includes #include #include #include "whist/core/features.h" +#include "whist/utils/queue.h" #if !OS_IS(OS_WIN32) #include @@ -90,6 +91,16 @@ typedef struct { WhistTimer last_recvp; } TCPContext; +// TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread +typedef struct TCPQueueItem { + TCPContext* context; + TCPNetworkPacket* packet; + int packet_size; +} TCPQueueItem; +WhistThread tcp_send_thread = NULL; +QueueContext* tcp_send_queue = NULL; +bool run_tcp_sender = false; + // Time between consecutive pings #define TCP_PING_INTERVAL_SEC 2.0 // Time before a ping to be considered "lost", and reconnection starts @@ -160,6 +171,18 @@ int create_tcp_client_context(TCPContext* context, char* destination, int port, */ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet); +/** + * @brief Multithreaded function to asynchronously + * send all TCP packets on the same thread. + * This prevents garbled TCP messages from + * being sent since large TCP sends are not atomic. + * + * @param opaque Unused pointer, pass NULL + * + * @returns 0 on exit + */ +int multithreaded_tcp_send(void* opaque); + /** * @brief Returns the size, in bytes, of the relevant part of * the TCPPacket, that must be sent over the network @@ -625,6 +648,18 @@ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms) { return 0; } +void init_tcp_sender() { + run_tcp_sender = true; + tcp_send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16); + tcp_send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", NULL); +} + +void destroy_tcp_sender() { + run_tcp_sender = false; + whist_wait_thread(tcp_send_thread, NULL); + fifo_queue_destroy(tcp_send_queue); +} + /* ============================ Private Function Implementations @@ -763,33 +798,57 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { memcpy(network_packet->payload, packet, packet_size); } - int tcp_packet_size = get_tcp_network_packet_size(network_packet); + // Add TCPNetworkPacket to the queue to be sent on the TCP send thread + TCPQueueItem* queue_item = safe_malloc(sizeof(TCPQueueItem)); + queue_item->context = context; + queue_item->packet = network_packet; + queue_item->packet_size = packet_size; + return fifo_queue_enqueue_item(tcp_send_queue, queue_item); +} - // For now, the TCP network throttler is NULL, so this is a no-op. - network_throttler_wait_byte_allocation(context->network_throttler, tcp_packet_size); +int multithreaded_tcp_send(void* opaque) { + UNUSED(opaque); - // This is useful enough to print, even outside of LOG_NETWORKING GUARDS - LOG_INFO("Sending a WhistPacket of size %d (Total %d bytes), over TCP", packet_size, - tcp_packet_size); + TCPQueueItem queue_item; + TCPNetworkPacket* network_packet = NULL; + TCPContext* context = NULL; + while (true) { + if (fifo_queue_dequeue_item_timeout(tcp_send_queue, &queue_item, 5000) < 0) { + if (!run_tcp_sender) { + break; + } + continue; + } - // Send the packet - bool failed = false; - int ret = send(context->socket, (const char*)network_packet, tcp_packet_size, 0); - if (ret < 0) { - int error = get_last_network_error(); - if (error == WHIST_ECONNRESET) { - LOG_WARNING("TCP Connection reset by peer"); - context->connection_lost = true; - } else { - LOG_WARNING("Unexpected TCP Packet Error: %d", error); + network_packet = queue_item.packet; + context = queue_item.context; + + int tcp_packet_size = get_tcp_network_packet_size(network_packet); + + // For now, the TCP network throttler is NULL, so this is a no-op. + network_throttler_wait_byte_allocation(context->network_throttler, tcp_packet_size); + + // This is useful enough to print, even outside of LOG_NETWORKING GUARDS + LOG_INFO("Sending a WhistPacket of size %d (Total %d bytes), over TCP", queue_item.packet_size, + tcp_packet_size); + + // Send the packet + int ret = send(context->socket, (const char*)network_packet, tcp_packet_size, 0); + if (ret < 0) { + int error = get_last_network_error(); + if (error == WHIST_ECONNRESET) { + LOG_WARNING("TCP Connection reset by peer"); + context->connection_lost = true; + } else { + LOG_WARNING("Unexpected TCP Packet Error: %d", error); + } } - failed = true; - } - // Free the encrypted allocation - deallocate_region(network_packet); + // Free the encrypted allocation + deallocate_region(network_packet); + } - return failed ? -1 : 0; + return 0; } int get_tcp_packet_size(TCPPacket* tcp_packet) { diff --git a/protocol/whist/network/tcp.h b/protocol/whist/network/tcp.h index a4aeb48f02b..f2523e30daa 100644 --- a/protocol/whist/network/tcp.h +++ b/protocol/whist/network/tcp.h @@ -54,7 +54,7 @@ bool create_tcp_socket_context(SocketContext* context, char* destination, int po char* binary_aes_private_key); /** - * @brief Creates a tcp listen socket, that can be used in SocketContext + * @brief Creates a tcp listen socket, that can be used in SocketContext * * @param sock The socket that will be initialized * @param port The port to listen on @@ -62,4 +62,16 @@ bool create_tcp_socket_context(SocketContext* context, char* destination, int po * @return 0 on success, otherwise failure. */ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms); + +/** + * @brief Create TCP send thread and resources + * + */ +void init_tcp_sender(void); + +/** + * @brief Destroy TCP send thread and resources + * + */ +void destroy_tcp_sender(void); #endif // WHIST_TCP_H From 19c1d8e653da0d92d67cda54541674b33828a557 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Mon, 12 Sep 2022 16:38:18 -0400 Subject: [PATCH 02/15] if partial TCP packet was sent, keep sending the rest of the packet until complete or failure --- protocol/whist/network/tcp.c | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 2482e92588a..57e8d7a7938 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -832,15 +832,22 @@ int multithreaded_tcp_send(void* opaque) { LOG_INFO("Sending a WhistPacket of size %d (Total %d bytes), over TCP", queue_item.packet_size, tcp_packet_size); - // Send the packet - int ret = send(context->socket, (const char*)network_packet, tcp_packet_size, 0); - if (ret < 0) { - int error = get_last_network_error(); - if (error == WHIST_ECONNRESET) { - LOG_WARNING("TCP Connection reset by peer"); - context->connection_lost = true; + // Send the packet. If a partial packet is sent, keep sending until full packet has been sent. + int total_sent = 0; + while (total_sent < tcp_packet_size) { + int ret = send(context->socket, (const char*)(network_packet + total_sent), tcp_packet_size, 0); + if (ret < 0) { + int error = get_last_network_error(); + if (error == WHIST_ECONNRESET) { + LOG_WARNING("TCP Connection reset by peer"); + context->connection_lost = true; + } else { + LOG_WARNING("Unexpected TCP Packet Error: %d", error); + } + // Don't attempt to send the rest of the packet if there was a failure + break; } else { - LOG_WARNING("Unexpected TCP Packet Error: %d", error); + total_sent += ret; } } From fdab3c78339c183f455f3dbe9cf69efb2ff26342 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Tue, 13 Sep 2022 15:05:50 -0400 Subject: [PATCH 03/15] make tcp send queue per-socket, add blocking enqueue impl --- protocol/client/frontend/virtual/impl.c | 2 +- .../client/frontend/virtual/interface.cpp | 2 +- protocol/client/whist_client.cpp | 2 - protocol/server/main.c | 2 - protocol/whist/network/network.c | 1 - protocol/whist/network/tcp.c | 87 ++++++++++++------- protocol/whist/network/tcp.h | 12 --- protocol/whist/network/udp.c | 6 +- protocol/whist/utils/queue.c | 35 +++++++- protocol/whist/utils/queue.h | 3 +- 10 files changed, 94 insertions(+), 58 deletions(-) diff --git a/protocol/client/frontend/virtual/impl.c b/protocol/client/frontend/virtual/impl.c index 21ff1ef6c71..485fd1d85f4 100644 --- a/protocol/client/frontend/virtual/impl.c +++ b/protocol/client/frontend/virtual/impl.c @@ -223,7 +223,7 @@ bool virtual_wait_event(WhistFrontend* frontend, WhistFrontendEvent* event, int void virtual_interrupt(WhistFrontend* frontend) { WhistFrontendEvent event; event.type = FRONTEND_EVENT_INTERRUPT; - if (fifo_queue_enqueue_item(events_queue, &event) != 0) { + if (fifo_queue_enqueue_item(events_queue, &event, false) != 0) { LOG_ERROR("Virtual frontend interrupt failed"); } } diff --git a/protocol/client/frontend/virtual/interface.cpp b/protocol/client/frontend/virtual/interface.cpp index 9dc32a62ea9..eb089a3902a 100644 --- a/protocol/client/frontend/virtual/interface.cpp +++ b/protocol/client/frontend/virtual/interface.cpp @@ -209,7 +209,7 @@ static void vi_api_send_event(const WhistFrontendEvent* frontend_event) { requested_width = frontend_event->resize.width; requested_height = frontend_event->resize.height; } - if (fifo_queue_enqueue_item(events_queue, frontend_event) != 0) { + if (fifo_queue_enqueue_item(events_queue, frontend_event, false) != 0) { LOG_ERROR("Virtual event queuing failed"); } } diff --git a/protocol/client/whist_client.cpp b/protocol/client/whist_client.cpp index f7131d98048..6780d40ab49 100644 --- a/protocol/client/whist_client.cpp +++ b/protocol/client/whist_client.cpp @@ -494,8 +494,6 @@ int whist_client_main(int argc, const char* argv[]) { // Destroy any resources being used by the client LOG_INFO("Closing Client..."); - destroy_tcp_sender(); - destroy_frontend(frontend); LOG_INFO("Client frontend has exited..."); diff --git a/protocol/server/main.c b/protocol/server/main.c index 9d5688e0656..4462468794c 100644 --- a/protocol/server/main.c +++ b/protocol/server/main.c @@ -674,8 +674,6 @@ int main(int argc, char* argv[]) { // Mark the client as permanently deactivated, which lets the threads reap permanently_deactivate_client(server_state.client); - destroy_tcp_sender(); - destroy_input_device(server_state.input_device); server_state.input_device = NULL; diff --git a/protocol/whist/network/network.c b/protocol/whist/network/network.c index dbb420825f5..6dcb5405862 100644 --- a/protocol/whist/network/network.c +++ b/protocol/whist/network/network.c @@ -250,7 +250,6 @@ void set_tos(SOCKET socket, WhistTOSValue tos) { } void whist_init_networking(void) { - init_tcp_sender(); // Initialize any uninitialized port mappings with the identity for (int i = 0; i <= USHRT_MAX; i++) { if (port_mappings[i] == 0) { diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 57e8d7a7938..0f8f3df4f79 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -89,17 +89,19 @@ typedef struct { // Only recvp every RECV_INTERVAL_MS, to keep CPU usage low. // This is because a recvp takes ~8ms sometimes WhistTimer last_recvp; + + // TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread + WhistThread send_thread; + QueueContext* send_queue; + WhistSemaphore send_semaphore; + bool run_sender; } TCPContext; -// TCP send is not atomic, so we have to hold packets in a queue and send on a separate thread +// Struct for holding packets on queue typedef struct TCPQueueItem { - TCPContext* context; TCPNetworkPacket* packet; int packet_size; } TCPQueueItem; -WhistThread tcp_send_thread = NULL; -QueueContext* tcp_send_queue = NULL; -bool run_tcp_sender = false; // Time between consecutive pings #define TCP_PING_INTERVAL_SEC 2.0 @@ -173,11 +175,12 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet); /** * @brief Multithreaded function to asynchronously - * send all TCP packets on the same thread. + * send all TCP packets for one socket context + * on the same thread. * This prevents garbled TCP messages from * being sent since large TCP sends are not atomic. * - * @param opaque Unused pointer, pass NULL + * @param opaque Pointer to associated socket context * * @returns 0 on exit */ @@ -515,6 +518,14 @@ static void tcp_destroy_socket_context(void* raw_context) { FATAL_ASSERT(raw_context != NULL); TCPContext* context = raw_context; + // Destroy TCP send queue resources + context->run_sender = false; + + // Any pending TCP packets will be dropped + whist_wait_thread(context->send_thread, NULL); + TCPQueueItem queue_item; + fifo_queue_destroy(context->send_queue); + closesocket(context->socket); closesocket(context->listen_socket); whist_destroy_mutex(context->mutex); @@ -583,6 +594,9 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination context->last_pong_id = -1; start_timer(&context->last_ping_timer); context->connection_lost = false; + context->send_queue = NULL; + context->send_semaphore = NULL; + context->send_thread = NULL; start_timer(&context->last_recvp); int ret; @@ -601,6 +615,21 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination return false; } + // Set up TCP send queue + context->run_sender = true; + if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16)) == NULL || + (context->send_semaphore = whist_create_semaphore(0)) == NULL || + (context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) { + // If any of the created resources are NULL, there was a failure and we need to clean up and return false + if (context->send_queue) + fifo_queue_destroy(context->send_queue); + if (context->send_semaphore) + whist_destroy_semaphore(context->send_semaphore); + free(context); + network_context->context = NULL; + return false; + } + // Restore the original timeout set_timeout(context->socket, context->timeout); @@ -648,18 +677,6 @@ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms) { return 0; } -void init_tcp_sender() { - run_tcp_sender = true; - tcp_send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16); - tcp_send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", NULL); -} - -void destroy_tcp_sender() { - run_tcp_sender = false; - whist_wait_thread(tcp_send_thread, NULL); - fifo_queue_destroy(tcp_send_queue); -} - /* ============================ Private Function Implementations @@ -799,29 +816,35 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { } // Add TCPNetworkPacket to the queue to be sent on the TCP send thread - TCPQueueItem* queue_item = safe_malloc(sizeof(TCPQueueItem)); - queue_item->context = context; - queue_item->packet = network_packet; - queue_item->packet_size = packet_size; - return fifo_queue_enqueue_item(tcp_send_queue, queue_item); + TCPQueueItem queue_item; + queue_item.packet = network_packet; + queue_item.packet_size = packet_size; + if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0) + return -1; + whist_post_semaphore(context->send_semaphore); + return 0; } int multithreaded_tcp_send(void* opaque) { - UNUSED(opaque); - TCPQueueItem queue_item; TCPNetworkPacket* network_packet = NULL; - TCPContext* context = NULL; + TCPContext* context = (TCPContext*) opaque; while (true) { - if (fifo_queue_dequeue_item_timeout(tcp_send_queue, &queue_item, 5000) < 0) { - if (!run_tcp_sender) { - break; - } + whist_wait_semaphore(context->send_semaphore); + // Check to see if the sender thread needs to stop running + if (!context->run_sender) + break; + // If the connection is lost, re-increment the semaphore and continue + // to try again later + if (context->connection_lost) { + whist_post_semaphore(context->send_semaphore); continue; } + // If there is no item to be dequeued, continue + if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0) + continue; network_packet = queue_item.packet; - context = queue_item.context; int tcp_packet_size = get_tcp_network_packet_size(network_packet); diff --git a/protocol/whist/network/tcp.h b/protocol/whist/network/tcp.h index f2523e30daa..10821c5e958 100644 --- a/protocol/whist/network/tcp.h +++ b/protocol/whist/network/tcp.h @@ -62,16 +62,4 @@ bool create_tcp_socket_context(SocketContext* context, char* destination, int po * @return 0 on success, otherwise failure. */ int create_tcp_listen_socket(SOCKET* sock, int port, int timeout_ms); - -/** - * @brief Create TCP send thread and resources - * - */ -void init_tcp_sender(void); - -/** - * @brief Destroy TCP send thread and resources - * - */ -void destroy_tcp_sender(void); #endif // WHIST_TCP_H diff --git a/protocol/whist/network/udp.c b/protocol/whist/network/udp.c index 5e6efc41b21..70b0c9d7ffe 100644 --- a/protocol/whist/network/udp.c +++ b/protocol/whist/network/udp.c @@ -1765,7 +1765,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { nack_id.frame_id = packet->udp_nack_data.id; if ((short)packet->udp_nack_data.index >= 0) { nack_id.packet_index = packet->udp_nack_data.index; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } else { @@ -1786,7 +1786,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { if (LOG_NACKING) { LOG_INFO("Generating Nack for Frame ID %d, index %d", nack_id.frame_id, i); } - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } @@ -1809,7 +1809,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { i < packet->udp_bitarray_nack_data.numBits; i++) { if (bit_array_test_bit(bit_arr, i)) { nack_id.packet_index = i; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 6d161b6e1c8..0fddb7145da 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -15,7 +15,9 @@ typedef struct QueueContext { int max_items; WhistMutex mutex; WhistCondition cond; + WhistSemaphore sem; void *data; + bool destroying; } QueueContext; static void increment_idx(QueueContext *context, int *idx) { @@ -55,19 +57,34 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) { return NULL; } + context->sem = whist_create_semaphore(max_items); + if (context->sem == NULL) { + fifo_queue_destroy(context); + return NULL; + } + context->item_size = item_size; context->max_items = max_items; + context->destroying = false; return context; } -int fifo_queue_enqueue_item(QueueContext *context, const void *item) { +int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocking) { if (context == NULL) { return -1; } whist_lock_mutex(context->mutex); - if (context->num_items >= context->max_items) { + while (context->num_items >= context->max_items) { whist_unlock_mutex(context->mutex); - return -1; + if (blocking) { + whist_wait_semaphore(context->sem); + if (context->destroying) { + return -1; + } + whist_lock_mutex(context->mutex); + } else { + return -1; + } } context->num_items++; void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx); @@ -75,6 +92,11 @@ int fifo_queue_enqueue_item(QueueContext *context, const void *item) { increment_idx(context, &context->write_idx); whist_broadcast_cond(context->cond); whist_unlock_mutex(context->mutex); + // If this enqueue call is not blocking, we still need to decrement the semaphore. + // Since we just successfully dequeued an element, we know that the semaphore + // count is greater than 0 and that this wait will be successful. + if (!blocking) + whist_wait_timeout_semaphore(context->sem, 0); return 0; } @@ -89,6 +111,7 @@ int fifo_queue_dequeue_item(QueueContext *context, void *item) { } dequeue_item(context, item); whist_unlock_mutex(context->mutex); + whist_post_semaphore(context->sem); return 0; } @@ -118,6 +141,12 @@ void fifo_queue_destroy(QueueContext *context) { if (context == NULL) { return; } + if (context->sem != NULL) { + // This ensures that a blocking enqueue will release + context->destroying = true; + whist_post_semaphore(context->sem); + whist_destroy_semaphore(context->sem); + } if (context->data != NULL) { free(context->data); } diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index 55c05c46daf..5cf67569901 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -23,10 +23,11 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items); * * @param queue_context Queue's context pointer * @param item Pointer to the item that needs to be enqueued + * @param blocking Whether the enqueue operation should be blocking * * @returns 0 on success, -1 on failure */ -int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); +int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item, bool blocking); /** * @brief Dequeue an item from the FIFO queue. If an item is not available, From 925930b74c2464ad0233a97ce2136e5ff740b431 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Tue, 13 Sep 2022 15:19:38 -0400 Subject: [PATCH 04/15] update queue test --- protocol/test/protocol_test.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/protocol/test/protocol_test.cpp b/protocol/test/protocol_test.cpp index 36565dd832d..f349ed180a4 100644 --- a/protocol/test/protocol_test.cpp +++ b/protocol/test/protocol_test.cpp @@ -1914,21 +1914,21 @@ TEST_F(ProtocolTest, QueueTest) { EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), -1); EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, 50), -1); item = 1; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); item = 2; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, true), 0); EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), 0); EXPECT_EQ(item, 1); item = 3; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); item = 4; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); item = 5; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, true), 0); item = 6; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); item = 7; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), -1); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), -1); EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), 0); EXPECT_EQ(item, 2); EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, 100), 0); @@ -1950,7 +1950,7 @@ TEST_F(ProtocolTest, QueueTest) { EXPECT_EQ(item, 6); fifo_queue_destroy(fifo_queue); EXPECT_EQ(fifo_queue_dequeue_item(NULL, &item), -1); - EXPECT_EQ(fifo_queue_enqueue_item(NULL, &item), -1); + EXPECT_EQ(fifo_queue_enqueue_item(NULL, &item, false), -1); } int test_virtual_intr(void* arg) { From 514d36bd9ffc5e4e4fee6f3a62a63d4ff9600c2e Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Tue, 13 Sep 2022 15:21:14 -0400 Subject: [PATCH 05/15] move context->destroying = true to outside of semaphore condition --- protocol/whist/utils/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 0fddb7145da..4e4a5494941 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -141,9 +141,9 @@ void fifo_queue_destroy(QueueContext *context) { if (context == NULL) { return; } + context->destroying = true; if (context->sem != NULL) { // This ensures that a blocking enqueue will release - context->destroying = true; whist_post_semaphore(context->sem); whist_destroy_semaphore(context->sem); } From a314c0021a8a7fce87d0b05192714ef621eee736 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Tue, 13 Sep 2022 15:24:51 -0400 Subject: [PATCH 06/15] clang-format --- protocol/whist/network/tcp.c | 37 ++++++++++++++++++------------------ protocol/whist/utils/queue.c | 3 +-- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 0f8f3df4f79..97290778b58 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -179,9 +179,9 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet); * on the same thread. * This prevents garbled TCP messages from * being sent since large TCP sends are not atomic. - * + * * @param opaque Pointer to associated socket context - * + * * @returns 0 on exit */ int multithreaded_tcp_send(void* opaque); @@ -619,12 +619,12 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination context->run_sender = true; if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16)) == NULL || (context->send_semaphore = whist_create_semaphore(0)) == NULL || - (context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) { - // If any of the created resources are NULL, there was a failure and we need to clean up and return false - if (context->send_queue) - fifo_queue_destroy(context->send_queue); - if (context->send_semaphore) - whist_destroy_semaphore(context->send_semaphore); + (context->send_thread = whist_create_thread(multithreaded_tcp_send, + "multithreaded_tcp_send", context)) == NULL) { + // If any of the created resources are NULL, there was a failure and we need to clean up and + // return false + if (context->send_queue) fifo_queue_destroy(context->send_queue); + if (context->send_semaphore) whist_destroy_semaphore(context->send_semaphore); free(context); network_context->context = NULL; return false; @@ -819,8 +819,7 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { TCPQueueItem queue_item; queue_item.packet = network_packet; queue_item.packet_size = packet_size; - if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0) - return -1; + if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0) return -1; whist_post_semaphore(context->send_semaphore); return 0; } @@ -828,12 +827,11 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { int multithreaded_tcp_send(void* opaque) { TCPQueueItem queue_item; TCPNetworkPacket* network_packet = NULL; - TCPContext* context = (TCPContext*) opaque; + TCPContext* context = (TCPContext*)opaque; while (true) { whist_wait_semaphore(context->send_semaphore); // Check to see if the sender thread needs to stop running - if (!context->run_sender) - break; + if (!context->run_sender) break; // If the connection is lost, re-increment the semaphore and continue // to try again later if (context->connection_lost) { @@ -841,8 +839,7 @@ int multithreaded_tcp_send(void* opaque) { continue; } // If there is no item to be dequeued, continue - if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0) - continue; + if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0) continue; network_packet = queue_item.packet; @@ -852,13 +849,15 @@ int multithreaded_tcp_send(void* opaque) { network_throttler_wait_byte_allocation(context->network_throttler, tcp_packet_size); // This is useful enough to print, even outside of LOG_NETWORKING GUARDS - LOG_INFO("Sending a WhistPacket of size %d (Total %d bytes), over TCP", queue_item.packet_size, - tcp_packet_size); + LOG_INFO("Sending a WhistPacket of size %d (Total %d bytes), over TCP", + queue_item.packet_size, tcp_packet_size); - // Send the packet. If a partial packet is sent, keep sending until full packet has been sent. + // Send the packet. If a partial packet is sent, keep sending until full packet has been + // sent. int total_sent = 0; while (total_sent < tcp_packet_size) { - int ret = send(context->socket, (const char*)(network_packet + total_sent), tcp_packet_size, 0); + int ret = send(context->socket, (const char*)(network_packet + total_sent), + tcp_packet_size, 0); if (ret < 0) { int error = get_last_network_error(); if (error == WHIST_ECONNRESET) { diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 4e4a5494941..02b2b0a1a63 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -95,8 +95,7 @@ int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocki // If this enqueue call is not blocking, we still need to decrement the semaphore. // Since we just successfully dequeued an element, we know that the semaphore // count is greater than 0 and that this wait will be successful. - if (!blocking) - whist_wait_timeout_semaphore(context->sem, 0); + if (!blocking) whist_wait_timeout_semaphore(context->sem, 0); return 0; } From 69ce55e6642f09ccc2382764583e4c3ffbbd1a16 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 10:54:46 -0400 Subject: [PATCH 07/15] remove extraneous queue_item --- protocol/whist/network/tcp.c | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 97290778b58..145246a7454 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -523,7 +523,6 @@ static void tcp_destroy_socket_context(void* raw_context) { // Any pending TCP packets will be dropped whist_wait_thread(context->send_thread, NULL); - TCPQueueItem queue_item; fifo_queue_destroy(context->send_queue); closesocket(context->socket); From 44e32a5573e313d524a94758c392c03b707a4252 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 11:17:52 -0400 Subject: [PATCH 08/15] update queue semaphore comments --- protocol/whist/utils/queue.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 02b2b0a1a63..fb756dda07d 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -57,6 +57,7 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) { return NULL; } + // The semaphore value corresponds to the number of available spaces in the queue context->sem = whist_create_semaphore(max_items); if (context->sem == NULL) { fifo_queue_destroy(context); @@ -93,8 +94,9 @@ int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocki whist_broadcast_cond(context->cond); whist_unlock_mutex(context->mutex); // If this enqueue call is not blocking, we still need to decrement the semaphore. - // Since we just successfully dequeued an element, we know that the semaphore - // count is greater than 0 and that this wait will be successful. + // Since we just successfully enqueued an element without decrementing the + // semaphore, we know that the semaphore count is greater than 0 and that this + // wait will be successful. if (!blocking) whist_wait_timeout_semaphore(context->sem, 0); return 0; } From 81d375dcad13720d6c7f4c08a8793ba733e6a898 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 14:27:33 -0400 Subject: [PATCH 09/15] change queue to use condvar for blocking enqueue and have different function for timeout/blocking --- protocol/client/frontend/virtual/impl.c | 2 +- .../client/frontend/virtual/interface.cpp | 2 +- protocol/test/protocol_test.cpp | 18 ++-- protocol/whist/network/tcp.c | 2 +- protocol/whist/network/udp.c | 6 +- protocol/whist/utils/queue.c | 97 ++++++++++++------- protocol/whist/utils/queue.h | 19 +++- 7 files changed, 95 insertions(+), 51 deletions(-) diff --git a/protocol/client/frontend/virtual/impl.c b/protocol/client/frontend/virtual/impl.c index 485fd1d85f4..21ff1ef6c71 100644 --- a/protocol/client/frontend/virtual/impl.c +++ b/protocol/client/frontend/virtual/impl.c @@ -223,7 +223,7 @@ bool virtual_wait_event(WhistFrontend* frontend, WhistFrontendEvent* event, int void virtual_interrupt(WhistFrontend* frontend) { WhistFrontendEvent event; event.type = FRONTEND_EVENT_INTERRUPT; - if (fifo_queue_enqueue_item(events_queue, &event, false) != 0) { + if (fifo_queue_enqueue_item(events_queue, &event) != 0) { LOG_ERROR("Virtual frontend interrupt failed"); } } diff --git a/protocol/client/frontend/virtual/interface.cpp b/protocol/client/frontend/virtual/interface.cpp index eb089a3902a..9dc32a62ea9 100644 --- a/protocol/client/frontend/virtual/interface.cpp +++ b/protocol/client/frontend/virtual/interface.cpp @@ -209,7 +209,7 @@ static void vi_api_send_event(const WhistFrontendEvent* frontend_event) { requested_width = frontend_event->resize.width; requested_height = frontend_event->resize.height; } - if (fifo_queue_enqueue_item(events_queue, frontend_event, false) != 0) { + if (fifo_queue_enqueue_item(events_queue, frontend_event) != 0) { LOG_ERROR("Virtual event queuing failed"); } } diff --git a/protocol/test/protocol_test.cpp b/protocol/test/protocol_test.cpp index f349ed180a4..d98f0e8378c 100644 --- a/protocol/test/protocol_test.cpp +++ b/protocol/test/protocol_test.cpp @@ -1914,26 +1914,26 @@ TEST_F(ProtocolTest, QueueTest) { EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), -1); EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, 50), -1); item = 1; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); item = 2; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, true), 0); + EXPECT_EQ(fifo_queue_enqueue_item_timeout(fifo_queue, &item, -1), 0); EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), 0); EXPECT_EQ(item, 1); item = 3; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); item = 4; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); item = 5; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, true), 0); + EXPECT_EQ(fifo_queue_enqueue_item_timeout(fifo_queue, &item, 50), 0); item = 6; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), 0); + EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item), 0); item = 7; - EXPECT_EQ(fifo_queue_enqueue_item(fifo_queue, &item, false), -1); + EXPECT_EQ(fifo_queue_enqueue_item_timeout(fifo_queue, &item, 50), -1); EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), 0); EXPECT_EQ(item, 2); EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, 100), 0); EXPECT_EQ(item, 3); - EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, 100), 0); + EXPECT_EQ(fifo_queue_dequeue_item_timeout(fifo_queue, &item, -1), 0); EXPECT_EQ(item, 4); EXPECT_EQ(fifo_queue_dequeue_item(fifo_queue, &item), 0); EXPECT_EQ(item, 5); @@ -1950,7 +1950,7 @@ TEST_F(ProtocolTest, QueueTest) { EXPECT_EQ(item, 6); fifo_queue_destroy(fifo_queue); EXPECT_EQ(fifo_queue_dequeue_item(NULL, &item), -1); - EXPECT_EQ(fifo_queue_enqueue_item(NULL, &item, false), -1); + EXPECT_EQ(fifo_queue_enqueue_item(NULL, &item), -1); } int test_virtual_intr(void* arg) { diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 145246a7454..91ea4975052 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -818,7 +818,7 @@ int tcp_send_constructed_packet(TCPContext* context, TCPPacket* packet) { TCPQueueItem queue_item; queue_item.packet = network_packet; queue_item.packet_size = packet_size; - if (fifo_queue_enqueue_item(context->send_queue, &queue_item, false) < 0) return -1; + if (fifo_queue_enqueue_item_timeout(context->send_queue, &queue_item, -1) < 0) return -1; whist_post_semaphore(context->send_semaphore); return 0; } diff --git a/protocol/whist/network/udp.c b/protocol/whist/network/udp.c index 70b0c9d7ffe..5e6efc41b21 100644 --- a/protocol/whist/network/udp.c +++ b/protocol/whist/network/udp.c @@ -1765,7 +1765,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { nack_id.frame_id = packet->udp_nack_data.id; if ((short)packet->udp_nack_data.index >= 0) { nack_id.packet_index = packet->udp_nack_data.index; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } else { @@ -1786,7 +1786,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { if (LOG_NACKING) { LOG_INFO("Generating Nack for Frame ID %d, index %d", nack_id.frame_id, i); } - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } @@ -1809,7 +1809,7 @@ void udp_handle_message(UDPContext* context, UDPPacket* packet) { i < packet->udp_bitarray_nack_data.numBits; i++) { if (bit_array_test_bit(bit_arr, i)) { nack_id.packet_index = i; - if (fifo_queue_enqueue_item(context->nack_queue, &nack_id, false) < 0) { + if (fifo_queue_enqueue_item(context->nack_queue, &nack_id) < 0) { LOG_ERROR("Failed to enqueue NACK request"); } } diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index fb756dda07d..8459af085d7 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -14,8 +14,8 @@ typedef struct QueueContext { int num_items; int max_items; WhistMutex mutex; - WhistCondition cond; - WhistSemaphore sem; + WhistCondition avail_items_cond; + WhistCondition avail_space_cond; void *data; bool destroying; } QueueContext; @@ -31,6 +31,15 @@ static void dequeue_item(QueueContext *context, void *item) { void *source_item = (uint8_t *)context->data + (context->item_size * context->read_idx); memcpy(item, source_item, context->item_size); increment_idx(context, &context->read_idx); + whist_broadcast_cond(context->avail_space_cond); +} + +static void enqueue_item(QueueContext *context, void *item) { + context->num_items++; + void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx); + memcpy(target_item, item, context->item_size); + increment_idx(context, &context->write_idx); + whist_broadcast_cond(context->avail_items_cond); } QueueContext *fifo_queue_create(size_t item_size, int max_items) { @@ -51,15 +60,14 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) { return NULL; } - context->cond = whist_create_cond(); - if (context->cond == NULL) { + context->avail_items_cond = whist_create_cond(); + if (context->avail_items_cond == NULL) { fifo_queue_destroy(context); return NULL; } - // The semaphore value corresponds to the number of available spaces in the queue - context->sem = whist_create_semaphore(max_items); - if (context->sem == NULL) { + context->avail_space_cond = whist_create_cond(); + if (context->avail_space_cond == NULL) { fifo_queue_destroy(context); return NULL; } @@ -70,34 +78,48 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items) { return context; } -int fifo_queue_enqueue_item(QueueContext *context, const void *item, bool blocking) { +int fifo_queue_enqueue_item(QueueContext *context, const void *item) { if (context == NULL) { return -1; } whist_lock_mutex(context->mutex); - while (context->num_items >= context->max_items) { + if (context->num_items >= context->max_items) { whist_unlock_mutex(context->mutex); - if (blocking) { - whist_wait_semaphore(context->sem); - if (context->destroying) { + return -1; + } + enqueue_item(context, item); + whist_unlock_mutex(context->mutex); + return 0; +} + +int fifo_queue_enqueue_item_timeout(QueueContext *context, const void *item, int timeout_ms) { + if (context == NULL) { + return -1; + } + WhistTimer timer; + start_timer(&timer); + int current_timeout_ms = timeout_ms; + whist_lock_mutex(context->mutex); + while (context->num_items >= context->max_items) { + if (context->destroying) { + whist_unlock_mutex(context->mutex); + return -1; + } + if (timeout_ms >= 0) { + bool res = whist_timedwait_cond(context->avail_space_cond, context->mutex, current_timeout_ms); + if (res == false) { // In case of a timeout simply exit + whist_unlock_mutex(context->mutex); return -1; } - whist_lock_mutex(context->mutex); + int elapsed_ms = (int)(get_timer(&timer) * MS_IN_SECOND); + current_timeout_ms = max(timeout_ms - elapsed_ms, 0); } else { - return -1; + // Negative timeout_ms indicates block until available, not timeout + whist_wait_cond(context->avail_space_cond, context->mutex); } } - context->num_items++; - void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx); - memcpy(target_item, item, context->item_size); - increment_idx(context, &context->write_idx); - whist_broadcast_cond(context->cond); + enqueue_item(context, item); whist_unlock_mutex(context->mutex); - // If this enqueue call is not blocking, we still need to decrement the semaphore. - // Since we just successfully enqueued an element without decrementing the - // semaphore, we know that the semaphore count is greater than 0 and that this - // wait will be successful. - if (!blocking) whist_wait_timeout_semaphore(context->sem, 0); return 0; } @@ -125,13 +147,22 @@ int fifo_queue_dequeue_item_timeout(QueueContext *context, void *item, int timeo int current_timeout_ms = timeout_ms; whist_lock_mutex(context->mutex); while (context->num_items <= 0) { - bool res = whist_timedwait_cond(context->cond, context->mutex, current_timeout_ms); - if (res == false) { // In case of a timeout simply exit + if (context->destroying) { whist_unlock_mutex(context->mutex); return -1; } - int elapsed_ms = (int)(get_timer(&timer) * MS_IN_SECOND); - current_timeout_ms = max(timeout_ms - elapsed_ms, 0); + if (timeout_ms >= 0) { + bool res = whist_timedwait_cond(context->avail_items_cond, context->mutex, current_timeout_ms); + if (res == false) { // In case of a timeout simply exit + whist_unlock_mutex(context->mutex); + return -1; + } + int elapsed_ms = (int)(get_timer(&timer) * MS_IN_SECOND); + current_timeout_ms = max(timeout_ms - elapsed_ms, 0); + } else { + // Negative timeout_ms indicates block until available, not timeout + whist_wait_cond(context->avail_items_cond, context->mutex); + } } dequeue_item(context, item); whist_unlock_mutex(context->mutex); @@ -142,12 +173,12 @@ void fifo_queue_destroy(QueueContext *context) { if (context == NULL) { return; } + + // Make sure that all blocking calls release context->destroying = true; - if (context->sem != NULL) { - // This ensures that a blocking enqueue will release - whist_post_semaphore(context->sem); - whist_destroy_semaphore(context->sem); - } + whist_broadcast_cond(context->avail_items_cond); + whist_broadcast_cond(context->avail_space_cond); + if (context->data != NULL) { free(context->data); } diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index 5cf67569901..e3d09531167 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -23,11 +23,23 @@ QueueContext *fifo_queue_create(size_t item_size, int max_items); * * @param queue_context Queue's context pointer * @param item Pointer to the item that needs to be enqueued - * @param blocking Whether the enqueue operation should be blocking * * @returns 0 on success, -1 on failure */ -int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item, bool blocking); +int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); + +/** + * @brief Enqueue an item to the FIFO queue, If an item is not available, + * then wait till a timeout. + * + * @param queue_context Queue's context pointer + * @param item Pointer to the item that needs to be enqueued + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * timeout. + * + * @returns 0 on success, -1 on failure + */ +int fifo_queue_enqueue_item_blocking(QueueContext *queue_context, const void *item, int timeout); /** * @brief Dequeue an item from the FIFO queue. If an item is not available, @@ -46,7 +58,8 @@ int fifo_queue_dequeue_item(QueueContext *queue_context, void *item); * * @param queue_context Queue's context pointer * @param item Pointer to the memory where dequeued item will be stored - * @param timeout_ms The number of milliseconds to wait for. + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * timeout. * * @returns 0 on success, -1 on failure */ From 718376efcaa455cd3dd0ffaaea4d078ffd106e5c Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 15:04:04 -0400 Subject: [PATCH 10/15] get rid of busy loop when socket connection is lost --- protocol/whist/network/tcp.c | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index 91ea4975052..ea3fadf3fc1 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -31,6 +31,11 @@ Defines // Currently set to the "large enough" 1GB #define MAX_TCP_PAYLOAD_SIZE 1000000000 +// How many packets to allow to be queued up on +// a single TCP sending thread before queueing +// up the next packet will block. +#define TCP_SEND_QUEUE_SIZE 16 + typedef enum { TCP_PING, TCP_PONG, @@ -616,7 +621,7 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination // Set up TCP send queue context->run_sender = true; - if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), 16)) == NULL || + if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), TCP_SEND_QUEUE_SIZE)) == NULL || (context->send_semaphore = whist_create_semaphore(0)) == NULL || (context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) { @@ -831,12 +836,17 @@ int multithreaded_tcp_send(void* opaque) { whist_wait_semaphore(context->send_semaphore); // Check to see if the sender thread needs to stop running if (!context->run_sender) break; - // If the connection is lost, re-increment the semaphore and continue - // to try again later + // If connection is lost, then wait for up to TCP_PING_MAX_RECONNECTION_TIME_SEC + // before continuing. if (context->connection_lost) { + // Need to re-increment semaphore because wait_semaphore at the top of the loop + // will have decremented semaphore for a packet we are not sending yet. whist_post_semaphore(context->send_semaphore); - continue; + // If the wait for another packet times out, then we return to the top of the loop + if (!whist_wait_timeout_semaphore(context->send_semaphore, TCP_PING_MAX_RECONNECTION_TIME_SEC * 1000)) + continue; } + // If there is no item to be dequeued, continue if (fifo_queue_dequeue_item(context->send_queue, &queue_item) < 0) continue; From 05efb0f61d0085e83a9faefccb4b2ad32f5098a7 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 15:06:30 -0400 Subject: [PATCH 11/15] clang-format --- protocol/whist/network/tcp.c | 6 ++++-- protocol/whist/utils/queue.c | 6 ++++-- protocol/whist/utils/queue.h | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/protocol/whist/network/tcp.c b/protocol/whist/network/tcp.c index ea3fadf3fc1..324ac12580b 100644 --- a/protocol/whist/network/tcp.c +++ b/protocol/whist/network/tcp.c @@ -621,7 +621,8 @@ bool create_tcp_socket_context(SocketContext* network_context, char* destination // Set up TCP send queue context->run_sender = true; - if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), TCP_SEND_QUEUE_SIZE)) == NULL || + if ((context->send_queue = fifo_queue_create(sizeof(TCPQueueItem), TCP_SEND_QUEUE_SIZE)) == + NULL || (context->send_semaphore = whist_create_semaphore(0)) == NULL || (context->send_thread = whist_create_thread(multithreaded_tcp_send, "multithreaded_tcp_send", context)) == NULL) { @@ -843,7 +844,8 @@ int multithreaded_tcp_send(void* opaque) { // will have decremented semaphore for a packet we are not sending yet. whist_post_semaphore(context->send_semaphore); // If the wait for another packet times out, then we return to the top of the loop - if (!whist_wait_timeout_semaphore(context->send_semaphore, TCP_PING_MAX_RECONNECTION_TIME_SEC * 1000)) + if (!whist_wait_timeout_semaphore(context->send_semaphore, + TCP_PING_MAX_RECONNECTION_TIME_SEC * 1000)) continue; } diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 8459af085d7..0c33ab54f81 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -106,7 +106,8 @@ int fifo_queue_enqueue_item_timeout(QueueContext *context, const void *item, int return -1; } if (timeout_ms >= 0) { - bool res = whist_timedwait_cond(context->avail_space_cond, context->mutex, current_timeout_ms); + bool res = + whist_timedwait_cond(context->avail_space_cond, context->mutex, current_timeout_ms); if (res == false) { // In case of a timeout simply exit whist_unlock_mutex(context->mutex); return -1; @@ -152,7 +153,8 @@ int fifo_queue_dequeue_item_timeout(QueueContext *context, void *item, int timeo return -1; } if (timeout_ms >= 0) { - bool res = whist_timedwait_cond(context->avail_items_cond, context->mutex, current_timeout_ms); + bool res = + whist_timedwait_cond(context->avail_items_cond, context->mutex, current_timeout_ms); if (res == false) { // In case of a timeout simply exit whist_unlock_mutex(context->mutex); return -1; diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index e3d09531167..4009d1450b1 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -34,7 +34,7 @@ int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); * * @param queue_context Queue's context pointer * @param item Pointer to the item that needs to be enqueued - * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without * timeout. * * @returns 0 on success, -1 on failure @@ -58,7 +58,7 @@ int fifo_queue_dequeue_item(QueueContext *queue_context, void *item); * * @param queue_context Queue's context pointer * @param item Pointer to the memory where dequeued item will be stored - * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without * timeout. * * @returns 0 on success, -1 on failure From 8493a14a04f7fb2978df5598c3f11999867f0b78 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 15:38:11 -0400 Subject: [PATCH 12/15] add const qualifier in enqueue_item --- protocol/whist/utils/queue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 0c33ab54f81..9ccb6eaf21b 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -34,7 +34,7 @@ static void dequeue_item(QueueContext *context, void *item) { whist_broadcast_cond(context->avail_space_cond); } -static void enqueue_item(QueueContext *context, void *item) { +static void enqueue_item(QueueContext *context, const void *item) { context->num_items++; void *target_item = (uint8_t *)context->data + (context->item_size * context->write_idx); memcpy(target_item, item, context->item_size); From f073f453621deeb57466cfd16eb8adedcbe889be Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 15:40:18 -0400 Subject: [PATCH 13/15] remove semaphore remnants, add cond changes, change signature --- protocol/whist/utils/queue.c | 8 +++++--- protocol/whist/utils/queue.h | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/protocol/whist/utils/queue.c b/protocol/whist/utils/queue.c index 9ccb6eaf21b..34f0338bc20 100644 --- a/protocol/whist/utils/queue.c +++ b/protocol/whist/utils/queue.c @@ -135,7 +135,6 @@ int fifo_queue_dequeue_item(QueueContext *context, void *item) { } dequeue_item(context, item); whist_unlock_mutex(context->mutex); - whist_post_semaphore(context->sem); return 0; } @@ -187,8 +186,11 @@ void fifo_queue_destroy(QueueContext *context) { if (context->mutex != NULL) { whist_destroy_mutex(context->mutex); } - if (context->cond != NULL) { - whist_destroy_cond(context->cond); + if (context->avail_items_cond != NULL) { + whist_destroy_cond(context->avail_items_cond); + } + if (context->avail_space_cond != NULL) { + whist_destroy_cond(context->avail_space_cond); } free(context); } diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index 4009d1450b1..225ab47cce2 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -39,7 +39,7 @@ int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); * * @returns 0 on success, -1 on failure */ -int fifo_queue_enqueue_item_blocking(QueueContext *queue_context, const void *item, int timeout); +int fifo_queue_enqueue_item_timeout(QueueContext *queue_context, const void *item, int timeout_ms); /** * @brief Dequeue an item from the FIFO queue. If an item is not available, From e8231dfaf0a7ab3d233a8bb9f8510dbc08cdbd7c Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 16:11:05 -0400 Subject: [PATCH 14/15] update queue docs --- protocol/test.c | 77 ++++++++++++++++++++++++++++++++++++ protocol/whist/utils/queue.h | 7 ++-- 2 files changed, 81 insertions(+), 3 deletions(-) create mode 100644 protocol/test.c diff --git a/protocol/test.c b/protocol/test.c new file mode 100644 index 00000000000..ce6d1d7494c --- /dev/null +++ b/protocol/test.c @@ -0,0 +1,77 @@ +/** + * @copyright Copyright 2022 Whist Technologies, Inc. + * @file queue.h + * @brief API of thread-safe fifo queue. + */ +#ifndef WHIST_UTILS_QUEUE_H +#define WHIST_UTILS_QUEUE_H + +typedef struct QueueContext QueueContext; + +/** + * @brief Create a FIFO queue + * + * @param item_size Size of each item in the queue + * @param max_items Maximum number of items in the queue + * + * @returns Opaque context pointer for the queue + */ +QueueContext *fifo_queue_create(size_t item_size, int max_items); + +/** + * @brief Enqueue an item to the FIFO queue (nonblocking) If queue is full, + * then return immediately without any waiting. + * + * @param queue_context Queue's context pointer + * @param item Pointer to the item that needs to be enqueued + * + * @returns 0 on success, -1 when queue is full and on failure + */ +int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); + +/** + * @brief Enqueue an item to the FIFO queue, If an item is not available, + * then wait till a timeout. + * + * @param queue_context Queue's context pointer + * @param item Pointer to the item that needs to be enqueued + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * timeout. + * + * @returns 0 on success, -1 on failure + */ +int fifo_queue_enqueue_item_timeout(QueueContext *queue_context, const void *item, int timeout_ms); + +/** + * @brief Dequeue an item from the FIFO queue. If an item is not available, + * then return immediately without any waiting. + * + * @param queue_context Queue's context pointer + * @param item Pointer to the memory where dequeued item will be stored + * + * @returns 0 on success, -1 when queue is empty and on failure + */ +int fifo_queue_dequeue_item(QueueContext *queue_context, void *item); + +/** + * @brief Dequeue an item from the FIFO queue. If an item is not available, + * then wait till a timeout. + * + * @param queue_context Queue's context pointer + * @param item Pointer to the memory where dequeued item will be stored + * @param timeout_ms The number of milliseconds to wait for. -1 for wait without + * timeout. + * + * @returns 0 on success, -1 on failure + */ +int fifo_queue_dequeue_item_timeout(QueueContext *queue_context, void *item, int timeout_ms); + +/** + * @brief Destroys the FIFO queue + * + * @param queue_context Queue's context pointer + * + */ +void fifo_queue_destroy(QueueContext *queue_context); + +#endif diff --git a/protocol/whist/utils/queue.h b/protocol/whist/utils/queue.h index 225ab47cce2..ce6d1d7494c 100644 --- a/protocol/whist/utils/queue.h +++ b/protocol/whist/utils/queue.h @@ -19,12 +19,13 @@ typedef struct QueueContext QueueContext; QueueContext *fifo_queue_create(size_t item_size, int max_items); /** - * @brief Enqueue an item to the FIFO queue + * @brief Enqueue an item to the FIFO queue (nonblocking) If queue is full, + * then return immediately without any waiting. * * @param queue_context Queue's context pointer * @param item Pointer to the item that needs to be enqueued * - * @returns 0 on success, -1 on failure + * @returns 0 on success, -1 when queue is full and on failure */ int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); @@ -48,7 +49,7 @@ int fifo_queue_enqueue_item_timeout(QueueContext *queue_context, const void *ite * @param queue_context Queue's context pointer * @param item Pointer to the memory where dequeued item will be stored * - * @returns 0 on success, -1 on failure + * @returns 0 on success, -1 when queue is empty and on failure */ int fifo_queue_dequeue_item(QueueContext *queue_context, void *item); From 9e369bb2aaa30eced6f8a17e893c6b4f2b7dfc14 Mon Sep 17 00:00:00 2001 From: Suriya Kandaswamy Date: Wed, 14 Sep 2022 16:39:34 -0400 Subject: [PATCH 15/15] remove test.c oops --- protocol/test.c | 77 ------------------------------------------------- 1 file changed, 77 deletions(-) delete mode 100644 protocol/test.c diff --git a/protocol/test.c b/protocol/test.c deleted file mode 100644 index ce6d1d7494c..00000000000 --- a/protocol/test.c +++ /dev/null @@ -1,77 +0,0 @@ -/** - * @copyright Copyright 2022 Whist Technologies, Inc. - * @file queue.h - * @brief API of thread-safe fifo queue. - */ -#ifndef WHIST_UTILS_QUEUE_H -#define WHIST_UTILS_QUEUE_H - -typedef struct QueueContext QueueContext; - -/** - * @brief Create a FIFO queue - * - * @param item_size Size of each item in the queue - * @param max_items Maximum number of items in the queue - * - * @returns Opaque context pointer for the queue - */ -QueueContext *fifo_queue_create(size_t item_size, int max_items); - -/** - * @brief Enqueue an item to the FIFO queue (nonblocking) If queue is full, - * then return immediately without any waiting. - * - * @param queue_context Queue's context pointer - * @param item Pointer to the item that needs to be enqueued - * - * @returns 0 on success, -1 when queue is full and on failure - */ -int fifo_queue_enqueue_item(QueueContext *queue_context, const void *item); - -/** - * @brief Enqueue an item to the FIFO queue, If an item is not available, - * then wait till a timeout. - * - * @param queue_context Queue's context pointer - * @param item Pointer to the item that needs to be enqueued - * @param timeout_ms The number of milliseconds to wait for. -1 for wait without - * timeout. - * - * @returns 0 on success, -1 on failure - */ -int fifo_queue_enqueue_item_timeout(QueueContext *queue_context, const void *item, int timeout_ms); - -/** - * @brief Dequeue an item from the FIFO queue. If an item is not available, - * then return immediately without any waiting. - * - * @param queue_context Queue's context pointer - * @param item Pointer to the memory where dequeued item will be stored - * - * @returns 0 on success, -1 when queue is empty and on failure - */ -int fifo_queue_dequeue_item(QueueContext *queue_context, void *item); - -/** - * @brief Dequeue an item from the FIFO queue. If an item is not available, - * then wait till a timeout. - * - * @param queue_context Queue's context pointer - * @param item Pointer to the memory where dequeued item will be stored - * @param timeout_ms The number of milliseconds to wait for. -1 for wait without - * timeout. - * - * @returns 0 on success, -1 on failure - */ -int fifo_queue_dequeue_item_timeout(QueueContext *queue_context, void *item, int timeout_ms); - -/** - * @brief Destroys the FIFO queue - * - * @param queue_context Queue's context pointer - * - */ -void fifo_queue_destroy(QueueContext *queue_context); - -#endif