From cab4f931d64f405fc5630a33a118544aed4a29b8 Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 11 Mar 2024 11:17:34 +0100 Subject: [PATCH] add manual GC loop that cleans up memory before next epoch --- pkg/coordinator/coordinator.go | 72 ++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/pkg/coordinator/coordinator.go b/pkg/coordinator/coordinator.go index 7d278c6..c3c63ab 100644 --- a/pkg/coordinator/coordinator.go +++ b/pkg/coordinator/coordinator.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "runtime" "runtime/debug" "sort" "sync" @@ -11,6 +12,7 @@ import ( "github.com/ethpandaops/assertoor/pkg/coordinator/buildinfo" "github.com/ethpandaops/assertoor/pkg/coordinator/clients" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients/consensus" "github.com/ethpandaops/assertoor/pkg/coordinator/logger" "github.com/ethpandaops/assertoor/pkg/coordinator/names" "github.com/ethpandaops/assertoor/pkg/coordinator/test" @@ -138,6 +140,9 @@ func (c *Coordinator) Run(ctx context.Context) error { // start test cleanup routine go c.runTestCleanup(ctx) + // start per epoch GC routine + go c.runEpochGC(ctx) + // run tests c.runTestExecutionLoop(ctx) @@ -542,3 +547,70 @@ func (c *Coordinator) cleanupTestHistory(retentionTime time.Duration) { c.testHistory = cleanedHistory } + +func (c *Coordinator) runEpochGC(ctx context.Context) { + defer func() { + if err := recover(); err != nil { + c.log.GetLogger().WithError(err.(error)).Panicf("uncaught panic in coordinator.runEpochGC: %v, stack: %v", err, string(debug.Stack())) + } + }() + + // await client readiness, which implies cache initialization + if c.clientPool.GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) == nil { + return + } + + genesis := c.clientPool.GetConsensusPool().GetBlockCache().GetGenesis() + specs := c.clientPool.GetConsensusPool().GetBlockCache().GetSpecs() + + for { + var sleepTime time.Duration + + networkTime := time.Since(genesis.GenesisTime) + if networkTime < 0 { + sleepTime = networkTime.Abs() + } else { + currentSlot := uint64(networkTime / specs.SecondsPerSlot) + currentEpoch := currentSlot / specs.SlotsPerEpoch + currentSlotIndex := currentSlot % specs.SlotsPerEpoch + nextGcSlot := uint64(0) + + gcSlotDiff := uint64(2) + if gcSlotDiff > specs.SlotsPerEpoch/2 { + gcSlotDiff = 1 + } + + gcSlotIndex := specs.SlotsPerEpoch - gcSlotDiff - 1 + + if currentSlotIndex == gcSlotIndex { + select { + case <-ctx.Done(): + return + case <-time.After(specs.SecondsPerSlot / 2): + } + + nextEpochDuration := time.Until(genesis.GenesisTime.Add(time.Duration((currentEpoch+1)*specs.SlotsPerEpoch) * specs.SecondsPerSlot)) + + c.log.GetLogger().Infof("run GC (slot %v, %v sec before epoch %v)", currentSlot, nextEpochDuration.Seconds(), currentEpoch+1) + runtime.GC() + + nextGcSlot = currentSlot + specs.SlotsPerEpoch + } else { + if currentSlotIndex < gcSlotIndex { + nextGcSlot = currentSlot + (gcSlotIndex - currentSlotIndex) + } else { + nextGcSlot = currentSlot + (specs.SlotsPerEpoch - currentSlotIndex) + gcSlotIndex + } + } + + nextRunTime := genesis.GenesisTime.Add(time.Duration(nextGcSlot) * specs.SecondsPerSlot) + sleepTime = time.Until(nextRunTime) + } + + select { + case <-ctx.Done(): + return + case <-time.After(sleepTime): + } + } +}