Skip to content

Commit

Permalink
Halt worker threads in a more deterministic fashion
Browse files Browse the repository at this point in the history
Previously, we just had the sync thread tell all of the threads to
stop at once and then just wait for them all to exit.

This was starting to introduce deadlock scenarios on exit where
a producing thread would be blocked trying to write its final
messages to a full queue when the corresponding consumer had
already exited.

Now we halt threads in batches based on their role, with
upstream producers being halted first followed by their consumers.
This prevents us from halting a consumer before the producer has
stopped and should greatly reduce the frequency of collectors
hanging on exit.
  • Loading branch information
salcock committed Nov 22, 2024
1 parent 460a94a commit cde064e
Show file tree
Hide file tree
Showing 18 changed files with 142 additions and 57 deletions.
4 changes: 4 additions & 0 deletions src/collector/collector.c
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,7 @@ static int init_sip_worker_thread(openli_sip_worker_t *sipworker,
sipworker->shared = &(glob->sharedinfo);
sipworker->shared_mutex = &(glob->config_mutex);
sipworker->collector_queues = NULL;
sipworker->haltinfo = NULL;

/* It is ok to initialize this mutex here because this method will
* be called by the main collector thread before we start any packet
Expand Down Expand Up @@ -2313,6 +2314,7 @@ int main(int argc, char *argv[]) {
//forwarder only needs CTX if ctx exists and is enabled
glob->forwarders[i].RMQ_conf = glob->RMQ_conf;
glob->forwarders[i].ampq_blocked = 0;
glob->forwarders[i].haltinfo = NULL;

pthread_create(&(glob->forwarders[i].threadid), NULL,
start_forwarding_thread, (void *)&(glob->forwarders[i]));
Expand Down Expand Up @@ -2363,6 +2365,7 @@ int main(int argc, char *argv[]) {
glob->emailworkers[i].zmq_ingest_recvsock = NULL;
glob->emailworkers[i].zmq_colthread_recvsock = NULL;
glob->emailworkers[i].zmq_ii_sock = NULL;
glob->emailworkers[i].haltinfo = NULL;

glob->emailworkers[i].timeouts = NULL;
glob->emailworkers[i].allintercepts = NULL;
Expand Down Expand Up @@ -2405,6 +2408,7 @@ int main(int argc, char *argv[]) {
glob->seqtrackers[i].zmq_pushjobsock = NULL;
glob->seqtrackers[i].zmq_recvpublished = NULL;
glob->seqtrackers[i].intercepts = NULL;
glob->seqtrackers[i].haltinfo = NULL;
glob->seqtrackers[i].colident = &(glob->sharedinfo);
glob->seqtrackers[i].colident_mutex = &(glob->config_mutex);
glob->seqtrackers[i].encoding_method = glob->encoding_method;
Expand Down
4 changes: 3 additions & 1 deletion src/collector/collector_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ typedef struct seqtracker_thread_data {
exporter_intercept_state_t *intercepts;
removed_intercept_t *removedints;
uint8_t encoding_method;

halt_info_t *haltinfo;

} seqtracker_thread_data_t;

Expand All @@ -217,6 +217,7 @@ typedef struct forwarding_thread_data {
pthread_t threadid;
int forwardid;
int encoders;
int encoders_over;
int colthreads;

void *zmq_ctrlsock;
Expand Down Expand Up @@ -245,6 +246,7 @@ typedef struct forwarding_thread_data {
amqp_socket_t *ampq_sock;
uint8_t ampq_blocked;
openli_RMQ_config_t RMQ_conf;
halt_info_t *haltinfo;

} forwarding_thread_data_t;

Expand Down
24 changes: 17 additions & 7 deletions src/collector/collector_forwarder.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ static int handle_ctrl_message(forwarding_thread_data_t *fwd,
openli_export_recv_t *msg) {

if (msg->type == OPENLI_EXPORT_HALT) {
fwd->haltinfo = msg->data.haltinfo;
free(msg);
return 0;
}
Expand Down Expand Up @@ -666,10 +667,10 @@ static void connect_export_targets(forwarding_thread_data_t *fwd) {

static int drain_incoming_etsi(forwarding_thread_data_t *fwd) {

int x, encoders_over = 0, i, msgcnt;
int x, i, msgcnt;
openli_encoded_result_t res[MAX_ENCODED_RESULT_BATCH];

do {
while (fwd->encoders_over < fwd->encoders) {
x = zmq_recv(fwd->zmq_pullressock, res, sizeof(res),
ZMQ_DONTWAIT);
if (x < 0 && errno != EAGAIN) {
Expand All @@ -690,15 +691,12 @@ static int drain_incoming_etsi(forwarding_thread_data_t *fwd) {
for (i = 0; i < msgcnt; i++) {

if (res[i].liid == NULL && res[i].destid == 0) {
if (fwd->forwardid == 0) {
logger(LOG_INFO, "OpenLI: encoder %d has ceased encoding", encoders_over);
}
encoders_over ++;
fwd->encoders_over ++;
}

free_encoded_result(&(res[i]));
}
} while (encoders_over < fwd->encoders);
}

return 1;
}
Expand Down Expand Up @@ -730,6 +728,11 @@ static int receive_incoming_etsi(forwarding_thread_data_t *fwd) {
msgcnt = x / sizeof(openli_encoded_result_t);

for (i = 0; i < msgcnt; i++) {
if (res[i].liid == NULL || res[i].destid == 0) {
fwd->encoders_over ++;
free_encoded_result(&(res[i]));
break;
}
if (handle_encoded_result(fwd, &(res[i])) < 0) {
return -1;
}
Expand Down Expand Up @@ -1202,6 +1205,7 @@ void *start_forwarding_thread(void *data) {
char sockname[128];
int zero = 0;

fwd->encoders_over = 0;
fwd->zmq_ctrlsock = zmq_socket(fwd->zmq_ctxt, ZMQ_PULL);
snprintf(sockname, 128, "inproc://openliforwardercontrol_sync-%d",
fwd->forwardid);
Expand Down Expand Up @@ -1248,6 +1252,12 @@ void *start_forwarding_thread(void *data) {
remove_all_destinations(fwd);
logger(LOG_DEBUG, "OpenLI: halting forwarding thread %d",
fwd->forwardid);
if (fwd->haltinfo) {
pthread_mutex_lock(&(fwd->haltinfo->mutex));
fwd->haltinfo->halted ++;
pthread_cond_signal(&(fwd->haltinfo->cond));
pthread_mutex_unlock(&(fwd->haltinfo->mutex));
}
pthread_exit(NULL);
}

Expand Down
8 changes: 8 additions & 0 deletions src/collector/collector_publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#ifndef OPENLI_COLLECTOR_PUBLISH_H_
#define OPENLI_COLLECTOR_PUBLISH_H_

#include <pthread.h>
#include <libtrace.h>
#include <zmq.h>

Expand All @@ -36,6 +37,12 @@
#include "internetaccess.h"
#include "location.h"

typedef struct halt_info {
size_t halted;
pthread_mutex_t mutex;
pthread_cond_t cond;
} halt_info_t;

enum {
OPENLI_EXPORT_HALT_WORKER = 1,
OPENLI_EXPORT_PACKET_FIN = 2,
Expand Down Expand Up @@ -203,6 +210,7 @@ struct openli_export_recv {
openli_rawip_job_t rawip;
openli_emailiri_job_t emailiri;
openli_emailcc_job_t emailcc;
halt_info_t *haltinfo;
} data;
};

Expand Down
10 changes: 10 additions & 0 deletions src/collector/collector_seqtracker.c
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ static void seqtracker_main(seqtracker_thread_data_t *seqdata) {
switch(job->type) {
case OPENLI_EXPORT_HALT:
halted = 1;
seqdata->haltinfo = job->data.haltinfo;
free(job);
break;

Expand Down Expand Up @@ -587,8 +588,17 @@ void *start_seqtracker_thread(void *data) {
free_intercept_state(seqdata, intstate);
}

logger(LOG_INFO, "OpenLI: halting sequence tracker %d\n",
seqdata->trackerid);
zmq_close(seqdata->zmq_recvpublished);
zmq_close(seqdata->zmq_pushjobsock);

if (seqdata->haltinfo) {
pthread_mutex_lock(&(seqdata->haltinfo->mutex));
seqdata->haltinfo->halted ++;
pthread_cond_signal(&(seqdata->haltinfo->cond));
pthread_mutex_unlock(&(seqdata->haltinfo->mutex));
}
pthread_exit(NULL);
}

Expand Down
69 changes: 42 additions & 27 deletions src/collector/collector_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,36 @@ collector_sync_t *init_sync_data(collector_global_t *glob) {

}

#define HALT_THREADS(socks, count) \
pthread_mutex_lock(&(haltinfo.mutex)); \
haltinfo.halted = 0; \
haltfails = send_halt_message_to_zmq_socket_array( \
socks, count, &haltinfo); \
\
if (haltfails) { \
haltattempts ++; \
usleep(250000); \
continue; \
} \
\
while (count > 0 && haltinfo.halted < count) { \
pthread_cond_wait(&(haltinfo.cond), &(haltinfo.mutex)); \
} \
pthread_mutex_unlock(&(haltinfo.mutex));


void clean_sync_data(collector_sync_t *sync) {

int zero=0;
int haltattempts = 0, haltfails = 0;
ip_to_session_t *iter, *tmp;
default_radius_user_t *raditer, *radtmp;
halt_info_t haltinfo;

if (sync->instruct_fd != -1) {
close(sync->instruct_fd);
sync->instruct_fd = -1;
}
if (sync->instruct_fd != -1) {
close(sync->instruct_fd);
sync->instruct_fd = -1;
}

HASH_ITER(hh, sync->activeips, iter, tmp) {
HASH_DELETE(hh, sync->activeips, iter);
Expand Down Expand Up @@ -186,6 +205,9 @@ void clean_sync_data(collector_sync_t *sync) {
sync->radiusplugin = NULL;
sync->activeips = NULL;

pthread_mutex_init(&(haltinfo.mutex), NULL);
pthread_cond_init(&(haltinfo.cond), NULL);

while (haltattempts < 10) {
haltfails = 0;

Expand Down Expand Up @@ -213,24 +235,17 @@ void clean_sync_data(collector_sync_t *sync) {
sync->zmq_colsock = NULL;
}

haltfails += send_halt_message_to_zmq_socket_array(sync->zmq_pubsocks,
sync->pubsockcount);
haltfails += send_halt_message_to_zmq_socket_array(
sync->zmq_fwdctrlsocks, sync->forwardcount);
haltfails += send_halt_message_to_zmq_socket_array(
sync->zmq_emailsocks, sync->emailcount);
haltfails += send_halt_message_to_zmq_socket_array(
sync->zmq_gtpsocks, sync->gtpcount);
haltfails += send_halt_message_to_zmq_socket_array(
sync->zmq_sipsocks, sync->sipcount);

if (haltfails == 0) {
break;
}
haltattempts ++;
usleep(250000);
HALT_THREADS(sync->zmq_sipsocks, sync->sipcount);
HALT_THREADS(sync->zmq_emailsocks, sync->emailcount);
HALT_THREADS(sync->zmq_gtpsocks, sync->gtpcount);
HALT_THREADS(sync->zmq_pubsocks, sync->pubsockcount);
HALT_THREADS(sync->zmq_fwdctrlsocks, sync->forwardcount);
break;
}

pthread_mutex_destroy(&(haltinfo.mutex));
pthread_cond_destroy(&(haltinfo.cond));

free(sync->zmq_emailsocks);
free(sync->zmq_sipsocks);
free(sync->zmq_gtpsocks);
Expand Down Expand Up @@ -290,7 +305,7 @@ static inline void push_coreserver_msg(collector_sync_t *sync,

void sync_thread_publish_reload(collector_sync_t *sync) {

int i;
size_t i;
openli_export_recv_t *expmsg;

for (i = 0; i < sync->pubsockcount; i++) {
Expand Down Expand Up @@ -870,7 +885,7 @@ static void push_ipintercept_update_to_threads(collector_sync_t *sync,
static int new_mediator(collector_sync_t *sync, uint8_t *provmsg,
uint16_t msglen) {

int i;
size_t i;
openli_mediator_t med;
openli_export_recv_t *expmsg;

Expand Down Expand Up @@ -902,7 +917,7 @@ static int new_mediator(collector_sync_t *sync, uint8_t *provmsg,
static int remove_mediator(collector_sync_t *sync, uint8_t *provmsg,
uint16_t msglen) {

int i;
size_t i;
openli_mediator_t med;
openli_export_recv_t *expmsg;

Expand Down Expand Up @@ -1114,7 +1129,7 @@ static inline void remove_vendormirror_id(collector_sync_t *sync,
static void remove_ip_intercept(collector_sync_t *sync, ipintercept_t *ipint) {

openli_export_recv_t *expmsg;
int i;
size_t i;

if (!ipint) {
logger(LOG_INFO,
Expand Down Expand Up @@ -1273,7 +1288,7 @@ static int halt_ipintercept(collector_sync_t *sync, uint8_t *intmsg,

void sync_drop_all_mediators(collector_sync_t *sync) {
openli_export_recv_t *expmsg;
int i;
size_t i;

for (i = 0; i < sync->forwardcount; i++) {
expmsg = (openli_export_recv_t *)calloc(1,
Expand All @@ -1287,7 +1302,7 @@ void sync_drop_all_mediators(collector_sync_t *sync) {

void sync_reconnect_all_mediators(collector_sync_t *sync) {
openli_export_recv_t *expmsg;
int i;
size_t i;

for (i = 0; i < sync->forwardcount; i++) {
expmsg = (openli_export_recv_t *)calloc(1,
Expand Down Expand Up @@ -1781,7 +1796,7 @@ static inline void touch_all_intercepts(ipintercept_t *intlist) {
void sync_disconnect_provisioner(collector_sync_t *sync, uint8_t dropmeds) {

openli_export_recv_t *expmsg;
int i;
size_t i;

destroy_net_buffer(sync->outgoing, NULL);
destroy_net_buffer(sync->incoming, NULL);
Expand Down
10 changes: 5 additions & 5 deletions src/collector/collector_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ typedef struct colsync_data {
collector_identity_t *info;
pthread_rwlock_t *info_mutex;

int pubsockcount;
int forwardcount;
int emailcount;
int gtpcount;
int sipcount;
size_t pubsockcount;
size_t forwardcount;
size_t emailcount;
size_t gtpcount;
size_t sipcount;

void **zmq_pubsocks;
void **zmq_fwdctrlsocks;
Expand Down
6 changes: 5 additions & 1 deletion src/collector/collector_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ void clear_zmq_socket_array(void **zmq_socks, int sockcount) {
free(zmq_socks);
}

int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount) {
int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount,
halt_info_t *haltinfo) {
openli_export_recv_t *haltmsg;
int zero = 0, ret, i, failed;

Expand All @@ -86,6 +87,8 @@ int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount) {
haltmsg = (openli_export_recv_t *)calloc(1,
sizeof(openli_export_recv_t));
haltmsg->type = OPENLI_EXPORT_HALT;
haltmsg->data.haltinfo = haltinfo;

ret = zmq_send(zmq_socks[i], &haltmsg, sizeof(haltmsg), ZMQ_NOBLOCK);
if (ret < 0 && errno == EAGAIN) {
failed ++;
Expand All @@ -97,6 +100,7 @@ int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount) {
}
zmq_setsockopt(zmq_socks[i], ZMQ_LINGER, &zero, sizeof(zero));
zmq_close(zmq_socks[i]);
zmq_socks[i] = NULL;
}
return failed;
}
Expand Down
5 changes: 4 additions & 1 deletion src/collector/collector_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
#ifndef OPENLI_COLLECTOR_UTIL_H_
#define OPENLI_COLLECTOR_UTIL_H_

#include "export_buffer.h"

int init_zmq_socket_array(void **zmq_socks, int sockcount,
const char *basename, void *zmq_ctxt);
void clear_zmq_socket_array(void **zmq_socks, int sockcount);
int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount);
int send_halt_message_to_zmq_socket_array(void **zmq_socks, int sockcount,
halt_info_t *haltinfo);

#endif
// vim: set sw=4 tabstop=4 softtabstop=4 expandtab :
Loading

0 comments on commit cde064e

Please sign in to comment.