Skip to content

Commit

Permalink
add manual GC loop that cleans up memory before next epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Mar 11, 2024
1 parent c730af7 commit cab4f93
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions pkg/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"net/http"
"runtime"
"runtime/debug"
"sort"
"sync"
"time"

"github.com/ethpandaops/assertoor/pkg/coordinator/buildinfo"
"github.com/ethpandaops/assertoor/pkg/coordinator/clients"
"github.com/ethpandaops/assertoor/pkg/coordinator/clients/consensus"
"github.com/ethpandaops/assertoor/pkg/coordinator/logger"
"github.com/ethpandaops/assertoor/pkg/coordinator/names"
"github.com/ethpandaops/assertoor/pkg/coordinator/test"
Expand Down Expand Up @@ -138,6 +140,9 @@ func (c *Coordinator) Run(ctx context.Context) error {
// start test cleanup routine
go c.runTestCleanup(ctx)

// start per epoch GC routine
go c.runEpochGC(ctx)

// run tests
c.runTestExecutionLoop(ctx)

Expand Down Expand Up @@ -542,3 +547,70 @@ func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) {

c.testHistory = cleanedHistory
}

func (c *Coordinator) runEpochGC(ctx context.Context) {
defer func() {
if err := recover(); err != nil {
c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runEpochGC: %v, stack: %v", err, string(debug.Stack()))
}
}()

// await client readiness, which implies cache initialization
if c.clientPool.GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) == nil {
return
}

genesis := c.clientPool.GetConsensusPool().GetBlockCache().GetGenesis()
specs := c.clientPool.GetConsensusPool().GetBlockCache().GetSpecs()

for {
var sleepTime time.Duration

networkTime := time.Since(genesis.GenesisTime)
if networkTime < 0 {
sleepTime = networkTime.Abs()
} else {
currentSlot := uint64(networkTime / specs.SecondsPerSlot)
currentEpoch := currentSlot / specs.SlotsPerEpoch
currentSlotIndex := currentSlot % specs.SlotsPerEpoch
nextGcSlot := uint64(0)

gcSlotDiff := uint64(2)
if gcSlotDiff > specs.SlotsPerEpoch/2 {
gcSlotDiff = 1
}

gcSlotIndex := specs.SlotsPerEpoch - gcSlotDiff - 1

if currentSlotIndex == gcSlotIndex {
select {
case <-ctx.Done():
return
case <-time.After(specs.SecondsPerSlot / 2):
}

nextEpochDuration := time.Until(genesis.GenesisTime.Add(time.Duration((currentEpoch+1)*specs.SlotsPerEpoch) * specs.SecondsPerSlot))

c.log.GetLogger().Infof("run GC (slot %v, %v sec before epoch %v)", currentSlot, nextEpochDuration.Seconds(), currentEpoch+1)
runtime.GC()

nextGcSlot = currentSlot + specs.SlotsPerEpoch
} else {
if currentSlotIndex < gcSlotIndex {
nextGcSlot = currentSlot + (gcSlotIndex - currentSlotIndex)
} else {
nextGcSlot = currentSlot + (specs.SlotsPerEpoch - currentSlotIndex) + gcSlotIndex
}
}

nextRunTime := genesis.GenesisTime.Add(time.Duration(nextGcSlot) * specs.SecondsPerSlot)
sleepTime = time.Until(nextRunTime)
}

select {
case <-ctx.Done():
return
case <-time.After(sleepTime):
}
}
}

0 comments on commit cab4f93

Please sign in to comment.