Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Export distribution versions #2014

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions tools/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func ToolCmd() *cobra.Command {
}
toolsCmd.AddCommand(scanner.ScanCmd())
toolsCmd.AddCommand(migration.MigrateCmd())
toolsCmd.AddCommand(migration.MigrateSSCmd())
toolsCmd.AddCommand(migration.VerifyMigrationCmd())
toolsCmd.AddCommand(migration.GenerateStats())
return toolsCmd
Expand Down
39 changes: 39 additions & 0 deletions tools/migration/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,45 @@ func migrateSC(version int64, homeDir string, db dbm.DB) error {
return migrator.Migrate(version)
}

func MigrateSSCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate-ss",
Short: "A tool to migrate full ss distribution module",
Run: executeSS,
}
cmd.PersistentFlags().String("home-dir", "/root/.sei", "Sei home directory")
return cmd
}

func executeSS(cmd *cobra.Command, _ []string) {
homeDir, _ := cmd.Flags().GetString("home-dir")
dataDir := filepath.Join(homeDir, "data")
db, err := dbm.NewGoLevelDB("application", dataDir)
if err != nil {
panic(err)
}
latestVersion := rootmulti.GetLatestVersion(db)
fmt.Printf("latest version: %d\n", latestVersion)

if err = migrateSS(latestVersion, homeDir, db); err != nil {
panic(err)
}
}

func migrateSS(version int64, homeDir string, db dbm.DB) error {
ssConfig := config.DefaultStateStoreConfig()
ssConfig.Enable = true
ssConfig.KeepRecent = 0

stateStore, err := sstypes.NewStateStore(log.NewNopLogger(), homeDir, ssConfig)
if err != nil {
return err
}

migrator := ss.NewMigrator(db, stateStore)
return migrator.Migrate(version, homeDir)
}

func VerifyMigrationCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "verify-migration",
Expand Down
162 changes: 141 additions & 21 deletions tools/migration/ss/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"bytes"
"fmt"
"sync"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -30,47 +31,166 @@
}

