diff --git a/pkg/checksum/checker.go b/pkg/checksum/checker.go index 790b336..5fc5e5b 100644 --- a/pkg/checksum/checker.go +++ b/pkg/checksum/checker.go @@ -170,6 +170,9 @@ func (c *Checker) RecentValue() string { } func (c *Checker) GetLowWatermark() (string, error) { + if c.chunker == nil { + return "", errors.New("chunker not initialized") + } return c.chunker.GetLowWatermark() } diff --git a/pkg/checksum/checker_test.go b/pkg/checksum/checker_test.go index 76f5d91..abeca61 100644 --- a/pkg/checksum/checker_test.go +++ b/pkg/checksum/checker_test.go @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checker.Run(context.Background())) // fails } + +func TestFromWatermark(t *testing.T) { + testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt") + testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))") + testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement + testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)") + testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)") + + db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig()) + assert.NoError(t, err) + + t1 := table.NewTableInfo(db, "test", "tfromwatermark") + assert.NoError(t, t1.SetInfo(context.TODO())) + t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new") + assert.NoError(t, t2.SetInfo(context.TODO())) + logger := logrus.New() + + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{ + Logger: logger, + Concurrency: 4, + TargetBatchTime: time.Second, + }) + assert.NoError(t, feed.Run()) + + config := NewCheckerDefaultConfig() + config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}" + checker, err := NewChecker(db, t1, t2, feed, config) + assert.NoError(t, err) + assert.NoError(t, checker.Run(context.Background())) +} diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index 58dbf84..01609a1 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -42,7 +42,7 @@ const ( // These are really consts, but set to var for testing. var ( - checkpointDumpInterval = 2 * time.Second + checkpointDumpInterval = 50 * time.Second tableStatUpdateInterval = 5 * time.Minute statusInterval = 30 * time.Second sentinelCheckInterval = 1 * time.Second diff --git a/pkg/migration/runner_test.go b/pkg/migration/runner_test.go index ca98c49..adaac15 100644 --- a/pkg/migration/runner_test.go +++ b/pkg/migration/runner_test.go @@ -955,6 +955,75 @@ func TestCheckpointRestore(t *testing.T) { assert.True(t, r2.usedResumeFromCheckpoint) } +func TestCheckpointResumeDuringChecksum(t *testing.T) { + tbl := `CREATE TABLE cptresume ( + id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + id2 INT NOT NULL, + pad VARCHAR(100) NOT NULL default 0)` + cfg, err := mysql.ParseDSN(testutils.DSN()) + assert.NoError(t, err) + testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`) + testutils.RunSQL(t, tbl) + testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`) + testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`) + + r, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + + // Call r.Run() with our context in a go-routine. + // When we see that we are waiting on the sentinel table, + // we then manually start the first bits of checksum, and then close() + // We should be able to resume from the checkpoint into the checksum state. + ctx, cancel := context.WithCancel(context.Background()) + // it will error as the context is cancelled + go r.Run(ctx) //nolint:errcheck + for { + // Wait for the sentinel table. + if r.getCurrentState() >= stateWaitingOnSentinelTable { + break + } + time.Sleep(time.Millisecond) + } + + assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel. + assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark. + cancel() // unblock the original waiting on sentinel. + assert.NoError(t, r.Close()) // close the run. + + // drop the sentinel table. + testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`) + + // Start again as a new runner, + r2, err := NewRunner(&Migration{ + Host: cfg.Addr, + Username: cfg.User, + Password: cfg.Passwd, + Database: cfg.DBName, + Threads: 4, + TargetChunkTime: 100 * time.Millisecond, + Table: "cptresume", + Alter: "ENGINE=InnoDB", + Checksum: true, + }) + assert.NoError(t, err) + err = r2.Run(context.Background()) + assert.NoError(t, err) + assert.True(t, r2.usedResumeFromCheckpoint) + assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark +} + func TestCheckpointDifferentRestoreOptions(t *testing.T) { tbl := `CREATE TABLE cpt1difft1 ( id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,