Skip to content

Commit

Permalink
CT: Fixup get().
Browse files Browse the repository at this point in the history
  • Loading branch information
vehre committed Oct 22, 2024
1 parent 2e4b5ee commit 7db0001
Showing 1 changed file with 70 additions and 45 deletions.
115 changes: 70 additions & 45 deletions src/runtime-libraries/mpi/mpi_caf.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,35 @@ MPI_Datatype *dts;
char *msgbody;
pthread_mutex_t lock_am;
int done_am = 0;

/* Communication thread variables, constants and structures. */
static const int CAF_CT_TAG = 13;
pthread_t commthread;
MPI_Comm ct_COMM;
bool commthread_running = true;
static const int CAF_CT_TAG = 13;
enum CT_MSG_FLAGS
{
/* Use the inter communication thread communicator. */
CT_INTER_CT = 1,
/* Use 1 << 1 for next flag. */
};

typedef struct
{
MPI_Win win;
size_t transfer_size;
int dest_image;
int dest_tag;
int flags;
void (*access)(void *dst, void *base, void *data);
char data[];
} ct_msg_t;

char err_buffer[MPI_MAX_ERROR_STRING];

/* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for
* interoperability purposes. */
MPI_Comm CAF_COMM_WORLD;
MPI_Comm ct_COMM;

static const int CT_STATUS_TERM_REQ = -1;

static caf_teams_list *teams_list = NULL;
static caf_used_teams_list *used_teams = NULL;
Expand Down Expand Up @@ -415,59 +432,71 @@ helperFunction()
void *
communication_thread(void *)
{
int ierr = 0;
int cnt;
int ierr = 0, cnt;
MPI_Status status;
MPI_Message msg_han;
MPI_Comm comm;

dprint("ct: Started.\n");

do
{
dprint("ct: Waiting for request.\n");
ierr = MPI_Probe(MPI_ANY_SOURCE, CAF_CT_TAG, ct_COMM, &status);
dprint("ct: Woke up.\n");
if (status.MPI_ERROR == MPI_SUCCESS)
dprint("ct: Probing for incoming message.\n");
ierr = MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ct_COMM, &msg_han, &status);
chk_err(ierr);
dprint("ct: Message received from %d, tag %d, mpi-status: %d, processing "
"...\n",
status.MPI_SOURCE, status.MPI_TAG, status.MPI_ERROR);
if (status.MPI_TAG == CAF_CT_TAG && status.MPI_ERROR == MPI_SUCCESS)
{
MPI_Get_count(&status, MPI_BYTE, &cnt);

struct
{
MPI_Win win;
size_t sz;
} msg;
if (cnt >= sizeof(msg))
ct_msg_t *msg;
if (cnt >= sizeof(ct_msg_t))
{
ierr = MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
ct_COMM, &status);
msg = alloca(cnt);
ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status);
chk_err(ierr);
dprint("ct: Received request of size %ld.\n", cnt);

void *bptr;
int flag;
ierr = MPI_Win_get_attr(msg.win, MPI_WIN_BASE, &bptr, &flag);
ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag);
chk_err(ierr);
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg.win, bptr,
dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr,
flag);
if (!flag)
{
dprint("ct: Error: Window %p memory is not allocated.\n", msg.win);
dprint("ct: Error: Window %p memory is not allocated.\n", msg->win);
}
ierr = MPI_Send(bptr, msg.sz, MPI_BYTE, status.MPI_SOURCE,
status.MPI_TAG + 1, ct_COMM);
comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD;
dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n",
msg->transfer_size, msg->dest_image, msg->dest_tag, comm,
comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM");
ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image,
msg->dest_tag, comm);
chk_err(ierr);
}
else if (!commthread_running)
{
/* Pickup empty message. */
MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG,
ct_COMM, &status);
dprint("ct: Got termination message. Terminating.\n");
ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status);
chk_err(ierr);
}
else
{
dprint("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n",
cnt, sizeof(msg));
cnt, sizeof(ct_msg_t));
}
}
else if (ierr == MPI_SUCCESS)
{
/* There is a message, but not for us. */
dprint("ct: Message not for us received. Setting it free again.\n");
// ierr = MPI_Request_free(&msg_han);
chk_err(ierr);
}
else
chk_err(ierr);
} while (commthread_running);
Expand Down Expand Up @@ -1062,7 +1091,7 @@ PREFIX(init)(int *argc, char ***argv)
}
#endif

ierr = MPI_Comm_dup(MPI_COMM_WORLD, &ct_COMM);
ierr = MPI_Comm_dup(CAF_COMM_WORLD, &ct_COMM);
chk_err(ierr);
ierr = pthread_create(&commthread, NULL, &communication_thread, NULL);
chk_err(ierr);
Expand Down Expand Up @@ -1207,7 +1236,7 @@ finalize_internal(int status_code)

dprint("Sending termination signal to communication thread.\n");
commthread_running = false;
ierr = MPI_Send(NULL, 0, MPI_BYTE, caf_this_image - 1, CAF_CT_TAG, ct_COMM);
ierr = MPI_Send(NULL, 0, MPI_BYTE, mpi_this_image, CAF_CT_TAG, ct_COMM);
chk_err(ierr);
dprint("Termination signal send, waiting for thread join.\n");
ierr = pthread_join(commthread, NULL);
Expand Down Expand Up @@ -1439,15 +1468,9 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token,
p = TOKEN(mpi_token);

#if MPI_VERSION >= 3
void *flavor;
int flag = -1;
ierr = MPI_Win_allocate /*_shared*/ (actual_size, 1, MPI_INFO_NULL,
CAF_COMM_WORLD, &mem, p);
chk_err(ierr);
ierr = MPI_Win_get_attr(*p, MPI_WIN_CREATE_FLAVOR, &flavor, &flag);
ierr = MPI_Win_allocate(actual_size, 1, MPI_INFO_NULL, CAF_COMM_WORLD,
&mem, p);
chk_err(ierr);
dprint("win %d has create flavor: %x, flag: %d.\n", *p, *(int *)flavor,
flag);
CAF_Win_lock_all(*p);
#else
ierr = MPI_Alloc_mem(actual_size, MPI_INFO_NULL, &mem);
Expand Down Expand Up @@ -3787,16 +3810,18 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index,
{
const size_t trans_size
= ((dst_size > src_size) ? src_size : dst_size) * size;
struct
{
MPI_Win win;
size_t sz;
} buf = {*p, trans_size};
int tag = CAF_CT_TAG; // + caf_this_image) % 0xffff;
ct_msg_t *buf = alloca(sizeof(ct_msg_t));
buf->win = *p;
buf->transfer_size = trans_size;
buf->dest_image = mpi_this_image;
buf->dest_tag = CAF_CT_TAG + 1;
buf->flags = 0;
ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image,
CAF_CT_TAG, ct_COMM);
chk_err(ierr);
ierr
= MPI_Sendrecv(&buf, sizeof(buf), MPI_BYTE, remote_image, tag,
dest->base_addr, trans_size, MPI_BYTE,
remote_image, tag + 1, ct_COMM, MPI_STATUS_IGNORE);
= MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1,
buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE);
chk_err(ierr);

// CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);
Expand Down

0 comments on commit 7db0001

Please sign in to comment.