Skip to content

Commit

Permalink
Add a test for checkpoint in checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Sep 2, 2024
1 parent 7488738 commit fdabcd1
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/checksum/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ func (c *Checker) RecentValue() string {
}

func (c *Checker) GetLowWatermark() (string, error) {
if c.chunker == nil {
return "", errors.New("chunker not initialized")
}
return c.chunker.GetLowWatermark()
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/checksum/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,36 @@ func TestChangeDataTypeDatetime(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background())) // fails
}

func TestFromWatermark(t *testing.T) {
testutils.RunSQL(t, "DROP TABLE IF EXISTS tfromwatermark, _tfromwatermark_new, _tfromwatermark_chkpnt")
testutils.RunSQL(t, "CREATE TABLE tfromwatermark (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_new (a INT NOT NULL, b INT, c INT, PRIMARY KEY (a))")
testutils.RunSQL(t, "CREATE TABLE _tfromwatermark_chkpnt (a INT)") // for binlog advancement
testutils.RunSQL(t, "INSERT INTO tfromwatermark VALUES (1, 2, 3)")
testutils.RunSQL(t, "INSERT INTO _tfromwatermark_new VALUES (1, 2, 3)")

db, err := dbconn.New(testutils.DSN(), dbconn.NewDBConfig())
assert.NoError(t, err)

t1 := table.NewTableInfo(db, "test", "tfromwatermark")
assert.NoError(t, t1.SetInfo(context.TODO()))
t2 := table.NewTableInfo(db, "test", "_tfromwatermark_new")
assert.NoError(t, t2.SetInfo(context.TODO()))
logger := logrus.New()

cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
feed := repl.NewClient(db, cfg.Addr, t1, t2, cfg.User, cfg.Passwd, &repl.ClientConfig{
Logger: logger,
Concurrency: 4,
TargetBatchTime: time.Second,
})
assert.NoError(t, feed.Run())

config := NewCheckerDefaultConfig()
config.Watermark = "{\"Key\":[\"a\"],\"ChunkSize\":1000,\"LowerBound\":{\"Value\": [\"2\"],\"Inclusive\":true},\"UpperBound\":{\"Value\": [\"3\"],\"Inclusive\":false}}"
checker, err := NewChecker(db, t1, t2, feed, config)
assert.NoError(t, err)
assert.NoError(t, checker.Run(context.Background()))
}
2 changes: 1 addition & 1 deletion pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (

// These are really consts, but set to var for testing.
var (
checkpointDumpInterval = 2 * time.Second
checkpointDumpInterval = 50 * time.Second
tableStatUpdateInterval = 5 * time.Minute
statusInterval = 30 * time.Second
sentinelCheckInterval = 1 * time.Second
Expand Down
69 changes: 69 additions & 0 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,75 @@ func TestCheckpointRestore(t *testing.T) {
assert.True(t, r2.usedResumeFromCheckpoint)
}

func TestCheckpointResumeDuringChecksum(t *testing.T) {
tbl := `CREATE TABLE cptresume (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
id2 INT NOT NULL,
pad VARCHAR(100) NOT NULL default 0)`
cfg, err := mysql.ParseDSN(testutils.DSN())
assert.NoError(t, err)
testutils.RunSQL(t, `DROP TABLE IF EXISTS cptresume, _cptresume_new, _cptresume_chkpnt, _cptresume_sentinel`)
testutils.RunSQL(t, tbl)
testutils.RunSQL(t, `CREATE TABLE _cptresume_sentinel (id INT NOT NULL PRIMARY KEY)`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM dual`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume`)
testutils.RunSQL(t, `insert into cptresume (id2,pad) SELECT 1, REPEAT('a', 100) FROM cptresume a JOIN cptresume b JOIN cptresume c`)

r, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)

// Call r.Run() with our context in a go-routine.
// When we see that we are waiting on the sentinel table,
// we then manually start the first bits of checksum, and then close()
// We should be able to resume from the checkpoint into the checksum state.
ctx, cancel := context.WithCancel(context.Background())
// it will error as the context is cancelled
go r.Run(ctx) //nolint:errcheck
for {
// Wait for the sentinel table.
if r.getCurrentState() >= stateWaitingOnSentinelTable {
break
}
time.Sleep(time.Millisecond)
}

assert.NoError(t, r.checksum(context.TODO())) // run the checksum, the original Run is blocked on sentinel.
assert.NoError(t, r.dumpCheckpoint(context.TODO())) // dump a checkpoint with the watermark.
cancel() // unblock the original waiting on sentinel.
assert.NoError(t, r.Close()) // close the run.

// drop the sentinel table.
testutils.RunSQL(t, `DROP TABLE _cptresume_sentinel`)

// Start again as a new runner,
r2, err := NewRunner(&Migration{
Host: cfg.Addr,
Username: cfg.User,
Password: cfg.Passwd,
Database: cfg.DBName,
Threads: 4,
TargetChunkTime: 100 * time.Millisecond,
Table: "cptresume",
Alter: "ENGINE=InnoDB",
Checksum: true,
})
assert.NoError(t, err)
err = r2.Run(context.Background())
assert.NoError(t, err)
assert.True(t, r2.usedResumeFromCheckpoint)
assert.NotEmpty(t, r2.checksumWatermark) // it had a checksum watermark
}

func TestCheckpointDifferentRestoreOptions(t *testing.T) {
tbl := `CREATE TABLE cpt1difft1 (
id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
Expand Down

0 comments on commit fdabcd1

Please sign in to comment.