Skip to content

Commit

Permalink
WIP: Add get_by_ct call.
Browse files Browse the repository at this point in the history
  • Loading branch information
vehre committed Oct 25, 2024
1 parent 7db0001 commit c74aabb
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ endif()
if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 8.0.0 ) )
add_definitions(-DGCC_GE_8) # Tell library to build against GFortran 8.x bindings w/ descriptor change
endif()
if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 14.0.0 ) )
add_definitions(-DGCC_GE_15) # Tell library to build against GFortran 15.x bindings
endif()

if(gfortran_compiler)
set(OLD_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS})
Expand Down
9 changes: 9 additions & 0 deletions src/application-binary-interface/libcaf.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,15 @@ void PREFIX(caf_sendget)(caf_token_t, size_t, int, gfc_descriptor_t *,
gfc_descriptor_t *, caf_vector_t *, int, int, bool,
int *);

#ifdef GCC_GE_15
void PREFIX(get_by_ct)(
caf_token_t token, int image_index, size_t bufsize,
void *set_buf /*void (*set)(void *buffer, void *set_data)*/, void *set_data,
void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data),
void *get_data, size_t get_data_size, int *stat,
caf_team_t *team __attribute__((unused)),
int *team_number __attribute__((unused)));
#endif
#ifdef GCC_GE_8
void PREFIX(get_by_ref)(caf_token_t, int, gfc_descriptor_t *dst,
caf_reference_t *refs, int dst_kind, int src_kind,
Expand Down
137 changes: 119 additions & 18 deletions src/runtime-libraries/mpi/mpi_caf.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ typedef struct
int dest_image;
int dest_tag;
int flags;
void (*access)(void *dst, void *base, void *data);
void (*access)(void **dst, bool *free_dst, void *base, void *data);
char data[];
} ct_msg_t;

Expand Down Expand Up @@ -436,6 +436,9 @@ communication_thread(void *)
MPI_Status status;
MPI_Message msg_han;
MPI_Comm comm;
void *baseptr, *buffer;
int flag;
bool free_buffer;

dprint("ct: Started.\n");

Expand All @@ -451,37 +454,41 @@ communication_thread(void *)
{
MPI_Get_count(&status, MPI_BYTE, &cnt);

ct_msg_t *msg;
if (cnt >= sizeof(ct_msg_t))
{
msg = alloca(cnt);
ct_msg_t *msg = alloca(cnt);

ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status);
chk_err(ierr);
dprint("ct: Received request of size %ld.\n", cnt);

void *bptr;
int flag;
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag);
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &baseptr, &flag);
chk_err(ierr);
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr,
flag);
dprint("ct: Local base for win %ld is %p (set: %b) Executing getter at "
"%p.\n",
msg->win, baseptr, flag, msg->access);
if (!flag)
{
dprint("ct: Error: Window %p memory is not allocated.\n", msg->win);
}
msg->access(&buffer, &free_buffer, baseptr, msg->data);
dprint("ct: getter executed.\n");
comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD;
dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n",
msg->transfer_size, msg->dest_image, msg->dest_tag, comm,
comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM");
ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image,
ierr = MPI_Send(buffer, msg->transfer_size, MPI_BYTE, msg->dest_image,
msg->dest_tag, comm);
chk_err(ierr);
if (free_buffer)
free(buffer);
}
else if (!commthread_running)
{
/* Pickup empty message. */
dprint("ct: Got termination message. Terminating.\n");
ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status);
baseptr = NULL;
ierr = MPI_Mrecv(baseptr, cnt, MPI_BYTE, &msg_han, &status);
chk_err(ierr);
}
else
Expand Down Expand Up @@ -3669,6 +3676,13 @@ PREFIX(send)(caf_token_t token, size_t offset, int image_index,
}
}

void
get_access(void **dst, bool *dst_is_tmp, void *base, void *)
{
*dst = base;
*dst_is_tmp = false;
}

/* Get array data from a remote src to a local dest. */

void
Expand Down Expand Up @@ -3810,18 +3824,19 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
{
const size_t trans_size
= ((dst_size > src_size) ? src_size : dst_size) * size;
ct_msg_t *buf = alloca(sizeof(ct_msg_t));
buf->win = *p;
buf->transfer_size = trans_size;
buf->dest_image = mpi_this_image;
buf->dest_tag = CAF_CT_TAG + 1;
buf->flags = 0;
ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image,
ct_msg_t *msg = alloca(sizeof(ct_msg_t));
msg->win = *p;
msg->transfer_size = trans_size;
msg->dest_image = mpi_this_image;
msg->dest_tag = CAF_CT_TAG + 1;
msg->flags = 0;
msg->access = &get_access;
ierr = MPI_Send(msg, sizeof(ct_msg_t), MPI_BYTE, remote_image,
CAF_CT_TAG, ct_COMM);
chk_err(ierr);
ierr
= MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1,
buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
msg->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
chk_err(ierr);

// CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
Expand Down Expand Up @@ -4890,6 +4905,92 @@ get_for_ref(caf_reference_t *ref, size_t *i, size_t dst_index,
}
}

#ifdef GCC_GE_15
void
PREFIX(get_by_ct)(
caf_token_t token, int image_index, size_t bufsize, void *set_buf,
/*void (*set)(void *buffer, void *set_data), */ void *set_data,
void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data),
void *get_data, size_t get_data_size, int *stat,
caf_team_t *team __attribute__((unused)),
int *team_number __attribute__((unused)))
{
MPI_Group current_team_group, win_group;
int ierr, this_image, remote_image;
int trans_ranks[2];
bool free_t_buff, free_msg;
void *t_buff;
ct_msg_t *msg;
const size_t msg_size = sizeof(ct_msg_t) + get_data_size;

if (stat)
*stat = 0;

// Get mapped remote image
ierr = MPI_Comm_group(CAF_COMM_WORLD, &current_team_group);
chk_err(ierr);
ierr = MPI_Win_get_group(*TOKEN(token), &win_group);
chk_err(ierr);
ierr = MPI_Group_translate_ranks(current_team_group, 2,
(int[]){image_index - 1, mpi_this_image},
win_group, trans_ranks);
chk_err(ierr);
remote_image = trans_ranks[0];
this_image = trans_ranks[1];
ierr = MPI_Group_free(&current_team_group);
chk_err(ierr);
ierr = MPI_Group_free(&win_group);
chk_err(ierr);

check_image_health(remote_image, stat);

dprint("Entering get_by_ct(), win_rank = %d, this_rank = %d, getter: %p.\n",
remote_image, this_image, get);

// create get msg
if ((free_msg = (((msg = alloca(msg_size))) == NULL)))
{
msg = malloc(msg_size);
if (msg == NULL)
caf_runtime_error("Unable to allocate memory "
"for internal message in get_by_ct().");
}
msg->win = *TOKEN(token);
msg->transfer_size = bufsize;
msg->dest_image = mpi_this_image;
msg->dest_tag = CAF_CT_TAG + 1;
msg->flags = 0;
msg->access = get;
memcpy(msg->data, get_data, get_data_size);

// call get on remote
ierr = MPI_Send(msg, msg_size, MPI_BYTE, remote_image, CAF_CT_TAG, ct_COMM);
chk_err(ierr);

// allocate local buffer
if ((free_t_buff = (((t_buff = alloca(bufsize))) == NULL)))
{
t_buff = malloc(bufsize);
if (t_buff == NULL)
caf_runtime_error("Unable to allocate memory "
"for internal buffer in get_by_ct().");
}
ierr = MPI_Recv(t_buff, bufsize, MPI_BYTE, image_index - 1, msg->dest_tag,
CAF_COMM_WORLD, MPI_STATUS_IGNORE);
chk_err(ierr);

// set (buffer, set_data)
memcpy(set_buf, t_buff, bufsize);
// set(t_buff, set_data);

// free (buffer)
if (free_msg)
free(msg);
if (free_t_buff)
free(t_buff);
}
#endif

void
PREFIX(get_by_ref)(caf_token_t token, int image_index, gfc_descriptor_t *dst,
caf_reference_t *refs, int dst_kind, int src_kind,
Expand Down

0 comments on commit c74aabb

Please sign in to comment.