Skip to content

Commit

Permalink
TL/MLX5: device memory and topo init (#780)
Browse files Browse the repository at this point in the history
* TL/MLX5: device memory allocation

* TL/MLX5: topo init

* TL/MLX5: fix mask in wait-on-data WQE

* TEST: update tl/mlx5 gtest

* TL/MLX5: minor comments

* TL/MLX5: minor revisions

* TL/MLX5: gather global status after dm alloc

* TL/MLX5: cleanup team if dm alloc fails

* CODESTYLE: clang format

* TL/MLX5: minor comments
  • Loading branch information
samnordmann authored Jun 2, 2023
1 parent 0d07b2f commit 447c786
Show file tree
Hide file tree
Showing 9 changed files with 620 additions and 56 deletions.
4 changes: 3 additions & 1 deletion src/components/tl/mlx5/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ sources = \
tl_mlx5_wqe.c \
tl_mlx5_pd.h \
tl_mlx5_pd.c \
tl_mlx5_rcache.c
tl_mlx5_rcache.c \
tl_mlx5_dm.c \
tl_mlx5_dm.h

module_LTLIBRARIES = libucc_tl_mlx5.la
libucc_tl_mlx5_la_SOURCES = $(sources)
Expand Down
32 changes: 25 additions & 7 deletions src/components/tl/mlx5/tl_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,33 @@ typedef struct ucc_tl_mlx5_context {
UCC_CLASS_DECLARE(ucc_tl_mlx5_context_t, const ucc_base_context_params_t *,
const ucc_base_config_t *);

typedef struct ucc_tl_mlx5_schedule ucc_tl_mlx5_schedule_t;
typedef struct ucc_tl_mlx5_dm_chunk {
ptrdiff_t offset; /* 0 based offset from the beginning of
memic_mr (obtained with ibv_reg_dm_mr) */
ucc_tl_mlx5_schedule_t *task;
} ucc_tl_mlx5_dm_chunk_t;

typedef struct ucc_tl_mlx5_a2a ucc_tl_mlx5_a2a_t;

typedef enum
{
TL_MLX5_TEAM_STATE_INIT,
TL_MLX5_TEAM_STATE_POSTED,
} ucc_tl_mlx5_team_state_t;

typedef struct ucc_tl_mlx5_team {
ucc_tl_team_t super;
ucc_service_coll_req_t *scoll_req;
void * oob_req;
ucc_mpool_t dm_pool;
struct ibv_dm * dm_ptr;
struct ibv_mr * dm_mr;
ucc_tl_mlx5_a2a_t * a2a;
ucc_tl_team_t super;
ucc_status_t status[2];
ucc_service_coll_req_t *scoll_req;
ucc_tl_mlx5_team_state_t state;
void *dm_offset;
ucc_mpool_t dm_pool;
struct ibv_dm *dm_ptr;
struct ibv_mr *dm_mr;
ucc_tl_mlx5_a2a_t *a2a;
ucc_topo_t *topo;
ucc_ep_map_t ctx_map;
} ucc_tl_mlx5_team_t;
UCC_CLASS_DECLARE(ucc_tl_mlx5_team_t, ucc_base_context_t *,
const ucc_base_team_params_t *);
Expand Down
172 changes: 172 additions & 0 deletions src/components/tl/mlx5/tl_mlx5_dm.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/**
* Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "tl_mlx5_dm.h"

#define DM_HOST_AUTO_NUM_CHUNKS 8

static void ucc_tl_mlx5_dm_chunk_init(ucc_mpool_t *mp, //NOLINT
void *obj, void *chunk) //NOLINT
{
ucc_tl_mlx5_dm_chunk_t *c = (ucc_tl_mlx5_dm_chunk_t *)obj;
ucc_tl_mlx5_team_t *team =
ucc_container_of(mp, ucc_tl_mlx5_team_t, dm_pool);

c->offset = (ptrdiff_t)team->dm_offset;
team->dm_offset = PTR_OFFSET(team->dm_offset,
UCC_TL_MLX5_TEAM_LIB(team)->cfg.dm_buf_size);
}

static void ucc_tl_mlx5_dm_chunk_release(ucc_mpool_t *mp, void *chunk) //NOLINT
{
ucc_free(chunk);
}

static ucc_mpool_ops_t ucc_tl_mlx5_dm_ops = {.chunk_alloc = ucc_mpool_hugetlb_malloc,
.chunk_release =
ucc_tl_mlx5_dm_chunk_release,
.obj_init = ucc_tl_mlx5_dm_chunk_init,
.obj_cleanup = NULL};

void ucc_tl_mlx5_dm_cleanup(ucc_tl_mlx5_team_t *team)
{
if (!team->dm_ptr) {
return;
}

ucc_mpool_cleanup(&team->dm_pool, 1);

ibv_dereg_mr(team->dm_mr);
if (UCC_TL_MLX5_TEAM_LIB(team)->cfg.dm_host) {
ucc_free(team->dm_ptr);
} else {
ibv_free_dm(team->dm_ptr);
}
}

ucc_status_t ucc_tl_mlx5_dm_alloc_reg(struct ibv_context *ib_ctx,
struct ibv_pd *pd, int dm_host,
size_t buf_size, size_t *buf_num_p,
struct ibv_dm **ptr, struct ibv_mr **mr,
ucc_base_lib_t *lib)
{
struct ibv_dm *dm_ptr = NULL;
struct ibv_mr *dm_mr;
struct ibv_device_attr_ex attr;
struct ibv_alloc_dm_attr dm_attr;
int max_chunks_to_alloc, min_chunks_to_alloc, i;

if (dm_host) {
max_chunks_to_alloc = (*buf_num_p == UCC_ULUNITS_AUTO)
? DM_HOST_AUTO_NUM_CHUNKS
: *buf_num_p;
dm_attr.length = max_chunks_to_alloc * buf_size;
dm_ptr = ucc_malloc(dm_attr.length, "memic_host");
if (!dm_ptr) {
tl_error(lib, " memic_host allocation failed");
return UCC_ERR_NO_MEMORY;
}

dm_mr = ibv_reg_mr(pd, dm_ptr, dm_attr.length,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
if (!dm_mr) {
tl_error(lib, "failed to reg host memory");
ucc_free(dm_ptr);
return UCC_ERR_NO_MESSAGE;
}
*buf_num_p = max_chunks_to_alloc;
} else {
attr.comp_mask = 0;
if (ibv_query_device_ex(ib_ctx, NULL, &attr)) {
tl_error(lib, "failed to query device (errno=%d)", errno);
return UCC_ERR_NO_MESSAGE;
}
if (!attr.max_dm_size) {
tl_error(lib, "device doesn't support dm allocation");
return UCC_ERR_NO_RESOURCE;
}
max_chunks_to_alloc = min_chunks_to_alloc = *buf_num_p;
if (*buf_num_p == UCC_ULUNITS_AUTO) {
max_chunks_to_alloc =
attr.max_dm_size / buf_size - 1; //keep reserved memory
min_chunks_to_alloc = 1;
if (!max_chunks_to_alloc) {
tl_error(lib,
"requested buffer size (=%ld) is too large, "
"should be set to be strictly less than %ld. "
"max allocation size is %ld",
buf_size, attr.max_dm_size / 2, attr.max_dm_size);
return UCC_ERR_NO_RESOURCE;
}
}
if (attr.max_dm_size < buf_size * min_chunks_to_alloc) {
tl_error(lib,
"cannot allocate %i buffer(s) of size %ld, "
"max allocation size is %ld",
min_chunks_to_alloc, buf_size, attr.max_dm_size);
return UCC_ERR_NO_MEMORY;
}
memset(&dm_attr, 0, sizeof(dm_attr));
for (i = max_chunks_to_alloc; i >= min_chunks_to_alloc; i--) {
dm_attr.length = i * buf_size;
errno = 0;
dm_ptr = ibv_alloc_dm(ib_ctx, &dm_attr);
if (dm_ptr) {
break;
}
}
if (!dm_ptr) {
tl_error(lib,
"dev mem allocation failed, requested %ld, attr.max %zd, "
"errno %d",
dm_attr.length, attr.max_dm_size, errno);
return errno == ENOMEM || errno == ENOSPC ? UCC_ERR_NO_MEMORY
: UCC_ERR_NO_MESSAGE;
}
dm_mr = ibv_reg_dm_mr(pd, dm_ptr, 0, dm_attr.length,
IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE |
IBV_ACCESS_ZERO_BASED);
if (!dm_mr) {
tl_error(lib, "failed to reg memic");
ibv_free_dm(dm_ptr);
return UCC_ERR_NO_MESSAGE;
}
*buf_num_p = i;
}
*ptr = dm_ptr;
*mr = dm_mr;

return UCC_OK;
}

ucc_status_t ucc_tl_mlx5_dm_init(ucc_tl_mlx5_team_t *team)
{
ucc_tl_mlx5_context_t *ctx = UCC_TL_MLX5_TEAM_CTX(team);
ucc_tl_mlx5_lib_config_t *cfg = &UCC_TL_MLX5_TEAM_LIB(team)->cfg;
ucc_status_t status;

status = ucc_tl_mlx5_dm_alloc_reg(
ctx->shared_ctx, ctx->shared_pd, cfg->dm_host, cfg->dm_buf_size,
&cfg->dm_buf_num, &team->dm_ptr, &team->dm_mr, UCC_TL_TEAM_LIB(team));
if (status != UCC_OK) {
tl_error(UCC_TL_TEAM_LIB(team),
"failed to alloc and register device memory");
return status;
}
team->dm_offset = NULL;

status = ucc_mpool_init(&team->dm_pool, 0, sizeof(ucc_tl_mlx5_dm_chunk_t),
0, UCC_CACHE_LINE_SIZE, cfg->dm_buf_num,
cfg->dm_buf_num, &ucc_tl_mlx5_dm_ops,
ctx->super.super.ucc_context->thread_mode,
"mlx5 dm pool");
if (status != UCC_OK) {
tl_error(UCC_TL_TEAM_LIB(team), "failed to init dm pool");
ucc_tl_mlx5_dm_cleanup(team);
return status;
}
return UCC_OK;
}
17 changes: 17 additions & 0 deletions src/components/tl/mlx5/tl_mlx5_dm.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "tl_mlx5.h"

ucc_status_t ucc_tl_mlx5_dm_alloc_reg(struct ibv_context *ib_ctx,
struct ibv_pd *pd, int dm_host,
size_t buf_size, size_t *buf_num_p,
struct ibv_dm **ptr, struct ibv_mr **mr,
ucc_base_lib_t *lib);

void ucc_tl_mlx5_dm_cleanup(ucc_tl_mlx5_team_t *team);

ucc_status_t ucc_tl_mlx5_dm_init(ucc_tl_mlx5_team_t *team);
99 changes: 97 additions & 2 deletions src/components/tl/mlx5/tl_mlx5_team.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,44 @@
*/

#include "tl_mlx5.h"
#include "tl_mlx5_dm.h"
#include "coll_score/ucc_coll_score.h"
#include "core/ucc_team.h"
#include <sys/shm.h>

static ucc_status_t ucc_tl_mlx5_topo_init(ucc_tl_mlx5_team_t *team)
{
ucc_subset_t subset;
ucc_status_t status;

status = ucc_ep_map_create_nested(&UCC_TL_CORE_TEAM(team)->ctx_map,
&UCC_TL_TEAM_MAP(team), &team->ctx_map);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(team), "failed to create ctx map");
return status;
}
subset.map = team->ctx_map;
subset.myrank = UCC_TL_TEAM_RANK(team);

status = ucc_topo_init(subset, UCC_TL_CORE_CTX(team)->topo, &team->topo);

if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(team), "failed to init team topo");
goto err_topo_init;
}

