diff --git a/cassandra_test.go b/cassandra_test.go index 9f44ce784..b8303b2f5 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2180,7 +2180,7 @@ func TestNegativeStream(t *testing.T) { const stream = -50 writer := frameWriterFunc(func(f *framer, streamID int) (outFrameInfo, error) { - f.writeHeader(0, opOptions, stream) + f.writeHeader(0, FrameOpcodeOptions, stream) return f.finish() }) diff --git a/conn.go b/conn.go index 73137ad52..cd9773d01 100644 --- a/conn.go +++ b/conn.go @@ -721,7 +721,7 @@ func (c *Conn) recv(ctx context.Context) error { Version: protoVersion(head.version), Flags: head.flags, Stream: int16(head.stream), - Opcode: frameOp(head.op), + Opcode: FrameOpcode(head.op), Length: int32(head.length), Start: headStartTime, End: headEndTime, @@ -1248,7 +1248,7 @@ type ObservedStream struct { // Host of the connection used to send the stream. Host *HostInfo // FrameOpcode is the frame operation (type) that was used. - FrameOpcode frameOp + FrameOpcode FrameOpcode // FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header). // This field is only available in StreamStarted. FramePayloadUncompressedSize int diff --git a/conn_test.go b/conn_test.go index f1a0bc19f..84b1b3f0f 100644 --- a/conn_test.go +++ b/conn_test.go @@ -676,7 +676,7 @@ func TestStream0(t *testing.T) { var buf bytes.Buffer f := newFramer(nil, protoVersion4) - f.writeHeader(0, opResult, 0) + f.writeHeader(0, FrameOpcodeResult, 0) f.writeInt(resultKindVoid) f.buf[0] |= 0x80 if _, err := f.finish(); err != nil { @@ -734,7 +734,7 @@ func TestContext_CanceledBeforeExec(t *testing.T) { addr: "127.0.0.1:0", protocol: defaultProto, recvHook: func(f *framer) { - if f.header.op == opStartup || f.header.op == opOptions { + if f.header.op == FrameOpcodeStartup || f.header.op == FrameOpcodeOptions { // ignore statup and heartbeat messages return } @@ -966,7 +966,7 @@ func TestFrameHeaderObserver(t *testing.T) { } frames := observer.getFrames() - expFrames := []frameOp{opSupported, opReady, opResult} + expFrames := []FrameOpcode{FrameOpcodeSupported, FrameOpcodeReady, FrameOpcodeResult} if len(frames) != len(expFrames) { t.Fatalf("Expected to receive %d frames, instead received %d", len(expFrames), len(frames)) } @@ -1209,7 +1209,7 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] respFrame := newFramer(nil, reqFrame.proto) switch head.op { - case opStartup: + case FrameOpcodeStartup: if atomic.LoadInt32(&srv.TimeoutOnStartup) > 0 { // Do not respond to startup command // wait until we get a cancel signal @@ -1218,11 +1218,11 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] return } } - respFrame.writeHeader(0, opReady, head.stream) - case opOptions: - respFrame.writeHeader(0, opSupported, head.stream) + respFrame.writeHeader(0, FrameOpcodeReady, head.stream) + case FrameOpcodeOptions: + respFrame.writeHeader(0, FrameOpcodeSupported, head.stream) respFrame.writeStringMultiMap(exts) - case opQuery: + case FrameOpcodeQuery: query := reqFrame.readLongString() first := query if n := strings.Index(query, " "); n > 0 { @@ -1231,21 +1231,21 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] switch strings.ToLower(first) { case "kill": atomic.AddInt64(&srv.nKillReq, 1) - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0x1001) respFrame.writeString("query killed") case "use": respFrame.writeInt(resultKindKeyspace) respFrame.writeString(strings.TrimSpace(query[3:])) case "void": - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) case "timeout": <-srv.ctx.Done() return case "slow": go func() { - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) respFrame.buf[0] = srv.protocol | 0x80 select { @@ -1260,25 +1260,25 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] case "speculative": atomic.AddInt64(&srv.nKillReq, 1) if atomic.LoadInt64(&srv.nKillReq) > 3 { - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) respFrame.writeString("speculative query success on the node " + srv.Address) } else { - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0x1001) respFrame.writeString("speculative error") rand.Seed(time.Now().UnixNano()) <-time.After(time.Millisecond * 120) } default: - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) } - case opError: - respFrame.writeHeader(0, opError, head.stream) + case FrameOpcodeError: + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.buf = append(respFrame.buf, reqFrame.buf...) default: - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0) respFrame.writeString("not supported") } diff --git a/frame.go b/frame.go index 8fd7258cc..3640cd98f 100644 --- a/frame.go +++ b/frame.go @@ -75,61 +75,63 @@ func (p protoVersion) String() string { return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir) } -type frameOp byte +// FrameOpcode enumerates frame operation codes from the CQL protocol. +// See https://martin-sucha.github.io/cqlprotodoc/native_protocol_v4.html#s2.4 +type FrameOpcode byte const ( // header ops - opError frameOp = 0x00 - opStartup frameOp = 0x01 - opReady frameOp = 0x02 - opAuthenticate frameOp = 0x03 - opOptions frameOp = 0x05 - opSupported frameOp = 0x06 - opQuery frameOp = 0x07 - opResult frameOp = 0x08 - opPrepare frameOp = 0x09 - opExecute frameOp = 0x0A - opRegister frameOp = 0x0B - opEvent frameOp = 0x0C - opBatch frameOp = 0x0D - opAuthChallenge frameOp = 0x0E - opAuthResponse frameOp = 0x0F - opAuthSuccess frameOp = 0x10 + FrameOpcodeError FrameOpcode = 0x00 + FrameOpcodeStartup FrameOpcode = 0x01 + FrameOpcodeReady FrameOpcode = 0x02 + FrameOpcodeAuthenticate FrameOpcode = 0x03 + FrameOpcodeOptions FrameOpcode = 0x05 + FrameOpcodeSupported FrameOpcode = 0x06 + FrameOpcodeQuery FrameOpcode = 0x07 + FrameOpcodeResult FrameOpcode = 0x08 + FrameOpcodePrepare FrameOpcode = 0x09 + FrameOpcodeExecute FrameOpcode = 0x0A + FrameOpcodeRegister FrameOpcode = 0x0B + FrameOpcodeEvent FrameOpcode = 0x0C + FrameOpcodeBatch FrameOpcode = 0x0D + FrameOpcodeAuthChallenge FrameOpcode = 0x0E + FrameOpcodeAuthResponse FrameOpcode = 0x0F + FrameOpcodeAuthSuccess FrameOpcode = 0x10 ) -func (f frameOp) String() string { +func (f FrameOpcode) String() string { switch f { - case opError: + case FrameOpcodeError: return "ERROR" - case opStartup: + case FrameOpcodeStartup: return "STARTUP" - case opReady: + case FrameOpcodeReady: return "READY" - case opAuthenticate: + case FrameOpcodeAuthenticate: return "AUTHENTICATE" - case opOptions: + case FrameOpcodeOptions: return "OPTIONS" - case opSupported: + case FrameOpcodeSupported: return "SUPPORTED" - case opQuery: + case FrameOpcodeQuery: return "QUERY" - case opResult: + case FrameOpcodeResult: return "RESULT" - case opPrepare: + case FrameOpcodePrepare: return "PREPARE" - case opExecute: + case FrameOpcodeExecute: return "EXECUTE" - case opRegister: + case FrameOpcodeRegister: return "REGISTER" - case opEvent: + case FrameOpcodeEvent: return "EVENT" - case opBatch: + case FrameOpcodeBatch: return "BATCH" - case opAuthChallenge: + case FrameOpcodeAuthChallenge: return "AUTH_CHALLENGE" - case opAuthResponse: + case FrameOpcodeAuthResponse: return "AUTH_RESPONSE" - case opAuthSuccess: + case FrameOpcodeAuthSuccess: return "AUTH_SUCCESS" default: return fmt.Sprintf("UNKNOWN_OP_%d", f) @@ -301,7 +303,7 @@ type frameHeader struct { version protoVersion flags byte stream int - op frameOp + op FrameOpcode length int warnings []string } @@ -320,7 +322,7 @@ type ObservedFrameHeader struct { Version protoVersion Flags byte Stream int16 - Opcode frameOp + Opcode FrameOpcode Length int32 // StartHeader is the time we started reading the frame header off the network connection. @@ -380,7 +382,7 @@ type framer struct { compres Compressor headSize int // if writeHeader was called, outFrameOp will contain the frame operation. - outFrameOp frameOp + outFrameOp FrameOpcode // if this frame was read then the header will be here header *frameHeader // ucompressedSize is size of the frame payload after decompression. @@ -524,7 +526,7 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { } head.stream = int(int16(p[2])<<8 | int16(p[3])) - head.op = frameOp(p[4]) + head.op = FrameOpcode(p[4]) head.length = int(readInt(p[5:])) } else { if len(p) != 8 { @@ -532,7 +534,7 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { } head.stream = int(int8(p[2])) - head.op = frameOp(p[3]) + head.op = FrameOpcode(p[3]) head.length = int(readInt(p[4:])) } @@ -620,21 +622,21 @@ func (f *framer) parseFrame() (frame frame, err error) { // assumes that the frame body has been read into rbuf switch f.header.op { - case opError: + case FrameOpcodeError: frame = f.parseErrorFrame() - case opReady: + case FrameOpcodeReady: frame = f.parseReadyFrame() - case opResult: + case FrameOpcodeResult: frame, err = f.parseResultFrame() - case opSupported: + case FrameOpcodeSupported: frame = f.parseSupportedFrame() - case opAuthenticate: + case FrameOpcodeAuthenticate: frame = f.parseAuthenticateFrame() - case opAuthChallenge: + case FrameOpcodeAuthChallenge: frame = f.parseAuthChallengeFrame() - case opAuthSuccess: + case FrameOpcodeAuthSuccess: frame = f.parseAuthSuccessFrame() - case opEvent: + case FrameOpcodeEvent: frame = f.parseEventFrame() default: return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op) @@ -785,7 +787,7 @@ func (f *framer) readErrorMap() (errMap ErrorMap) { return } -func (f *framer) writeHeader(flags byte, op frameOp, stream int) { +func (f *framer) writeHeader(flags byte, op FrameOpcode, stream int) { f.outFrameOp = op f.buf = f.buf[:0] @@ -829,7 +831,7 @@ func (f *framer) setLength(length int) { type outFrameInfo struct { // op is the type of the frame. - op frameOp + op FrameOpcode // compressedSize of the frame payload (without header). compressedSize int // uncompressedSize of the frame payload (without header). @@ -916,7 +918,7 @@ func (w writeStartupFrame) String() string { } func (w *writeStartupFrame) buildFrame(f *framer, streamID int) (outFrameInfo, error) { - f.writeHeader(f.flags&^flagCompress, opStartup, streamID) + f.writeHeader(f.flags&^flagCompress, FrameOpcodeStartup, streamID) f.writeStringMap(w.opts) return f.finish() @@ -932,7 +934,7 @@ func (w *writePrepareFrame) buildFrame(f *framer, streamID int) (outFrameInfo, e if len(w.customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opPrepare, streamID) + f.writeHeader(f.flags, FrameOpcodePrepare, streamID) f.writeCustomPayload(&w.customPayload) f.writeLongString(w.statement) @@ -1529,7 +1531,7 @@ func (a *writeAuthResponseFrame) buildFrame(framer *framer, streamID int) (outFr } func (f *framer) writeAuthResponseFrame(streamID int, data []byte) (outFrameInfo, error) { - f.writeHeader(f.flags, opAuthResponse, streamID) + f.writeHeader(f.flags, FrameOpcodeAuthResponse, streamID) f.writeBytes(data) return f.finish() } @@ -1683,7 +1685,7 @@ func (f *framer) writeQueryFrame(streamID int, statement string, params *queryPa if len(customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opQuery, streamID) + f.writeHeader(f.flags, FrameOpcodeQuery, streamID) f.writeCustomPayload(&customPayload) f.writeLongString(statement) valuesSize := f.writeQueryParams(params) @@ -1724,7 +1726,7 @@ func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *quer if len(*customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opExecute, streamID) + f.writeHeader(f.flags, FrameOpcodeExecute, streamID) f.writeCustomPayload(customPayload) f.writeShortBytes(preparedID) var valuesSize int @@ -1781,7 +1783,7 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload if len(customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opBatch, streamID) + f.writeHeader(f.flags, FrameOpcodeBatch, streamID) f.writeCustomPayload(&customPayload) f.writeByte(byte(w.typ)) @@ -1870,7 +1872,7 @@ func (w *writeOptionsFrame) buildFrame(framer *framer, streamID int) (outFrameIn } func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) (outFrameInfo, error) { - f.writeHeader(f.flags&^flagCompress, opOptions, stream) + f.writeHeader(f.flags&^flagCompress, FrameOpcodeOptions, stream) return f.finish() } @@ -1883,7 +1885,7 @@ func (w *writeRegisterFrame) buildFrame(framer *framer, streamID int) (outFrameI } func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) (outFrameInfo, error) { - f.writeHeader(f.flags, opRegister, streamID) + f.writeHeader(f.flags, FrameOpcodeRegister, streamID) f.writeStringList(w.events) return f.finish() diff --git a/frame_test.go b/frame_test.go index 299ffbcc4..8844747fc 100644 --- a/frame_test.go +++ b/frame_test.go @@ -64,7 +64,7 @@ func TestFrameWriteTooLong(t *testing.T) { framer := newFramer(nil, 2) - framer.writeHeader(0, opStartup, 1) + framer.writeHeader(0, FrameOpcodeStartup, 1) framer.writeBytes(make([]byte, maxFrameSize+1)) _, err := framer.finish() if err != ErrFrameTooBig { @@ -80,13 +80,13 @@ func TestFrameReadTooLong(t *testing.T) { r := &bytes.Buffer{} r.Write(make([]byte, maxFrameSize+1)) // write a new header right after this frame to verify that we can read it - r.Write([]byte{0x02, 0x00, 0x00, byte(opReady), 0x00, 0x00, 0x00, 0x00}) + r.Write([]byte{0x02, 0x00, 0x00, byte(FrameOpcodeReady), 0x00, 0x00, 0x00, 0x00}) framer := newFramer(nil, 2) head := frameHeader{ version: 2, - op: opReady, + op: FrameOpcodeReady, length: r.Len() - 8, } @@ -99,8 +99,8 @@ func TestFrameReadTooLong(t *testing.T) { if err != nil { t.Fatal(err) } - if head.op != opReady { - t.Fatalf("expected to get header %v got %v", opReady, head.op) + if head.op != FrameOpcodeReady { + t.Fatalf("expected to get header %v got %v", FrameOpcodeReady, head.op) } } @@ -135,7 +135,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opQuery, + op: FrameOpcodeQuery, uncompressedSize: 81, compressedSize: 72, queryValuesSize: 30, @@ -159,7 +159,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opExecute, + op: FrameOpcodeExecute, compressedSize: 50, uncompressedSize: 51, queryValuesSize: 30, @@ -203,7 +203,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opBatch, + op: FrameOpcodeBatch, compressedSize: 96, uncompressedSize: 130, queryValuesSize: 60, @@ -214,7 +214,7 @@ func TestOutFrameInfo(t *testing.T) { frame: &writeOptionsFrame{}, compress: true, expectedInfo: outFrameInfo{ - op: opOptions, + op: FrameOpcodeOptions, compressedSize: 0, uncompressedSize: 0, queryValuesSize: 0, @@ -227,7 +227,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opRegister, + op: FrameOpcodeRegister, compressedSize: 20, uncompressedSize: 18, queryValuesSize: 0, @@ -240,7 +240,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: false, expectedInfo: outFrameInfo{ - op: opRegister, + op: FrameOpcodeRegister, compressedSize: 0, uncompressedSize: 18, queryValuesSize: 0, diff --git a/framer_bench_test.go b/framer_bench_test.go index 5b7b442f9..95f3aa6ba 100644 --- a/framer_bench_test.go +++ b/framer_bench_test.go @@ -34,7 +34,7 @@ func BenchmarkParseRowsFrame(b *testing.B) { framer := &framer{ header: &frameHeader{ version: protoVersion4 | 0x80, - op: opResult, + op: FrameOpcodeResult, length: len(data), }, buf: data,