Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Export frameOp type
Browse files Browse the repository at this point in the history
frameOp is returned in public APIs, so it should be exported.
  • Loading branch information
martin-sucha committed Sep 22, 2023
1 parent 169eaf1 commit 891afea
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,7 @@ func TestNegativeStream(t *testing.T) {

const stream = -50
writer := frameWriterFunc(func(f *framer, streamID int) (outFrameInfo, error) {
f.writeHeader(0, opOptions, stream)
f.writeHeader(0, FrameOpcodeOptions, stream)
return f.finish()
})

Expand Down
4 changes: 2 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (c *Conn) recv(ctx context.Context) error {
Version: protoVersion(head.version),
Flags: head.flags,
Stream: int16(head.stream),
Opcode: frameOp(head.op),
Opcode: FrameOpcode(head.op),
Length: int32(head.length),
Start: headStartTime,
End: headEndTime,
Expand Down Expand Up @@ -1248,7 +1248,7 @@ type ObservedStream struct {
// Host of the connection used to send the stream.
Host *HostInfo
// FrameOpcode is the frame operation (type) that was used.
FrameOpcode frameOp
FrameOpcode FrameOpcode
// FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header).
// This field is only available in StreamStarted.
FramePayloadUncompressedSize int
Expand Down
34 changes: 17 additions & 17 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func TestStream0(t *testing.T) {

var buf bytes.Buffer
f := newFramer(nil, protoVersion4)
f.writeHeader(0, opResult, 0)
f.writeHeader(0, FrameOpcodeResult, 0)
f.writeInt(resultKindVoid)
f.buf[0] |= 0x80
if _, err := f.finish(); err != nil {
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestContext_CanceledBeforeExec(t *testing.T) {
addr: "127.0.0.1:0",
protocol: defaultProto,
recvHook: func(f *framer) {
if f.header.op == opStartup || f.header.op == opOptions {
if f.header.op == FrameOpcodeStartup || f.header.op == FrameOpcodeOptions {
// ignore statup and heartbeat messages
return
}
Expand Down Expand Up @@ -966,7 +966,7 @@ func TestFrameHeaderObserver(t *testing.T) {
}

frames := observer.getFrames()
expFrames := []frameOp{opSupported, opReady, opResult}
expFrames := []FrameOpcode{FrameOpcodeSupported, FrameOpcodeReady, FrameOpcodeResult}
if len(frames) != len(expFrames) {
t.Fatalf("Expected to receive %d frames, instead received %d", len(expFrames), len(frames))
}
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
respFrame := newFramer(nil, reqFrame.proto)

switch head.op {
case opStartup:
case FrameOpcodeStartup:
if atomic.LoadInt32(&srv.TimeoutOnStartup) > 0 {
// Do not respond to startup command
// wait until we get a cancel signal
Expand All @@ -1218,11 +1218,11 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
return
}
}
respFrame.writeHeader(0, opReady, head.stream)
case opOptions:
respFrame.writeHeader(0, opSupported, head.stream)
respFrame.writeHeader(0, FrameOpcodeReady, head.stream)
case FrameOpcodeOptions:
respFrame.writeHeader(0, FrameOpcodeSupported, head.stream)
respFrame.writeStringMultiMap(exts)
case opQuery:
case FrameOpcodeQuery:
query := reqFrame.readLongString()
first := query
if n := strings.Index(query, " "); n > 0 {
Expand All @@ -1231,21 +1231,21 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
switch strings.ToLower(first) {
case "kill":
atomic.AddInt64(&srv.nKillReq, 1)
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0x1001)
respFrame.writeString("query killed")
case "use":
respFrame.writeInt(resultKindKeyspace)
respFrame.writeString(strings.TrimSpace(query[3:]))
case "void":
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
case "timeout":
<-srv.ctx.Done()
return
case "slow":
go func() {
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
respFrame.buf[0] = srv.protocol | 0x80
select {
Expand All @@ -1260,25 +1260,25 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
case "speculative":
atomic.AddInt64(&srv.nKillReq, 1)
if atomic.LoadInt64(&srv.nKillReq) > 3 {
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
respFrame.writeString("speculative query success on the node " + srv.Address)
} else {
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0x1001)
respFrame.writeString("speculative error")
rand.Seed(time.Now().UnixNano())
<-time.After(time.Millisecond * 120)
}
default:
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
}
case opError:
respFrame.writeHeader(0, opError, head.stream)
case FrameOpcodeError:
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.buf = append(respFrame.buf, reqFrame.buf...)
default:
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0)
respFrame.writeString("not supported")
}
Expand Down
116 changes: 59 additions & 57 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,61 +75,63 @@ func (p protoVersion) String() string {
return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir)
}

type frameOp byte
// FrameOpcode enumerates frame operation codes from the CQL protocol.
// See https://martin-sucha.github.io/cqlprotodoc/native_protocol_v4.html#s2.4
type FrameOpcode byte

const (
// header ops
opError frameOp = 0x00
opStartup frameOp = 0x01
opReady frameOp = 0x02
opAuthenticate frameOp = 0x03
opOptions frameOp = 0x05
opSupported frameOp = 0x06
opQuery frameOp = 0x07
opResult frameOp = 0x08
opPrepare frameOp = 0x09
opExecute frameOp = 0x0A
opRegister frameOp = 0x0B
opEvent frameOp = 0x0C
opBatch frameOp = 0x0D
opAuthChallenge frameOp = 0x0E
opAuthResponse frameOp = 0x0F
opAuthSuccess frameOp = 0x10
FrameOpcodeError FrameOpcode = 0x00
FrameOpcodeStartup FrameOpcode = 0x01
FrameOpcodeReady FrameOpcode = 0x02
FrameOpcodeAuthenticate FrameOpcode = 0x03
FrameOpcodeOptions FrameOpcode = 0x05
FrameOpcodeSupported FrameOpcode = 0x06
FrameOpcodeQuery FrameOpcode = 0x07
FrameOpcodeResult FrameOpcode = 0x08
FrameOpcodePrepare FrameOpcode = 0x09
FrameOpcodeExecute FrameOpcode = 0x0A
FrameOpcodeRegister FrameOpcode = 0x0B
FrameOpcodeEvent FrameOpcode = 0x0C
FrameOpcodeBatch FrameOpcode = 0x0D
FrameOpcodeAuthChallenge FrameOpcode = 0x0E
FrameOpcodeAuthResponse FrameOpcode = 0x0F
FrameOpcodeAuthSuccess FrameOpcode = 0x10
)

func (f frameOp) String() string {
func (f FrameOpcode) String() string {
switch f {
case opError:
case FrameOpcodeError:
return "ERROR"
case opStartup:
case FrameOpcodeStartup:
return "STARTUP"
case opReady:
case FrameOpcodeReady:
return "READY"
case opAuthenticate:
case FrameOpcodeAuthenticate:
return "AUTHENTICATE"
case opOptions:
case FrameOpcodeOptions:
return "OPTIONS"
case opSupported:
case FrameOpcodeSupported:
return "SUPPORTED"
case opQuery:
case FrameOpcodeQuery:
return "QUERY"
case opResult:
case FrameOpcodeResult:
return "RESULT"
case opPrepare:
case FrameOpcodePrepare:
return "PREPARE"
case opExecute:
case FrameOpcodeExecute:
return "EXECUTE"
case opRegister:
case FrameOpcodeRegister:
return "REGISTER"
case opEvent:
case FrameOpcodeEvent:
return "EVENT"
case opBatch:
case FrameOpcodeBatch:
return "BATCH"
case opAuthChallenge:
case FrameOpcodeAuthChallenge:
return "AUTH_CHALLENGE"
case opAuthResponse:
case FrameOpcodeAuthResponse:
return "AUTH_RESPONSE"
case opAuthSuccess:
case FrameOpcodeAuthSuccess:
return "AUTH_SUCCESS"
default:
return fmt.Sprintf("UNKNOWN_OP_%d", f)
Expand Down Expand Up @@ -301,7 +303,7 @@ type frameHeader struct {
version protoVersion
flags byte
stream int
op frameOp
op FrameOpcode
length int
warnings []string
}
Expand All @@ -320,7 +322,7 @@ type ObservedFrameHeader struct {
Version protoVersion
Flags byte
Stream int16
Opcode frameOp
Opcode FrameOpcode
Length int32

// StartHeader is the time we started reading the frame header off the network connection.
Expand Down Expand Up @@ -380,7 +382,7 @@ type framer struct {
compres Compressor
headSize int
// if writeHeader was called, outFrameOp will contain the frame operation.
outFrameOp frameOp
outFrameOp FrameOpcode
// if this frame was read then the header will be here
header *frameHeader
// ucompressedSize is size of the frame payload after decompression.
Expand Down Expand Up @@ -524,15 +526,15 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) {
}

head.stream = int(int16(p[2])<<8 | int16(p[3]))
head.op = frameOp(p[4])
head.op = FrameOpcode(p[4])
head.length = int(readInt(p[5:]))
} else {
if len(p) != 8 {
return frameHeader{}, fmt.Errorf("not enough bytes to read header require 8 got: %d", len(p))
}

head.stream = int(int8(p[2]))
head.op = frameOp(p[3])
head.op = FrameOpcode(p[3])
head.length = int(readInt(p[4:]))
}

Expand Down Expand Up @@ -620,21 +622,21 @@ func (f *framer) parseFrame() (frame frame, err error) {

// assumes that the frame body has been read into rbuf
switch f.header.op {
case opError:
case FrameOpcodeError:
frame = f.parseErrorFrame()
case opReady:
case FrameOpcodeReady:
frame = f.parseReadyFrame()
case opResult:
case FrameOpcodeResult:
frame, err = f.parseResultFrame()
case opSupported:
case FrameOpcodeSupported:
frame = f.parseSupportedFrame()
case opAuthenticate:
case FrameOpcodeAuthenticate:
frame = f.parseAuthenticateFrame()
case opAuthChallenge:
case FrameOpcodeAuthChallenge:
frame = f.parseAuthChallengeFrame()
case opAuthSuccess:
case FrameOpcodeAuthSuccess:
frame = f.parseAuthSuccessFrame()
case opEvent:
case FrameOpcodeEvent:
frame = f.parseEventFrame()
default:
return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op)
Expand Down Expand Up @@ -785,7 +787,7 @@ func (f *framer) readErrorMap() (errMap ErrorMap) {
return
}

func (f *framer) writeHeader(flags byte, op frameOp, stream int) {
func (f *framer) writeHeader(flags byte, op FrameOpcode, stream int) {
f.outFrameOp = op

f.buf = f.buf[:0]
Expand Down Expand Up @@ -829,7 +831,7 @@ func (f *framer) setLength(length int) {

type outFrameInfo struct {
// op is the type of the frame.
op frameOp
op FrameOpcode
// compressedSize of the frame payload (without header).
compressedSize int
// uncompressedSize of the frame payload (without header).
Expand Down Expand Up @@ -916,7 +918,7 @@ func (w writeStartupFrame) String() string {
}

func (w *writeStartupFrame) buildFrame(f *framer, streamID int) (outFrameInfo, error) {
f.writeHeader(f.flags&^flagCompress, opStartup, streamID)
f.writeHeader(f.flags&^flagCompress, FrameOpcodeStartup, streamID)
f.writeStringMap(w.opts)

return f.finish()
Expand All @@ -932,7 +934,7 @@ func (w *writePrepareFrame) buildFrame(f *framer, streamID int) (outFrameInfo, e
if len(w.customPayload) > 0 {
f.payload()
}
f.writeHeader(f.flags, opPrepare, streamID)
f.writeHeader(f.flags, FrameOpcodePrepare, streamID)
f.writeCustomPayload(&w.customPayload)
f.writeLongString(w.statement)

Expand Down Expand Up @@ -1529,7 +1531,7 @@ func (a *writeAuthResponseFrame) buildFrame(framer *framer, streamID int) (outFr
}

func (f *framer) writeAuthResponseFrame(streamID int, data []byte) (outFrameInfo, error) {
f.writeHeader(f.flags, opAuthResponse, streamID)
f.writeHeader(f.flags, FrameOpcodeAuthResponse, streamID)
f.writeBytes(data)
return f.finish()
}
Expand Down Expand Up @@ -1683,7 +1685,7 @@ func (f *framer) writeQueryFrame(streamID int, statement string, params *queryPa
if len(customPayload) > 0 {
f.payload()
}
f.writeHeader(f.flags, opQuery, streamID)
f.writeHeader(f.flags, FrameOpcodeQuery, streamID)
f.writeCustomPayload(&customPayload)
f.writeLongString(statement)
valuesSize := f.writeQueryParams(params)
Expand Down Expand Up @@ -1724,7 +1726,7 @@ func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *quer
if len(*customPayload) > 0 {
f.payload()
}
f.writeHeader(f.flags, opExecute, streamID)
f.writeHeader(f.flags, FrameOpcodeExecute, streamID)
f.writeCustomPayload(customPayload)
f.writeShortBytes(preparedID)
var valuesSize int
Expand Down Expand Up @@ -1781,7 +1783,7 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload
if len(customPayload) > 0 {
f.payload()
}
f.writeHeader(f.flags, opBatch, streamID)
f.writeHeader(f.flags, FrameOpcodeBatch, streamID)
f.writeCustomPayload(&customPayload)
f.writeByte(byte(w.typ))

Expand Down Expand Up @@ -1870,7 +1872,7 @@ func (w *writeOptionsFrame) buildFrame(framer *framer, streamID int) (outFrameIn
}

func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) (outFrameInfo, error) {
f.writeHeader(f.flags&^flagCompress, opOptions, stream)
f.writeHeader(f.flags&^flagCompress, FrameOpcodeOptions, stream)
return f.finish()
}

Expand All @@ -1883,7 +1885,7 @@ func (w *writeRegisterFrame) buildFrame(framer *framer, streamID int) (outFrameI
}

func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) (outFrameInfo, error) {
f.writeHeader(f.flags, opRegister, streamID)
f.writeHeader(f.flags, FrameOpcodeRegister, streamID)
f.writeStringList(w.events)

return f.finish()
Expand Down
Loading

0 comments on commit 891afea

Please sign in to comment.