From c872b8d8533a79878f31d101c22ffda3e0668ea3 Mon Sep 17 00:00:00 2001
From: Nick Sarkauskas <nsarkauskas@nvidia.com>
Date: Tue, 23 Jan 2024 09:36:37 -0800
Subject: [PATCH] TL/UCP: Add stubs for sliding window allreduce

---
 config/m4/ucx.m4                              |   1 +
 src/components/tl/ucp/Makefile.am             |  11 +-
 src/components/tl/ucp/allreduce/allreduce.c   |  14 ++
 src/components/tl/ucp/allreduce/allreduce.h   |  26 +++
 .../ucp/allreduce/allreduce_sliding_window.c  |  90 ++++++++
 .../ucp/allreduce/allreduce_sliding_window.h  |  77 +++++++
 .../allreduce_sliding_window_setup.c          |  43 ++++
 src/components/tl/ucp/tl_ucp.c                |  17 ++
 src/components/tl/ucp/tl_ucp.h                |   3 +
 src/components/tl/ucp/tl_ucp_coll.c           |   3 +
 src/components/tl/ucp/tl_ucp_coll.h           |  22 ++
 test/gtest/Makefile.am                        | 102 ++++-----
 test/gtest/coll/test_allreduce.cc             |  59 +++++-
 .../coll/test_allreduce_sliding_window.cc     | 197 ++++++++++++++++++
 .../coll/test_allreduce_sliding_window.h      |  36 ++++
 15 files changed, 648 insertions(+), 53 deletions(-)
 create mode 100644 src/components/tl/ucp/allreduce/allreduce_sliding_window.c
 create mode 100644 src/components/tl/ucp/allreduce/allreduce_sliding_window.h
 create mode 100644 src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c
 create mode 100644 test/gtest/coll/test_allreduce_sliding_window.cc
 create mode 100644 test/gtest/coll/test_allreduce_sliding_window.h

