Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init xa trx log #835

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 26 additions & 25 deletions pkg/runtime/transaction/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ func NewXAHook(tenant string, enable bool) (*xaHook, error) {
}

trxStateChangeFunc := map[runtime.TxState]handleFunc{
runtime.TrxActive: xh.onActive,
runtime.TrxPreparing: xh.onPreparing,
runtime.TrxPrepared: xh.onPrepared,
runtime.TrxCommitting: xh.onCommitting,
runtime.TrxCommitted: xh.onCommitted,
runtime.TrxAborting: xh.onAborting,
runtime.TrxRollback: xh.onRollbackOnly,
runtime.TrxRolledBack: xh.onRolledBack,
runtime.TrxStarted: xh.onActive,
runtime.TrxPreparing: xh.onPreparing,
runtime.TrxPrepared: xh.onPrepared,
runtime.TrxCommitting: xh.onCommitting,
runtime.TrxCommitted: xh.onCommitted,
runtime.TrxFailed: xh.onAborting,
runtime.TrxRolledBacking: xh.onRollbackOnly,
runtime.TrxRolledBacked: xh.onRolledBack,
}

xh.trxMgr = trxMgr
xh.trxLog = &TrxLog{}
xh.trxLog = &GlobalTrxLog{}
xh.trxStateChangeFunc = trxStateChangeFunc

