From e187ff5eba103e96618626d326a205accf21333c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 3 Jan 2025 12:03:03 +0800 Subject: [PATCH] tests(ticdc): fix bank test (#11407) (#11809) close pingcap/tiflow#11806 --- cdc/owner/ddl_manager.go | 2 + tests/integration_tests/bank/case.go | 70 +++++++++++++++++++++++++++- tests/integration_tests/bank/run.sh | 4 +- 3 files changed, 74 insertions(+), 2 deletions(-) diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 1185f3538b0..220a4d54fdd 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -210,6 +210,8 @@ func (m *ddlManager) tick( } if job != nil && job.BinlogInfo != nil { + // Note: do not change the key words in the log, it is used to search the + // FinishTS of the DDL job. Some integration tests and users depend on it. log.Info("handle a ddl job", zap.String("namespace", m.changfeedID.Namespace), zap.String("ID", m.changfeedID.ID), diff --git a/tests/integration_tests/bank/case.go b/tests/integration_tests/bank/case.go index 6b00903dfd5..1c25fdc5f15 100644 --- a/tests/integration_tests/bank/case.go +++ b/tests/integration_tests/bank/case.go @@ -14,6 +14,7 @@ package main import ( + "bufio" "context" "database/sql" "encoding/json" @@ -21,6 +22,10 @@ import ( "io" "math/rand" "net/http" + "os" + "path/filepath" + "regexp" + "strconv" "strings" "sync/atomic" "time" @@ -623,7 +628,8 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tidbAPIEndpoint, log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err())) return 0, ctx.Err() case <-time.After(2 * time.Second): - result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) + // result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName) + result, ok := tryGetEndTsFromLog(db, tableName) if ok { return result, nil } @@ -675,3 +681,65 @@ func tryGetEndTs(db *sql.DB, tidbAPIEndpoint, tableName string) (result uint64, zap.Uint64("ts", ddlJob[0].Binlog.FinishedTS)) return ddlJob[0].Binlog.FinishedTS, true } + +func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) { + log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName)) + + logFilePath := "/tmp/tidb_cdc_test/bank" + cdcLogFiles := make([]string, 0) + // walk all file with cdc prefix + err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if !d.IsDir() { + if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") { + cdcLogFiles = append(cdcLogFiles, path) + fmt.Println(path) + } + } + return nil + }) + if err != nil { + log.Error("Failed to walk dir: %v", zap.Error(err)) + } + log.Info("total files", zap.Any("file", cdcLogFiles)) + + logRegex := regexp.MustCompile(`handle a ddl job`) + tableNameRegex := regexp.MustCompile(tableName + "`") + timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`) + for _, f := range cdcLogFiles { + file, err := os.Open(f) + if err != nil { + log.Error("Failed to open file: %v", zap.Error(err)) + } + defer file.Close() + + reader := bufio.NewReader(file) + for { + bs, _, err := reader.ReadLine() + if err != nil { + if err != io.EOF { + fmt.Printf("Error reading file: %v\n", err) + } + return 0, false + } + line := string(bs) + if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) { + continue + } + + matches := timeStampRegex.FindStringSubmatch(line) + if len(matches) > 1 { + fmt.Println("found first match line, Match Result: ", matches[1], ", line: ", line) + // convert to uint64 + result, err := strconv.ParseUint(matches[1], 10, 64) + if err != nil { + log.Error("Failed to parse uint64: %v", zap.Error(err)) + } + return result, true + } + } + } + return 0, false +} diff --git a/tests/integration_tests/bank/run.sh b/tests/integration_tests/bank/run.sh index e0c1a5cd6de..36cee26803f 100644 --- a/tests/integration_tests/bank/run.sh +++ b/tests/integration_tests/bank/run.sh @@ -20,8 +20,10 @@ function prepare() { run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down" + run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400" } trap stop_tidb_cluster EXIT