From f407d8f25cdcf670e1e9843378010e6d295cc402 Mon Sep 17 00:00:00 2001 From: Awbrey Hughlett Date: Wed, 4 Dec 2024 09:50:01 -0600 Subject: [PATCH] make tests pass again --- pkg/solana/logpoller/job.go | 8 +- pkg/solana/logpoller/loader.go | 139 ++++++++++----- pkg/solana/logpoller/loader_test.go | 267 ++++++++++++++++++---------- 3 files changed, 274 insertions(+), 140 deletions(-) diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index ce62de190..165c0b5fe 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -55,12 +55,18 @@ func (j *processEventJob) Run(_ context.Context) error { return j.parser.Process(j.event) } +type wrappedParser interface { + ProgramEventProcessor + ExpectBlock(uint64) + ExpectTxs(uint64, int) +} + // getTransactionsFromBlockJob is a job that fetches transaction signatures from a block and loads // the job queue with getTransactionLogsJobs for each transaction found in the block. type getTransactionsFromBlockJob struct { slotNumber uint64 client RPCClient - parser *orderedParser + parser wrappedParser chJobs chan Job } diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 25118c6bc..8791e2a9f 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -44,7 +44,8 @@ type EncodedLogCollector struct { // dependencies and configuration client RPCClient - parser *orderedParser + ordered *orderedParser + unordered *unorderedParser lggr logger.Logger rpcTimeLimit time.Duration @@ -66,7 +67,7 @@ func NewEncodedLogCollector( ) *EncodedLogCollector { c := &EncodedLogCollector{ client: client, - parser: newOrderedParser(parser), + unordered: newUnorderedParser(parser), chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), @@ -78,8 +79,9 @@ func NewEncodedLogCollector( Name: "EncodedLogCollector", NewSubServices: func(lggr logger.Logger) []services.Service { c.workers = NewWorkerGroup(DefaultWorkerCount, lggr) + c.ordered = newOrderedParser(parser, lggr) - return []services.Service{c.workers} + return []services.Service{c.workers, c.ordered} }, Start: c.start, Close: c.close, @@ -131,7 +133,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ slotNumber: sig.Slot, client: c.client, - parser: c.parser, + parser: c.unordered, chJobs: c.chJobs, }); err != nil { return err @@ -142,7 +144,7 @@ func (c *EncodedLogCollector) BackfillForAddress(ctx context.Context, address st return nil } -func (c *EncodedLogCollector) start(ctx context.Context) error { +func (c *EncodedLogCollector) start(_ context.Context) error { c.engine.Go(c.runSlotPolling) c.engine.Go(c.runSlotProcessing) c.engine.Go(c.runBlockProcessing) @@ -227,7 +229,7 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ slotNumber: slot, client: c.client, - parser: c.parser, + parser: c.ordered, chJobs: c.chJobs, }); err != nil { c.lggr.Errorf("failed to add job to queue: %s", err) @@ -279,7 +281,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en } for _, block := range result { - c.parser.ExpectBlock(block) + c.ordered.ExpectBlock(block) select { case <-ctx.Done(): return nil @@ -290,7 +292,26 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return nil } +type unorderedParser struct { + parser ProgramEventProcessor +} + +func newUnorderedParser(parser ProgramEventProcessor) *unorderedParser { + return &unorderedParser{parser: parser} +} + +func (p *unorderedParser) ExpectBlock(_ uint64) {} +func (p *unorderedParser) ExpectTxs(_ uint64, _ int) {} +func (p *unorderedParser) Process(evt ProgramEvent) error { + return p.parser.Process(evt) +} + type orderedParser struct { + // service state management + services.Service + engine *services.Engine + + // internal state parser ProgramEventProcessor mu sync.Mutex blocks []uint64 @@ -299,14 +320,22 @@ type orderedParser struct { actual map[uint64][]ProgramEvent } -func newOrderedParser(parser ProgramEventProcessor) *orderedParser { - return &orderedParser{ +func newOrderedParser(parser ProgramEventProcessor, lggr logger.Logger) *orderedParser { + op := &orderedParser{ parser: parser, blocks: make([]uint64, 0), ready: make([]uint64, 0), expect: make(map[uint64]int), actual: make(map[uint64][]ProgramEvent), } + + op.Service, op.engine = services.Config{ + Name: "OrderedParser", + Start: op.start, + Close: op.close, + }.NewServiceEngine(lggr) + + return op } func (p *orderedParser) ExpectBlock(block uint64) { @@ -333,71 +362,97 @@ func (p *orderedParser) Process(event ProgramEvent) error { p.mu.Lock() defer p.mu.Unlock() - meetsExpectations, err := p.addAndCompareExpectations(event) - if err != nil { - return err - } - - // incoming event does not meet expectations for transaction - // event is added to actual and no error is returned - if !meetsExpectations { + if err := p.addToExpectations(event); err != nil { + // TODO: log error because this is an unrecoverable error return nil } - p.clearEmptyBlocks() - p.setReady(event.SlotNumber) - return p.sendReadySlots() } -func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) { +func (p *orderedParser) start(_ context.Context) error { + p.engine.GoTick(services.NewTicker(time.Second), p.run) + + return nil +} + +func (p *orderedParser) close() error { + return nil +} + +func (p *orderedParser) addToExpectations(evt ProgramEvent) error { expectations, ok := p.expect[evt.SlotNumber] if !ok { - return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) } evts, ok := p.actual[evt.SlotNumber] if !ok { - return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + return fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) } p.actual[evt.SlotNumber] = append(evts, evt) - return expectations == len(evts)+1, nil -} - -func (p *orderedParser) clearEmptyBlocks() { - rmvIdx := make([]int, 0) - - for idx, block := range p.blocks { - exp, ok := p.expect[block] - if !ok { - // transaction expectations have not been applied for this block yet - continue - } + if expectations == len(evts)+1 { + p.setReady(evt.SlotNumber) + } - if exp == 0 { - rmvIdx = append(rmvIdx, idx) + return nil +} - delete(p.expect, block) - delete(p.actual, block) - } +func (p *orderedParser) expectations(block uint64) (int, bool, error) { + expectations, ok := p.expect[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) } - for count, idx := range rmvIdx { - p.blocks = remove(p.blocks, idx-count) + evts, ok := p.actual[block] + if !ok { + return 0, false, fmt.Errorf("%w: %d", errExpectationsNotSet, block) } + + return expectations, expectations == len(evts), nil +} + +func (p *orderedParser) clearExpectations(block uint64) { + delete(p.expect, block) + delete(p.actual, block) } func (p *orderedParser) setReady(slot uint64) { p.ready = append(p.ready, slot) } +func (p *orderedParser) run(_ context.Context) { + p.mu.Lock() + defer p.mu.Unlock() + + _ = p.sendReadySlots() +} + func (p *orderedParser) sendReadySlots() error { rmvIdx := make([]int, 0) // start at the lowest block and find ready blocks for idx, block := range p.blocks { + // if no expectations are set, we are still waiting on information for the block. + // if expectations set and not met, we are still waiting on information for the block + // no other block data should be sent until this is resolved + exp, met, err := p.expectations(block) + if err != nil || !met { + break + } + + // if expectations are 0 -> remove and continue + if exp == 0 { + p.clearExpectations(block) + rmvIdx = append(rmvIdx, idx) + + continue + } + + // if expectations set and met -> forward, remove, and continue + // to ensure ordered delivery, break from the loop if a ready block isn't found // this function should be preceded by clearEmptyBlocks rIdx, ok := getIdx(p.ready, block) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index d752b0054..166ed9b24 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -33,6 +33,8 @@ var ( ) func TestEncodedLogCollector_StartClose(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) ctx := tests.Context(t) @@ -43,6 +45,8 @@ func TestEncodedLogCollector_StartClose(t *testing.T) { } func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -54,39 +58,56 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { require.NoError(t, collector.Close()) }) - slot := uint64(42) - sig := solana.Signature{2, 1, 4, 2} - blockHeight := uint64(21) + var latest atomic.Uint64 - client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slot, - }, - }, - }, nil) + latest.Store(uint64(40)) - client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val == slot - }), mock.Anything).Return(rpc.BlocksResult{slot}, nil) + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)) - client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, - }, - }, - }, - Signatures: []solana.Signature{sig}, - BlockHeight: &blockHeight, - }, nil).Twice() + client.EXPECT(). + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + height := slot - 1 + + result := rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + } + + _, _ = rand.Read(result.Blockhash[:]) + + if slot == 42 { + var sig solana.Signature + _, _ = rand.Read(sig[:]) + + result.Signatures = []solana.Signature{sig} + result.Transactions = []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + } + } + + return &result, nil + }) tests.AssertEventually(t, func() bool { return parser.Called() }) - - client.AssertExpectations(t) } func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { @@ -118,39 +139,19 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { client.EXPECT(). GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). - RunAndReturn(func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { - defer func() { - latest.Store(latest.Load() + 2) - }() - - return &rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: latest.Load(), - }, - }, - }, nil - }) + RunAndReturn(latestBlockhashReturnFunc(&latest)) client.EXPECT(). - GetBlocks(mock.Anything, mock.MatchedBy(func(val uint64) bool { - return val > uint64(0) - }), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val <= latest.Load() - }), mock.Anything). - RunAndReturn(func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { - blocks := make([]uint64, *u2-u1+1) - for idx := range blocks { - blocks[idx] = u1 + uint64(idx) - } - - return rpc.BlocksResult(blocks), nil - }) + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(false)) client.EXPECT(). - GetBlockWithOpts(mock.Anything, mock.MatchedBy(func(val uint64) bool { - return true - }), mock.Anything). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { slotIdx := -1 for idx, slt := range slots { @@ -227,6 +228,8 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { } func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { + t.Parallel() + client := new(mocks.RPCClient) parser := new(testParser) ctx := tests.Context(t) @@ -241,65 +244,91 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { pubKey := solana.PublicKey{2, 1, 4, 2} slots := []uint64{44, 43, 42} sigs := make([]solana.Signature, len(slots)*2) - blockHeights := []uint64{21, 22, 23, 50} for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) } + var latest atomic.Uint64 + + latest.Store(uint64(40)) + // GetLatestBlockhash might be called at start-up; make it take some time because the result isn't needed for this test - client.EXPECT().GetLatestBlockhash(mock.Anything, mock.Anything).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slots[0], - }, - }, - Value: &rpc.LatestBlockhashResult{ - LastValidBlockHeight: 42, - }, - }, nil).After(2 * time.Second).Maybe() + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(latestBlockhashReturnFunc(&latest)). + After(2 * time.Second). + Maybe() client.EXPECT(). - GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.MatchedBy(func(opts *rpc.GetSignaturesForAddressOpts) bool { - return opts != nil && opts.Before.String() == solana.Signature{}.String() - })). - Return([]*rpc.TransactionSignature{ - {Slot: slots[0], Signature: sigs[0]}, - {Slot: slots[0], Signature: sigs[1]}, - {Slot: slots[1], Signature: sigs[2]}, - {Slot: slots[1], Signature: sigs[3]}, - {Slot: slots[2], Signature: sigs[4]}, - {Slot: slots[2], Signature: sigs[5]}, - }, nil) - - client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything).Return([]*rpc.TransactionSignature{}, nil) - - for idx := range len(slots) { - client.EXPECT().GetBlockWithOpts(mock.Anything, slots[idx], mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + GetBlocks( + mock.Anything, + mock.MatchedBy(getBlocksStartValMatcher), + mock.MatchedBy(getBlocksEndValMatcher(&latest)), + rpc.CommitmentFinalized, + ). + RunAndReturn(getBlocksReturnFunc(true)) + + client.EXPECT(). + GetSignaturesForAddressWithOpts(mock.Anything, pubKey, mock.Anything). + RunAndReturn(func(_ context.Context, pk solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) { + ret := []*rpc.TransactionSignature{} + + if opts != nil && opts.Before.String() == (solana.Signature{}).String() { + for idx := range slots { + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[idx*2]}) + ret = append(ret, &rpc.TransactionSignature{Slot: slots[idx], Signature: sigs[(idx*2)+1]}) + } + } + + return ret, nil + }) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + idx := -1 + for sIdx, slt := range slots { + if slt == slot { + idx = sIdx + + break + } + } + + height := slot - 1 + + if idx == -1 { + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, - }, - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, }, }, - }, - Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, - BlockHeight: &blockHeights[idx], - }, nil).Twice() - } + Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, + BlockHeight: &height, + }, nil + }) assert.NoError(t, collector.BackfillForAddress(ctx, pubKey.String(), 42)) tests.AssertEventually(t, func() bool { return parser.Count() == 6 }) - - client.AssertExpectations(t) } func BenchmarkEncodedLogCollector(b *testing.B) { @@ -516,3 +545,47 @@ func (p *testParser) Events() []logpoller.ProgramEvent { return p.events } + +func latestBlockhashReturnFunc(latest *atomic.Uint64) func(context.Context, rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + return func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + defer func() { + latest.Store(latest.Load() + 2) + }() + + return &rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: latest.Load(), + }, + }, + Value: &rpc.LatestBlockhashResult{ + LastValidBlockHeight: latest.Load() - 1, + }, + }, nil + } +} + +func getBlocksReturnFunc(empty bool) func(context.Context, uint64, *uint64, rpc.CommitmentType) (rpc.BlocksResult, error) { + return func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { + blocks := []uint64{} + + if !empty { + blocks = make([]uint64, *u2-u1+1) + for idx := range blocks { + blocks[idx] = u1 + uint64(idx) + } + } + + return rpc.BlocksResult(blocks), nil + } +} + +func getBlocksStartValMatcher(val uint64) bool { + return val > uint64(0) +} + +func getBlocksEndValMatcher(latest *atomic.Uint64) func(*uint64) bool { + return func(val *uint64) bool { + return val != nil && *val <= latest.Load() + } +}