From 74887385aec34bd1f35f733190a54b3cd3022ba4 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Mon, 2 Sep 2024 09:03:20 -0600 Subject: [PATCH] Add resume from checksum --- pkg/checksum/checker.go | 74 +++++++++++++++++++++------------- pkg/migration/runner.go | 78 ++++++++++++++++++++++++++---------- pkg/migration/runner_test.go | 17 ++++++-- 3 files changed, 117 insertions(+), 52 deletions(-) diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 60c0596..790b336 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -25,22 +25,23 @@ import ( type Checker struct { sync.Mutex - table *table.TableInfo - newTable *table.TableInfo - concurrency int - feed *repl.Client - db *sql.DB - trxPool *dbconn.TrxPool - isInvalid bool - chunker table.Chunker - startTime time.Time - ExecTime time.Duration - recentValue interface{} // used for status - dbConfig *dbconn.DBConfig - logger loggers.Advanced - fixDifferences bool - differencesFound atomic.Uint64 - recopyLock sync.Mutex + table *table.TableInfo + newTable *table.TableInfo + concurrency int + feed *repl.Client + db *sql.DB + trxPool *dbconn.TrxPool + isInvalid bool + chunker table.Chunker + startTime time.Time + ExecTime time.Duration + recentValue interface{} // used for status + dbConfig *dbconn.DBConfig + logger loggers.Advanced + fixDifferences bool + differencesFound atomic.Uint64 + recopyLock sync.Mutex + isResumeFromCheckpoint bool } type CheckerConfig struct { @@ -49,6 +50,7 @@ type CheckerConfig struct { DBConfig *dbconn.DBConfig Logger loggers.Advanced FixDifferences bool + Watermark string // optional; defines a watermark to start from } func NewCheckerDefaultConfig() *CheckerConfig { @@ -76,16 +78,25 @@ func NewChecker(db *sql.DB, tbl, newTable *table.TableInfo, feed *repl.Client, c if err != nil { return nil, err } + // If there is a watermark, we need to open the chunker at that watermark. + // Overwrite the previously attached chunker with a new one. + if config.Watermark != "" { + config.Logger.Warnf("opening checksum chunker at watermark: %s", config.Watermark) + if err := chunker.OpenAtWatermark(config.Watermark, newTable.MaxValue()); err != nil { + return nil, err + } + } checksum := &Checker{ - table: tbl, - newTable: newTable, - concurrency: config.Concurrency, - db: db, - feed: feed, - chunker: chunker, - dbConfig: config.DBConfig, - logger: config.Logger, - fixDifferences: config.FixDifferences, + table: tbl, + newTable: newTable, + concurrency: config.Concurrency, + db: db, + feed: feed, + chunker: chunker, + dbConfig: config.DBConfig, + logger: config.Logger, + fixDifferences: config.FixDifferences, + isResumeFromCheckpoint: config.Watermark != "", } return checksum, nil } @@ -158,6 +169,10 @@ func (c *Checker) RecentValue() string { return fmt.Sprintf("%v", c.recentValue) } +func (c *Checker) GetLowWatermark() (string, error) { + return c.chunker.GetLowWatermark() +} + func (c *Checker) inspectDifferences(trx *sql.Tx, chunk *table.Chunk) error { sourceSubquery := fmt.Sprintf("SELECT CRC32(CONCAT(%s)) as row_checksum, %s FROM %s WHERE %s", c.intersectColumns(), @@ -362,8 +377,13 @@ func (c *Checker) Run(ctx context.Context) error { defer func() { c.ExecTime = time.Since(c.startTime) }() - if err := c.chunker.Open(); err != nil { - return err + // Open the chunker if it's not open. + // It will already be open if this is a resume from checkpoint. + // This is a little annoying, but just the way the chunker API works. + if !c.isResumeFromCheckpoint { + if err := c.chunker.Open(); err != nil { + return err + } } c.Unlock() diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 3de7927..58dbf84 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -42,7 +42,7 @@ const ( // These are really consts, but set to var for testing. var ( - checkpointDumpInterval = 50 * time.Second + checkpointDumpInterval = 2 * time.Second tableStatUpdateInterval = 5 * time.Minute statusInterval = 30 * time.Second sentinelCheckInterval = 1 * time.Second @@ -92,6 +92,9 @@ type Runner struct { checker *checksum.Checker checkerLock sync.Mutex + // used to recover direct to checksum. + checksumWatermark string + // Track some key statistics. startTime time.Time sentinelWaitStartTime time.Time @@ -252,7 +255,9 @@ func (r *Runner) Run(originalCtx context.Context) error { go r.dumpCheckpointContinuously(ctx) // start periodically dumping the checkpoint. // Perform the main copy rows task. This is where the majority - // of migrations usually spend time. + // of migrations usually spend time. It is not strictly necessary, + // but we always recopy the last-bit, even if we are resuming + // partially through the checksum. r.setCurrentState(stateCopyRows) if err := r.copier.Run(ctx); err != nil { return err @@ -630,7 +635,16 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error { if err := dbconn.Exec(ctx, r.db, "DROP TABLE IF EXISTS %n.%n", r.table.SchemaName, cpName); err != nil { return err } - if err := dbconn.Exec(ctx, r.db, "CREATE TABLE %n.%n (id int NOT NULL AUTO_INCREMENT PRIMARY KEY, low_watermark TEXT, binlog_name VARCHAR(255), binlog_pos INT, rows_copied BIGINT, rows_copied_logical BIGINT, alter_statement TEXT)", + if err := dbconn.Exec(ctx, r.db, `CREATE TABLE %n.%n ( + id int NOT NULL AUTO_INCREMENT PRIMARY KEY, + copier_watermark TEXT, + checksum_watermark TEXT, + binlog_name VARCHAR(255), + binlog_pos INT, + rows_copied BIGINT, + rows_copied_logical BIGINT, + alter_statement TEXT + )`, r.table.SchemaName, cpName); err != nil { return err } @@ -740,17 +754,21 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { // Make sure we can read from the new table. if err := dbconn.Exec(ctx, r.db, "SELECT * FROM %n.%n LIMIT 1", r.migration.Database, newName); err != nil { - return fmt.Errorf("could not read from table '%s'", newName) + return fmt.Errorf("could not find any checkpoints in table '%s'", newName) } - query := fmt.Sprintf("SELECT low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement FROM `%s`.`%s` ORDER BY id DESC LIMIT 1", + // We intentionally SELECT * FROM the checkpoint table because if the structure + // changes, we want this operation to fail. This will indicate that the checkpoint + // was created by either an earlier or later version of spirit, in which case + // we do not support recovery. + query := fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY id DESC LIMIT 1", r.migration.Database, cpName) - var lowWatermark, binlogName, alterStatement string - var binlogPos int + var copierWatermark, binlogName, alterStatement string + var id, binlogPos int var rowsCopied, rowsCopiedLogical uint64 - err := r.db.QueryRow(query).Scan(&lowWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement) + err := r.db.QueryRow(query).Scan(&id, &copierWatermark, &r.checksumWatermark, &binlogName, &binlogPos, &rowsCopied, &rowsCopiedLogical, &alterStatement) if err != nil { - return fmt.Errorf("could not read from table '%s'", cpName) + return fmt.Errorf("could not read from table '%s', err:%v", cpName, err) } if r.migration.Alter != alterStatement { return ErrMismatchedAlter @@ -777,8 +795,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { Logger: r.logger, MetricsSink: r.metricsSink, DBConfig: r.dbConfig, - }, lowWatermark, rowsCopied, rowsCopiedLogical) - + }, copierWatermark, rowsCopied, rowsCopiedLogical) if err != nil { return err } @@ -796,9 +813,6 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { }) r.checkpointTable = table.NewTableInfo(r.db, r.table.SchemaName, cpName) - if err != nil { - return err - } // Start the replClient now. This is because if the checkpoint is so old there // are no longer binary log files, we want to abandon resume-from-checkpoint @@ -808,7 +822,7 @@ func (r *Runner) resumeFromCheckpoint(ctx context.Context) error { r.logger.Warnf("resuming from checkpoint failed because resuming from the previous binlog position failed. log-file: %s log-pos: %d", binlogName, binlogPos) return err } - r.logger.Warnf("resuming from checkpoint. low-watermark: %s log-file: %s log-pos: %d copy-rows: %d", lowWatermark, binlogName, binlogPos, rowsCopied) + r.logger.Warnf("resuming from checkpoint. copier-watermark: %s checksum-watermark: %s log-file: %s log-pos: %d copy-rows: %d", copierWatermark, r.checksumWatermark, binlogName, binlogPos, rowsCopied) r.usedResumeFromCheckpoint = true return nil } @@ -821,13 +835,16 @@ func (r *Runner) checksum(ctx context.Context) error { // - background flushing // - checkpoint thread // - checksum "replaceChunk" DB connections - // Handle a case in the tests not having a dbConfig + // Handle a case just in the tests not having a dbConfig if r.dbConfig == nil { r.dbConfig = dbconn.NewDBConfig() } r.db.SetMaxOpenConns(r.dbConfig.MaxOpenConnections + 2) var err error for i := range 3 { // try the checksum up to 3 times. + if i > 0 { + r.checksumWatermark = "" // reset the watermark if we are retrying. + } r.checkerLock.Lock() r.checker, err = checksum.NewChecker(r.db, r.table, r.newTable, r.replClient, &checksum.CheckerConfig{ Concurrency: r.migration.Threads, @@ -835,6 +852,7 @@ func (r *Runner) checksum(ctx context.Context) error { DBConfig: r.dbConfig, Logger: r.logger, FixDifferences: true, // we want to repair the differences. + Watermark: r.checksumWatermark, }) r.checkerLock.Unlock() if err != nil { @@ -881,26 +899,42 @@ func (r *Runner) setCurrentState(s migrationState) { atomic.StoreInt32((*int32)(&r.currentState), int32(s)) } +// dumpCheckpoint is called approximately every minute. +// It writes the current state of the migration to the checkpoint table, +// which can be used in recovery. Previously resuming from checkpoint +// would always restart at the copier, but it can now also resume at +// the checksum phase. func (r *Runner) dumpCheckpoint(ctx context.Context) error { // Retrieve the binlog position first and under a mutex. - // Currently, it never advances, but it's possible it might in future - // and this race condition is missed. binlog := r.replClient.GetBinlogApplyPosition() - lowWatermark, err := r.copier.GetLowWatermark() + copierWatermark, err := r.copier.GetLowWatermark() if err != nil { return err // it might not be ready, we can try again. } + // We only dump the checksumWatermark if we are in >= checksum state. + // We require a mutex because the checker can be replaced during + // operation, leaving a race condition. + var checksumWatermark string + if r.getCurrentState() >= stateChecksum { + r.checkerLock.Lock() + checksumWatermark, err = r.checker.GetLowWatermark() + r.checkerLock.Unlock() + if err != nil { + return err + } + } copyRows := atomic.LoadUint64(&r.copier.CopyRowsCount) logicalCopyRows := atomic.LoadUint64(&r.copier.CopyRowsLogicalCount) // Note: when we dump the lowWatermark to the log, we are exposing the PK values, // when using the composite chunker are based on actual user-data. // We believe this is OK but may change it in the future. Please do not // add any other fields to this log line. - r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", lowWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows) - return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?)", + r.logger.Infof("checkpoint: low-watermark=%s log-file=%s log-pos=%d rows-copied=%d rows-copied-logical=%d", copierWatermark, binlog.Name, binlog.Pos, copyRows, logicalCopyRows) + return dbconn.Exec(ctx, r.db, "INSERT INTO %n.%n (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (%?, %?, %?, %?, %?, %?, %?)", r.checkpointTable.SchemaName, r.checkpointTable.TableName, - lowWatermark, + copierWatermark, + checksumWatermark, binlog.Name, binlog.Pos, copyRows, diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index 59f4be6..ca98c49 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -924,9 +924,20 @@ func TestCheckpointRestore(t *testing.T) { // from issue #125 watermark := "{\"Key\":[\"id\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\":[\"53926425\"],\"Inclusive\":true},\"UpperBound\":{\"Value\":[\"53926425\"],\"Inclusive\":false}}" binlog := r.replClient.GetBinlogApplyPosition() - query := fmt.Sprintf("INSERT INTO %s (low_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) VALUES (?, ?, ?, ?, ?, ?)", - r.checkpointTable.QuotedName) - _, err = r.db.ExecContext(context.TODO(), query, watermark, binlog.Name, binlog.Pos, 0, 0, r.migration.Alter) + err = dbconn.Exec(context.TODO(), r.db, `INSERT INTO %n.%n + (copier_watermark, checksum_watermark, binlog_name, binlog_pos, rows_copied, rows_copied_logical, alter_statement) + VALUES + (%?, %?, %?, %?, %?, %?, %?)`, + r.checkpointTable.SchemaName, + r.checkpointTable.TableName, + watermark, + "", + binlog.Name, + binlog.Pos, + 0, + 0, + r.migration.Alter, + ) assert.NoError(t, err) r2, err := NewRunner(&Migration{