-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sliding Window Allreduce #862
Conversation
Can one of the admins verify this patch? |
@nsarka Does this run (optimally or sub optimally) on Hosts without DPUs? |
Yes, it should work suboptimally on Hosts without DPUs, so long as the global work buffer info struct is populated with the correct ucp info. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Nicolas! Thank you for this PR which looks really great. I left a bunch of remarks, most of them are really minor and only deal with the codestyle. Let me stress here a couple of items:
- The collective needs to be non-blocking; however I fear that the progress function
ucc_tl_ucp_allreduce_sliding_window_req_test
is blocking and needs to be fixed - I may be wrong but there might be an issue with
ucc_tl_ucp_allreduce_sliding_window_reduction
, which seems to only post the reduction but does not poll completion - Can you please update the tests to test this algorithm as well?
|
||
params.mask = UCC_EE_EXECUTOR_PARAM_FIELD_TYPE; | ||
params.ee_type = UCC_EE_CPU_THREAD; | ||
status = ucc_ee_executor_init(¶ms, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need a different executor than the one initialized by the core ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I don't initialize an executor here, then there is a segfault in the reduction that indicates it's trying to use an executor that doesn't exist. I have not debugged the issue further since initializing one here seemed to fix the problem. If I remember correctly, the reduction would call a function that looks for an executor on the task.super object, going upwards until it finds one. In this case, it never found one.
+ get_offset; | ||
dst_addr = getbuf->buf; | ||
|
||
assert(getbuf->state == FREE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ucc_assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to ensure that the buffer is free ? I see that pipe->avail_buffs > 0
but is it sufficient to ensure that this precise buffer is free ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the get buf is not free, it is because it is either RECVING or REDUCING. It cannot be RECVING , because avail_bufs > 0 and get_idx is incremented after the buffer is set to RECVING . It cannot be REDUCING, because red_idx is always behind get_idx.
data_size = count * dt_size; | ||
src_rank = pipe->src_rank; | ||
getbuf = accbuf->state == FREE ? | ||
accbuf : &pipe->getbuf[get_idx]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about using a memory pool (see mpool) to dynamically request buffers from, instead of doing manually like here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, can you please point me to where pipe->getbuf
is allocated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For locality reasons, the buffers are allocated outside of UCC
+ pipe->my_offset; | ||
data_size = count * dt_size; | ||
src_rank = pipe->src_rank; | ||
getbuf = accbuf->state == FREE ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be a good idea to merge accbuf into getbuf, and set accbuf to be the first entry of getbug ? Or do you prefer to keep them separated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are allocated this way inside of the UROM ucc worker. There is one big allocation and getbuf is set to the 2nd buffer in the allocation, like this:
accbuf = malloc(buffer_size * num_bufs);
getbufs = accbuf + buffer_size;
Hi Sam, I have taken care of all of the above items. Please let me know if it looks good to you. |
8f8fcd6
to
9beead6
Compare
This PR implements Sliding Window Allreduce, to be used by the UROM UCC worker. The algorithm gets passed all of the data it needs (all host source bufs, destination bufs, src rkeys, dst rkeys, ucp workers, endpoints, ...) via the global work buffer. It's set up to use one ucp worker per thread. All ucp setup is done by the UROM UCC worker outside of this UCC change.
The algorithm itself is meant to run on the DPU.