From 7db0001df770a327c5c1ed3a68c671a63564180a Mon Sep 17 00:00:00 2001 From: Andre Vehreschild Date: Tue, 22 Oct 2024 11:12:36 +0200 Subject: [PATCH] CT: Fixup get(). --- src/runtime-libraries/mpi/mpi_caf.c | 115 +++++++++++++++++----------- 1 file changed, 70 insertions(+), 45 deletions(-) diff --git a/src/runtime-libraries/mpi/mpi_caf.c b/src/runtime-libraries/mpi/mpi_caf.c index 2fb16fef..ea93818c 100644 --- a/src/runtime-libraries/mpi/mpi_caf.c +++ b/src/runtime-libraries/mpi/mpi_caf.c @@ -233,18 +233,35 @@ MPI_Datatype *dts; char *msgbody; pthread_mutex_t lock_am; int done_am = 0; + +/* Communication thread variables, constants and structures. */ +static const int CAF_CT_TAG = 13; pthread_t commthread; +MPI_Comm ct_COMM; bool commthread_running = true; -static const int CAF_CT_TAG = 13; +enum CT_MSG_FLAGS +{ + /* Use the inter communication thread communicator. */ + CT_INTER_CT = 1, + /* Use 1 << 1 for next flag. */ +}; + +typedef struct +{ + MPI_Win win; + size_t transfer_size; + int dest_image; + int dest_tag; + int flags; + void (*access)(void *dst, void *base, void *data); + char data[]; +} ct_msg_t; char err_buffer[MPI_MAX_ERROR_STRING]; /* All CAF runtime calls should use this comm instead of MPI_COMM_WORLD for * interoperability purposes. */ MPI_Comm CAF_COMM_WORLD; -MPI_Comm ct_COMM; - -static const int CT_STATUS_TERM_REQ = -1; static caf_teams_list *teams_list = NULL; static caf_used_teams_list *used_teams = NULL; @@ -415,59 +432,71 @@ helperFunction() void * communication_thread(void *) { - int ierr = 0; - int cnt; + int ierr = 0, cnt; MPI_Status status; + MPI_Message msg_han; + MPI_Comm comm; dprint("ct: Started.\n"); do { - dprint("ct: Waiting for request.\n"); - ierr = MPI_Probe(MPI_ANY_SOURCE, CAF_CT_TAG, ct_COMM, &status); - dprint("ct: Woke up.\n"); - if (status.MPI_ERROR == MPI_SUCCESS) + dprint("ct: Probing for incoming message.\n"); + ierr = MPI_Mprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, ct_COMM, &msg_han, &status); + chk_err(ierr); + dprint("ct: Message received from %d, tag %d, mpi-status: %d, processing " + "...\n", + status.MPI_SOURCE, status.MPI_TAG, status.MPI_ERROR); + if (status.MPI_TAG == CAF_CT_TAG && status.MPI_ERROR == MPI_SUCCESS) { MPI_Get_count(&status, MPI_BYTE, &cnt); - struct - { - MPI_Win win; - size_t sz; - } msg; - if (cnt >= sizeof(msg)) + ct_msg_t *msg; + if (cnt >= sizeof(ct_msg_t)) { - ierr = MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, - ct_COMM, &status); + msg = alloca(cnt); + ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status); chk_err(ierr); dprint("ct: Received request of size %ld.\n", cnt); void *bptr; int flag; - ierr = MPI_Win_get_attr(msg.win, MPI_WIN_BASE, &bptr, &flag); + ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag); chk_err(ierr); - dprint("ct: Local base for win %ld is %p (set: %b).\n", msg.win, bptr, + dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr, flag); if (!flag) { - dprint("ct: Error: Window %p memory is not allocated.\n", msg.win); + dprint("ct: Error: Window %p memory is not allocated.\n", msg->win); } - ierr = MPI_Send(bptr, msg.sz, MPI_BYTE, status.MPI_SOURCE, - status.MPI_TAG + 1, ct_COMM); + comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD; + dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n", + msg->transfer_size, msg->dest_image, msg->dest_tag, comm, + comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM"); + ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image, + msg->dest_tag, comm); chk_err(ierr); } else if (!commthread_running) { /* Pickup empty message. */ - MPI_Recv(&msg, cnt, MPI_BYTE, status.MPI_SOURCE, status.MPI_TAG, - ct_COMM, &status); + dprint("ct: Got termination message. Terminating.\n"); + ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status); + chk_err(ierr); } else { dprint("ct: Error: message to small, ignoring (got: %ld, exp: %ld).\n", - cnt, sizeof(msg)); + cnt, sizeof(ct_msg_t)); } } + else if (ierr == MPI_SUCCESS) + { + /* There is a message, but not for us. */ + dprint("ct: Message not for us received. Setting it free again.\n"); + // ierr = MPI_Request_free(&msg_han); + chk_err(ierr); + } else chk_err(ierr); } while (commthread_running); @@ -1062,7 +1091,7 @@ PREFIX(init)(int *argc, char ***argv) } #endif - ierr = MPI_Comm_dup(MPI_COMM_WORLD, &ct_COMM); + ierr = MPI_Comm_dup(CAF_COMM_WORLD, &ct_COMM); chk_err(ierr); ierr = pthread_create(&commthread, NULL, &communication_thread, NULL); chk_err(ierr); @@ -1207,7 +1236,7 @@ finalize_internal(int status_code) dprint("Sending termination signal to communication thread.\n"); commthread_running = false; - ierr = MPI_Send(NULL, 0, MPI_BYTE, caf_this_image - 1, CAF_CT_TAG, ct_COMM); + ierr = MPI_Send(NULL, 0, MPI_BYTE, mpi_this_image, CAF_CT_TAG, ct_COMM); chk_err(ierr); dprint("Termination signal send, waiting for thread join.\n"); ierr = pthread_join(commthread, NULL); @@ -1439,15 +1468,9 @@ void PREFIX(register)(size_t size, caf_register_t type, caf_token_t *token, p = TOKEN(mpi_token); #if MPI_VERSION >= 3 - void *flavor; - int flag = -1; - ierr = MPI_Win_allocate /*_shared*/ (actual_size, 1, MPI_INFO_NULL, - CAF_COMM_WORLD, &mem, p); - chk_err(ierr); - ierr = MPI_Win_get_attr(*p, MPI_WIN_CREATE_FLAVOR, &flavor, &flag); + ierr = MPI_Win_allocate(actual_size, 1, MPI_INFO_NULL, CAF_COMM_WORLD, + &mem, p); chk_err(ierr); - dprint("win %d has create flavor: %x, flag: %d.\n", *p, *(int *)flavor, - flag); CAF_Win_lock_all(*p); #else ierr = MPI_Alloc_mem(actual_size, MPI_INFO_NULL, &mem); @@ -3787,16 +3810,18 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index, { const size_t trans_size = ((dst_size > src_size) ? src_size : dst_size) * size; - struct - { - MPI_Win win; - size_t sz; - } buf = {*p, trans_size}; - int tag = CAF_CT_TAG; // + caf_this_image) % 0xffff; + ct_msg_t *buf = alloca(sizeof(ct_msg_t)); + buf->win = *p; + buf->transfer_size = trans_size; + buf->dest_image = mpi_this_image; + buf->dest_tag = CAF_CT_TAG + 1; + buf->flags = 0; + ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image, + CAF_CT_TAG, ct_COMM); + chk_err(ierr); ierr - = MPI_Sendrecv(&buf, sizeof(buf), MPI_BYTE, remote_image, tag, - dest->base_addr, trans_size, MPI_BYTE, - remote_image, tag + 1, ct_COMM, MPI_STATUS_IGNORE); + = MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1, + buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE); chk_err(ierr); // CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p);