Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TL/UCP: Add sliding window allreduce algorithm
Browse files Browse the repository at this point in the history
nsarka committed Jan 23, 2024
1 parent 483b91b commit 9beead6
Showing 15 changed files with 1,273 additions and 62 deletions.
10 changes: 6 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
@@ -34,10 +34,12 @@ alltoallv = \
alltoallv/alltoallv_pairwise.c \
alltoallv/alltoallv_hybrid.c

allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sliding_window.c \
allreduce/allreduce_sliding_window_setup.c \
allreduce/allreduce_sra_knomial.c

barrier = \
45 changes: 43 additions & 2 deletions src/components/tl/ucp/allreduce/allreduce.c
Original file line number Diff line number Diff line change
@@ -13,13 +13,17 @@ ucc_base_coll_alg_info_t
[UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
.name = "knomial",
.desc =
"recursive knomial with arbitrary radix (optimized for latency)"},
.desc = "recursive knomial with arbitrary radix (optimized for "
"latency)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
.name = "sra_knomial",
.desc = "recursive knomial scatter-reduce followed by knomial "
"allgather (optimized for BW)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
.name = "sliding_window",
.desc = "sliding window allreduce (optimized for running on DPU)"},
[UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

@@ -46,3 +50,40 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
out:
return status;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h)
{
ucc_status_t status = UCC_OK;
ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t * task;
ucc_ee_executor_params_t params;

ALLREDUCE_TASK_CHECK(coll_args->args, tl_team);

task = ucc_tl_ucp_init_task(coll_args, team);
if (ucc_unlikely(!task)) {
ucc_error("couldnt allocate task");
return UCC_ERR_NO_MEMORY;
}
*task_h = &task->super;
task->super.post = ucc_tl_ucp_allreduce_sliding_window_start;
task->super.progress = ucc_tl_ucp_allreduce_sliding_window_progress;
task->super.finalize = ucc_tl_ucp_allreduce_sliding_window_finalize;

ucc_tl_ucp_allreduce_sliding_window_task_init(coll_args, team, task);

params.mask = UCC_EE_EXECUTOR_PARAM_FIELD_TYPE;
params.ee_type = UCC_EE_CPU_THREAD;
status =
ucc_ee_executor_init(&params, &task->allreduce_sliding_window.executor);

if (UCC_OK != status) {
ucc_error("failed to init executor: %s", ucc_status_string(status));
}

out:
return status;
}
102 changes: 99 additions & 3 deletions src/components/tl/ucp/allreduce/allreduce.h
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@
enum {
UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
UCC_TL_UCP_ALLREDUCE_ALG_LAST
};

@@ -35,21 +36,116 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
#define ALLREDUCE_TASK_CHECK(_args, _team) \
CHECK_SAME_MEMTYPE((_args), (_team));

#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024

typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
void *packed_src_memh;
void *packed_dst_memh;
} ucc_tl_ucp_allreduce_sw_global_work_buf_info;

typedef enum ucc_tl_ucp_allreduce_sw_buf_state
{
FREE,
RECVING,
REDUCING,
REDUCED,
SENDING,
IDLE,
} ucc_tl_ucp_allreduce_sw_buf_state;

typedef struct ucc_tl_ucp_allreduce_sw_buf {
void * buf;
ucc_tl_ucp_allreduce_sw_buf_state state;
ucs_status_ptr_t ucp_req;
size_t count;
size_t bytes;
} ucc_tl_ucp_allreduce_sw_buf;

typedef struct ucc_tl_ucp_allreduce_sw_pipeline {
ucc_tl_ucp_allreduce_sw_buf accbuf;
ucc_tl_ucp_allreduce_sw_buf *getbuf;
ucs_status_ptr_t * put_requests;
size_t buffer_size;
size_t num_buffers;
size_t avail_buffs;
size_t my_count;
size_t my_offset;
size_t count_issued;
size_t count_received;
size_t count_reduced;
size_t count_serviced;
size_t get_idx;
size_t red_idx;
ucc_rank_t src_rank;
ucc_rank_t dst_rank;
int done_get;
int done_red;
int done_put;
int posted_put;
} ucc_tl_ucp_allreduce_sw_pipeline;

struct ucc_tl_ucp_allreduce_sw_export_buf {
ucp_context_h ucp_context;
ucp_mem_h memh;
void * packed_memh;
size_t packed_memh_len;
void * packed_key;
size_t packed_key_len;
uint64_t memh_id;
};

typedef struct ucc_tl_ucp_allreduce_sw_host_allgather {
void *src_buf;
void *dst_buf;
char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
} ucc_tl_ucp_allreduce_sw_host_allgather;

ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(
ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);

void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);
ucc_status_t
ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_start(ucc_coll_task_t *task);

Loading

0 comments on commit 9beead6

Please sign in to comment.