Skip to content

Commit

Permalink
TEST: do local checks for gather (openucx#901)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergei-Lebedev authored Mar 4, 2024
1 parent e353542 commit 9ceb057
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 48 deletions.
6 changes: 4 additions & 2 deletions src/utils/ucc_component.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* See file LICENSE for terms.
*/
#include "config.h"
Expand Down Expand Up @@ -52,7 +52,9 @@ static ucc_status_t ucc_component_load_one(const char *so_path,

handle = dlopen(so_path, RTLD_LAZY);
if (!handle) {
ucc_debug("failed to load UCC component library: %s", so_path);
error = dlerror();
ucc_debug("failed to load UCC component library: %s (%s)",
so_path, error);
goto error;
}
iface = (ucc_component_iface_t *)dlsym(handle, iface_struct);
Expand Down
20 changes: 20 additions & 0 deletions src/utils/ucc_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@
#define ucc_coll_trace_debug(_fmt, ...) \
ucc_log_component_collective_trace(UCS_LOG_LEVEL_DEBUG, _fmt, ##__VA_ARGS__)

/**
* Print a message regardless of current log level. Output can be
* enabled/disabled via environment variable/configuration settings.
*
* During debugging it can be useful to add a few prints to the code
* without changing a current log level. Also it is useful to be able
* to see messages only from specific processes. For example, one may
* want to see prints only from rank 0 when debugging MPI.
*
* The function is intended for debugging only. It should not be used
* in the real code.
*/

#define ucc_print(_fmt, ...) \
do { \
ucs_log_dispatch(__FILE__, __LINE__, __FUNCTION__, \
UCS_LOG_LEVEL_PRINT, \
&ucc_global_config.log_component, \
_fmt, ## __VA_ARGS__); \
} while(0)

static inline const char* ucc_coll_type_str(ucc_coll_type_t ct)
{
Expand Down
6 changes: 1 addition & 5 deletions test/mpi/test_allgather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ucc_status_t TestAllgather::set_input(int iter_persistent)
size_t single_rank_count = msgsize / dt_size;
size_t single_rank_size = single_rank_count * dt_size;
int rank;
void *buf, *check;
void *buf;

this->iter_persistent = iter_persistent;
MPI_Comm_rank(team.comm, &rank);
Expand All @@ -60,12 +60,9 @@ ucc_status_t TestAllgather::set_input(int iter_persistent)
} else {
buf = sbuf;
}
check = PTR_OFFSET(check_buf, rank * single_rank_size);

init_buffer(buf, single_rank_count, dt, mem_type,
rank * (iter_persistent + 1));
UCC_CHECK(ucc_mc_memcpy(check, buf, single_rank_size,
UCC_MEMORY_TYPE_HOST, mem_type));
return UCC_OK;
}

Expand All @@ -83,7 +80,6 @@ ucc_status_t TestAllgather::check()
i * (iter_persistent + 1));
}


return compare_buffers(rbuf, check_buf, single_rank_count * size, dt,
mem_type);
}
10 changes: 5 additions & 5 deletions test/mpi/test_bcast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ ucc_status_t TestBcast::set_input(int iter_persistent)
MPI_Comm_rank(team.comm, &rank);
if (rank == root) {
init_buffer(sbuf, count, dt, mem_type, rank * (iter_persistent + 1));
UCC_CHECK(ucc_mc_memcpy(check_buf, sbuf, count * dt_size,
UCC_MEMORY_TYPE_HOST, mem_type));
}
return UCC_OK;
}
Expand All @@ -61,9 +59,11 @@ ucc_status_t TestBcast::check()
int rank;

MPI_Comm_rank(team.comm, &rank);
if (rank == root) {
return UCC_OK;
}

init_buffer(check_buf, count, dt, UCC_MEMORY_TYPE_HOST,
root * (iter_persistent + 1));
return (rank == root)
? UCC_OK
: compare_buffers(sbuf, check_buf, count, dt, mem_type);
return compare_buffers(sbuf, check_buf, count, dt, mem_type);
}
35 changes: 16 additions & 19 deletions test/mpi/test_gather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ ucc_status_t TestGather::set_input(int iter_persistent)
size_t single_rank_count = msgsize / dt_size;
size_t single_rank_size = single_rank_count * dt_size;
int rank;
void *buf, *check;
void *buf;