return UCC_OK;
err_topo_init:
ucc_ep_map_destroy_nested(&team->ctx_map);
return status;
}

static void ucc_tl_mlx5_topo_cleanup(ucc_tl_mlx5_team_t *team)
{
ucc_ep_map_destroy_nested(&team->ctx_map);
ucc_topo_cleanup(team->topo);
}

UCC_CLASS_INIT_FUNC(ucc_tl_mlx5_team_t, ucc_base_context_t *tl_context,
const ucc_base_team_params_t *params)
{
Expand All @@ -20,12 +54,33 @@ UCC_CLASS_INIT_FUNC(ucc_tl_mlx5_team_t, ucc_base_context_t *tl_context,

self->a2a = NULL;
self->dm_ptr = NULL;
return status;

status = ucc_tl_mlx5_topo_init(self);
if (status != UCC_OK) {
tl_error(ctx->super.super.lib, "failed to init team topo");
return status;
}

if (ucc_topo_get_sbgp(self->topo, UCC_SBGP_NODE)->group_rank == 0) {
status = ucc_tl_mlx5_dm_init(self);
if (UCC_OK != status) {
tl_error(UCC_TL_TEAM_LIB(self), "failed to init device memory");
}
}

self->status[0] = status;
self->state = TL_MLX5_TEAM_STATE_INIT;

tl_debug(tl_context->lib, "posted tl team: %p", self);
return UCC_OK;
}

UCC_CLASS_CLEANUP_FUNC(ucc_tl_mlx5_team_t)
{
tl_debug(self->super.super.context->lib, "finalizing tl team: %p", self);

ucc_tl_mlx5_dm_cleanup(self);
ucc_tl_mlx5_topo_cleanup(self);
}

UCC_CLASS_DEFINE_DELETE_FUNC(ucc_tl_mlx5_team_t, ucc_base_team_t);
Expand All @@ -37,8 +92,48 @@ ucc_status_t ucc_tl_mlx5_team_destroy(ucc_base_team_t *tl_team)
return UCC_OK;
}

