Skip to content

Commit

Permalink
[bug] proxy: session cannot be transferred after load data local infi…
Browse files Browse the repository at this point in the history
…le (#18787)

In the case of "load data local infile" statement, client sends the
first packet, then server sends response, which is "0xFB + filename",
after that, client sends content of filename and an empty packet, at
last, server sends OK packet. The sequence ID of this OK packet is not
1, and will cause the session cannot be transferred after this stmt finished.
So, the solution is: when server sends 0xFB and the sequence ID of
next packet is 3 bigger than last one, the next packet MUST be an
OK packet, and the transfer is allowed.

Approved by: @aylei, @sukki37, @zhangxu19830126
  • Loading branch information
volgariver6 authored Sep 14, 2024
1 parent e74f4b9 commit 2da268f
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 28 deletions.
37 changes: 29 additions & 8 deletions pkg/proxy/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,9 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) {
p.mu.started = false
p.mu.cond.Broadcast()
}

var firstCond bool
var currSeq int16
var lastSeq int16 = -1
var rotated bool
prepareNextMessage := func() (terminate bool, err error) {
Expand Down Expand Up @@ -557,8 +560,6 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) {
// set txn status and cmd time within the mutex together.
// only server->client pipe need to set the txn status.
if p.name == pipeServerToClient {
var currSeq int16

// issue#16042
if len(tempBuf) > 3 {
currSeq = int16(tempBuf[3])
Expand All @@ -576,7 +577,27 @@ func (p *pipe) kickoff(ctx context.Context, peer *pipe) (e error) {
rotated = false
}

inTxn, ok := checkTxnStatus(tempBuf)
// seqID is mainly used for server side. It records the sequence ID of
// each packet.
// In the case of "load data local infile" statement, client sends the
// first packet, then server sends response, which is "0xFB + filename",
// after that, client sends content of filename and an empty packet, at
// last, server sends OK packet. The sequence ID of this OK packet is not
// 1, and will cause the session cannot be transferred after this stmt
// finished.
// So, the solution is: when server sends 0xFB and the sequence ID of
// next packet is 3 bigger than last one, the next packet MUST be an
// OK packet, and the transfer is allowed.
// Related issue: https://github.com/matrixorigin/mo-cloud/issues/4088
var mustOK bool
if !firstCond {
firstCond = isLoadDataLocalInfileRespPacket(tempBuf)
} else {
mustOK = currSeq-lastSeq == 3
firstCond = false
}

inTxn, ok := checkTxnStatus(tempBuf, mustOK)
if ok {
p.mu.inTxn = inTxn
}
Expand Down Expand Up @@ -721,10 +742,10 @@ func txnStatus(status uint16) bool {
}

// handleOKPacket handles the OK packet from server to update the txn state.
func handleOKPacket(msg []byte) bool {
func handleOKPacket(msg []byte, mustOK bool) bool {
var mp *frontend.MysqlProtocolImpl
// the sequence ID should be 1 for OK packet.
if msg[3] != 1 {
// if the mustOK is false, then the sequence ID should be 1 for OK packet.
if !mustOK && msg[3] != 1 {
return txnStatus(0)
}
pos := 5
Expand Down Expand Up @@ -754,14 +775,14 @@ func handleEOFPacket(msg []byte) bool {
// the first return value is the txn status, and the second return value
// indicates if we can get the txn status from the packet. If it is a ERROR
// packet, the second return value is false.
func checkTxnStatus(msg []byte) (bool, bool) {
func checkTxnStatus(msg []byte, mustOK bool) (bool, bool) {
ok := true
inTxn := true
// For the server->client pipe, we get the transaction status from the
// OK and EOF packet, which is used in connection transfer. If the session
// is in a transaction, a transfer should not start.
if isOKPacket(msg) {
inTxn = handleOKPacket(msg)
inTxn = handleOKPacket(msg, mustOK)
} else if isEOFPacket(msg) {
inTxn = handleEOFPacket(msg)
} else if isErrPacket(msg) {
Expand Down
80 changes: 60 additions & 20 deletions pkg/proxy/tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,24 +691,64 @@ func TestReplaceServerConn(t *testing.T) {
}

func TestCheckTxnStatus(t *testing.T) {
inTxn, ok := checkTxnStatus(nil)
require.True(t, ok)
require.True(t, inTxn)

inTxn, ok = checkTxnStatus(makeErrPacket(8))
require.False(t, ok)
require.True(t, inTxn)

p1 := makeOKPacket(5)
value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1)
require.True(t, ok)
require.False(t, inTxn)

value |= frontend.SERVER_STATUS_IN_TRANS
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1)
require.True(t, ok)
require.True(t, inTxn)
t.Run("mustOK false", func(t *testing.T) {
inTxn, ok := checkTxnStatus(nil, false)
require.True(t, ok)
require.True(t, inTxn)

inTxn, ok = checkTxnStatus(makeErrPacket(8), false)
require.False(t, ok)
require.True(t, inTxn)

p1 := makeOKPacket(5)
value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1, false)
require.True(t, ok)
require.False(t, inTxn)

value |= frontend.SERVER_STATUS_IN_TRANS
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1, false)
require.True(t, ok)
require.True(t, inTxn)
})

t.Run("mustOK true", func(t *testing.T) {
inTxn, ok := checkTxnStatus(nil, true)
require.True(t, ok)
require.True(t, inTxn)

inTxn, ok = checkTxnStatus(makeErrPacket(8), true)
require.False(t, ok)
require.True(t, inTxn)

p1 := makeOKPacket(5)
value := frontend.SERVER_QUERY_WAS_SLOW | frontend.SERVER_STATUS_NO_GOOD_INDEX_USED
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1, true)
require.True(t, ok)
require.False(t, inTxn)

value |= frontend.SERVER_STATUS_IN_TRANS
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1, true)
require.True(t, ok)
require.True(t, inTxn)

value ^= frontend.SERVER_STATUS_IN_TRANS
binary.LittleEndian.PutUint16(p1[7:], value)
inTxn, ok = checkTxnStatus(p1, true)
require.True(t, ok)
require.False(t, inTxn)

p1[3] = 4
inTxn, ok = checkTxnStatus(p1, false)
require.True(t, ok)
require.True(t, inTxn)

inTxn, ok = checkTxnStatus(p1, true)
require.True(t, ok)
require.False(t, inTxn)
})
}
9 changes: 9 additions & 0 deletions pkg/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ func isErrPacket(p []byte) bool {
return false
}

// isLoadDataLocalInfileRespPacket returns true if []byte is a packet
// of load data local infile response.
func isLoadDataLocalInfileRespPacket(p []byte) bool {
if len(p) > 4 && p[4] == 0xFB {
return true
}
return false
}

// isEmptyPacket returns true if []byte is an empty packet.
func isEmptyPacket(p []byte) bool {
return len(p) == 0
Expand Down
14 changes: 14 additions & 0 deletions pkg/proxy/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,20 @@ func TestIsErrPacket(t *testing.T) {
require.True(t, ret)
}

func TestIsLoadDataLocalInfileRespPacket(t *testing.T) {
var data []byte
ret := isLoadDataLocalInfileRespPacket(data)
require.False(t, ret)

data = []byte{0, 0, 0, 0, 2, 0}
ret = isLoadDataLocalInfileRespPacket(data)
require.False(t, ret)

data = []byte{0, 0, 0, 0, 0xFB, 0}
ret = isLoadDataLocalInfileRespPacket(data)
require.True(t, ret)
}

func TestIsDeallocatePacket(t *testing.T) {
var data []byte
ret := isDeallocatePacket(data)
Expand Down

0 comments on commit 2da268f

Please sign in to comment.