Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Window Allreduce #862

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ alltoallv = \
alltoallv/alltoallv_pairwise.c \
alltoallv/alltoallv_hybrid.c

allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sliding_window.c \
allreduce/allreduce_sliding_window_setup.c \
allreduce/allreduce_sra_knomial.c

barrier = \
Expand Down
45 changes: 43 additions & 2 deletions src/components/tl/ucp/allreduce/allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ ucc_base_coll_alg_info_t
[UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
.name = "knomial",
.desc =
"recursive knomial with arbitrary radix (optimized for latency)"},
.desc = "recursive knomial with arbitrary radix (optimized for "
"latency)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
.name = "sra_knomial",
.desc = "recursive knomial scatter-reduce followed by knomial "
"allgather (optimized for BW)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
.name = "sliding_window",
.desc = "sliding window allreduce (optimized for running on DPU)"},
[UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand All @@ -46,3 +50,40 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
out:
return status;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h)
{
ucc_status_t status = UCC_OK;
ucc_tl_ucp_team_t * tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_tl_ucp_task_t * task;
ucc_ee_executor_params_t params;

ALLREDUCE_TASK_CHECK(coll_args->args, tl_team);

task = ucc_tl_ucp_init_task(coll_args, team);
if (ucc_unlikely(!task)) {
ucc_error("couldnt allocate task");
return UCC_ERR_NO_MEMORY;
}
*task_h = &task->super;
task->super.post = ucc_tl_ucp_allreduce_sliding_window_start;
task->super.progress = ucc_tl_ucp_allreduce_sliding_window_progress;
task->super.finalize = ucc_tl_ucp_allreduce_sliding_window_finalize;

ucc_tl_ucp_allreduce_sliding_window_task_init(coll_args, team, task);

params.mask = UCC_EE_EXECUTOR_PARAM_FIELD_TYPE;
params.ee_type = UCC_EE_CPU_THREAD;
status =
ucc_ee_executor_init(&params, &task->allreduce_sliding_window.executor);

if (UCC_OK != status) {
ucc_error("failed to init executor: %s", ucc_status_string(status));
}

out:
return status;
}
102 changes: 99 additions & 3 deletions src/components/tl/ucp/allreduce/allreduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
enum {
UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
UCC_TL_UCP_ALLREDUCE_ALG_LAST
};

Expand All @@ -35,21 +36,116 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
#define ALLREDUCE_TASK_CHECK(_args, _team) \
CHECK_SAME_MEMTYPE((_args), (_team));

#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024

typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
void *packed_src_memh;
void *packed_dst_memh;
} ucc_tl_ucp_allreduce_sw_global_work_buf_info;

typedef enum ucc_tl_ucp_allreduce_sw_buf_state
{
FREE,
RECVING,
REDUCING,
REDUCED,
SENDING,
IDLE,
} ucc_tl_ucp_allreduce_sw_buf_state;

typedef struct ucc_tl_ucp_allreduce_sw_buf {
void * buf;
ucc_tl_ucp_allreduce_sw_buf_state state;
ucs_status_ptr_t ucp_req;
size_t count;
size_t bytes;
} ucc_tl_ucp_allreduce_sw_buf;

typedef struct ucc_tl_ucp_allreduce_sw_pipeline {
ucc_tl_ucp_allreduce_sw_buf accbuf;
ucc_tl_ucp_allreduce_sw_buf *getbuf;
ucs_status_ptr_t * put_requests;
size_t buffer_size;
size_t num_buffers;
size_t avail_buffs;
size_t my_count;
size_t my_offset;
size_t count_issued;
size_t count_received;
size_t count_reduced;
size_t count_serviced;
size_t get_idx;
size_t red_idx;
ucc_rank_t src_rank;
ucc_rank_t dst_rank;
int done_get;
int done_red;
int done_put;
int posted_put;
} ucc_tl_ucp_allreduce_sw_pipeline;

struct ucc_tl_ucp_allreduce_sw_export_buf {
ucp_context_h ucp_context;
ucp_mem_h memh;
void * packed_memh;
size_t packed_memh_len;
void * packed_key;
size_t packed_key_len;
uint64_t memh_id;
};

typedef struct ucc_tl_ucp_allreduce_sw_host_allgather {
void *src_buf;
void *dst_buf;
char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
} ucc_tl_ucp_allreduce_sw_host_allgather;

ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(
ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);

void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);
ucc_status_t
ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_start(ucc_coll_task_t *task);

Expand Down
Loading