func (m *Migrator) Migrate(version int64, homeDir string) error {
// Channel to send RawSnapshotNodes to the importer.
ch := make(chan types.RawSnapshotNode, 1000)
// Channel to capture errors from both goroutines below.
errCh := make(chan error, 2)

// Get the latest key, if any, to resume from
latestKey, err := m.stateStore.GetLatestMigratedKey()
if err != nil {
return fmt.Errorf("failed to get latest key: %w", err)
}

latestModule, err := m.stateStore.GetLatestMigratedModule()
if err != nil {
return fmt.Errorf("failed to get latest module: %w", err)
}

fmt.Println("Starting migration...")
fmt.Printf("Starting migration for 'distribution' module from version %d to %d...\n", 100215000, 106789896)

// Goroutine to iterate through IAVL and export leaf nodes
// Goroutine #1: Export distribution leaf nodes into ch
go func() {
defer close(ch)
errCh <- ExportLeafNodesFromKey(m.iavlDB, ch, latestKey, latestModule)
errCh <- exportDistributionLeafNodes(m.iavlDB, ch, 100215000, 106789896, 30)
}()

// Import nodes into PebbleDB
// Goroutine #2: Import those leaf nodes into PebbleDB
go func() {
errCh <- m.stateStore.RawImport(ch)
}()

// Block until both processes complete
// Wait for both goroutines to complete
for i := 0; i < 2; i++ {
if err := <-errCh; err != nil {
return err
}
}

// Set earliest and latest version in the database
err = m.stateStore.SetEarliestVersion(1, true)
if err != nil {
return err
return nil
}

func exportDistributionLeafNodes(
db dbm.DB,
ch chan<- types.RawSnapshotNode,
startVersion, endVersion int64,
concurrency int,
) error {
fmt.Printf("Starting export at time: %s with concurrency=%d\n", time.Now().Format(time.RFC3339), concurrency)
Fixed Show fixed Hide fixed

if concurrency < 1 {
concurrency = 1
}

return m.stateStore.SetLatestVersion(version)
// We'll collect errors from each goroutine in a separate error channel
errCh := make(chan error, concurrency)

totalVersions := endVersion - startVersion + 1
// Basic integer division to split up the version range
chunkSize := totalVersions / int64(concurrency)
remainder := totalVersions % int64(concurrency)

var wg sync.WaitGroup
wg.Add(concurrency)

startTime := time.Now()
Fixed Show fixed Hide fixed

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
// Atomic or shared counters for tracking exports
var mu sync.Mutex
totalExported := 0

// Helper function that each goroutine will run
workerFunc := func(workerID int, vStart, vEnd int64) {
defer wg.Done()

// Local counter
localExportCount := 0

for ver := vStart; ver <= vEnd; ver++ {
// Load only the distribution module prefix at this version.
tree, err := ReadTree(db, ver, []byte(utils.BuildTreePrefix("distribution")))
if err != nil {
errCh <- fmt.Errorf(
"[worker %d] Error loading distribution tree at version %d: %w",
workerID, ver, err,
)
return
}

var count int
_, err = tree.Iterate(func(key, value []byte) bool {
ch <- types.RawSnapshotNode{
StoreKey: "distribution",
Key: key,
Value: value,
Version: ver,
}
count++
// Use a lock when incrementing total counters
mu.Lock()
totalExported++
mu.Unlock()

// Logging / metrics every 1,000,000 keys in this version subset
if count%1000000 == 0 {
fmt.Printf("[worker %d][%s] Exported %d distribution keys at version %d so far\n",
workerID, time.Now().Format(time.RFC3339), count, ver,
Fixed Show fixed Hide fixed
)
metrics.IncrCounterWithLabels(
[]string{"sei", "migration", "leaf_nodes_exported"},
float32(count),
[]metrics.Label{
{Name: "module", Value: "distribution"},
},
)
}
return false // continue iteration
})
if err != nil {
errCh <- fmt.Errorf(
"[worker %d] Error iterating distribution tree for version %d: %w",
workerID, ver, err,
)
return
}

localExportCount += count

fmt.Printf("[worker %d][%s] Finished versions %d: exported %d distribution keys.\n",
workerID, time.Now().Format(time.RFC3339), ver, localExportCount)
Fixed Show fixed Hide fixed
}

fmt.Printf("[worker %d][%s] Finished versions [%d - %d]: exported %d distribution keys.\n",
workerID, time.Now().Format(time.RFC3339), vStart, vEnd, localExportCount)
Fixed Show fixed Hide fixed
// Signal that we're done successfully (no error).
errCh <- nil
}

// Spawn the workers
var currentVersion int64 = startVersion

Check failure on line 158 in tools/migration/ss/migrator.go

View workflow job for this annotation

GitHub Actions / lint

ST1023: should omit type int64 from declaration; it will be inferred from the right-hand side (stylecheck)
for i := 0; i < concurrency; i++ {
// Each goroutine gets a range: [currentVersion, currentVersion+chunkSize-1]
// plus we handle any remainder in the last chunk(s).
extra := int64(0)
if i < int(remainder) {
extra = 1
}
workerStart := currentVersion
workerEnd := currentVersion + chunkSize + extra - 1
if i == concurrency-1 {
// Make sure the last one includes everything up to endVersion
workerEnd = endVersion
}
currentVersion = workerEnd + 1

go workerFunc(i, workerStart, workerEnd)
Fixed Show fixed Hide fixed
}

// Wait for all goroutines to finish
wg.Wait()

// Check error channel. If any goroutine returned a non-nil error, return that.
close(errCh)
for err := range errCh {
if err != nil {
return err
}
}

fmt.Printf(
"[%s] Completed exporting distribution module from %d to %d. Total keys: %d. Duration: %s\n",
time.Now().Format(time.RFC3339), startVersion, endVersion, totalExported, time.Since(startTime),

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
)
fmt.Printf("Finished export at time: %s\n", time.Now().Format(time.RFC3339))
Fixed Show fixed Hide fixed
return nil
}

func (m *Migrator) Verify(version int64) error {
Expand Down
Loading