Skip to content

Commit

Permalink
parallel.c: Generalize parallel task framework.
Browse files Browse the repository at this point in the history
The parallel task framework was initially written for and
embedded in net_imap. However, this code is completely generic
and could be used for anything, and anticipating that, move
this code out of net_imap into the code, renaming appropriately.
  • Loading branch information
InterLinked1 committed Jan 27, 2024
1 parent 9233077 commit 1758e7f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 46 deletions.
78 changes: 47 additions & 31 deletions nets/net_imap/imap_client_parallel.c → bbs/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,45 @@

/*! \file
*
* \brief IMAP Client Parallel Operation Framework
* \brief Parallel Task Framework
*
* \author Naveen Albert <[email protected]>
*/

#include "include/bbs.h"

#include "include/alertpipe.h"

#include "nets/net_imap/imap.h"
#include "nets/net_imap/imap_client.h"
#include "nets/net_imap/imap_client_parallel.h"

extern unsigned int maxuserproxies;
#include "include/parallel.h"
#include "include/utils.h"

/* Yes, this kind of looks like a threadpool.
* No, it's not a threadpool. */

/* Unlike normal non-parallel operations, we need to carefully coordinate
/*! \note This code was originally written specifically for and embedded in net_imap.
* While the code is completely generic, there are still some net_imap specific comments
* that have been left here. */

/* imap/imap_client_list.c: Unlike normal non-parallel operations, we need to carefully coordinate
* use of the different imap_client's, because we can't have 2 threads
* trying to use the same client at once.
* Naturally won't happen for LIST, where we do one per account,
* but could happen for STATUS if we do potentially multiple folders/acct at once. */

#define MAX_CONCURRENT_TASKS 10
#define DEBUG_PARALLEL_TASKS

void bbs_parallel_init(struct bbs_parallel *p, unsigned int min, unsigned int max)
{
memset(p, 0, sizeof(struct bbs_parallel));
p->min_parallel_tasks = min;
p->max_parallel_tasks = max;
#ifdef DEBUG_PARALLEL_TASKS
bbs_debug(7, "Initializing parallel task set (parallelism range: %u-%u)\n", min, max);
#endif
}

static int task_match(struct imap_parallel *p, int started, int completed, struct imap_parallel_task *task, unsigned long hash)
static int task_match(struct bbs_parallel *p, int started, int completed, struct bbs_parallel_task *task, unsigned long hash)
{
struct imap_parallel_task *t;
struct bbs_parallel_task *t;
RWLIST_TRAVERSE(&p->tasks, t, entry) {
if (t == task) {
continue;
Expand All @@ -59,10 +69,10 @@ static int task_match(struct imap_parallel *p, int started, int completed, struc
}

/*! \note Must be called locked */
static struct imap_parallel_task *next_task(struct imap_parallel *p, int maxconcurrent, int *throttled)
static struct bbs_parallel_task *next_task(struct bbs_parallel *p, unsigned int maxconcurrent, int *throttled)
{
int total_running = 0;
struct imap_parallel_task *t;
unsigned int total_running = 0;
struct bbs_parallel_task *t;
RWLIST_TRAVERSE(&p->tasks, t, entry) {
if (t->started) { /* t->started implies t->completed, no need to check that as well */
if (!t->completed) {
Expand All @@ -71,7 +81,7 @@ static struct imap_parallel_task *next_task(struct imap_parallel *p, int maxconc
continue;
}
/* We can't execute this task if another task is currently running with the same hash. */
/* This absolutely MUST be enforced. imap_client's are not thread safe,
/* imap/imap_client_list.c: This absolutely MUST be enforced. imap_client's are not thread safe,
* we cannot have two different tasks using the same underlying imap_client at ANY time. */
if (task_match(p, 1, 0, t, t->hash)) {
#ifdef DEBUG_PARALLEL_TASKS
Expand All @@ -83,10 +93,10 @@ static struct imap_parallel_task *next_task(struct imap_parallel *p, int maxconc
}
if (maxconcurrent && total_running >= maxconcurrent) { /* Reached concurrent thread limit */
*throttled = 1;
bbs_debug(6, "Delaying subsequent task execution (currently at %d concurrent)\n", total_running);
bbs_debug(6, "Delaying subsequent task execution (currently at %u concurrent)\n", total_running);
return NULL;
} else if (!t && total_running) { /* Couldn't find a suitable task */
bbs_debug(6, "Unable to find suitable task for immediate execution (%d running)\n", total_running);
bbs_debug(6, "Unable to find suitable task for immediate execution (%u running)\n", total_running);
*throttled = 1;
return NULL;
}
Expand All @@ -96,12 +106,12 @@ static struct imap_parallel_task *next_task(struct imap_parallel *p, int maxconc
static void *run_task(void *varg)
{
int throttled = 0;
struct imap_parallel_task *t = varg;
struct imap_parallel *p = t->p;
struct bbs_parallel_task *t = varg;
struct bbs_parallel *p = t->p;

bbs_debug(6, "Spawned thread for task %p\n", t);
for (;;) {
struct imap_parallel_task *t2;
struct bbs_parallel_task *t2;
t->res = t->cb(t->data);
t->completed = 1;

Expand Down Expand Up @@ -136,15 +146,15 @@ static void *run_task(void *varg)
}

/* \retval 0 if a task was scheduled, -1 if tasks could not be scheduled, 1 if there are no further tasks to schedule */
static int run_scheduler(struct imap_parallel *p)
static int run_scheduler(struct bbs_parallel *p)
{
int remaining = 0;
int throttled = 0;
struct imap_parallel_task *t;
struct bbs_parallel_task *t;

/* Look through the list for a suitable task to schedule */
RWLIST_WRLOCK(&p->tasks);
t = next_task(p, MAX_CONCURRENT_TASKS, &throttled);
t = next_task(p, p->max_parallel_tasks, &throttled);
if (!t) {
/* Perhaps we've reached the concurrent execution limit */
RWLIST_UNLOCK(&p->tasks);
Expand Down Expand Up @@ -181,28 +191,31 @@ static unsigned long fast_hash(unsigned const char *restrict s)
return hash;
}

int imap_client_parallel_schedule_task(struct imap_parallel *p, const char *restrict prefix, void *data, int (*cb)(void *data), void *(*duplicate)(void *data), void (*cleanup)(void *data))
int bbs_parallel_schedule_task(struct bbs_parallel *p, const char *restrict prefix, void *data, int (*cb)(void *data), void *(*duplicate)(void *data), void (*cleanup)(void *data))
{
struct imap_parallel_task *t;
struct bbs_parallel_task *t;
void *datadup;

/* XXX Or if we know there won't be any more tasks coming (either only 1, or this is the last one),
* we might as well just do it directly as well */
if (maxuserproxies <= 1) {
/* Use the stack allocated version directly, since we won't be able to take advantage of concurrent proxies anyways. */
/* XXX Problem is this MIGHT not be safe if one is already in use, in a parallel task?
if (p->max_parallel_tasks <= 1) {
/* Use the stack allocated version directly, since we won't be able to take advantage of concurrency anyways. */
/* imap/imap_client_list.c: XXX Problem is this MIGHT not be safe if one is already in use, in a parallel task?
* imap_client_get should create a new client if we already have one but it's in use for a parallel job.
* That'll take care of the case where a static job like this requests it,
* or if we accidentally request it for a parallel job (which shouldn't happen,
* since we shouldn't be scheduling tasks for the same IMAP client at the same time). */
#ifdef DEBUG_PARALLEL_TASKS
bbs_debug(7, "Parallelism inhibited (max parallel tasks: %u), running task serially\n", p->max_parallel_tasks);
#endif
return cb(data);
}

/* Set up a parallel task instead */
datadup = duplicate(data); /* Allocate a heap allocated version of the stack structure since we can't execute this in the current thread */
if (!datadup) {
/* If duplicate function returns NULL, then we must execute in serial.
* This might not always indicate failure. Maybe for this task, it's been determined that it's better to do it this way
* imap/imap_client_list.c: This might not always indicate failure. Maybe for this task, it's been determined that it's better to do it this way
* (e.g. the remote server supports LIST-STATUS, so this will be a fast operation) */
return cb(data);
}
Expand Down Expand Up @@ -242,12 +255,15 @@ int imap_client_parallel_schedule_task(struct imap_parallel *p, const char *rest
return run_scheduler(p);
}

int imap_client_parallel_join(struct imap_parallel *p)
int bbs_parallel_join(struct bbs_parallel *p)
{
struct imap_parallel_task *t;
struct bbs_parallel_task *t;
int res; /* Maybe there are no tasks (everything was executed serially), assume success by default */

if (!p->initialized) {
#ifdef DEBUG_PARALLEL_TASKS
bbs_debug(3, "No parallel tasks initialized, nothing to join\n");
#endif
return 0;
}

Expand Down
27 changes: 18 additions & 9 deletions nets/net_imap/imap_client_parallel.h → include/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,43 @@

/*! \file
*
* \brief IMAP Client Parallel Operation Framework
* \brief Parallel Task Framework
*
*/

#include "include/linkedlists.h"

struct imap_parallel_task {
struct imap_parallel *p; /* Parent to which this task belongs */
struct bbs_parallel_task {
struct bbs_parallel *p; /* Parent to which this task belongs */
unsigned long hash; /* Prefix hash value */
int (*cb)(void *data); /* Callback to execute the task */
void (*cleanup)(void *data); /* Cleanup function */
void *data; /* Callback data for the task */
pthread_t thread; /* Thread responsible for the task */
int res; /* Return code of task */
RWLIST_ENTRY(imap_parallel_task) entry;
RWLIST_ENTRY(bbs_parallel_task) entry;
unsigned int started:1; /* Task has been started (i.e. is either running or completed) */
unsigned int completed:1; /* Task has been completed */
};

RWLIST_HEAD(parallel_tasks, imap_parallel_task);
RWLIST_HEAD(parallel_tasks, bbs_parallel_task);

struct imap_parallel {
struct bbs_parallel {
struct parallel_tasks tasks;
int alertpipe[2];
unsigned int min_parallel_tasks; /* Minimum number of tasks to run in parallel */
unsigned int max_parallel_tasks; /* Maximum number of tasks to run in parallel */
unsigned int waiting:1; /* In the "waiting" phase, i.e. there are no more tasks to schedule */
unsigned int initialized:1;
};

/*!
* \brief Init a bbs_parallel
* \param min Minimum number of tasks to run in parallel instead of serially. This option is currently ignored.
* \param max Maximum number of tasks that may run in parallel at once
*/
void bbs_parallel_init(struct bbs_parallel *p, unsigned int min, unsigned int max);

/*!
* \brief Schedule a task for execution. The task may be executed immediately or delayed, in a separate thread
* \param p Parallel job series structure
Expand All @@ -47,13 +56,13 @@ struct imap_parallel {
* \param cleanup Function to destroy a heap allocated callback data structure
* \return Task return code, if executed immediately
* \return Scheduler return code, if not being executed immediately.
* \note You must call imap_client_parallel_join to ensure all tasks finish execeution, at some point before p goes out of scope
* \note You must call bbs_parallel_join to ensure all tasks finish execeution, at some point before p goes out of scope
* \note This function should only be called from the (same) parent thread
*/
int imap_client_parallel_schedule_task(struct imap_parallel *p, const char *restrict prefix, void *data, int (*cb)(void *data), void *(*duplicate)(void *data), void (*cleanup)(void *data));
int bbs_parallel_schedule_task(struct bbs_parallel *p, const char *restrict prefix, void *data, int (*cb)(void *data), void *(*duplicate)(void *data), void (*cleanup)(void *data));

/*!
* \brief Wait for all pending tasks to finish execution
* \retval Bitwise OR of all task return values, ORed with the status of this function (0 on success, nonzero on failure)
*/
int imap_client_parallel_join(struct imap_parallel *p);
int bbs_parallel_join(struct bbs_parallel *p);
14 changes: 8 additions & 6 deletions nets/net_imap/imap_client_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "include/bbs.h"

#include "include/node.h"
#include "include/parallel.h"

#include "include/mod_mail.h"

Expand All @@ -29,7 +30,8 @@
#include "nets/net_imap/imap_server_list.h"
#include "nets/net_imap/imap_client_list.h"
#include "nets/net_imap/imap_client_status.h" /* use remove_size */
#include "nets/net_imap/imap_client_parallel.h"

extern unsigned int maxuserproxies;

static int remote_list(struct imap_client *client, struct list_command *lcmd, const char *prefix)
{
Expand Down Expand Up @@ -293,7 +295,7 @@ static int remote_list_cb(void *data)
return res;
}

static int remote_list_parallel(struct imap_parallel *p, const char *restrict prefix, struct list_command *lcmd, struct imap_session *imap, const char *server)
static int remote_list_parallel(struct bbs_parallel *p, const char *restrict prefix, struct list_command *lcmd, struct imap_session *imap, const char *server)
{
struct remote_list_info rinfo; /* No memset needed */

Expand All @@ -305,7 +307,7 @@ static int remote_list_parallel(struct imap_parallel *p, const char *restrict pr
/* This variable will not be modified, but since the dynamic version allocates and frees it, the type cannot be const */
rinfo.server = (char*) server;
#pragma GCC diagnostic pop
return imap_client_parallel_schedule_task(p, prefix, &rinfo, remote_list_cb, remote_list_dup, remote_list_destroy);
return bbs_parallel_schedule_task(p, prefix, &rinfo, remote_list_cb, remote_list_dup, remote_list_destroy);
}

/*! \brief Mutex to prevent recursion */
Expand All @@ -317,7 +319,7 @@ int list_virtual(struct imap_session *imap, struct list_command *lcmd)
char virtfile[256];
char line[256];
int l = 0;
struct imap_parallel p;
struct bbs_parallel p;

/* Folders from the proxied mailbox will need to be translated back and forth */
if (pthread_mutex_trylock(&virt_lock)) {
Expand Down Expand Up @@ -349,7 +351,7 @@ int list_virtual(struct imap_session *imap, struct list_command *lcmd)
}

stringlist_empty(&imap->remotemailboxes);
memset(&p, 0, sizeof(p));
bbs_parallel_init(&p, 2, maxuserproxies);

/* Note that we cache all the directories on all servers at once, since we truncate the file. */
while ((fgets(line, sizeof(line), fp))) {
Expand All @@ -369,7 +371,7 @@ int list_virtual(struct imap_session *imap, struct list_command *lcmd)
}
fclose(fp);

imap_client_parallel_join(&p);
bbs_parallel_join(&p);
pthread_mutex_unlock(&virt_lock);
return 0;
}

0 comments on commit 1758e7f

Please sign in to comment.