return xh, nil
Expand All @@ -63,15 +63,15 @@ func NewXAHook(tenant string, enable bool) (*xaHook, error) {
type xaHook struct {
enable bool
trxMgr *TrxManager
trxLog *TrxLog
trxLog *GlobalTrxLog
trxStateChangeFunc map[runtime.TxState]handleFunc
}

func (xh *xaHook) OnTxStateChange(ctx context.Context, state runtime.TxState, tx runtime.CompositeTx) error {
if !xh.enable {
return nil
}
xh.trxLog.State = state
xh.trxLog.Status = state
handle, ok := xh.trxStateChangeFunc[state]
if ok {
return handle(ctx, tx)
Expand All @@ -84,17 +84,18 @@ func (xh *xaHook) OnCreateBranchTx(ctx context.Context, tx runtime.BranchTx) {
if !xh.enable {
return
}
xh.trxLog.Participants = append(xh.trxLog.Participants, TrxParticipant{
NodeID: "",
RemoteAddr: tx.GetConn().GetDatabaseConn().GetNetConn().RemoteAddr().String(),
Schema: tx.GetConn().DBName(),
})
// TODO: add branch trx log
//xh.trxLog.BranchTrxLogs = append(xh.trxLog.BranchTrxLogs, BranchTrxLog{
// NodeID: "",
// RemoteAddr: tx.GetConn().GetDatabaseConn().GetNetConn().RemoteAddr().String(),
// Schema: tx.GetConn().DBName(),
//})
}

func (xh *xaHook) onActive(ctx context.Context, tx runtime.CompositeTx) error {
tx.SetBeginFunc(StartXA)
xh.trxLog.TrxID = tx.GetTrxID()
xh.trxLog.State = tx.GetTxState()
xh.trxLog.Status = tx.GetTxState()
xh.trxLog.Tenant = tx.GetTenant()
return nil
}
Expand All @@ -103,14 +104,14 @@ func (xh *xaHook) onPreparing(ctx context.Context, tx runtime.CompositeTx) error
tx.Range(func(tx runtime.BranchTx) {
tx.SetPrepareFunc(PrepareXA)
})
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
}

func (xh *xaHook) onPrepared(ctx context.Context, tx runtime.CompositeTx) error {
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
Expand All @@ -120,14 +121,14 @@ func (xh *xaHook) onCommitting(ctx context.Context, tx runtime.CompositeTx) erro
tx.Range(func(tx runtime.BranchTx) {
tx.SetCommitFunc(CommitXA)
})
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
}

func (xh *xaHook) onCommitted(ctx context.Context, tx runtime.CompositeTx) error {
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
Expand All @@ -137,7 +138,7 @@ func (xh *xaHook) onAborting(ctx context.Context, tx runtime.CompositeTx) error
tx.Range(func(bTx runtime.BranchTx) {
bTx.SetCommitFunc(RollbackXA)
})
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
// auto execute XA rollback action
Expand All @@ -151,15 +152,15 @@ func (xh *xaHook) onRollbackOnly(ctx context.Context, tx runtime.CompositeTx) er
tx.Range(func(tx runtime.BranchTx) {
tx.SetCommitFunc(RollbackXA)
})
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
}

func (xh *xaHook) onRolledBack(ctx context.Context, tx runtime.CompositeTx) error {
xh.trxLog.State = runtime.TrxRolledBack
if err := xh.trxMgr.trxLog.AddOrUpdateTxLog(*xh.trxLog); err != nil {
xh.trxLog.Status = runtime.TrxRolledBacking
if err := xh.trxMgr.trxLog.AddOrUpdateGlobalTxLog(*xh.trxLog); err != nil {
return err
}
return nil
Expand Down
161 changes: 101 additions & 60 deletions pkg/runtime/transaction/trx_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -48,28 +47,46 @@

const (
// TODO 启用 mysql 的二级分区功能,解决清理 tx log 的问题
_initTxLog = `
CREATE TABLE IF NOT EXISTS __arana_trx_log
(
log_id bigint(20) auto_increment COMMENT 'primary key',
txr_id varchar(255) NOT NULL COMMENT 'transaction uniq id',
tenant varchar(255) NOT NULL COMMENT 'tenant info',
server_id int(10) UNSIGNED NOT NULL COMMENT 'arana server node id',
status int(10) NOT NULL COMMENT 'transaction status, preparing:2,prepared:3,committing:4,committed:5,aborting:6,rollback:7,finish:8,rolledBack:9',
participant varchar(500) COMMENT 'transaction participants, content is mysql node info',
start_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (log_id),
UNIQUE KEY (txr_id)
) ENGINE = InnoDB
CHARSET = utf8
`
insSql = "REPLACE INTO __arana_trx_log(trx_id, tenant, server_id, status, participant, start_time, update_time) VALUES (?,?,?,?,?,sysdate(),sysdate())"
delSql = "DELETE FROM __arana_trx_log WHERE trx_id = ?"
selectSql = "SELECT trx_id, tenant, server_id, status, participant, start_time, update_time FROM __arana_trx_log WHERE 1=1 %s ORDER BY update_time LIMIT ? OFFSET ?"
_initGlobalTxLog = `
CREATE TABLE __arana_global_trx_log (
log_id bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
txr_id varchar(255) NOT NULL COMMENT 'transaction uniq id',
tenant varchar(255) NOT NULL COMMENT 'tenant info',
server_id int unsigned NOT NULL COMMENT 'arana server node id',
status int NOT NULL COMMENT 'transaction status: started:1,preparing:2,prepared:3,committing:4,committed:5,rollbacking:6,rollbacked:7,failed:8',
start_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'transaction start time',
expected_end_time datetime NOT NULL COMMENT 'global transaction expected end time',
update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (log_id),
UNIQUE KEY txr_id (txr_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
`
insertGlobalSql = "INSERT INTO __arana_global_trx_log (txr_id, tenant, server_id, status, start_time, expected_end_time) VALUES (?, ?, ?, ?, ?, ?);"
deleteGlobalSql = "DELETE FROM __arana_global_trx_log WHERE trx_id = ?"
selectGlobalSql = "SELECT log_id, txr_id, tenant, server_id, status, start_time, expected_end_time, update_time FROM __arana_trx_log WHERE 1=1 %s ORDER BY expected_end_time LIMIT ? OFFSET ?"
)

const (
_initBranchTxLog = `
CREATE TABLE __arana_branch_trx_log (
log_id bigint NOT NULL AUTO_INCREMENT COMMENT 'primary key',
txr_id varchar(255) NOT NULL COMMENT 'transaction uniq id',
branch_id varchar(255) NOT NULL COMMENT 'branch transaction key',
participant_id int unsigned NOT NULL COMMENT 'transaction participants, content is mysql node info',
status int NOT NULL COMMENT 'transaction status: started:1,preparing:2,prepared:3,committing:4,committed:5,rollbacking:6,rollbacked:7,failed:8',
start_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'branch transaction start time',
update_time datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (log_id),
UNIQUE KEY txr_branch_id (txr_id, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
`
insertBranchSql = "INSERT INTO __arana_branch_trx_log (txr_id, branch_id, participant_id, status, start_time) VALUES (?, ?, ?, ?, ?);"
deleteBranchSql = "DELETE FROM __arana_global_trx_log WHERE trx_id = ? and branch_id=?"
selectBranchSql = "SELECT log_id, txr_id, branch_id, participant_id, status, start_time, update_time FROM __arana_branch_trx_log WHERE 1=1 %s ORDER BY expected_end_time LIMIT ? OFFSET ?"
)

// TxLogManager Transaction log management
// TODO
type TxLogManager struct {
sysDB proto.DB
}
Expand All @@ -79,60 +96,73 @@
var err error
_initTxLogOnce.Do(func() {
ctx := context.Background()
res, _, err := gm.sysDB.Call(ctx, _initTxLog)
res, _, err := gm.sysDB.Call(ctx, _initGlobalTxLog)
if err != nil {
return
}
_, _ = res.RowsAffected()
_txLogCleanTimer = time.AfterFunc(delay, gm.runCleanTxLogTask)
_txLogCleanTimer = time.AfterFunc(delay, gm.runCleanGlobalTxLogTask)

res, _, err = gm.sysDB.Call(ctx, _initBranchTxLog)
if err != nil {
return
}
_, _ = res.RowsAffected()
_txLogCleanTimer = time.AfterFunc(delay, gm.runCleanBranchTxLogTask)
})
return err
}

// AddOrUpdateTxLog Add or update transaction log
func (gm *TxLogManager) AddOrUpdateTxLog(l TrxLog) error {
participants, err := json.Marshal(l.Participants)
if err != nil {
return err
}
// AddOrUpdateTxLog Add or update global transaction log
func (gm *TxLogManager) AddOrUpdateGlobalTxLog(l GlobalTrxLog) error {
trxIdVal, _ := proto.NewValue(l.TrxID)
tenantVal, _ := proto.NewValue(l.Tenant)
serverIdVal, _ := proto.NewValue(l.ServerID)
stateVal, _ := proto.NewValue(int32(l.State))
participantsVal, _ := proto.NewValue(string(participants))
statusVal, _ := proto.NewValue(int32(l.Status))
expectedEndTimeVal, _ := proto.NewValue(l.ExpectedEndTime)
args := []proto.Value{
trxIdVal,
tenantVal,
serverIdVal,
stateVal,
participantsVal,
statusVal,
expectedEndTimeVal,
}
_, _, err = gm.sysDB.Call(context.Background(), insSql, args...)
_, _, err := gm.sysDB.Call(context.Background(), insertGlobalSql, args...)
return err
}

// AddOrUpdateTxLog Add or update branch transaction log
func (gm *TxLogManager) AddOrUpdateBranchTxLog(l BranchTrxLog) error {
panic("implement me")
}

// DeleteTxLog Delete transaction log
func (gm *TxLogManager) DeleteTxLog(l TrxLog) error {
func (gm *TxLogManager) DeleteGlobalTxLog(l GlobalTrxLog) error {
trxIdVal, _ := proto.NewValue(l.TrxID)
args := []proto.Value{
trxIdVal,
}
_, _, err := gm.sysDB.Call(context.Background(), delSql, args...)
_, _, err := gm.sysDB.Call(context.Background(), deleteGlobalSql, args...)
return err
}

// ScanTxLog Scanning transaction
func (gm *TxLogManager) ScanTxLog(pageNo, pageSize uint64, conditions []Condition) (uint32, []TrxLog, error) {
// TODO
func (gm *TxLogManager) DeleteBranchTxLog(l BranchTrxLog) error {
panic("implement me")
}

// Global ScanTxLog Scanning transaction
func (gm *TxLogManager) ScanGlobalTxLog(pageNo, pageSize uint64, conditions []Condition) (uint32, []GlobalTrxLog, error) {
var (
whereBuilder []string
args []proto.Value
logs []TrxLog
num uint32
dest []proto.Value
log TrxLog
participants []TrxParticipant
serverId int64
state int64
whereBuilder []string
args []proto.Value
logs []GlobalTrxLog
num uint32
dest []proto.Value
serverId int64
expectedEndTime int64
startTime int64
state int64
)

for i := range conditions {
Expand All @@ -149,7 +179,7 @@
offset := proto.NewValueUint64((pageNo - 1) * pageSize)

args = append(args, limit, offset)
conditionSelectSql := fmt.Sprintf(selectSql, strings.Join(whereBuilder, " "))
conditionSelectSql := fmt.Sprintf(selectGlobalSql, strings.Join(whereBuilder, " "))
rows, _, err := gm.sysDB.Call(context.Background(), conditionSelectSql, args...)
if err != nil {
return 0, nil, err
Expand All @@ -163,44 +193,50 @@
if row == nil {
break
}
if err := row.Scan(dest[:]); err != nil {
var log GlobalTrxLog
if err = row.Scan(dest[:]); err != nil {
return 0, nil, err
}
log.TrxID = dest[0].String()
log.Tenant = dest[1].String()
serverId, _ = dest[2].Int64()
log.ServerID = int32(serverId)
state, _ = dest[3].Int64()
log.State = runtime.TxState(int32(state))

if err := json.Unmarshal([]byte(dest[4].String()), &participants); err != nil {
return 0, nil, err
}
log.Participants = participants
log.Status = runtime.TxState(int32(state))
Fixed Show fixed Hide fixed
expectedEndTime, _ = dest[4].Int64()
log.ExpectedEndTime = expectedEndTime
startTime, _ = dest[5].Int64()
log.StartTime = startTime
logs = append(logs, log)
num++
}
return num, logs, nil
}

// Branch ScanTxLog Scanning transaction
// TODO
func (gm *TxLogManager) ScanBranchTxLog(pageNo, pageSize uint64, conditions []Condition) (uint32, []BranchTrxLog, error) {
panic("implement me")
}

// runCleanTxLogTask execute the transaction log cleanup action, and clean up the __arana_tx_log secondary
// partition table according to the day level or hour level.
// the execution of this task requires distributed task preemption based on the metadata DB
func (gm *TxLogManager) runCleanTxLogTask() {
func (gm *TxLogManager) runCleanGlobalTxLogTask() {
var (
pageNo uint64
pageSize uint64 = 50
conditions = []Condition{
{
FiledName: "status",
Operation: Equal,
Value: runtime.TrxFinish,
Operation: In,
Value: []int32{int32(runtime.TrxRolledBacked), int32(runtime.TrxCommitted), int32(runtime.TrxFailed)},
},
}
)
var txLogs []TrxLog
var txLogs []GlobalTrxLog
for {
total, logs, err := gm.ScanTxLog(pageNo, pageSize, conditions)
total, logs, err := gm.ScanGlobalTxLog(pageNo, pageSize, conditions)
if err != nil {
break
}
Expand All @@ -210,6 +246,11 @@
}
}
for _, l := range txLogs {
gm.DeleteTxLog(l)
gm.DeleteGlobalTxLog(l)
}
}

// TODO
func (gm *TxLogManager) runCleanBranchTxLogTask() {
panic("implement me")
}
Loading
Loading