Skip to content

Commit

Permalink
Add resume from checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Sep 2, 2024
1 parent 31b5fad commit 7488738
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 52 deletions.
74 changes: 47 additions & 27 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ import (

type Checker struct {
sync.Mutex
table *table.TableInfo
newTable *table.TableInfo
concurrency int
feed *repl.Client
db *sql.DB
trxPool *dbconn.TrxPool
isInvalid bool
chunker table.Chunker
startTime time.Time
ExecTime time.Duration
recentValue interface{} // used for status
dbConfig *dbconn.DBConfig
logger loggers.Advanced
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
table *table.TableInfo
newTable *table.TableInfo
concurrency int
feed *repl.Client
db *sql.DB
trxPool *dbconn.TrxPool
isInvalid bool
chunker table.Chunker
startTime time.Time
ExecTime time.Duration
recentValue interface{} // used for status
dbConfig *dbconn.DBConfig
logger loggers.Advanced
fixDifferences bool
differencesFound atomic.Uint64
recopyLock sync.Mutex
isResumeFromCheckpoint bool
}

type CheckerConfig struct {
Expand All @@ -49,6 +50,7 @@ type CheckerConfig struct {
DBConfig *dbconn.DBConfig
Logger loggers.Advanced
FixDifferences bool
Watermark string // optional; defines a watermark to start from
}

func NewCheckerDefaultConfig() *CheckerConfig {
Expand Down Expand Up @@ -76,16 +78,25 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c
if err != nil {
return nil, err
}
// If there is a watermark, we need to open the chunker at that watermark.
// Overwrite the previously attached chunker with a new one.
if config.Watermark != "" {
config.Logger.Warnf("opening checksum chunker at watermark: %s", config.Watermark)
if err := chunker.OpenAtWatermark(config.Watermark, newTable.MaxValue()); err != nil {
return nil, err
}
}
checksum := &Checker{
table: tbl,
newTable: newTable,
concurrency: config.Concurrency,
db: db,
feed: feed,
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
table: tbl,
newTable: newTable,
concurrency: config.Concurrency,
db: db,
feed: feed,
chunker: chunker,
dbConfig: config.DBConfig,
logger: config.Logger,
fixDifferences: config.FixDifferences,
isResumeFromCheckpoint: config.Watermark != "",
}
return checksum, nil
}
Expand Down Expand Up @@ -158,6 +169,10 @@ func (c *Checker) RecentValue() string {
return fmt.Sprintf("%v", c.recentValue)
}

func (c *Checker) GetLowWatermark() (string, error) {
return c.chunker.GetLowWatermark()
}

func (c *Checker) inspectDifferences(trx *sql.Tx, chunk *table.Chunk) error {
sourceSubquery := fmt.Sprintf("SELECT CRC32(CONCAT(%s)) as row_checksum, %s FROM %s WHERE %s",
c.intersectColumns(),
Expand Down Expand Up @@ -362,8 +377,13 @@ func (c *Checker) Run(ctx context.Context) error {
defer func() {
c.ExecTime = time.Since(c.startTime)
}()
if err := c.chunker.Open(); err != nil {
return err
// Open the chunker if it's not open.
// It will already be open if this is a resume from checkpoint.
// This is a little annoying, but just the way the chunker API works.
if !c.isResumeFromCheckpoint {
if err := c.chunker.Open(); err != nil {
return err
}
}
c.Unlock()

Expand Down
78 changes: 56 additions & 22 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

// These are really consts, but set to var for testing.
var (
checkpointDumpInterval = 50 * time.Second
checkpointDumpInterval = 2 * time.Second
tableStatUpdateInterval = 5 * time.Minute
statusInterval = 30 * time.Second
sentinelCheckInterval = 1 * time.Second
Expand Down Expand Up @@ -92,6 +92,9 @@ type Runner struct {
checker *checksum.Checker
checkerLock sync.Mutex

// used to recover direct to checksum.
checksumWatermark string

// Track some key statistics.
startTime time.Time
sentinelWaitStartTime time.Time
Expand Down Expand Up @@ -252,7 +255,9 @@ func (r *Runner) Run(originalCtx context.Context) error {
go r.dumpCheckpointContinuously(ctx) // start periodically dumping the checkpoint.

// Perform the main copy rows task. This is where the majority
// of migrations usually spend time.
// of migrations usually spend time. It is not strictly necessary,
// but we always recopy the last-bit, even if we are resuming
// partially through the checksum.
r.setCurrentState(stateCopyRows)
if err := r.copier.Run(ctx); err != nil {
return err
Expand Down Expand Up @@ -630,7 +635,16 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error {
if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, cpName); err != nil {
return err
}
if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL AUTO_INCREMENT PRIMARY KEY, low_watermark TEXT, binlog_name VARCHAR(255), binlog_pos INT, rows_copied BIGINT, rows_copied_logical BIGINT, alter_statement TEXT)",
if err := dbconn.Exec(ctx, r.db, `CREATE TABLE %n.%n (
id int NOT NULL AUTO_INCREMENT PRIMARY KEY,
copier_watermark TEXT,
checksum_watermark TEXT,
binlog_name VARCHAR(255),
binlog_pos INT,
rows_copied BIGINT,
rows_copied_logical BIGINT,
alter_statement TEXT
)`,
r.table.SchemaName, cpName); err != nil {
return err
}
Expand Down Expand Up @@ -740,17 +754,21 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
// Make sure we can read from the new table.
if err := dbconn.Exec(ctx, r.db, "SELECT * FROM %n.%n LIMIT 1",
r.migration.Database, newName); err != nil {
return fmt.Errorf("could not read from table '%s'", newName)
return fmt.Errorf("could not find any checkpoints in table '%s'", newName)
}

query := fmt.Sprintf("SELECT low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
// We intentionally SELECT * FROM the checkpoint table because if the structure
// changes, we want this operation to fail. This will indicate that the checkpoint
// was created by either an earlier or later version of spirit, in which case
// we do not support recovery.
query := fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY id DESC LIMIT 1",
r.migration.Database, cpName)
var lowWatermark, binlogName, alterStatement string
var binlogPos int
var copierWatermark, binlogName, alterStatement string
var id, binlogPos int
var rowsCopied, rowsCopiedLogical uint64
err := r.db.QueryRow(query).Scan(&lowWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
err := r.db.QueryRow(query).Scan(&id, &copierWatermark, &r.checksumWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement)
if err != nil {
return fmt.Errorf("could not read from table '%s'", cpName)
return fmt.Errorf("could not read from table '%s', err:%v", cpName, err)
}
if r.migration.Alter != alterStatement {
return ErrMismatchedAlter
Expand All @@ -777,8 +795,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
Logger: r.logger,
MetricsSink: r.metricsSink,
DBConfig: r.dbConfig,
}, lowWatermark, rowsCopied, rowsCopiedLogical)

}, copierWatermark, rowsCopied, rowsCopiedLogical)
if err != nil {
return err
}
Expand All @@ -796,9 +813,6 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
})

r.checkpointTable = table.NewTableInfo(r.db, r.table.SchemaName, cpName)
if err != nil {
return err
}

// Start the replClient now. This is because if the checkpoint is so old there
// are no longer binary log files, we want to abandon resume-from-checkpoint
Expand All @@ -808,7 +822,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error {
r.logger.Warnf("resuming from checkpoint failed because resuming from the previous binlog position failed. log-file: %s log-pos: %d", binlogName, binlogPos)
return err
}
r.logger.Warnf("resuming from checkpoint. low-watermark: %s log-file: %s log-pos: %d copy-rows: %d", lowWatermark, binlogName, binlogPos, rowsCopied)
r.logger.Warnf("resuming from checkpoint. copier-watermark: %s checksum-watermark: %s log-file: %s log-pos: %d copy-rows: %d", copierWatermark, r.checksumWatermark, binlogName, binlogPos, rowsCopied)
r.usedResumeFromCheckpoint = true
return nil
}
Expand All @@ -821,20 +835,24 @@ func (r *Runner) checksum(ctx context.Context) error {
// - background flushing
// - checkpoint thread
// - checksum "replaceChunk" DB connections
// Handle a case in the tests not having a dbConfig
// Handle a case just in the tests not having a dbConfig
if r.dbConfig == nil {
r.dbConfig = dbconn.NewDBConfig()
}
r.db.SetMaxOpenConns(r.dbConfig.MaxOpenConnections + 2)
var err error
for i := range 3 { // try the checksum up to 3 times.
if i > 0 {
r.checksumWatermark = "" // reset the watermark if we are retrying.
}
r.checkerLock.Lock()
r.checker, err = checksum.NewChecker(r.db, r.table, r.newTable, r.replClient, &checksum.CheckerConfig{
Concurrency: r.migration.Threads,
TargetChunkTime: r.migration.TargetChunkTime,
DBConfig: r.dbConfig,
Logger: r.logger,
FixDifferences: true, // we want to repair the differences.
Watermark: r.checksumWatermark,
})
r.checkerLock.Unlock()
if err != nil {
Expand Down Expand Up @@ -881,26 +899,42 @@ func (r *Runner) setCurrentState(s migrationState) {
atomic.StoreInt32((*int32)(&r.currentState), int32(s))
}

// dumpCheckpoint is called approximately every minute.
// It writes the current state of the migration to the checkpoint table,
// which can be used in recovery. Previously resuming from checkpoint
// would always restart at the copier, but it can now also resume at
// the checksum phase.
func (r *Runner) dumpCheckpoint(ctx context.Context) error {
// Retrieve the binlog position first and under a mutex.
// Currently, it never advances, but it's possible it might in future
// and this race condition is missed.
binlog := r.replClient.GetBinlogApplyPosition()
lowWatermark, err := r.copier.GetLowWatermark()
copierWatermark, err := r.copier.GetLowWatermark()
if err != nil {
return err // it might not be ready, we can try again.
}
// We only dump the checksumWatermark if we are in >= checksum state.
// We require a mutex because the checker can be replaced during
// operation, leaving a race condition.
var checksumWatermark string
if r.getCurrentState() >= stateChecksum {
r.checkerLock.Lock()
checksumWatermark, err = r.checker.GetLowWatermark()
r.checkerLock.Unlock()
if err != nil {
return err
}
}
copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount)
logicalCopyRows := atomic.LoadUint64(&r.copier.CopyRowsLogicalCount)
// Note: when we dump the lowWatermark to the log, we are exposing the PK values,
// when using the composite chunker are based on actual user-data.
// We believe this is OK but may change it in the future. Please do not
// add any other fields to this log line.
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", lowWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?)",
r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", copierWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows)
return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?, %?)",
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
lowWatermark,
copierWatermark,
checksumWatermark,
binlog.Name,
binlog.Pos,
copyRows,
Expand Down
17 changes: 14 additions & 3 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,9 +924,20 @@ func TestCheckpointRestore(t *testing.T) {
// from issue #125
watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}"
binlog := r.replClient.GetBinlogApplyPosition()
query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)",
r.checkpointTable.QuotedName)
_, err = r.db.ExecContext(context.TODO(), query, watermark, binlog.Name, binlog.Pos, 0, 0, r.migration.Alter)
err = dbconn.Exec(context.TODO(), r.db, `INSERT INTO %n.%n
(copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement)
VALUES
(%?, %?, %?, %?, %?, %?, %?)`,
r.checkpointTable.SchemaName,
r.checkpointTable.TableName,
watermark,
"",
binlog.Name,
binlog.Pos,
0,
0,
r.migration.Alter,
)
assert.NoError(t, err)

r2, err := NewRunner(&Migration{
Expand Down

0 comments on commit 7488738

Please sign in to comment.