-
Notifications
You must be signed in to change notification settings - Fork 979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
XA全局事务成功后,DTM Admin 显示正常,但是本地数据库XA事务未提交,要修改的数据未变动,需要手动执行XA事务提交才生效 #470
Comments
能有可以复现的例子吗? |
具体的示例的如下,有两个文件,一个是test_dtm.go,代码如下: package main
import (
"crypto/md5"
"fmt"
"github.com/bwmarrin/snowflake"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/google/uuid"
"log"
"strings"
)
func CryptToMD5(v1 []byte, v2 []byte, uppercase bool) string {
var rs = make([]string, 0)
m := md5.New()
m.Write(v1)
bm := m.Sum(v2)
for _, v := range bm {
if uppercase {
rs = append(rs, fmt.Sprintf("%02X", v))
} else {
rs = append(rs, fmt.Sprintf("%02x", v))
}
}
return strings.Join(rs, "")
/*return hex.EncodeToString(h.Sum(format.ToByte(v2)))*/ //第二种返回字符串的方法,返回的参数是小写
}
func NewGid() string {
u := uuid.New()
return fmt.Sprintf("%s", u)
}
func TransXa(serverUrl, businessUrl string) {
gid := NewGid()
transInReq := &gin.H{
"transInUserId": 1001,
"amount": 100,
}
transOutReq := &gin.H{
"transOutUserId": 1002,
"amount": 100,
}
err := dtmcli.XaGlobalTransaction(serverUrl, gid, func(xa *dtmcli.Xa) (*resty.Response, error) {
resp, err := xa.CallBranch(transOutReq, businessUrl+"/trans-out-xa")
if err != nil {
return resp, err
}
return xa.CallBranch(transInReq, businessUrl+"/trans-in-xa")
})
if err != nil {
log.Fatal(err)
}
log.Printf("transaction:%s xa success", gid)
}
func main() {
serverUrl := "http://192.168.0.101:31980/api/dtmsvr"
businessUrl := "http://192.168.0.101:8686/api/v1/business"
TransXa(serverUrl, businessUrl)
} 一个文件是test_dtm_http_xa.go,代码如下: package main
import (
"database/sql"
"fmt"
"github.com/dtm-labs/client/dtmcli"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"github.com/go-playground/locales/en"
"github.com/go-playground/locales/zh"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
enTranslations "github.com/go-playground/validator/v10/translations/en"
zhTranslations "github.com/go-playground/validator/v10/translations/zh"
jsoniter "github.com/json-iterator/go"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/pkg/errors"
"gorm.io/driver/mysql"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"log"
"net/http"
"net/url"
"time"
)
type TransInXaRequest struct {
TransInUserId int `json:"transInUserId" binding:"required" desc:"转入账号id"` //转入账号id
Amount float64 `json:"amount" binding:"required" desc:"转入金额"` //转入金额
}
type TransOutXaRequest struct {
TransOutUserId int `json:"transOutUserId" binding:"required" desc:"转出账号id"` //转出账号id
Amount float64 `json:"amount" binding:"required" desc:"转出金额"` //转出金额
}
func initValidator(locale string) (ut.Translator, error) {
if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
zhTranslator := zh.New()
enTranslator := en.New()
uni := ut.New(enTranslator, zhTranslator, enTranslator)
translator, ok := uni.GetTranslator(locale)
if !ok {
return nil, errors.New("init validator failed")
}
var err error
switch locale {
case "en":
err = enTranslations.RegisterDefaultTranslations(v, translator)
case "zh":
err = zhTranslations.RegisterDefaultTranslations(v, translator)
default:
err = enTranslations.RegisterDefaultTranslations(v, translator)
}
return translator, err
}
return nil, errors.New("init validator failed")
}
func ValidateError(ctx *gin.Context, err error) bool {
if err == nil {
return false
}
errs, ok := err.(validator.ValidationErrors)
if ok {
translator, err := initValidator("zh")
if err == nil {
return false
}
errMsg, _ := jsoniter.Marshal(errs.Translate(translator))
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "validate request failed:" + string(errMsg),
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": "read request failed:" + err.Error(),
})
}
return true
}
func ResponseFailed(ctx *gin.Context, message string, err error) {
if err == nil {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "!",
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": -1,
"message": message + "," + err.Error(),
})
}
}
func ResponseSuccess(ctx *gin.Context, message string, data ...interface{}) {
if len(data) > 0 {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
"data": data[0],
})
} else {
ctx.JSON(http.StatusOK, gin.H{
"code": 0,
"message": message + "!",
})
}
}
func ResponseWithStatusCode(ctx *gin.Context, statusCode int, code int, message string, data ...interface{}) {
if statusCode != 200 {
code = statusCode
}
if len(data) > 0 {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
"data": data[0],
})
} else {
ctx.JSON(statusCode, gin.H{
"code": code,
"message": message,
})
}
}
type DBConf struct {
User string `yaml:"user" desc:"用户名"`
Password string `yaml:"password" desc:"用户密码"`
Host string `yaml:"host" desc:"主机名"`
Port int `yaml:"port" desc:"主机端口"`
Database string `yaml:"database" desc:"数据库名"`
Dialect string `yaml:"dialect" desc:"数据库类型"`
DBSource string `yaml:"db_source" desc:"数据库源"`
DBDebug bool `yaml:"db_debug" desc:"是否输出gorm数据库调试语句"`
MaxAge int `yaml:"max_age" desc:"日志最大保留时间"`
RotateTimeLevel int `yaml:"rotate_time_level" desc:"日志分片时间等级 0 自定义时间分片 1 日分片 2 1小时分片 3 1分钟分片"`
RotateTime int `yaml:"rotate_time" desc:"自定义时间分片时长 单位为:min"`
}
const RotateByTimestamp = 0 //自定义时间分片
const RotateByDate = 1 //日分片
const RotateByHour = 2 //1小时分片
const RotateByMinute = 3 //1分钟分片
type Dao struct {
DB *gorm.DB
}
func NewDBFromRawDB(db *sql.DB, dbConf DBConf) (*gorm.DB, error) {
var newDb *gorm.DB
var err error
if dbConf.DBDebug {
var err error
var loggerWriteSyncer *rotatelogs.RotateLogs
var loggerFileName = fmt.Sprintf("%s-db-debug.log", dbConf.Database)
switch dbConf.RotateTimeLevel {
case RotateByTimestamp:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Duration(dbConf.RotateTime)*time.Minute))
case RotateByDate:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour))
case RotateByHour:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Hour))
case RotateByMinute:
loggerWriteSyncer, err = rotatelogs.New(
"../logs/"+loggerFileName+".%Y%m%d%H%M",
rotatelogs.WithLinkName("../logs/"+loggerFileName),
rotatelogs.WithMaxAge(time.Duration(dbConf.MaxAge)*time.Hour),
rotatelogs.WithRotationTime(time.Minute))
}
if err != nil {
return nil, err
}
dbLogger := logger.New(log.New(loggerWriteSyncer, "\r\n", log.LstdFlags), logger.Config{
SlowThreshold: time.Second,
LogLevel: logger.Silent,
Colorful: true,
})
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}), &gorm.Config{
Logger: dbLogger,
})
break
}
newDb = newDb.Debug()
} else {
switch dbConf.Dialect {
case "mysql":
newDb, err = gorm.Open(mysql.New(mysql.Config{
Conn: db,
}))
break
case "postgres":
newDb, err = gorm.Open(postgres.New(postgres.Config{
Conn: db,
}))
break
}
}
return newDb, err
}
func (dao *Dao) XaLocalTransaction(qs url.Values, f func(dao *Dao) error) (err error) {
dbConf := DBConf{
User: "root",
Password: "CodeMan2022080^2*1",
Host: "127.0.0.1",
Port: 3306,
Database: "test",
Dialect: "mysql",
DBSource: "root:CodeMan2022080^2*1@tcp(127.0.0.1:3306)/test?charset=utf8mb4&parseTime=True&loc=Local",
DBDebug: true,
RotateTime: 1,
RotateTimeLevel: 1,
}
err = dtmcli.XaLocalTransaction(qs, dtmcli.DBConf{
Driver: dbConf.Dialect,
Host: dbConf.Host,
Port: int64(dbConf.Port),
User: dbConf.User,
Password: dbConf.Password,
Db: dbConf.Database,
}, func(db *sql.DB, xa *dtmcli.Xa) error {
dao := &Dao{}
dao.DB, err = NewDBFromRawDB(db, dbConf)
return f(dao)
})
return
}
func (dao *Dao) TransInXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance + ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func (dao *Dao) TransOutXa(userId int, amount float64) error {
result := dao.DB.Exec("update user_account set balance = balance - ? where user_id = ?", amount, userId)
if result.Error != nil {
return result.Error
}
return nil
}
func transInXa(ctx *gin.Context) {
var req TransInXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransInXa(req.TransInUserId, req.Amount)
if err != nil {
return errors.New("数据库更新出错!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans in xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans in xa success")
}
func transOutXa(ctx *gin.Context) {
var req TransOutXaRequest
err := ctx.ShouldBind(&req)
if ValidateError(ctx, err) {
return
}
qs := ctx.Request.URL.Query()
dao := Dao{}
err = dao.XaLocalTransaction(qs, func(dao *Dao) error {
err = dao.TransOutXa(req.TransOutUserId, req.Amount)
if err != nil {
return errors.New("数据库更新出错!")
}
return err
})
if err != nil {
ResponseWithStatusCode(ctx, 409, 409, "trans out xa failed:"+err.Error())
return
}
ResponseSuccess(ctx, "trans out xa success")
}
func main() {
router := gin.Default()
router.POST("/api/v1/business/trans-in-xa", transInXa)
router.POST("/api/v1/business/trans-out-xa", transOutXa)
router.Run(":8686")
} 运行环境:golang版本1.21,mysql版本8.0.32 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
原始数据库表截图如下:
使用Dtm XA分布式事务,实现如下场景:用户1002 balance转出 100 1001 balance转入100,
程序执行成功,并执行成功后,dtm admin 的截图如下:
在Dtm XA分布式事务完成后,数据库表截图如下:
可见结果未达到预期,在本地数据库中使用xa recover命令后,发现如下截图:
在本地数据库使用如下命令手动提交XA事务:
数据库表数据发生了预期的变化,数据库表截图如下:
The text was updated successfully, but these errors were encountered: