Skip to content

Commit

Permalink
TL/UCP: Add sliding window allreduce algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
nsarka committed Nov 1, 2023
1 parent 483b91b commit 1739d91
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 4 deletions.
9 changes: 5 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ alltoallv = \
alltoallv/alltoallv_pairwise.c \
alltoallv/alltoallv_hybrid.c

allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sliding_window.c \
allreduce/allreduce_sra_knomial.c

barrier = \
Expand Down
66 changes: 66 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ ucc_base_coll_alg_info_t
.name = "sra_knomial",
.desc = "recursive knomial scatter-reduce followed by knomial "
"allgather (optimized for BW)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
.name = "sliding_window",
.desc = "sliding window allreduce (DPU based)"},
[UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand All @@ -46,3 +50,65 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
out:
return status;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h)
{
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_rank_t size = (ucc_rank_t)team->params.size;
ucc_status_t status = UCC_OK;
ucc_tl_ucp_task_t *task;
ucc_ee_executor_params_t params;

ALLREDUCE_TASK_CHECK(coll_args->args, tl_team);

if (!(coll_args->args.mask & UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"global work buffer not provided nor associated with team");
status = UCC_ERR_NOT_SUPPORTED;
goto out;
}
if (coll_args->args.mask & UCC_COLL_ARGS_FIELD_FLAGS) {
if (!(coll_args->args.flags & UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS)) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"non memory mapped buffers are not supported");
status = UCC_ERR_NOT_SUPPORTED;
goto out;
}
}
task = ucc_tl_ucp_init_task(coll_args, team);
*task_h = &task->super;
task->super.post = ucc_tl_ucp_allreduce_sliding_window_start;
task->super.progress = ucc_tl_ucp_allreduce_sliding_window_progress;
task->super.finalize = ucc_tl_ucp_allreduce_sliding_window_finalize;

ucc_tl_ucp_allreduce_sw_global_work_buf_info *gwbi_p =
(ucc_tl_ucp_allreduce_sw_global_work_buf_info *)TASK_ARGS(task).global_work_buffer;

task->allreduce_sliding_window.src_rkeys = &gwbi_p->src_rkeys[size * gwbi_p->tid];
task->allreduce_sliding_window.dst_rkeys = &gwbi_p->dst_rkeys[size * gwbi_p->tid];
task->allreduce_sliding_window.host_eps = &gwbi_p->host_eps[size * gwbi_p->tid];
task->allreduce_sliding_window.worker = gwbi_p->ucp_thread_workers[gwbi_p->tid];
task->allreduce_sliding_window.sbufs = gwbi_p->sbufs;
task->allreduce_sliding_window.rbufs = gwbi_p->rbufs;
task->allreduce_sliding_window.pipe = &gwbi_p->pipes[gwbi_p->tid];
task->allreduce_sliding_window.num_get_bufs = gwbi_p->num_bufs - 1;
task->allreduce_sliding_window.window_size = gwbi_p->window_size;
task->allreduce_sliding_window.tid = gwbi_p->tid;
task->allreduce_sliding_window.nthreads = gwbi_p->nthreads;
task->allreduce_sliding_window.put_requests = gwbi_p->pipes[gwbi_p->tid].put_requests;

params.mask = UCC_EE_EXECUTOR_PARAM_FIELD_TYPE;
params.ee_type = UCC_EE_CPU_THREAD;
status = ucc_ee_executor_init(&params,
&task->allreduce_sliding_window.executor);

if (UCC_OK != status) {
ucc_error("failed to init executor: %s", ucc_status_string(status));
}

out:
return status;
}
25 changes: 25 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
enum {
UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
UCC_TL_UCP_ALLREDUCE_ALG_LAST
};

Expand All @@ -35,16 +36,40 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
#define ALLREDUCE_TASK_CHECK(_args, _team) \
CHECK_SAME_MEMTYPE((_args), (_team));

typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
ucp_rkey_h *src_rkeys; //unpacked
ucp_rkey_h *dst_rkeys; //unpacked
ucp_ep_h *host_eps;
ucp_worker_h *ucp_thread_workers;
void *sbufs;
void *rbufs;
ucc_tl_ucp_allreduce_sw_pipeline_t *pipes;
int num_bufs;
int window_size;
int tid;
int nthreads;
} ucc_tl_ucp_allreduce_sw_global_work_buf_info;

ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);

void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
Expand Down
Loading

0 comments on commit 1739d91

Please sign in to comment.