this->iter_persistent = iter_persistent;
MPI_Comm_rank(team.comm, &rank);
if (rank == root) {
if (inplace) {
Expand All @@ -87,34 +88,30 @@ ucc_status_t TestGather::set_input(int iter_persistent)
} else {
buf = sbuf;
}
check = PTR_OFFSET(check_buf, rank * single_rank_size);

init_buffer(buf, single_rank_count, dt, mem_type,
rank * (iter_persistent + 1));
UCC_CHECK(ucc_mc_memcpy(check, buf, single_rank_size,
UCC_MEMORY_TYPE_HOST, mem_type));
return UCC_OK;
}

ucc_status_t TestGather::check()
{
size_t single_rank_count = msgsize / ucc_dt_size(dt);
MPI_Datatype mpi_dt = ucc_dt_to_mpi(dt);
MPI_Request req;
int size, rank, completed;
int size, rank, i;
size_t dt_size, single_rank_count;

MPI_Comm_size(team.comm, &size);
MPI_Comm_rank(team.comm, &rank);
if (rank != root) {
return UCC_OK;
}

MPI_Iallgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, check_buf,
single_rank_count, mpi_dt, team.comm, &req);
do {
MPI_Test(&req, &completed, MPI_STATUS_IGNORE);
ucc_context_progress(team.ctx);
} while(!completed);
dt_size = ucc_dt_size(dt);
single_rank_count = msgsize / dt_size;
for (i = 0; i < size; i++) {
init_buffer(PTR_OFFSET(check_buf, i * single_rank_count * dt_size),
single_rank_count, dt, UCC_MEMORY_TYPE_HOST,
i * (iter_persistent + 1));
}

return (rank != root)
? UCC_OK
: compare_buffers(rbuf, check_buf, single_rank_count * size, dt,
mem_type);
return compare_buffers(rbuf, check_buf, single_rank_count * size, dt,
mem_type);
}
30 changes: 14 additions & 16 deletions test/mpi/test_gatherv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ ucc_status_t TestGatherv::set_input(int iter_persistent)
{
size_t dt_size = ucc_dt_size(dt);
int rank;
void *buf, *check;
void *buf;

this->iter_persistent = iter_persistent;
MPI_Comm_rank(team.comm, &rank);
if (rank == root) {
if (inplace) {
Expand All @@ -118,11 +119,8 @@ ucc_status_t TestGatherv::set_input(int iter_persistent)
} else {
buf = sbuf;
}
check = PTR_OFFSET(check_buf, displacements[rank] * dt_size);

init_buffer(buf, counts[rank], dt, mem_type, rank * (iter_persistent + 1));
UCC_CHECK(ucc_mc_memcpy(check, buf, counts[rank] * dt_size,
UCC_MEMORY_TYPE_HOST, mem_type));

return UCC_OK;
}

Expand All @@ -138,21 +136,21 @@ TestGatherv::~TestGatherv()

ucc_status_t TestGatherv::check()
{
size_t count = msgsize / ucc_dt_size(dt);
MPI_Datatype mpi_dt = ucc_dt_to_mpi(dt);
MPI_Request req;
int size, rank, completed;
size_t count = msgsize / ucc_dt_size(dt);
int size, rank, i;

MPI_Comm_size(team.comm, &size);
MPI_Comm_rank(team.comm, &rank);

MPI_Iallgatherv(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, check_buf,
(int *)counts, (int *)displacements, mpi_dt, team.comm,
&req);
do {
MPI_Test(&req, &completed, MPI_STATUS_IGNORE);
ucc_context_progress(team.ctx);
} while(!completed);
if (rank != root) {
return UCC_OK;
}

for (i = 0; i < size; i++) {
init_buffer(PTR_OFFSET(check_buf, displacements[i] * ucc_dt_size(dt)),
counts[i], dt, UCC_MEMORY_TYPE_HOST,
i * (iter_persistent + 1));
}

return (rank != root)
? UCC_OK
Expand Down
1 change: 0 additions & 1 deletion test/mpi/test_scatter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ TestScatter::TestScatter(ucc_test_team_t &_team, TestCaseParams &params) :
TEST_SKIP_MEM_LIMIT, team.comm)) {
return;
}

if (rank == root) {
UCC_CHECK(ucc_mc_alloc(&sbuf_mc_header, msgsize * size, mem_type));
sbuf = sbuf_mc_header->addr;
Expand Down

0 comments on commit 9ceb057

Please sign in to comment.