Skip to content

Commit

Permalink
tests(ticdc): fix bank test (#11407) (#11809)
Browse files Browse the repository at this point in the history
close #11806
  • Loading branch information
ti-chi-bot authored Jan 3, 2025
1 parent 61c4ac5 commit e187ff5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (m *ddlManager) tick(
}

if job != nil && job.BinlogInfo != nil {
// Note: do not change the key words in the log, it is used to search the
// FinishTS of the DDL job. Some integration tests and users depend on it.
log.Info("handle a ddl job",
zap.String("namespace", m.changfeedID.Namespace),
zap.String("ID", m.changfeedID.ID),
Expand Down
70 changes: 69 additions & 1 deletion tests/integration_tests/bank/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
package main

import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -623,7 +628,8 @@ func getDownStreamSyncedEndTs(ctx context.Context, db *sql.DB, tidbAPIEndpoint,
log.Error("get downstream sync end ts failed due to timeout", zap.String("table", tableName), zap.Error(ctx.Err()))
return 0, ctx.Err()
case <-time.After(2 * time.Second):
result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
// result, ok := tryGetEndTs(db, tidbAPIEndpoint, tableName)
result, ok := tryGetEndTsFromLog(db, tableName)
if ok {
return result, nil
}
Expand Down Expand Up @@ -675,3 +681,65 @@ func tryGetEndTs(db *sql.DB, tidbAPIEndpoint, tableName string) (result uint64,
zap.Uint64("ts", ddlJob[0].Binlog.FinishedTS))
return ddlJob[0].Binlog.FinishedTS, true
}

func tryGetEndTsFromLog(_ *sql.DB, tableName string) (result uint64, ok bool) {
log.Info("try parse finishedTs from ticdc log", zap.String("tableName", tableName))

logFilePath := "/tmp/tidb_cdc_test/bank"
cdcLogFiles := make([]string, 0)
// walk all file with cdc prefix
err := filepath.WalkDir(logFilePath, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if !d.IsDir() {
if strings.Contains(d.Name(), "down") && strings.Contains(d.Name(), "cdc") && strings.Contains(d.Name(), "log") {
cdcLogFiles = append(cdcLogFiles, path)
fmt.Println(path)
}
}
return nil
})
if err != nil {
log.Error("Failed to walk dir: %v", zap.Error(err))
}
log.Info("total files", zap.Any("file", cdcLogFiles))

logRegex := regexp.MustCompile(`handle a ddl job`)
tableNameRegex := regexp.MustCompile(tableName + "`")
timeStampRegex := regexp.MustCompile(`finishedTs=([0-9]+)`)
for _, f := range cdcLogFiles {
file, err := os.Open(f)
if err != nil {
log.Error("Failed to open file: %v", zap.Error(err))
}
defer file.Close()

reader := bufio.NewReader(file)
for {
bs, _, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
fmt.Printf("Error reading file: %v\n", err)
}
return 0, false
}
line := string(bs)
if !logRegex.MatchString(line) || !tableNameRegex.MatchString(line) {
continue
}

matches := timeStampRegex.FindStringSubmatch(line)
if len(matches) > 1 {
fmt.Println("found first match line, Match Result: ", matches[1], ", line: ", line)
// convert to uint64
result, err := strconv.ParseUint(matches[1], 10, 64)
if err != nil {
log.Error("Failed to parse uint64: %v", zap.Error(err))
}
return result, true
}
}
}
return 0, false
}
4 changes: 3 additions & 1 deletion tests/integration_tests/bank/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ function prepare() {
run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/"

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8400" --pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" --logsuffix "down"
run_cdc_cli changefeed create --sink-uri="blackhole://" -c "changefeed-for-find-finished-ts" --server "http://127.0.0.1:8400"
}

trap stop_tidb_cluster EXIT
Expand Down

0 comments on commit e187ff5

Please sign in to comment.