diff --git a/config/m4/ucx.m4 b/config/m4/ucx.m4
index ba57dae303..35bb55f9bd 100644
--- a/config/m4/ucx.m4
+++ b/config/m4/ucx.m4
@@ -119,6 +119,7 @@ AS_IF([test "x$ucx_checked" != "xyes"],[
                 [],
                 [#include <ucs/memory/rcache.h>])
 
+            AC_DEFINE([HAVE_UCX], 1, [Enable UCX support])
 
             AC_COMPILE_IFELSE([AC_LANG_SOURCE([[#include <ucs/config/parser.h>
 					            int main(int argc, char** argv) {
diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am
index 6074ed65c8..013653bfe6 100644
--- a/src/components/tl/ucp/Makefile.am
+++ b/src/components/tl/ucp/Makefile.am
@@ -37,10 +37,13 @@ alltoallv =                        \
 	alltoallv/alltoallv_onesided.c
 
 allreduce =                           \
-	allreduce/allreduce.h             \
-	allreduce/allreduce.c             \
-	allreduce/allreduce_knomial.c     \
-	allreduce/allreduce_sra_knomial.c \
+	allreduce/allreduce.h                      \
+	allreduce/allreduce.c                      \
+	allreduce/allreduce_knomial.c              \
+	allreduce/allreduce_sra_knomial.c          \
+	allreduce/allreduce_sliding_window.h       \
+	allreduce/allreduce_sliding_window.c       \
+	allreduce/allreduce_sliding_window_setup.c \
 	allreduce/allreduce_dbt.c
 
 barrier =                     \
diff --git a/src/components/tl/ucp/allreduce/allreduce.c b/src/components/tl/ucp/allreduce/allreduce.c
index ef3f6f54ad..f9d41a1190 100644
--- a/src/components/tl/ucp/allreduce/allreduce.c
+++ b/src/components/tl/ucp/allreduce/allreduce.c
@@ -25,6 +25,10 @@ ucc_base_coll_alg_info_t
              .name = "dbt",
              .desc = "alreduce over double binary tree where a leaf in one tree "
                      "will be intermediate in other (optimized for BW)"},
+        [UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW] =
+            {.id   = UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
+             .name = "sliding_window",
+             .desc = "sliding window allreduce (optimized for running on DPU)"},
         [UCC_TL_UCP_ALLREDUCE_ALG_LAST] = {
             .id = 0, .name = NULL, .desc = NULL}};
 
@@ -51,3 +55,13 @@ ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
 out:
     return status;
 }
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t __attribute__((unused)) *coll_args, //NOLINT
+                                         ucc_base_team_t __attribute__((unused)) *team, //NOLINT
+                                         ucc_coll_task_t __attribute__((unused)) **task_h) //NOLINT
+{
+    ucc_coll_task_t *coll_task = NULL;
+    ucc_tl_ucp_allreduce_sliding_window_progress(coll_task);
+    return UCC_OK;
+}
diff --git a/src/components/tl/ucp/allreduce/allreduce.h b/src/components/tl/ucp/allreduce/allreduce.h
index 8eb75fb999..3f4c4e2834 100644
--- a/src/components/tl/ucp/allreduce/allreduce.h
+++ b/src/components/tl/ucp/allreduce/allreduce.h
@@ -11,6 +11,7 @@
 enum {
     UCC_TL_UCP_ALLREDUCE_ALG_KNOMIAL,
     UCC_TL_UCP_ALLREDUCE_ALG_SRA_KNOMIAL,
+    UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW,
     UCC_TL_UCP_ALLREDUCE_ALG_DBT,
     UCC_TL_UCP_ALLREDUCE_ALG_LAST
 };
@@ -36,16 +37,41 @@ ucc_status_t ucc_tl_ucp_allreduce_init(ucc_tl_ucp_task_t *task);
 #define ALLREDUCE_TASK_CHECK(_args, _team)                                     \
     CHECK_SAME_MEMTYPE((_args), (_team));
 
+
 ucc_status_t ucc_tl_ucp_allreduce_knomial_init(ucc_base_coll_args_t *coll_args,
                                                ucc_base_team_t *team,
                                                ucc_coll_task_t **task_h);
 
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_init(ucc_base_coll_args_t *coll_args,
+                                         ucc_base_team_t *     team,
+                                         ucc_coll_task_t **    task_h);
+
 ucc_status_t ucc_tl_ucp_allreduce_knomial_init_common(ucc_tl_ucp_task_t *task);
 
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_task_init(ucc_base_coll_args_t *coll_args,
+                                              ucc_base_team_t *     team,
+                                              ucc_tl_ucp_task_t *   task);
+
+ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(
+    ucc_service_coll_req_t *scoll_req, ucc_tl_ucp_task_t *sw_task);
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_free_gwbi(ucc_coll_task_t *coll_task);
+
 ucc_status_t ucc_tl_ucp_allreduce_knomial_start(ucc_coll_task_t *task);
 
 void ucc_tl_ucp_allreduce_knomial_progress(ucc_coll_task_t *task);
 
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t *coll_task);
+
+void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *task);
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t *task);
+
 ucc_status_t ucc_tl_ucp_allreduce_knomial_finalize(ucc_coll_task_t *task);
 
 ucc_status_t ucc_tl_ucp_allreduce_sra_knomial_init(ucc_base_coll_args_t *coll_args,
diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c
new file mode 100644
index 0000000000..59512bcd6d
--- /dev/null
+++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.c
@@ -0,0 +1,90 @@
+/**
+ * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+ *
+ * See file LICENSE for terms.
+ */
+
+#include "allreduce.h"
+#include "allreduce_sliding_window.h"
+#include "../allgather/allgather.h"
+#include "../barrier/barrier.h"
+#include "utils/ucc_dt_reduce.h"
+#include "tl_ucp_ep.h"
+
+
+static inline void //NOLINT
+ucc_tl_ucp_allreduce_sliding_window_reset_buf(ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused))  *buf) //NOLINT
+{
+}
+
+static inline void ucc_tl_ucp_allreduce_sliding_window_reset_pipeline( //NOLINT
+    ucc_tl_ucp_allreduce_sw_pipeline_t __attribute__((unused)) *pipe, ucc_rank_t __attribute__((unused)) rank, //NOLINT
+    size_t __attribute__((unused)) put_window_size) //NOLINT
+{
+}
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_start(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
+{
+    return UCC_OK;
+}
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_finalize(ucc_coll_task_t __attribute__((unused)) *coll_task) //NOLINT
+{
+    return UCC_OK;
+}
+
+static inline void ucc_tl_ucp_allreduce_sliding_window_reduction(
+    ucc_coll_task_t __attribute__((unused)) *coll_task, ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *accbuf,//NOLINT
+    ucc_tl_ucp_allreduce_sw_buf_t __attribute__((unused)) *getbuf)//NOLINT
+{
+}
+
+static inline void
+ucc_tl_ucp_allreduce_sliding_window_test_reduction(ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
+{
+}
+
+static inline ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_req_test(ucs_status_ptr_t __attribute__((unused))  request,//NOLINT
+                                             ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
+{
+    return UCC_OK;
+}
+
+static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(//NOLINT
+    ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
+{
+}
+
+static inline void ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(//NOLINT
+    ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
+{
+}
+
+static inline void
+ucc_tl_ucp_allreduce_sliding_window_barrier(ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
+{
+}
+
+void ucc_tl_ucp_allreduce_sliding_window_progress(ucc_coll_task_t *coll_task)//NOLINT
+{
+    ucs_status_ptr_t request = 0;
+    ucc_tl_ucp_task_t *task = NULL;
+    ucc_tl_ucp_allreduce_sw_buf_t *accbuf = NULL;
+    ucc_tl_ucp_allreduce_sw_buf_t *getbuf = NULL;
+    ucc_tl_ucp_allreduce_sw_pipeline_t *pipe = NULL;
+
+    // suppress "function unused" Werrors
+    ucc_tl_ucp_allreduce_sliding_window_barrier(coll_task);
+    ucc_tl_ucp_allreduce_sliding_window_allgather_free_rkeys(coll_task);
+    ucc_tl_ucp_allreduce_sliding_window_allgather_info_test(coll_task);
+    ucc_tl_ucp_allreduce_sliding_window_req_test(request, task);
+    ucc_tl_ucp_allreduce_sliding_window_test_reduction(task);
+    ucc_tl_ucp_allreduce_sliding_window_reduction(coll_task, accbuf, getbuf);
+    ucc_tl_ucp_allreduce_sliding_window_finalize(coll_task);
+    ucc_tl_ucp_allreduce_sliding_window_start(coll_task);
+    ucc_tl_ucp_allreduce_sliding_window_reset_pipeline(pipe, 0, 0);
+    ucc_tl_ucp_allreduce_sliding_window_reset_buf(accbuf);
+}
diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window.h b/src/components/tl/ucp/allreduce/allreduce_sliding_window.h
new file mode 100644
index 0000000000..c6f68592aa
--- /dev/null
+++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window.h
@@ -0,0 +1,77 @@
+/**
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+ *
+ * See file LICENSE for terms.
+ */
+
+#ifndef ALLREDUCE_SW_H_
+#define ALLREDUCE_SW_H_
+
+#include "tl_ucp_coll.h"
+
+#define ALLREDUCE_PACKED_KEY_MAX_LEN 1024
+
+typedef struct ucc_tl_ucp_allreduce_sw_global_work_buf_info {
+    void *packed_src_memh;
+    void *packed_dst_memh;
+} ucc_tl_ucp_allreduce_sw_global_work_buf_info_t;
+
+typedef enum ucc_tl_ucp_allreduce_sw_buf_state
+{
+    FREE,
+    RECVING,
+    REDUCING,
+    REDUCED,
+    SENDING,
+    IDLE,
+} ucc_tl_ucp_allreduce_sw_buf_state_t;
+
+typedef struct ucc_tl_ucp_allreduce_sw_buf {
+    void                               *buf;
+    ucc_tl_ucp_allreduce_sw_buf_state_t state;
+    ucs_status_ptr_t                    ucp_req;
+    size_t                              count;
+    size_t                              bytes;
+} ucc_tl_ucp_allreduce_sw_buf_t;
+
+typedef struct ucc_tl_ucp_allreduce_sw_pipeline {
+    ucc_tl_ucp_allreduce_sw_buf_t  accbuf;
+    ucc_tl_ucp_allreduce_sw_buf_t *getbuf;
+    ucs_status_ptr_t              *put_requests;
+    size_t                         buffer_size;
+    size_t                         num_buffers;
+    size_t                         avail_buffs;
+    size_t                         my_count;
+    size_t                         my_offset;
+    size_t                         count_issued;
+    size_t                         count_received;
+    size_t                         count_reduced;
+    size_t                         count_serviced;
+    size_t                         get_idx;
+    size_t                         red_idx;
+    ucc_rank_t                     src_rank;
+    ucc_rank_t                     dst_rank;
+    int                            done_get;
+    int                            done_red;
+    int                            done_put;
+    int                            posted_put;
+} ucc_tl_ucp_allreduce_sw_pipeline_t;
+
+struct ucc_tl_ucp_allreduce_sw_export_buf {
+    ucp_context_h ucp_context;
+    ucp_mem_h     memh;
+    void         *packed_memh;
+    size_t        packed_memh_len;
+    void         *packed_key;
+    size_t        packed_key_len;
+    uint64_t      memh_id;
+};
+
+typedef struct ucc_tl_ucp_allreduce_sw_host_allgather {
+    void *src_buf;
+    void *dst_buf;
+    char  packed_src_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
+    char  packed_dst_key[ALLREDUCE_PACKED_KEY_MAX_LEN];
+} ucc_tl_ucp_allreduce_sw_host_allgather_t;
+
+#endif
diff --git a/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c
new file mode 100644
index 0000000000..b90f6528ad
--- /dev/null
+++ b/src/components/tl/ucp/allreduce/allreduce_sliding_window_setup.c
@@ -0,0 +1,43 @@
+/**
+ * Copyright(c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+ *
+ * See file LICENSE for terms.
+ */
+
+#include "allreduce.h"
+#include "allreduce_sliding_window.h"
+#include "../allgather/allgather.h"
+#include "utils/ucc_dt_reduce.h"
+#include "tl_ucp_ep.h"
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_alloc_pipe(
+    ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
+    ucc_base_team_t __attribute__((unused)) *team,//NOLINT
+    ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
+{
+    return UCC_OK;
+}
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_task_init(
+    ucc_base_coll_args_t __attribute__((unused)) *coll_args,//NOLINT
+    ucc_base_team_t __attribute__((unused)) *team,//NOLINT
+    ucc_tl_ucp_task_t __attribute__((unused)) *task)//NOLINT
+{
+    return UCC_OK;
+}
+
+ucc_status_t ucc_tl_ucp_allreduce_sliding_window_allgather_info_finalize(//NOLINT
+   ucc_service_coll_req_t __attribute__((unused)) *scoll_req, //NOLINT
+   ucc_tl_ucp_task_t __attribute__((unused)) *sw_task)//NOLINT
+{
+    return UCC_OK;
+}
+
+ucc_status_t
+ucc_tl_ucp_allreduce_sliding_window_free_gwbi(
+    ucc_coll_task_t __attribute__((unused)) *coll_task)//NOLINT
+{
+    return UCC_OK;
+}
diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c
index 72586dc5de..2ce32ef72d 100644
--- a/src/components/tl/ucp/tl_ucp.c
+++ b/src/components/tl/ucp/tl_ucp.c
@@ -104,6 +104,23 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = {
      ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_kn_radix),
      UCC_CONFIG_TYPE_UINT_RANGED},
 
+    {"ALLREDUCE_SLIDING_WIN_BUF_SIZE", "65536",
+     "Buffer size of the sliding window allreduce algorithm",
+     ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sliding_window_buf_size),
+     UCC_CONFIG_TYPE_MEMUNITS},
+
+    {"ALLREDUCE_SLIDING_WIN_PUT_WINDOW_SIZE", "0",
+     "Max concurrent puts in SW Allreduce. 0 means set to team size",
+     ucc_offsetof(ucc_tl_ucp_lib_config_t,
+                  allreduce_sliding_window_put_window_size),
+     UCC_CONFIG_TYPE_UINT},
+
+    {"ALLREDUCE_SLIDING_WIN_NUM_GET_BUFS", "0",
+     "Number of get buffers for sliding window AR. 0 means set to team size",
+     ucc_offsetof(ucc_tl_ucp_lib_config_t,
+                  allreduce_sliding_window_num_get_bufs),
+     UCC_CONFIG_TYPE_UINT},
+
     {"ALLREDUCE_SRA_KN_RADIX", "auto",
      "Radix of the scatter-reduce-allgather (SRA) knomial allreduce algorithm",
      ucc_offsetof(ucc_tl_ucp_lib_config_t, allreduce_sra_kn_radix),
diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h
index eac2303443..894a33bc6f 100644
--- a/src/components/tl/ucp/tl_ucp.h
+++ b/src/components/tl/ucp/tl_ucp.h
@@ -48,6 +48,9 @@ typedef struct ucc_tl_ucp_lib_config {
     uint32_t                 fanin_kn_radix;
     uint32_t                 fanout_kn_radix;
     uint32_t                 barrier_kn_radix;
+    size_t                   allreduce_sliding_window_buf_size;
+    uint32_t                 allreduce_sliding_window_put_window_size;
+    uint32_t                 allreduce_sliding_window_num_get_bufs;
     ucc_mrange_uint_t        allreduce_kn_radix;
     ucc_mrange_uint_t        allreduce_sra_kn_radix;
     uint32_t                 reduce_scatter_kn_radix;
diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c
index 46bf6a2ed6..615798f1af 100644
--- a/src/components/tl/ucp/tl_ucp_coll.c
+++ b/src/components/tl/ucp/tl_ucp_coll.c
@@ -281,6 +281,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str,
         case UCC_TL_UCP_ALLREDUCE_ALG_DBT:
             *init = ucc_tl_ucp_allreduce_dbt_init;
             break;
+        case UCC_TL_UCP_ALLREDUCE_ALG_SLIDING_WINDOW:
+            *init = ucc_tl_ucp_allreduce_sliding_window_init;
+            break;
         default:
             status = UCC_ERR_INVALID_PARAM;
             break;
diff --git a/src/components/tl/ucp/tl_ucp_coll.h b/src/components/tl/ucp/tl_ucp_coll.h
index 0a8a340955..2ba037d51f 100644
--- a/src/components/tl/ucp/tl_ucp_coll.h
+++ b/src/components/tl/ucp/tl_ucp_coll.h
@@ -88,6 +88,11 @@ enum ucc_tl_ucp_task_flags {
     UCC_TL_UCP_TASK_FLAG_SUBSET = UCC_BIT(0),
 };
 
+typedef struct ucc_tl_ucp_allreduce_sw_pipeline
+    ucc_tl_ucp_allreduce_sw_pipeline;
+typedef struct ucc_tl_ucp_allreduce_sw_host_allgather
+    ucc_tl_ucp_allreduce_sw_host_allgather;
+
 typedef struct ucc_tl_ucp_task {
     ucc_coll_task_t super;
     uint32_t        flags;
@@ -121,6 +126,23 @@ typedef struct ucc_tl_ucp_task {
             ucc_ee_executor_task_t *etask;
             ucc_ee_executor_t      *executor;
         } allreduce_kn;
+        struct {
+            int                                        reduce_in_progress;
+            ucp_rkey_h                                *src_rkeys; //unpacked
+            ucp_rkey_h                                *dst_rkeys; //unpacked
+            void                                     **sbufs;
+            void                                     **rbufs;
+            ucc_tl_ucp_allreduce_sw_pipeline          *pipe;
+            ucc_ee_executor_task_t                    *etask;
+            ucc_ee_executor_t                         *executor;
+            int                                        put_window_size;
+            int                                        num_get_bufs;
+            ucs_status_ptr_t                          *put_requests;
+            ucc_tl_ucp_allreduce_sw_host_allgather    *allgather_data;
+            ucc_schedule_t                            *sw_sched;
+            struct ucc_tl_ucp_allreduce_sw_export_buf *src_ebuf;
+            struct ucc_tl_ucp_allreduce_sw_export_buf *dst_ebuf;
+        } allreduce_sliding_window;
         struct {
             int                     phase;
             ucc_knomial_pattern_t   p;
diff --git a/test/gtest/Makefile.am b/test/gtest/Makefile.am
index 591b2cf005..49c60fe16b 100644
--- a/test/gtest/Makefile.am
+++ b/test/gtest/Makefile.am
@@ -64,47 +64,48 @@ gtest_CXXFLAGS = -std=gnu++11 \
 	-DGTEST_UCM_HOOK_LIB_DIR="\"${abs_builddir}/ucm/test_dlopen/.libs\"" \
 	-DGTEST_UCC_TOP_SRCDIR="\"${UCC_TOP_SRCDIR}\""
 
-gtest_SOURCES =                     \
-	common/gtest-all.cc             \
-	common/test_obj_size.cc         \
-	common/main.cc                  \
-	common/test_ucc.cc              \
-	tl/tl_test.cc                   \
-	core/test_lib_config.cc         \
-	core/test_lib.cc                \
-	core/test_context_config.cc     \
-	core/test_context.cc            \
-	core/test_mc.cc                 \
-	core/test_mc_reduce.cc          \
-	core/test_team.cc               \
-	core/test_schedule.cc           \
-	core/test_topo.cc               \
-	core/test_service_coll.cc       \
-	core/test_timeout.cc            \
-	core/test_utils.cc              \
-	coll/test_barrier.cc            \
-	coll/test_alltoall.cc           \
-	coll/test_alltoallv.cc          \
-	coll/test_allgather.cc          \
-	coll/test_allgatherv.cc         \
-	coll/test_gather.cc         	\
-	coll/test_gatherv.cc         	\
-	coll/test_bcast.cc              \
-	coll/test_reduce.cc             \
-	coll/test_allreduce.cc          \
-	coll/test_reduce_scatter.cc     \
-	coll/test_reduce_scatterv.cc    \
-	coll/test_scatter.cc         	\
-	coll/test_scatterv.cc         	\
-	utils/test_string.cc            \
-	utils/test_ep_map.cc            \
-	utils/test_lock_free_queue.cc   \
-	utils/test_math.cc              \
-	utils/test_cfg_file.cc          \
-	utils/test_parser.cc            \
-	coll_score/test_score.cc        \
-	coll_score/test_score_str.cc    \
-	coll_score/test_score_update.cc \
+gtest_SOURCES =                           \
+	common/gtest-all.cc                   \
+	common/test_obj_size.cc               \
+	common/main.cc                        \
+	common/test_ucc.cc                    \
+	tl/tl_test.cc                         \
+	core/test_lib_config.cc               \
+	core/test_lib.cc                      \
+	core/test_context_config.cc           \
+	core/test_context.cc                  \
+	core/test_mc.cc                       \
+	core/test_mc_reduce.cc                \
+	core/test_team.cc                     \
+	core/test_schedule.cc                 \
+	core/test_topo.cc                     \
+	core/test_service_coll.cc             \
+	core/test_timeout.cc                  \
+	core/test_utils.cc                    \
+	coll/test_barrier.cc                  \
+	coll/test_alltoall.cc                 \
+	coll/test_alltoallv.cc                \
+	coll/test_allgather.cc                \
+	coll/test_allgatherv.cc               \
+	coll/test_gather.cc                   \
+	coll/test_gatherv.cc                  \
+	coll/test_bcast.cc                    \
+	coll/test_reduce.cc                   \
+	coll/test_allreduce_sliding_window.cc \
+	coll/test_allreduce.cc                \
+	coll/test_reduce_scatter.cc           \
+	coll/test_reduce_scatterv.cc          \
+	coll/test_scatter.cc                  \
+	coll/test_scatterv.cc                 \
+	utils/test_string.cc                  \
+	utils/test_ep_map.cc                  \
+	utils/test_lock_free_queue.cc         \
+	utils/test_math.cc                    \
+	utils/test_cfg_file.cc                \
+	utils/test_parser.cc                  \
+	coll_score/test_score.cc              \
+	coll_score/test_score_str.cc          \
+	coll_score/test_score_update.cc       \
 	active_set/test_active_set.cc
 
 if TL_MLX5_ENABLED
@@ -134,13 +135,20 @@ gtest_LDFLAGS  += $(HIP_LDFLAGS)
 gtest_LDADD    += $(HIP_LIBS)
 endif
 
+if HAVE_UCX
+gtest_CXXFLAGS += $(UCX_CXXFLAGS)
+gtest_CPPFLAGS += $(UCX_CPPFLAGS)
+gtest_LDFLAGS  += $(UCX_LDFLAGS)
+gtest_LDADD    += $(UCX_LIBS) $(UCX_LIBADD)
+endif
 
-noinst_HEADERS =            \
-	common/gtest.h          \
-	common/test.h           \
-	common/test_ucc.h       \
-	core/test_context.h     \
-	core/test_mc_reduce.h   \
+noinst_HEADERS =                         \
+	common/gtest.h                       \
+	common/test.h                        \
+	common/test_ucc.h                    \
+	core/test_context.h                  \
+	core/test_mc_reduce.h                \
+	coll/test_allreduce_sliding_window.h \
 	coll_score/test_score.h
 
 .PHONY: test test gdb valgrind fix_rpath ucc
diff --git a/test/gtest/coll/test_allreduce.cc b/test/gtest/coll/test_allreduce.cc
index 7e718cefaa..8048f8716b 100644
--- a/test/gtest/coll/test_allreduce.cc
+++ b/test/gtest/coll/test_allreduce.cc
@@ -7,6 +7,9 @@
 #include "common/test_ucc.h"
 #include "utils/ucc_math.h"
 
+// For sliding window allreduce
+#include "test_allreduce_sliding_window.h"
+
 #include <array>
 
 template<typename T>
@@ -23,8 +26,9 @@ class test_allreduce : public UccCollArgs, public testing::Test {
             ctxs[r] = (gtest_ucc_coll_ctx_t*)calloc(1, sizeof(gtest_ucc_coll_ctx_t));
             ctxs[r]->args = coll;
 
-            coll->coll_type = UCC_COLL_TYPE_ALLREDUCE;
-            coll->op        = T::redop;
+            coll->coll_type          = UCC_COLL_TYPE_ALLREDUCE;
+            coll->op                 = T::redop;
+            coll->global_work_buffer = NULL;
 
             ctxs[r]->init_buf = ucc_malloc(ucc_dt_size(dt) * count, "init buf");
             EXPECT_NE(ctxs[r]->init_buf, nullptr);
@@ -433,6 +437,57 @@ TYPED_TEST(test_allreduce_alg, rab_pipelined) {
     }
 }
 
+#ifdef HAVE_UCX
+TYPED_TEST(test_allreduce_alg, sliding_window)
+{
+    int              n_procs = 8;
+    ucc_job_env_t    env     = {{"UCC_TL_UCP_TUNE", "allreduce:@2"},
+                                {"UCC_CLS", "all"}};
+    UccJob           job(n_procs, UccJob::UCC_JOB_CTX_GLOBAL_ONESIDED, env);
+    UccTeam_h        team      = job.create_team(n_procs);
+    int              repeat    = 3;
+    test_ucp_info_t *ucp_infos = NULL;
+    std::vector<ucc_memory_type_t> mt = {UCC_MEMORY_TYPE_HOST};
+    ucs_status_t     ucs_status = UCS_OK;
+    UccCollCtxVec    ctxs;
+
+    if (UCC_OK == ucc_mc_available(
+                      UCC_MEMORY_TYPE_CUDA)) { //add cuda_managed for cl hier?
+        mt.push_back(UCC_MEMORY_TYPE_CUDA);
+    }
+
+    for (auto count : {65536, 123567}) {
+        for (auto inplace : {TEST_NO_INPLACE, TEST_INPLACE}) {
+            for (auto m : mt) {
+                SET_MEM_TYPE(m);
+                this->set_inplace(inplace);
+                this->data_init(n_procs, TypeParam::dt, count, ctxs, true);
+
+                // set args->global_work_buffer on each ctx
+                ucs_status = setup_gwbi(n_procs, ctxs, &ucp_infos, inplace == TEST_INPLACE);
+                if (ucs_status != UCS_OK) {
+                    free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE);
+                    this->data_fini(ctxs);
+                    if (ucs_status == UCS_ERR_UNSUPPORTED) {
+                        GTEST_SKIP() << "Exported memory key not supported";
+                    } else {
+                        GTEST_FAIL() << ucs_status_string(ucs_status);
+                    }
+                }
+
+                for (auto i = 0; i < repeat; i++) {
+                    this->reset(ctxs);
+                }
+
+                free_gwbi(n_procs, ctxs, ucp_infos, inplace == TEST_INPLACE);
+                ucp_infos = NULL;
+                this->data_fini(ctxs);
+            }
+        }
+    }
+}
+#endif
+
 template <typename T>
 class test_allreduce_avg_order : public test_allreduce<T> {
 };
diff --git a/test/gtest/coll/test_allreduce_sliding_window.cc b/test/gtest/coll/test_allreduce_sliding_window.cc
new file mode 100644
index 0000000000..c6aebc9f2f
--- /dev/null
+++ b/test/gtest/coll/test_allreduce_sliding_window.cc
@@ -0,0 +1,197 @@
+/**
+ * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+ * See file LICENSE for terms.
+ */
+
+/*
+   This file is for setting up the global work buffer for sliding window 
+   allreduce. This entails allocating ucp workers, registering memory,
+   exchanging rkeys, and allocating the pipeline datastructure the 
+   algorithm uses.
+*/
+
+#include "common/test_ucc.h"
+
+#ifdef HAVE_UCX
+
+#include "core/test_mc_reduce.h"
+#include "utils/ucc_math.h"
+
+#include <array>
+
+#include "test_allreduce_sliding_window.h"
+
+void test_init_ucp(ucp_context_h *ucp_ctx, ucp_config_t **ucp_config_p)
+{
+    ucs_status_t  ucs_status;
+    ucp_config_t *ucp_config;
+    ucp_params_t  ucp_params;
+    ucp_context_h ucp_context;
+
+    ucs_status = ucp_config_read(NULL, NULL, &ucp_config);
+    EXPECT_EQ(UCS_OK, ucs_status) << "ucp_config_read() returned error: "
+                                  << ucs_status_string(ucs_status);
+
+    ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES;
+    ucp_params.features   = UCP_FEATURE_TAG | UCP_FEATURE_RMA |
+                          UCP_FEATURE_AMO64 | UCP_FEATURE_EXPORTED_MEMH;
+
+    ucs_status = ucp_init(&ucp_params, ucp_config, &ucp_context);
+    EXPECT_EQ(UCS_OK, ucs_status) << "ucp_init() returned error: "
+                                  << ucs_status_string(ucs_status);
+
+    *ucp_ctx = ucp_context;
+    *ucp_config_p = ucp_config;
+}
+
+ucs_status_t buffer_export_ucc(ucp_context_h ucp_context, void *buf, size_t len,
+                      struct export_buf *ebuf)
+{
+    ucs_status_t           ucs_status = UCS_OK;
+    ucp_mem_map_params_t   params;
+    ucp_memh_pack_params_t pack_params;
+
+    ebuf->ucp_context = ucp_context;
+
+    params.field_mask =
+        UCP_MEM_MAP_PARAM_FIELD_ADDRESS | UCP_MEM_MAP_PARAM_FIELD_LENGTH;
+    params.address = buf;
+    params.length  = len;
+
+    ucs_status = ucp_mem_map(ucp_context, &params, &ebuf->memh);
+    if (ucs_status != UCS_OK) {
+         printf("ucp_mem_map() returned error: %s\n", ucs_status_string(ucs_status));
+         return ucs_status;
+    }
+
+    pack_params.field_mask = UCP_MEMH_PACK_PARAM_FIELD_FLAGS;
+    pack_params.flags      = UCP_MEMH_PACK_FLAG_EXPORT;
+
+    ucs_status = ucp_memh_pack(ebuf->memh, &pack_params, &ebuf->packed_memh,
+                               &ebuf->packed_memh_len);
+    if (ucs_status != UCS_OK) {
+        printf("ucp_memh_pack() returned error: %s\n", ucs_status_string(ucs_status));
+        return ucs_status;
+    }
+
+    return ucs_status;
+}
+
+ucs_status_t setup_gwbi(int n_procs, UccCollCtxVec &ctxs,
+                test_ucp_info_t **ucp_infos_p /* out */, bool inplace)
+{
+    int i;
+    ucs_status_t ucs_status = UCS_OK;
+
+    test_ucp_info_t *ucp_infos =
+        (test_ucp_info_t *)ucc_malloc(sizeof(test_ucp_info_t) * n_procs);
+    EXPECT_NE(ucp_infos, nullptr);
+    *ucp_infos_p = ucp_infos;
+
+    // allocate gwbi
+    for (auto ctx : ctxs) {
+        global_work_buf_info *gwbi =
+            (global_work_buf_info *)ucc_malloc(
+                sizeof(global_work_buf_info),
+                "global work buf info");
+
+        EXPECT_NE(gwbi, nullptr);
+
+        ctx->args->global_work_buffer = gwbi;
+    }
+
+    // setup ucp contexts and workers
+    for (i = 0; i < n_procs; i++) {
+        test_init_ucp(&ucp_infos[i].ucp_ctx, &ucp_infos[i].ucp_config);
+        ucp_infos[i].src_ebuf = {0};
+        ucp_infos[i].dst_ebuf = {0};
+    }
+
+    // set up packed src/dst memh
+    for (i = 0; i < n_procs; i++) {
+        // my proc's gwbi
+        global_work_buf_info *gwbi =
+            (global_work_buf_info *)ctxs[i]
+                ->args->global_work_buffer;
+        // my proc's ucp_info
+        test_ucp_info_t *       ucp_info = &ucp_infos[i];
+        struct export_buf *dst_ebuf = &ucp_info->dst_ebuf;
+        size_t             dst_len  = ctxs[i]->args->dst.info.count *
+                         ucc_dt_size(ctxs[i]->args->dst.info.datatype);
+
+        ucs_status = buffer_export_ucc(
+                          ucp_info->ucp_ctx, ctxs[i]->args->dst.info.buffer,
+                          dst_len, dst_ebuf);
+        if (ucs_status != UCS_OK) return ucs_status;
+
+        gwbi->packed_dst_memh = dst_ebuf->packed_memh;
+
+        if (!inplace) {
+            size_t src_len = ctxs[i]->args->src.info.count *
+                             ucc_dt_size(ctxs[i]->args->src.info.datatype);
+            struct export_buf *src_ebuf = &ucp_info->src_ebuf;
+            ucs_status = buffer_export_ucc(
+                              ucp_info->ucp_ctx, ctxs[i]->args->src.info.buffer,
+                              src_len, src_ebuf);
+            if (ucs_status != UCS_OK) return ucs_status;
+
+            gwbi->packed_src_memh = src_ebuf->packed_memh;
+        }
+    }
+
+    // set the flag that indicates the global work buffer was passed
+    for (auto ctx : ctxs) {
+        ctx->args->mask |=
+            UCC_COLL_ARGS_FIELD_FLAGS | UCC_COLL_ARGS_FIELD_GLOBAL_WORK_BUFFER;
+        ctx->args->flags |= UCC_COLL_ARGS_FLAG_MEM_MAPPED_BUFFERS;
+    }
+
+    return ucs_status;
+}
+
+void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos,
+               bool inplace)
+{
+    int i, k;
+    ucs_status_t ucs_status;
+
+    // free sbufs, rbufs, src_rkeys, and dst_rkeys
+    for (i = 0; i < n_procs; i++) {
+        // my proc's ucp_info
+        test_ucp_info_t *ucp_info = &ucp_infos[i];
+
+        if (!inplace) {
+            struct export_buf *src_ebuf = &ucp_info->src_ebuf;
+            if (src_ebuf->memh != 0) {
+                ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, src_ebuf->memh);
+                ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: "
+                                            << ucs_status_string(ucs_status);
+            }
+        }
+
+        struct export_buf *dst_ebuf = &ucp_info->dst_ebuf;
+        if (dst_ebuf->memh != 0) {
+            ucs_status = ucp_mem_unmap(ucp_info->ucp_ctx, dst_ebuf->memh);
+            ASSERT_EQ(UCS_OK, ucs_status) << "ucp_mem_unmap() returned error: "
+                                        << ucs_status_string(ucs_status);
+        }
+    }
+
+    // free ucp contexts
+    for (i = 0; i < n_procs; i++) {
+        ucp_config_release(ucp_infos[i].ucp_config);
+        ucp_cleanup(ucp_infos[i].ucp_ctx);
+    }
+
+    // free gwbi and each gwbi's set of pipes
+    for (k = 0; k < n_procs; k++) {
+        global_work_buf_info *gwbi =
+            (global_work_buf_info *) ctxs[k]->args->global_work_buffer;
+
+        ucc_free(gwbi);
+    }
+
+    ucc_free(ucp_infos);
+}
+
+#endif
diff --git a/test/gtest/coll/test_allreduce_sliding_window.h b/test/gtest/coll/test_allreduce_sliding_window.h
new file mode 100644
index 0000000000..0c14b7275a
--- /dev/null
+++ b/test/gtest/coll/test_allreduce_sliding_window.h
@@ -0,0 +1,36 @@
+#ifndef TEST_ALLREDUCE_SW_H
+#define TEST_ALLREDUCE_SW_H
+
+#include "common/test_ucc.h"
+
+#ifdef HAVE_UCX
+
+#include <ucp/api/ucp.h>
+
+typedef struct global_work_buf_info {
+    void *packed_src_memh;
+    void *packed_dst_memh;
+} global_work_buf_info;
+
+struct export_buf {
+    ucp_context_h ucp_context;
+    ucp_mem_h     memh;
+    void *        packed_memh;
+    size_t        packed_memh_len;
+    uint64_t      memh_id;
+};
+
+typedef struct test_ucp_info_t {
+    ucp_context_h     ucp_ctx;
+    ucp_config_t     *ucp_config;
+    struct export_buf src_ebuf;
+    struct export_buf dst_ebuf;
+} test_ucp_info_t;
+
+void free_gwbi(int n_procs, UccCollCtxVec &ctxs, test_ucp_info_t *ucp_infos,
+               bool inplace);
+ucs_status_t setup_gwbi(int n_procs, UccCollCtxVec &ctxs,
+                test_ucp_info_t **ucp_infos_p /* out */, bool inplace);
+
+#endif
+#endif