Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

logs are forwarded to a processor in slot and trx order #953

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

EasterTheBunny
Copy link
Contributor

Description

Log ingestion heavily leverages async go-routines but LogPoller will rely on logs delivered in slot/block order. This commit adds a new component to the log ingestion to order logs before forwarding them to LogPoller.

@EasterTheBunny EasterTheBunny force-pushed the etb/ordered-log-processing branch from 094f9cb to 72089b2 Compare December 2, 2024 16:44
@EasterTheBunny EasterTheBunny force-pushed the etb/ordered-log-processing branch from f407d8f to f9824d9 Compare December 4, 2024 18:40
@EasterTheBunny EasterTheBunny marked this pull request as ready for review December 4, 2024 19:02
@EasterTheBunny EasterTheBunny requested a review from a team as a code owner December 4, 2024 19:02
// if expectations set and met -> forward, remove, and continue

// to ensure ordered delivery, break from the loop if a ready block isn't found
// this function should be preceded by clearEmptyBlocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand this comment. Which function is it referring to that's preceded by cleanEmptyBlocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was just a comment from a previous iteration. I'll remove it.


func remove[T any](slice []T, s int) []T {
return append(slice[:s], slice[s+1:]...)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use slices.Delete:
https://pkg.go.dev/slices#Delete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. I have not used the slices package yet.

}

return -1, false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use slices.Index instead of defining are own:
https://pkg.go.dev/slices#Index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here on the slices package.

sort.Slice(p.blocks, func(i, j int) bool {
return p.blocks[i] < p.blocks[j]
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a perfect use case for a heap:
https://pkg.go.dev/container/heap#pkg-index

Instead of re-sorting the list each time a new block is added, you just call h.Push() to add each block and it inserts it efficiently into the tree. Then calling h.Pop() will remove them in order.

func (h *blockHeap) Delete(idx int) {
old := *h
*h = slices.Delete(old, idx, idx+1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can use slices.Delete with a heap, because it has to do do some fancy rebalancing while removing an element to preserve the heap invariant.

I think we just need to call:

heap.Remove(h, idx)

and then it does it all for you. No need to define a custom Delete method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: you shouldn't need either this or heap.Remove, just heap.Pop--see other comments

@@ -434,7 +431,7 @@ func (p *orderedParser) sendReadySlots() error {
rmvIdx := make([]int, 0)

// start at the lowest block and find ready blocks
for idx, block := range p.blocks {
for idx, block := range *p.blocks {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating over them directly like this won't work right. The order is of the elements of a heap is well-defined, but it's not stored in the same order as the underlying elements of the backing slice (and I think some elements might be empty?)

To get the right order, you pop them one at a time. Something like this:

for p.blocks.Len() > 0 {
	block := heap.Pop(p.blocks)
	...
}

I left another comment somewhere else suggesting to replace the Delete method with a call to heap.Remove, but now that I'm looking at this part of the code I realize you shouldn't need either of those, I think Pop is all we need?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: replace heap.Pop line above with block := (*p.blocks)[0], to delay the actual Pop for later when you are sure it can be removed.

@@ -451,12 +448,9 @@ func (p *orderedParser) sendReadySlots() error {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... you'll probably want to delay the heap.Pop call I have above till here, so that if you break or return early it's not already removed. Instead of the initial heap.Pop you can "peak" at the minimum block number on the heap with block := (*p.blocks)[0]. Even though the rest may be out of order, this should always be guaranteed to be the min element.

I think the two places you'll need the heap.Pop are in place of the 2 instances of rmvIdx = append(rmvIndex, idx), does that sound right?

}

p.ready = slices.Delete(p.ready, rIdx, rIdx+1)
rmvIdx = append(rmvIdx, idx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rmvIdx = append(rmvIdx, idx)
_ = heap.Pop(p.blocks)

@reductionista
Copy link
Contributor

Almost there, I think this is coming together. Sorry for updating a couple of the comments after writing them hopefully it's clear what needs to be done. I think once you've got the iteration replaced with peaking and popping the heap it should simply things a bit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants