Skip to content

Commit

Permalink
TL/UCP: Add stubs for sliding window allreduce
Browse files Browse the repository at this point in the history
  • Loading branch information
nsarka committed Mar 1, 2024
1 parent c9f1286 commit 5f50edf
Show file tree
Hide file tree
Showing 15 changed files with 626 additions and 57 deletions.
10 changes: 6 additions & 4 deletions src/components/tl/ucp/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ alltoallv = \
alltoallv/alltoallv_onesided.c

allreduce = \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sra_knomial.c \
allreduce/allreduce.h \
allreduce/allreduce.c \
allreduce/allreduce_knomial.c \
allreduce/allreduce_sra_knomial.c \
allreduce/allreduce_sliding_window.c \
allreduce/allreduce_sliding_window_setup.c \
allreduce/allreduce_dbt.c

barrier = \
Expand Down
14 changes: 14 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ ucc_base_coll_alg_info_t
.name = "dbt",
.desc = "alreduce over double binary tree where a leaf in one tree "
"will be intermediate in other (optimized for BW)"},
[UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
{.id = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
.name = "sliding_window",
.desc = "sliding window allreduce (optimized for running on DPU)"},
[UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

Expand All @@ -51,3 +55,13 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
out:
return status;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t __attribute__((unused)) *coll_args, //NOLINT
ucc_base_team_t __attribute__((unused)) *team, //NOLINT
ucc_coll_task_t __attribute__((unused)) **task_h) //NOLINT
{
ucc_coll_task_t *coll_task = NULL;
ucc_tl_ucp_allreduce_sliding_window_progress(coll_task);
return UCC_OK;
}
90 changes: 90 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
enum {
UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
UCC_TL_UCP_ALLREDUCE_ALG_DBT,
UCC_TL_UCP_ALLREDUCE_ALG_LAST
};
Expand All @@ -36,16 +37,105 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
#define ALLREDUCE_TASK_CHECK(_args, _team) \
CHECK_SAME_MEMTYPE((_args), (_team));

#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024

typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
void *packed_src_memh;
void *packed_dst_memh;
} ucc_tl_ucp_allreduce_sw_global_work_buf_info_t;

typedef enum ucc_tl_ucp_allreduce_sw_buf_state
{
FREE,
RECVING,
REDUCING,
REDUCED,
SENDING,
IDLE,
} ucc_tl_ucp_allreduce_sw_buf_state_t;

typedef struct ucc_tl_ucp_allreduce_sw_buf {
void *buf;
ucc_tl_ucp_allreduce_sw_buf_state_t state;
ucs_status_ptr_t ucp_req;
size_t count;
size_t bytes;
} ucc_tl_ucp_allreduce_sw_buf_t;

typedef struct ucc_tl_ucp_allreduce_sw_pipeline {
ucc_tl_ucp_allreduce_sw_buf_t accbuf;
ucc_tl_ucp_allreduce_sw_buf_t *getbuf;
ucs_status_ptr_t *put_requests;
size_t buffer_size;
size_t num_buffers;
size_t avail_buffs;
size_t my_count;
size_t my_offset;
size_t count_issued;
size_t count_received;
size_t count_reduced;
size_t count_serviced;
size_t get_idx;
size_t red_idx;
ucc_rank_t src_rank;
ucc_rank_t dst_rank;
int done_get;
int done_red;
int done_put;
int posted_put;
} ucc_tl_ucp_allreduce_sw_pipeline_t;

struct ucc_tl_ucp_allreduce_sw_export_buf {
ucp_context_h ucp_context;
ucp_mem_h memh;
void *packed_memh;
size_t packed_memh_len;
void *packed_key;
size_t packed_key_len;
uint64_t memh_id;
};

typedef struct ucc_tl_ucp_allreduce_sw_host_allgather {
void *src_buf;
void *dst_buf;
char packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
char packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
} ucc_tl_ucp_allreduce_sw_host_allgather_t;

ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task_h);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_tl_ucp_task_t * task);

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(
ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);

void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);

ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
Expand Down
90 changes: 90 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce_sliding_window.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "allreduce.h"
#include "../allgather/allgather.h"
#include "../barrier/barrier.h"
#include "utils/ucc_dt_reduce.h"
#include "tl_ucp_ep.h"


static inline void //NOLINT
ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *buf) //NOLINT
{
}

static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( //NOLINT
ucc_tl_ucp_allreduce_sw_pipeline_t __attribute__((unused)) *pipe, ucc_rank_t __attribute__((unused)) rank, //NOLINT
size_t __attribute__((unused)) put_window_size) //NOLINT
{
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
{
return UCC_OK;
}

static inline void ucc_tl_ucp_allreduce_sliding_window_reduction(
ucc_coll_task_t __attribute__((unused)) *coll_task, ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *accbuf,//NOLINT
ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *getbuf)//NOLINT
{
}

static inline void
ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
}