ucc_status_t ucc_tl_mlx5_team_create_test(ucc_base_team_t *tl_team) /* NOLINT */
ucc_status_t ucc_tl_mlx5_team_create_test(ucc_base_team_t *team)
{
ucc_tl_mlx5_team_t *tl_team = ucc_derived_of(team, ucc_tl_mlx5_team_t);
ucc_team_t *core_team = UCC_TL_CORE_TEAM(tl_team);
ucc_subset_t subset = {.map.type = UCC_EP_MAP_FULL,
.map.ep_num = core_team->size,
.myrank = core_team->rank};
ucc_status_t status;

switch (tl_team->state) {
case TL_MLX5_TEAM_STATE_INIT:
status = ucc_service_allreduce(
core_team, &tl_team->status[0], &tl_team->status[1],
UCC_DT_INT32, 1, UCC_OP_MIN, subset, &tl_team->scoll_req);
if (status < 0) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failed to collect global status");
return status;
}
tl_team->state = TL_MLX5_TEAM_STATE_POSTED;
case TL_MLX5_TEAM_STATE_POSTED:
status = ucc_service_coll_test(tl_team->scoll_req);
if (status < 0) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"failure during service coll exchange: %s",
ucc_status_string(status));
return status;
}
if (UCC_INPROGRESS == status) {
return status;
}
ucc_assert(status == UCC_OK);
ucc_service_coll_finalize(tl_team->scoll_req);
if (tl_team->status[1] != UCC_OK) {
tl_error(UCC_TL_TEAM_LIB(tl_team),
"node leader failed during device memory init: %s",
ucc_status_string(tl_team->status[1]));
ucc_tl_mlx5_team_destroy(team);
return tl_team->status[1];
}
}

return UCC_OK;
}

Expand Down
2 changes: 1 addition & 1 deletion src/components/tl/mlx5/tl_mlx5_wqe.c
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ ucc_status_t ucc_tl_mlx5_post_wait_on_data(struct ibv_qp *qp, uint64_t value,
wseg->lkey = htobe32(lkey);
wseg->va_fail = htobe64((addr) | (ACTION));
wseg->data = value;
wseg->data_mask = 1;
wseg->data_mask = 0xFFFFFFFF;
mlx5dv_wr_raw_wqe(mqp, wqe_desc);
if (ibv_wr_complete(qp_ex)) {
return UCC_ERR_NO_MESSAGE;
Expand Down
1 change: 1 addition & 0 deletions test/gtest/tl/mlx5/test_tl_mlx5.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <dlfcn.h>
#include "common/test_ucc.h"
#include "components/tl/mlx5/tl_mlx5.h"
#include "components/tl/mlx5/tl_mlx5_dm.h"
#include "components/tl/mlx5/tl_mlx5_ib.h"

typedef ucc_status_t (*ucc_tl_mlx5_create_ibv_ctx_fn_t)(
Expand Down
Loading

0 comments on commit 447c786

Please sign in to comment.