From 09e78c27d2f62e121d18dbedc7f06cd81fe01c33 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Fri, 16 Aug 2019 15:41:42 +0800 Subject: [PATCH] mydumper: add select filter --- conf/mydumper.ini.sample | 8 ++++++++ src/cmd/config.go | 22 ++++++++++++++++++++++ src/common/common.go | 1 + src/common/dumper.go | 28 +++++++++++++++++++++------- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/conf/mydumper.ini.sample b/conf/mydumper.ini.sample index efadf09..4f9e6a4 100644 --- a/conf/mydumper.ini.sample +++ b/conf/mydumper.ini.sample @@ -20,3 +20,11 @@ vars= "xx=xx;xx=xx;" [where] sample_table1 = created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) sample_table2 = created_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) + +# Use this to override value returned from tables. These are optional +[select] +user.salt = 'reset salt of all system users' +user.password = 'reset password of all system users' + +customer.first_name = CONCAT('Bohu', id) +customer.last_name = 'Last' diff --git a/src/cmd/config.go b/src/cmd/config.go index 50e34a4..6a56d65 100644 --- a/src/cmd/config.go +++ b/src/cmd/config.go @@ -12,6 +12,7 @@ package main import ( "common" "fmt" + "strings" ini "github.com/dlintw/goconf" ) @@ -65,6 +66,27 @@ func parseDumperConfig(file string) (*common.Args, error) { return nil, err } + var selects []string + if selects, err = cfg.GetOptions("select"); err != nil { + return nil, err + } + for _, tblcol := range selects { + var table, column string + split := strings.Split(tblcol, ".") + table = split[0] + column = split[1] + + if args.Selects == nil { + args.Selects = make(map[string]map[string]string) + } + if args.Selects[table] == nil { + args.Selects[table] = make(map[string]string, 0) + } + if args.Selects[table][column], err = cfg.GetString("select", tblcol); err != nil { + return nil, err + } + } + args.Address = fmt.Sprintf("%s:%d", host, port) args.User = user args.Password = password diff --git a/src/common/common.go b/src/common/common.go index 39d312b..d2a4986 100644 --- a/src/common/common.go +++ b/src/common/common.go @@ -38,6 +38,7 @@ type Args struct { Allrows uint64 OverwriteTables bool Wheres map[string]string + Selects map[string]map[string]string // Interval in millisecond. IntervalMs int diff --git a/src/common/dumper.go b/src/common/dumper.go index 6605488..8657928 100644 --- a/src/common/dumper.go +++ b/src/common/dumper.go @@ -49,20 +49,34 @@ func dumpTable(log *xlog.Log, conn *Connection, args *Args, table string) { var allBytes uint64 var allRows uint64 var where string + var selfields []string + + fields := make([]string, 0, 16) + { + cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT * FROM `%s`.`%s` LIMIT 1", args.Database, table)) + AssertNil(err) + + flds := cursor.Fields() + for _, fld := range flds { + fields = append(fields, fmt.Sprintf("`%s`", fld.Name)) + replacement, ok := args.Selects[table][fld.Name] + if ok { + selfields = append(selfields, fmt.Sprintf("%s AS `%s`", replacement, fld.Name)) + } else { + selfields = append(selfields, fmt.Sprintf("`%s`", fld.Name)) + } + } + err = cursor.Close() + AssertNil(err) + } if v, ok := args.Wheres[table]; ok { where = fmt.Sprintf(" WHERE %v", v) } - cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT * FROM `%s`.`%s` %s", args.Database, table, where)) + cursor, err := conn.StreamFetch(fmt.Sprintf("SELECT %s FROM `%s`.`%s` %s", strings.Join(selfields, ", "), args.Database, table, where)) AssertNil(err) - fields := make([]string, 0, 16) - flds := cursor.Fields() - for _, fld := range flds { - fields = append(fields, fmt.Sprintf("`%s`", fld.Name)) - } - fileNo := 1 stmtsize := 0 chunkbytes := 0