static inline ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t __attribute__((unused)) request,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(//NOLINT
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(//NOLINT
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

static inline void
ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
}

void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task)//NOLINT
{
ucs_status_ptr_t request = 0;
ucc_tl_ucp_task_t *task = NULL;
ucc_tl_ucp_allreduce_sw_buf_t *accbuf = NULL;
ucc_tl_ucp_allreduce_sw_buf_t *getbuf = NULL;
ucc_tl_ucp_allreduce_sw_pipeline_t *pipe = NULL;

// suppress "function unused" Werrors
ucc_tl_ucp_allreduce_sliding_window_barrier(coll_task);
ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(coll_task);
ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(coll_task);
ucc_tl_ucp_allreduce_sliding_window_req_test(request, task);
ucc_tl_ucp_allreduce_sliding_window_test_reduction(task);
ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, getbuf);
ucc_tl_ucp_allreduce_sliding_window_finalize(coll_task);
ucc_tl_ucp_allreduce_sliding_window_start(coll_task);
ucc_tl_ucp_allreduce_sliding_window_reset_pipeline(pipe, 0, 0);
ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf);
}
// NOLINTEND
42 changes: 42 additions & 0 deletions src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "allreduce.h"
#include "../allgather/allgather.h"
#include "utils/ucc_dt_reduce.h"
#include "tl_ucp_ep.h"

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(
ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
ucc_base_team_t __attribute__((unused)) *team,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_task_init(
ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
ucc_base_team_t __attribute__((unused)) *team,//NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
{
return UCC_OK;
}

ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(//NOLINT
ucc_service_coll_req_t __attribute__((unused)) *scoll_req, //NOLINT
ucc_tl_ucp_task_t __attribute__((unused)) *sw_task)//NOLINT
{
return UCC_OK;
}

ucc_status_t
ucc_tl_ucp_allreduce_sliding_window_free_gwbi(
ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
{
return UCC_OK;
}
3 changes: 0 additions & 3 deletions src/components/tl/ucp/barrier/barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
#include "tl_ucp.h"
#include "barrier.h"

ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task);
void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task);

ucc_base_coll_alg_info_t
ucc_tl_ucp_barrier_algs[UCC_TL_UCP_BARRIER_ALG_LAST + 1] = {
[UCC_TL_UCP_BARRIER_ALG_KNOMIAL] =
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/barrier/barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ extern ucc_base_coll_alg_info_t

ucc_status_t ucc_tl_ucp_barrier_init(ucc_tl_ucp_task_t *task);

ucc_status_t ucc_tl_ucp_barrier_knomial_start(ucc_coll_task_t *task);
void ucc_tl_ucp_barrier_knomial_progress(ucc_coll_task_t *task);

#endif
17 changes: 17 additions & 0 deletions src/components/tl/ucp/tl_ucp.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix),
UCC_CONFIG_TYPE_UINT_RANGED},

{"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536",
"Buffer size of the sliding window allreduce algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size),
UCC_CONFIG_TYPE_MEMUNITS},

{"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0",
"Max concurrent puts in SW Allreduce. <= 0 means set to team size",
ucc_offsetof(ucc_tl_ucp_lib_config_t,
allreduce_sliding_window_put_window_size),
UCC_CONFIG_TYPE_UINT},

{"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0",
"Number of get buffers for sliding window AR. <= 0 means set to team size",
ucc_offsetof(ucc_tl_ucp_lib_config_t,
allreduce_sliding_window_num_get_bufs),
UCC_CONFIG_TYPE_UINT},

{"ALLREDUCE_SRA_KN_RADIX", "auto",
"Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm",
ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix),
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config {
uint32_t fanin_kn_radix;
uint32_t fanout_kn_radix;
uint32_t barrier_kn_radix;
size_t allreduce_sliding_window_buf_size;
uint32_t allreduce_sliding_window_put_window_size;
uint32_t allreduce_sliding_window_num_get_bufs;
ucc_mrange_uint_t allreduce_kn_radix;
ucc_mrange_uint_t allreduce_sra_kn_radix;
uint32_t reduce_scatter_kn_radix;
Expand Down
3 changes: 3 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
case UCC_TL_UCP_ALLREDUCE_ALG_DBT:
*init = ucc_tl_ucp_allreduce_dbt_init;
break;
case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW:
*init = ucc_tl_ucp_allreduce_sliding_window_init;
break;
default:
status = UCC_ERR_INVALID_PARAM;
break;
Expand Down
26 changes: 26 additions & 0 deletions src/components/tl/ucp/tl_ucp_coll.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ enum ucc_tl_ucp_task_flags {
UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0),
};

typedef struct ucc_tl_ucp_allreduce_sw_pipeline
ucc_tl_ucp_allreduce_sw_pipeline;
typedef struct ucc_tl_ucp_allreduce_sw_host_allgather
ucc_tl_ucp_allreduce_sw_host_allgather;

typedef struct ucc_tl_ucp_task {
ucc_coll_task_t super;
uint32_t flags;
Expand Down Expand Up @@ -121,6 +126,27 @@ typedef struct ucc_tl_ucp_task {
ucc_ee_executor_task_t *etask;
ucc_ee_executor_t *executor;
} allreduce_kn;
struct {
int reduce_in_progress;
ucp_rkey_h *src_rkeys; //unpacked
ucp_rkey_h *dst_rkeys; //unpacked
ucp_ep_h *eps;
void **sbufs;
void **rbufs;
ucc_coll_task_t *allreduce_task_h;
ucc_tl_ucp_allreduce_sw_pipeline *pipe;
ucc_ee_executor_task_t *etask;
ucc_ee_executor_t *executor;
int put_window_size;
int num_get_bufs;
ucs_status_ptr_t *put_requests;
ucc_service_coll_req_t *allgather_scoll_req;
ucc_tl_ucp_allreduce_sw_host_allgather *allgather_data;
ucc_coll_task_t *barrier_task;
struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf;
struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf;
int inplace;
} allreduce_sliding_window;
struct {
int phase;
ucc_knomial_pattern_t p;
Expand Down
Loading

0 comments on commit 5f50edf

Please sign in to comment.