From 3975e7d26276e3e01b6c2ef61d749ffd112b5edc Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 20 Sep 2023 10:56:18 +0200 Subject: [PATCH 1/6] Track payload sizes of outgoing frames We want to see the effect of changing compression settings and also the size of queries. --- cassandra_test.go | 2 +- conn.go | 24 +++++++- conn_test.go | 4 +- frame.go | 98 +++++++++++++++++++++---------- frame_test.go | 145 +++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 238 insertions(+), 35 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 02eed613e..9f44ce784 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2179,7 +2179,7 @@ func TestNegativeStream(t *testing.T) { conn := getRandomConn(t, session) const stream = -50 - writer := frameWriterFunc(func(f *framer, streamID int) error { + writer := frameWriterFunc(func(f *framer, streamID int) (outFrameInfo, error) { f.writeHeader(0, opOptions, stream) return f.finish() }) diff --git a/conn.go b/conn.go index 1a7eb787d..7c2ce1cf2 100644 --- a/conn.go +++ b/conn.go @@ -1099,13 +1099,19 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram framer.trace() } + ofi, err := req.buildFrame(framer, stream) + // The error is handled after we call the stream observer. + if call.streamObserverContext != nil { call.streamObserverContext.StreamStarted(ObservedStream{ - Host: c.host, + Host: c.host, + FramePayloadUncompressedSize: ofi.uncompressedSize, + FramePayloadCompressedSize: ofi.compressedSize, + QueryValuesSize: ofi.queryValuesSize, + QueryCount: ofi.queryCount, }) } - err := req.buildFrame(framer, stream) if err != nil { // closeWithError will block waiting for this stream to either receive a response // or for us to timeout. @@ -1217,6 +1223,20 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram type ObservedStream struct { // Host of the connection used to send the stream. Host *HostInfo + // FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header). + // This field is only available in StreamStarted. + FramePayloadUncompressedSize int + // FramePayloadCompressedSize is the compressed size of the frame payload (without frame header). + // This field is only available in StreamStarted. + FramePayloadCompressedSize int + // QueryValuesSize is the total uncompressed size of query values in the frame (without other query options). + // For a batch, it is the sum for all queries in the batch. + // For frames that contain no query values QueryValuesSize is zero. + // This field is only available in StreamStarted. + QueryValuesSize int + // QueryCount is 1 for EXECUTE/QUERY and size of the batch for BATCH frames. + // This field is only available in StreamStarted. + QueryCount int } // StreamObserver is notified about request/response pairs. diff --git a/conn_test.go b/conn_test.go index 69d775664..f1a0bc19f 100644 --- a/conn_test.go +++ b/conn_test.go @@ -679,7 +679,7 @@ func TestStream0(t *testing.T) { f.writeHeader(0, opResult, 0) f.writeInt(resultKindVoid) f.buf[0] |= 0x80 - if err := f.finish(); err != nil { + if _, err := f.finish(); err != nil { t.Fatal(err) } if err := f.writeTo(&buf); err != nil { @@ -1285,7 +1285,7 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] respFrame.buf[0] = srv.protocol | 0x80 - if err := respFrame.finish(); err != nil { + if _, err := respFrame.finish(); err != nil { srv.errorLocked(err) } diff --git a/frame.go b/frame.go index 22d722205..218c97ce8 100644 --- a/frame.go +++ b/frame.go @@ -764,11 +764,26 @@ func (f *framer) setLength(length int) { f.buf[p+3] = byte(length) } -func (f *framer) finish() error { +type outFrameInfo struct { + // compressedSize of the frame payload (without header). + compressedSize int + // uncompressedSize of the frame payload (without header). + uncompressedSize int + // queryValuesSize is sum of sizes of query values. + queryValuesSize int + // queryCount is number of queries executed by the query/execute/batch frame. + queryCount int +} + +func (f *framer) finish() (outFrameInfo, error) { if len(f.buf) > maxFrameSize { // huge app frame, lets remove it so it doesn't bloat the heap f.buf = make([]byte, defaultBufSize) - return ErrFrameTooBig + return outFrameInfo{}, ErrFrameTooBig + } + + info := outFrameInfo{ + uncompressedSize: len(f.buf) - f.headSize, } if f.buf[1]&flagCompress == flagCompress { @@ -779,15 +794,16 @@ func (f *framer) finish() error { // TODO: only compress frames which are big enough compressed, err := f.compres.Encode(f.buf[f.headSize:]) if err != nil { - return err + return info, err } f.buf = append(f.buf[:f.headSize], compressed...) } length := len(f.buf) - f.headSize + info.compressedSize = length f.setLength(length) - return nil + return info, nil } func (f *framer) writeTo(w io.Writer) error { @@ -833,7 +849,7 @@ func (w writeStartupFrame) String() string { return fmt.Sprintf("[startup opts=%+v]", w.opts) } -func (w *writeStartupFrame) buildFrame(f *framer, streamID int) error { +func (w *writeStartupFrame) buildFrame(f *framer, streamID int) (outFrameInfo, error) { f.writeHeader(f.flags&^flagCompress, opStartup, streamID) f.writeStringMap(w.opts) @@ -846,7 +862,7 @@ type writePrepareFrame struct { customPayload map[string][]byte } -func (w *writePrepareFrame) buildFrame(f *framer, streamID int) error { +func (w *writePrepareFrame) buildFrame(f *framer, streamID int) (outFrameInfo, error) { if len(w.customPayload) > 0 { f.payload() } @@ -1436,11 +1452,11 @@ func (a *writeAuthResponseFrame) String() string { return fmt.Sprintf("[auth_response data=%q]", a.data) } -func (a *writeAuthResponseFrame) buildFrame(framer *framer, streamID int) error { +func (a *writeAuthResponseFrame) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return framer.writeAuthResponseFrame(streamID, a.data) } -func (f *framer) writeAuthResponseFrame(streamID int, data []byte) error { +func (f *framer) writeAuthResponseFrame(streamID int, data []byte) (outFrameInfo, error) { f.writeHeader(f.flags, opAuthResponse, streamID) f.writeBytes(data) return f.finish() @@ -1474,11 +1490,13 @@ func (q queryParams) String() string { q.consistency, q.skipMeta, q.pageSize, q.pagingState, q.serialConsistency, q.defaultTimestamp, q.values, q.keyspace) } -func (f *framer) writeQueryParams(opts *queryParams) { +// writeQueryParams writes the queryParameters to the buffer. +// It returns the total size of the values. +func (f *framer) writeQueryParams(opts *queryParams) int { f.writeConsistency(opts.consistency) if f.proto == protoVersion1 { - return + return 0 } var flags byte @@ -1526,6 +1544,7 @@ func (f *framer) writeQueryParams(opts *queryParams) { f.writeByte(flags) } + startIdx := len(f.buf) if n := len(opts.values); n > 0 { f.writeShort(uint16(n)) @@ -1540,6 +1559,7 @@ func (f *framer) writeQueryParams(opts *queryParams) { } } } + valuesSize := len(f.buf) - startIdx if opts.pageSize > 0 { f.writeInt(int32(opts.pageSize)) @@ -1567,6 +1587,8 @@ func (f *framer) writeQueryParams(opts *queryParams) { if opts.keyspace != "" { f.writeString(opts.keyspace) } + + return valuesSize } type writeQueryFrame struct { @@ -1581,29 +1603,32 @@ func (w *writeQueryFrame) String() string { return fmt.Sprintf("[query statement=%q params=%v]", w.statement, w.params) } -func (w *writeQueryFrame) buildFrame(framer *framer, streamID int) error { +func (w *writeQueryFrame) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return framer.writeQueryFrame(streamID, w.statement, &w.params, w.customPayload) } -func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) error { +func (f *framer) writeQueryFrame(streamID int, statement string, params *queryParams, customPayload map[string][]byte) (outFrameInfo, error) { if len(customPayload) > 0 { f.payload() } f.writeHeader(f.flags, opQuery, streamID) f.writeCustomPayload(&customPayload) f.writeLongString(statement) - f.writeQueryParams(params) + valuesSize := f.writeQueryParams(params) - return f.finish() + ofi, err := f.finish() + ofi.queryValuesSize = valuesSize + ofi.queryCount = 1 + return ofi, err } type frameBuilder interface { - buildFrame(framer *framer, streamID int) error + buildFrame(framer *framer, streamID int) (outFrameInfo, error) } -type frameWriterFunc func(framer *framer, streamID int) error +type frameWriterFunc func(framer *framer, streamID int) (outFrameInfo, error) -func (f frameWriterFunc) buildFrame(framer *framer, streamID int) error { +func (f frameWriterFunc) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return f(framer, streamID) } @@ -1619,20 +1644,22 @@ func (e *writeExecuteFrame) String() string { return fmt.Sprintf("[execute id=% X params=%v]", e.preparedID, &e.params) } -func (e *writeExecuteFrame) buildFrame(fr *framer, streamID int) error { +func (e *writeExecuteFrame) buildFrame(fr *framer, streamID int) (outFrameInfo, error) { return fr.writeExecuteFrame(streamID, e.preparedID, &e.params, &e.customPayload) } -func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) error { +func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *queryParams, customPayload *map[string][]byte) (outFrameInfo, error) { if len(*customPayload) > 0 { f.payload() } f.writeHeader(f.flags, opExecute, streamID) f.writeCustomPayload(customPayload) f.writeShortBytes(preparedID) + var valuesSize int if f.proto > protoVersion1 { - f.writeQueryParams(params) + valuesSize = f.writeQueryParams(params) } else { + startIdx := len(f.buf) n := len(params.values) f.writeShort(uint16(n)) for i := 0; i < n; i++ { @@ -1642,10 +1669,14 @@ func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *quer f.writeBytes(params.values[i].value) } } + valuesSize = len(f.buf) - startIdx f.writeConsistency(params.consistency) } - return f.finish() + ofi, err := f.finish() + ofi.queryValuesSize = valuesSize + ofi.queryCount = 1 + return ofi, err } // TODO: can we replace BatchStatemt with batchStatement? As they prety much @@ -1670,11 +1701,11 @@ type writeBatchFrame struct { customPayload map[string][]byte } -func (w *writeBatchFrame) buildFrame(framer *framer, streamID int) error { +func (w *writeBatchFrame) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return framer.writeBatchFrame(streamID, w, w.customPayload) } -func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) error { +func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload map[string][]byte) (outFrameInfo, error) { if len(customPayload) > 0 { f.payload() } @@ -1687,6 +1718,8 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload var flags byte + var queryParamsSize int + for i := 0; i < n; i++ { b := &w.statements[i] if len(b.preparedID) == 0 { @@ -1697,6 +1730,8 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload f.writeShortBytes(b.preparedID) } + startIdx := len(f.buf) + f.writeShort(uint16(len(b.values))) for j := range b.values { col := b.values[j] @@ -1704,7 +1739,7 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload // TODO: move this check into the caller and set a flag on writeBatchFrame // to indicate using named values if f.proto <= protoVersion5 { - return fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246") + return outFrameInfo{}, fmt.Errorf("gocql: named query values are not supported in batches, please see https://issues.apache.org/jira/browse/CASSANDRA-10246") } flags |= flagWithNameValues f.writeString(col.name) @@ -1715,6 +1750,8 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload f.writeBytes(col.value) } } + + queryParamsSize += len(f.buf) - startIdx } f.writeConsistency(w.consistency) @@ -1748,16 +1785,19 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload } } - return f.finish() + ofi, err := f.finish() + ofi.queryValuesSize = queryParamsSize + ofi.queryCount = n + return ofi, err } type writeOptionsFrame struct{} -func (w *writeOptionsFrame) buildFrame(framer *framer, streamID int) error { +func (w *writeOptionsFrame) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return framer.writeOptionsFrame(streamID, w) } -func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) error { +func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) (outFrameInfo, error) { f.writeHeader(f.flags&^flagCompress, opOptions, stream) return f.finish() } @@ -1766,11 +1806,11 @@ type writeRegisterFrame struct { events []string } -func (w *writeRegisterFrame) buildFrame(framer *framer, streamID int) error { +func (w *writeRegisterFrame) buildFrame(framer *framer, streamID int) (outFrameInfo, error) { return framer.writeRegisterFrame(streamID, w) } -func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) error { +func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) (outFrameInfo, error) { f.writeHeader(f.flags, opRegister, streamID) f.writeStringList(w.events) diff --git a/frame_test.go b/frame_test.go index 6b8eb228e..9fa379ada 100644 --- a/frame_test.go +++ b/frame_test.go @@ -66,7 +66,7 @@ func TestFrameWriteTooLong(t *testing.T) { framer.writeHeader(0, opStartup, 1) framer.writeBytes(make([]byte, maxFrameSize+1)) - err := framer.finish() + _, err := framer.finish() if err != ErrFrameTooBig { t.Fatalf("expected to get %v got %v", ErrFrameTooBig, err) } @@ -103,3 +103,146 @@ func TestFrameReadTooLong(t *testing.T) { t.Fatalf("expected to get header %v got %v", opReady, head.op) } } + +func TestOutFrameInfo(t *testing.T) { + tests := map[string]struct { + frame frameBuilder + expectedInfo outFrameInfo + }{ + "query": { + frame: &writeQueryFrame{ + statement: "SELECT * FROM mytable WHERE id=? AND x=?", + params: queryParams{ + consistency: One, + skipMeta: false, + values: []queryValues{ + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + }, + pageSize: 5000, + pagingState: nil, + serialConsistency: 0, + defaultTimestamp: false, + defaultTimestampValue: 0, + keyspace: "", + }, + customPayload: nil, + }, + expectedInfo: outFrameInfo{ + uncompressedSize: 81, + compressedSize: 72, + queryValuesSize: 30, + queryCount: 1, + }, + }, + "execute": { + frame: &writeExecuteFrame{ + preparedID: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5}, + params: queryParams{ + values: []queryValues{ + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + }, + }, + customPayload: nil, + }, + expectedInfo: outFrameInfo{ + compressedSize: 50, + uncompressedSize: 51, + queryValuesSize: 30, + queryCount: 1, + }, + }, + "batch": { + frame: &writeBatchFrame{ + typ: UnloggedBatch, + statements: []batchStatment{ + { + preparedID: nil, + statement: "SELECT * FROM mytable WHERE id=? AND x=?", + values: []queryValues{ + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + }, + }, + { + preparedID: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5}, + statement: "", + values: []queryValues{ + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + { + value: []byte{'H', 'e', 'l', 'l', 'o', 'W', 'o', 'r', 'l', 'd'}, + }, + }, + }, + }, + consistency: One, + serialConsistency: 0, + defaultTimestamp: false, + defaultTimestampValue: 0, + customPayload: nil, + }, + expectedInfo: outFrameInfo{ + compressedSize: 96, + uncompressedSize: 130, + queryValuesSize: 60, + queryCount: 2, + }, + }, + "options": { + frame: &writeOptionsFrame{}, + expectedInfo: outFrameInfo{ + compressedSize: 0, + uncompressedSize: 0, + queryValuesSize: 0, + queryCount: 0, + }, + }, + "register": { + frame: &writeRegisterFrame{ + events: []string{"event1", "event2"}, + }, + expectedInfo: outFrameInfo{ + compressedSize: 20, + uncompressedSize: 18, + queryValuesSize: 0, + queryCount: 0, + }, + }, + } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + fr := newFramer(SnappyCompressor{}, 4) + ofi, err := test.frame.buildFrame(fr, 42) + if err != nil { + t.Fatal(err) + } + if ofi.queryCount != test.expectedInfo.queryCount { + t.Errorf("expected queryCount %d, but got %d", test.expectedInfo.queryCount, ofi.queryCount) + } + if ofi.queryValuesSize != test.expectedInfo.queryValuesSize { + t.Errorf("expected queryValuesSize %d, but got %d", test.expectedInfo.queryValuesSize, ofi.queryValuesSize) + } + if ofi.uncompressedSize != test.expectedInfo.uncompressedSize { + t.Errorf("expected uncompressedSize %d, but got %d", test.expectedInfo.uncompressedSize, ofi.uncompressedSize) + } + if ofi.compressedSize != test.expectedInfo.compressedSize { + t.Errorf("expected compressedSize %d, but got %d", test.expectedInfo.compressedSize, ofi.compressedSize) + } + }) + } +} From a368c8c7e9720120ced54f6f1f30e09f3068a15c Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Thu, 21 Sep 2023 17:31:00 +0200 Subject: [PATCH 2/6] Track payload sizes of incoming frames --- cluster.go | 4 ++++ conn.go | 65 ++++++++++++++++++++++++++++++++++++------------------ frame.go | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++ session.go | 14 +++++++----- 4 files changed, 117 insertions(+), 27 deletions(-) diff --git a/cluster.go b/cluster.go index 16796d7e6..61dd7e1cd 100644 --- a/cluster.go +++ b/cluster.go @@ -235,6 +235,10 @@ type ClusterConfig struct { // Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver. FrameHeaderObserver FrameHeaderObserver + // FrameObserver will be notified of all received frames that were read. + // FrameObserver will not see frames that were discarded. + FrameObserver FrameObserver + // StreamObserver will be notified of stream state changes. // This can be used to track in-flight protocol requests and responses. StreamObserver StreamObserver diff --git a/conn.go b/conn.go index 7c2ce1cf2..e857331af 100644 --- a/conn.go +++ b/conn.go @@ -170,11 +170,12 @@ type Conn struct { r *bufio.Reader w contextWriter - timeout time.Duration - writeTimeout time.Duration - cfg *ConnConfig - frameObserver FrameHeaderObserver - streamObserver StreamObserver + timeout time.Duration + writeTimeout time.Duration + cfg *ConnConfig + frameHeaderObserver FrameHeaderObserver + frameObserver FrameObserver + streamObserver StreamObserver headerBuf [maxFrameHeaderSize]byte @@ -279,19 +280,20 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg * ctx, cancel := context.WithCancel(ctx) c := &Conn{ - conn: dialedHost.Conn, - r: bufio.NewReader(dialedHost.Conn), - cfg: cfg, - calls: make(map[int]*callReq), - version: uint8(cfg.ProtoVersion), - addr: dialedHost.Conn.RemoteAddr().String(), - errorHandler: errorHandler, - compressor: cfg.Compressor, - session: s, - streams: s.streamIDGenerator(cfg.ProtoVersion), - host: host, - isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise - frameObserver: s.frameObserver, + conn: dialedHost.Conn, + r: bufio.NewReader(dialedHost.Conn), + cfg: cfg, + calls: make(map[int]*callReq), + version: uint8(cfg.ProtoVersion), + addr: dialedHost.Conn.RemoteAddr().String(), + errorHandler: errorHandler, + compressor: cfg.Compressor, + session: s, + streams: s.streamIDGenerator(cfg.ProtoVersion), + host: host, + isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise + frameHeaderObserver: s.frameHeaderObserver, + frameObserver: s.frameObserver, w: &deadlineContextWriter{ w: dialedHost.Conn, timeout: writeTimeout, @@ -713,8 +715,9 @@ func (c *Conn) recv(ctx context.Context) error { return err } - if c.frameObserver != nil { - c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{ + var parseObserver frameParseObserver + if c.frameHeaderObserver != nil || c.frameObserver != nil { + observedHeader := ObservedFrameHeader{ Version: protoVersion(head.version), Flags: head.flags, Stream: int16(head.stream), @@ -723,7 +726,18 @@ func (c *Conn) recv(ctx context.Context) error { Start: headStartTime, End: headEndTime, Host: c.host, - }) + } + + if c.frameHeaderObserver != nil { + c.frameHeaderObserver.ObserveFrameHeader(context.Background(), observedHeader) + } + + if c.frameObserver != nil { + parseObserver = frameParseObserver{ + head: observedHeader, + frameObserver: c.frameObserver, + } + } } if head.stream > c.streams.NumStreams { @@ -734,6 +748,9 @@ func (c *Conn) recv(ctx context.Context) error { if err := framer.readFrame(c, &head); err != nil { return err } + if c.frameObserver != nil { + framer.observer = parseObserver + } go c.session.handleEvent(framer) return nil } else if head.stream <= 0 { @@ -743,6 +760,9 @@ func (c *Conn) recv(ctx context.Context) error { if err := framer.readFrame(c, &head); err != nil { return err } + if c.frameObserver != nil { + framer.observer = parseObserver + } frame, err := framer.parseFrame() if err != nil { @@ -779,6 +799,9 @@ func (c *Conn) recv(ctx context.Context) error { return err } } + if c.frameObserver != nil { + framer.observer = parseObserver + } // we either, return a response to the caller, the caller timedout, or the // connection has closed. Either way we should never block indefinatly here diff --git a/frame.go b/frame.go index 218c97ce8..dfc677b0f 100644 --- a/frame.go +++ b/frame.go @@ -344,6 +344,31 @@ type FrameHeaderObserver interface { ObserveFrameHeader(context.Context, ObservedFrameHeader) } +// ObservedFrame describes a frame that was read (not discarded). +type ObservedFrame struct { + ObservedFrameHeader + + // UncompressedSize is a size of the frame payload after decompression. + // See ObservedFrameHeader.Length to get the compressed size. + // UncompressedSize is zero if the frame was not compressed. + UncompressedSize int + + // RowCount is count of result rows. + // Only set for RESULT frame with Rows kind. + RowCount int + + // RowsSize is sum of sizes of all rows in the result. + // Only set for RESULT frame with Rows kind. + RowsSize int +} + +// FrameObserver allows observing received frames. +// +// Experimental, this interface and use may change. +type FrameObserver interface { + ObserveFrame(context.Context, ObservedFrame) +} + // a framer is responsible for reading, writing and parsing frames on a single stream type framer struct { proto byte @@ -353,6 +378,12 @@ type framer struct { headSize int // if this frame was read then the header will be here header *frameHeader + // ucompressedSize is size of the frame payload after decompression. + // It is zero if the frame was not compressed. + // It will be set if this frame was read. + uncompressedSize int + + observer frameParseObserver // if tracing flag is set this is not nil traceID []byte @@ -369,6 +400,26 @@ type framer struct { rateLimitingErrorCode int } +type frameParseObserver struct { + head ObservedFrameHeader + frameObserver FrameObserver +} + +func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) { + if fpo.frameObserver == nil { + return + } + of := ObservedFrame{ + ObservedFrameHeader: fpo.head, + UncompressedSize: ff.uncompressedSize, + } + if rows, ok := f.(resultRowsFrame); ok { + of.RowCount = rows.numRows + of.RowsSize = rows.rowsContentSize + } + fpo.frameObserver.ObserveFrame(context.TODO(), of) +} + func newFramer(compressor Compressor, version byte) *framer { buf := make([]byte, defaultBufSize) f := &framer{ @@ -527,6 +578,8 @@ func (f *framer) readFrame(r io.Reader, head *frameHeader) error { if err != nil { return err } + + f.uncompressedSize = len(f.buf) } f.header = head @@ -581,6 +634,8 @@ func (f *framer) parseFrame() (frame frame, err error) { return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op) } + f.observer.observeFrame(f, frame) + return } @@ -1152,6 +1207,11 @@ type resultRowsFrame struct { meta resultMetadata // dont parse the rows here as we only need to do it once numRows int + // rowsContentSize is size of the frame after row_count. + // It approximates the size of rows_content: + // Currently it measures rows_content, + // but theoretically more fields could be added after rows_content in the future. + rowsContentSize int } func (f *resultRowsFrame) String() string { @@ -1166,6 +1226,7 @@ func (f *framer) parseResultRows() frame { if result.numRows < 0 { panic(fmt.Errorf("invalid row_count in result frame: %d", result.numRows)) } + result.rowsContentSize = len(f.buf) return result } diff --git a/session.go b/session.go index ba1c8cd7c..3422a2c2c 100644 --- a/session.go +++ b/session.go @@ -41,7 +41,8 @@ type Session struct { batchObserver BatchObserver connectObserver ConnectObserver disconnectObserver DisconnectObserver - frameObserver FrameHeaderObserver + frameHeaderObserver FrameHeaderObserver + frameObserver FrameObserver streamObserver StreamObserver hostSource *ringDescriber ringRefresher *refreshDebouncer @@ -175,7 +176,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.queryObserver = cfg.QueryObserver s.batchObserver = cfg.BatchObserver s.connectObserver = cfg.ConnectObserver - s.frameObserver = cfg.FrameHeaderObserver + s.frameHeaderObserver = cfg.FrameHeaderObserver + s.frameObserver = cfg.FrameObserver s.streamObserver = cfg.StreamObserver //Check the TLS Config before trying to connect to anything external @@ -2145,10 +2147,10 @@ type routingKeyInfoLRU struct { } type routingKeyInfo struct { - indexes []int - types []TypeInfo - keyspace string - table string + indexes []int + types []TypeInfo + keyspace string + table string lwt bool partitioner partitioner } From 58ed26dd140ed23920c92a135493c95379536932 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 22 Sep 2023 12:43:29 +0200 Subject: [PATCH 3/6] Add IsRowsResult flag to ObservedFrame Without the flag the user cannot distinguish between an empty RESULT ROWS frame and a frame that is not RESULT ROWS. --- frame.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/frame.go b/frame.go index dfc677b0f..71069447d 100644 --- a/frame.go +++ b/frame.go @@ -353,12 +353,15 @@ type ObservedFrame struct { // UncompressedSize is zero if the frame was not compressed. UncompressedSize int + // IsRowsResult indicates that the frame was a RESULT op with ROWS kind. + IsRowsResult bool + // RowCount is count of result rows. - // Only set for RESULT frame with Rows kind. + // Only set if IsRowsResult is true. RowCount int // RowsSize is sum of sizes of all rows in the result. - // Only set for RESULT frame with Rows kind. + // Only set if IsRowsResult is true. RowsSize int } @@ -414,6 +417,7 @@ func (fpo *frameParseObserver) observeFrame(ff *framer, f frame) { UncompressedSize: ff.uncompressedSize, } if rows, ok := f.(resultRowsFrame); ok { + of.IsRowsResult = true of.RowCount = rows.numRows of.RowsSize = rows.rowsContentSize } From 4fe12283dad332371ca0fd204093dbf2fd69ed7d Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 22 Sep 2023 12:56:46 +0200 Subject: [PATCH 4/6] Expose frame opcode of outgoing frames We need to track the frame count by opcode to compute some of the derived metrics. --- conn.go | 3 +++ frame.go | 7 +++++++ frame_test.go | 8 ++++++++ 3 files changed, 18 insertions(+) diff --git a/conn.go b/conn.go index e857331af..09a8bb74e 100644 --- a/conn.go +++ b/conn.go @@ -1128,6 +1128,7 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram if call.streamObserverContext != nil { call.streamObserverContext.StreamStarted(ObservedStream{ Host: c.host, + FrameOpcode: ofi.op, FramePayloadUncompressedSize: ofi.uncompressedSize, FramePayloadCompressedSize: ofi.compressedSize, QueryValuesSize: ofi.queryValuesSize, @@ -1246,6 +1247,8 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram type ObservedStream struct { // Host of the connection used to send the stream. Host *HostInfo + // FrameOpcode is the frame operation (type) that was used. + FrameOpcode frameOp // FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header). // This field is only available in StreamStarted. FramePayloadUncompressedSize int diff --git a/frame.go b/frame.go index 71069447d..ce5d3403c 100644 --- a/frame.go +++ b/frame.go @@ -379,6 +379,8 @@ type framer struct { flags byte compres Compressor headSize int + // if writeHeader was called, outFrameOp will contain the frame operation. + outFrameOp frameOp // if this frame was read then the header will be here header *frameHeader // ucompressedSize is size of the frame payload after decompression. @@ -784,6 +786,8 @@ func (f *framer) readErrorMap() (errMap ErrorMap) { } func (f *framer) writeHeader(flags byte, op frameOp, stream int) { + f.outFrameOp = op + f.buf = f.buf[:0] f.buf = append(f.buf, f.proto, @@ -824,6 +828,8 @@ func (f *framer) setLength(length int) { } type outFrameInfo struct { + // op is the type of the frame. + op frameOp // compressedSize of the frame payload (without header). compressedSize int // uncompressedSize of the frame payload (without header). @@ -842,6 +848,7 @@ func (f *framer) finish() (outFrameInfo, error) { } info := outFrameInfo{ + op: f.outFrameOp, uncompressedSize: len(f.buf) - f.headSize, } diff --git a/frame_test.go b/frame_test.go index 9fa379ada..7a36d23e3 100644 --- a/frame_test.go +++ b/frame_test.go @@ -133,6 +133,7 @@ func TestOutFrameInfo(t *testing.T) { customPayload: nil, }, expectedInfo: outFrameInfo{ + op: opQuery, uncompressedSize: 81, compressedSize: 72, queryValuesSize: 30, @@ -155,6 +156,7 @@ func TestOutFrameInfo(t *testing.T) { customPayload: nil, }, expectedInfo: outFrameInfo{ + op: opExecute, compressedSize: 50, uncompressedSize: 51, queryValuesSize: 30, @@ -197,6 +199,7 @@ func TestOutFrameInfo(t *testing.T) { customPayload: nil, }, expectedInfo: outFrameInfo{ + op: opBatch, compressedSize: 96, uncompressedSize: 130, queryValuesSize: 60, @@ -206,6 +209,7 @@ func TestOutFrameInfo(t *testing.T) { "options": { frame: &writeOptionsFrame{}, expectedInfo: outFrameInfo{ + op: opOptions, compressedSize: 0, uncompressedSize: 0, queryValuesSize: 0, @@ -217,6 +221,7 @@ func TestOutFrameInfo(t *testing.T) { events: []string{"event1", "event2"}, }, expectedInfo: outFrameInfo{ + op: opRegister, compressedSize: 20, uncompressedSize: 18, queryValuesSize: 0, @@ -231,6 +236,9 @@ func TestOutFrameInfo(t *testing.T) { if err != nil { t.Fatal(err) } + if ofi.op != test.expectedInfo.op { + t.Errorf("expected op %s, but got %s", test.expectedInfo.op.String(), ofi.op.String()) + } if ofi.queryCount != test.expectedInfo.queryCount { t.Errorf("expected queryCount %d, but got %d", test.expectedInfo.queryCount, ofi.queryCount) } From 169eaf1921157b0f1c8b3917551b8cebe9377bbf Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 22 Sep 2023 13:11:42 +0200 Subject: [PATCH 5/6] Expose whether an outgoing frame was compressed or not Clients might want to track this info. --- conn.go | 1 + frame.go | 6 +++--- frame_test.go | 27 +++++++++++++++++++++++++-- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/conn.go b/conn.go index 09a8bb74e..73137ad52 100644 --- a/conn.go +++ b/conn.go @@ -1254,6 +1254,7 @@ type ObservedStream struct { FramePayloadUncompressedSize int // FramePayloadCompressedSize is the compressed size of the frame payload (without frame header). // This field is only available in StreamStarted. + // FramePayloadCompressedSize is zero if the frame was not compressed. FramePayloadCompressedSize int // QueryValuesSize is the total uncompressed size of query values in the frame (without other query options). // For a batch, it is the sum for all queries in the batch. diff --git a/frame.go b/frame.go index ce5d3403c..8fd7258cc 100644 --- a/frame.go +++ b/frame.go @@ -864,10 +864,10 @@ func (f *framer) finish() (outFrameInfo, error) { } f.buf = append(f.buf[:f.headSize], compressed...) + + info.compressedSize = len(f.buf) - f.headSize } - length := len(f.buf) - f.headSize - info.compressedSize = length - f.setLength(length) + f.setLength(len(f.buf) - f.headSize) return info, nil } diff --git a/frame_test.go b/frame_test.go index 7a36d23e3..299ffbcc4 100644 --- a/frame_test.go +++ b/frame_test.go @@ -107,6 +107,7 @@ func TestFrameReadTooLong(t *testing.T) { func TestOutFrameInfo(t *testing.T) { tests := map[string]struct { frame frameBuilder + compress bool expectedInfo outFrameInfo }{ "query": { @@ -132,6 +133,7 @@ func TestOutFrameInfo(t *testing.T) { }, customPayload: nil, }, + compress: true, expectedInfo: outFrameInfo{ op: opQuery, uncompressedSize: 81, @@ -155,6 +157,7 @@ func TestOutFrameInfo(t *testing.T) { }, customPayload: nil, }, + compress: true, expectedInfo: outFrameInfo{ op: opExecute, compressedSize: 50, @@ -198,6 +201,7 @@ func TestOutFrameInfo(t *testing.T) { defaultTimestampValue: 0, customPayload: nil, }, + compress: true, expectedInfo: outFrameInfo{ op: opBatch, compressedSize: 96, @@ -207,7 +211,8 @@ func TestOutFrameInfo(t *testing.T) { }, }, "options": { - frame: &writeOptionsFrame{}, + frame: &writeOptionsFrame{}, + compress: true, expectedInfo: outFrameInfo{ op: opOptions, compressedSize: 0, @@ -220,6 +225,7 @@ func TestOutFrameInfo(t *testing.T) { frame: &writeRegisterFrame{ events: []string{"event1", "event2"}, }, + compress: true, expectedInfo: outFrameInfo{ op: opRegister, compressedSize: 20, @@ -228,10 +234,27 @@ func TestOutFrameInfo(t *testing.T) { queryCount: 0, }, }, + "register uncompressed": { + frame: &writeRegisterFrame{ + events: []string{"event1", "event2"}, + }, + compress: false, + expectedInfo: outFrameInfo{ + op: opRegister, + compressedSize: 0, + uncompressedSize: 18, + queryValuesSize: 0, + queryCount: 0, + }, + }, } for name, test := range tests { t.Run(name, func(t *testing.T) { - fr := newFramer(SnappyCompressor{}, 4) + var compressor Compressor + if test.compress { + compressor = SnappyCompressor{} + } + fr := newFramer(compressor, 4) ofi, err := test.frame.buildFrame(fr, 42) if err != nil { t.Fatal(err) From 891afea6c5682d7d44611847aa5bc8cb9db545ce Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Fri, 22 Sep 2023 13:19:56 +0200 Subject: [PATCH 6/6] Export frameOp type frameOp is returned in public APIs, so it should be exported. --- cassandra_test.go | 2 +- conn.go | 4 +- conn_test.go | 34 ++++++------- frame.go | 116 ++++++++++++++++++++++--------------------- frame_test.go | 22 ++++---- framer_bench_test.go | 2 +- 6 files changed, 91 insertions(+), 89 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 9f44ce784..b8303b2f5 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -2180,7 +2180,7 @@ func TestNegativeStream(t *testing.T) { const stream = -50 writer := frameWriterFunc(func(f *framer, streamID int) (outFrameInfo, error) { - f.writeHeader(0, opOptions, stream) + f.writeHeader(0, FrameOpcodeOptions, stream) return f.finish() }) diff --git a/conn.go b/conn.go index 73137ad52..cd9773d01 100644 --- a/conn.go +++ b/conn.go @@ -721,7 +721,7 @@ func (c *Conn) recv(ctx context.Context) error { Version: protoVersion(head.version), Flags: head.flags, Stream: int16(head.stream), - Opcode: frameOp(head.op), + Opcode: FrameOpcode(head.op), Length: int32(head.length), Start: headStartTime, End: headEndTime, @@ -1248,7 +1248,7 @@ type ObservedStream struct { // Host of the connection used to send the stream. Host *HostInfo // FrameOpcode is the frame operation (type) that was used. - FrameOpcode frameOp + FrameOpcode FrameOpcode // FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header). // This field is only available in StreamStarted. FramePayloadUncompressedSize int diff --git a/conn_test.go b/conn_test.go index f1a0bc19f..84b1b3f0f 100644 --- a/conn_test.go +++ b/conn_test.go @@ -676,7 +676,7 @@ func TestStream0(t *testing.T) { var buf bytes.Buffer f := newFramer(nil, protoVersion4) - f.writeHeader(0, opResult, 0) + f.writeHeader(0, FrameOpcodeResult, 0) f.writeInt(resultKindVoid) f.buf[0] |= 0x80 if _, err := f.finish(); err != nil { @@ -734,7 +734,7 @@ func TestContext_CanceledBeforeExec(t *testing.T) { addr: "127.0.0.1:0", protocol: defaultProto, recvHook: func(f *framer) { - if f.header.op == opStartup || f.header.op == opOptions { + if f.header.op == FrameOpcodeStartup || f.header.op == FrameOpcodeOptions { // ignore statup and heartbeat messages return } @@ -966,7 +966,7 @@ func TestFrameHeaderObserver(t *testing.T) { } frames := observer.getFrames() - expFrames := []frameOp{opSupported, opReady, opResult} + expFrames := []FrameOpcode{FrameOpcodeSupported, FrameOpcodeReady, FrameOpcodeResult} if len(frames) != len(expFrames) { t.Fatalf("Expected to receive %d frames, instead received %d", len(expFrames), len(frames)) } @@ -1209,7 +1209,7 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] respFrame := newFramer(nil, reqFrame.proto) switch head.op { - case opStartup: + case FrameOpcodeStartup: if atomic.LoadInt32(&srv.TimeoutOnStartup) > 0 { // Do not respond to startup command // wait until we get a cancel signal @@ -1218,11 +1218,11 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] return } } - respFrame.writeHeader(0, opReady, head.stream) - case opOptions: - respFrame.writeHeader(0, opSupported, head.stream) + respFrame.writeHeader(0, FrameOpcodeReady, head.stream) + case FrameOpcodeOptions: + respFrame.writeHeader(0, FrameOpcodeSupported, head.stream) respFrame.writeStringMultiMap(exts) - case opQuery: + case FrameOpcodeQuery: query := reqFrame.readLongString() first := query if n := strings.Index(query, " "); n > 0 { @@ -1231,21 +1231,21 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] switch strings.ToLower(first) { case "kill": atomic.AddInt64(&srv.nKillReq, 1) - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0x1001) respFrame.writeString("query killed") case "use": respFrame.writeInt(resultKindKeyspace) respFrame.writeString(strings.TrimSpace(query[3:])) case "void": - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) case "timeout": <-srv.ctx.Done() return case "slow": go func() { - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) respFrame.buf[0] = srv.protocol | 0x80 select { @@ -1260,25 +1260,25 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string] case "speculative": atomic.AddInt64(&srv.nKillReq, 1) if atomic.LoadInt64(&srv.nKillReq) > 3 { - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) respFrame.writeString("speculative query success on the node " + srv.Address) } else { - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0x1001) respFrame.writeString("speculative error") rand.Seed(time.Now().UnixNano()) <-time.After(time.Millisecond * 120) } default: - respFrame.writeHeader(0, opResult, head.stream) + respFrame.writeHeader(0, FrameOpcodeResult, head.stream) respFrame.writeInt(resultKindVoid) } - case opError: - respFrame.writeHeader(0, opError, head.stream) + case FrameOpcodeError: + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.buf = append(respFrame.buf, reqFrame.buf...) default: - respFrame.writeHeader(0, opError, head.stream) + respFrame.writeHeader(0, FrameOpcodeError, head.stream) respFrame.writeInt(0) respFrame.writeString("not supported") } diff --git a/frame.go b/frame.go index 8fd7258cc..3640cd98f 100644 --- a/frame.go +++ b/frame.go @@ -75,61 +75,63 @@ func (p protoVersion) String() string { return fmt.Sprintf("[version=%d direction=%s]", p.version(), dir) } -type frameOp byte +// FrameOpcode enumerates frame operation codes from the CQL protocol. +// See https://martin-sucha.github.io/cqlprotodoc/native_protocol_v4.html#s2.4 +type FrameOpcode byte const ( // header ops - opError frameOp = 0x00 - opStartup frameOp = 0x01 - opReady frameOp = 0x02 - opAuthenticate frameOp = 0x03 - opOptions frameOp = 0x05 - opSupported frameOp = 0x06 - opQuery frameOp = 0x07 - opResult frameOp = 0x08 - opPrepare frameOp = 0x09 - opExecute frameOp = 0x0A - opRegister frameOp = 0x0B - opEvent frameOp = 0x0C - opBatch frameOp = 0x0D - opAuthChallenge frameOp = 0x0E - opAuthResponse frameOp = 0x0F - opAuthSuccess frameOp = 0x10 + FrameOpcodeError FrameOpcode = 0x00 + FrameOpcodeStartup FrameOpcode = 0x01 + FrameOpcodeReady FrameOpcode = 0x02 + FrameOpcodeAuthenticate FrameOpcode = 0x03 + FrameOpcodeOptions FrameOpcode = 0x05 + FrameOpcodeSupported FrameOpcode = 0x06 + FrameOpcodeQuery FrameOpcode = 0x07 + FrameOpcodeResult FrameOpcode = 0x08 + FrameOpcodePrepare FrameOpcode = 0x09 + FrameOpcodeExecute FrameOpcode = 0x0A + FrameOpcodeRegister FrameOpcode = 0x0B + FrameOpcodeEvent FrameOpcode = 0x0C + FrameOpcodeBatch FrameOpcode = 0x0D + FrameOpcodeAuthChallenge FrameOpcode = 0x0E + FrameOpcodeAuthResponse FrameOpcode = 0x0F + FrameOpcodeAuthSuccess FrameOpcode = 0x10 ) -func (f frameOp) String() string { +func (f FrameOpcode) String() string { switch f { - case opError: + case FrameOpcodeError: return "ERROR" - case opStartup: + case FrameOpcodeStartup: return "STARTUP" - case opReady: + case FrameOpcodeReady: return "READY" - case opAuthenticate: + case FrameOpcodeAuthenticate: return "AUTHENTICATE" - case opOptions: + case FrameOpcodeOptions: return "OPTIONS" - case opSupported: + case FrameOpcodeSupported: return "SUPPORTED" - case opQuery: + case FrameOpcodeQuery: return "QUERY" - case opResult: + case FrameOpcodeResult: return "RESULT" - case opPrepare: + case FrameOpcodePrepare: return "PREPARE" - case opExecute: + case FrameOpcodeExecute: return "EXECUTE" - case opRegister: + case FrameOpcodeRegister: return "REGISTER" - case opEvent: + case FrameOpcodeEvent: return "EVENT" - case opBatch: + case FrameOpcodeBatch: return "BATCH" - case opAuthChallenge: + case FrameOpcodeAuthChallenge: return "AUTH_CHALLENGE" - case opAuthResponse: + case FrameOpcodeAuthResponse: return "AUTH_RESPONSE" - case opAuthSuccess: + case FrameOpcodeAuthSuccess: return "AUTH_SUCCESS" default: return fmt.Sprintf("UNKNOWN_OP_%d", f) @@ -301,7 +303,7 @@ type frameHeader struct { version protoVersion flags byte stream int - op frameOp + op FrameOpcode length int warnings []string } @@ -320,7 +322,7 @@ type ObservedFrameHeader struct { Version protoVersion Flags byte Stream int16 - Opcode frameOp + Opcode FrameOpcode Length int32 // StartHeader is the time we started reading the frame header off the network connection. @@ -380,7 +382,7 @@ type framer struct { compres Compressor headSize int // if writeHeader was called, outFrameOp will contain the frame operation. - outFrameOp frameOp + outFrameOp FrameOpcode // if this frame was read then the header will be here header *frameHeader // ucompressedSize is size of the frame payload after decompression. @@ -524,7 +526,7 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { } head.stream = int(int16(p[2])<<8 | int16(p[3])) - head.op = frameOp(p[4]) + head.op = FrameOpcode(p[4]) head.length = int(readInt(p[5:])) } else { if len(p) != 8 { @@ -532,7 +534,7 @@ func readHeader(r io.Reader, p []byte) (head frameHeader, err error) { } head.stream = int(int8(p[2])) - head.op = frameOp(p[3]) + head.op = FrameOpcode(p[3]) head.length = int(readInt(p[4:])) } @@ -620,21 +622,21 @@ func (f *framer) parseFrame() (frame frame, err error) { // assumes that the frame body has been read into rbuf switch f.header.op { - case opError: + case FrameOpcodeError: frame = f.parseErrorFrame() - case opReady: + case FrameOpcodeReady: frame = f.parseReadyFrame() - case opResult: + case FrameOpcodeResult: frame, err = f.parseResultFrame() - case opSupported: + case FrameOpcodeSupported: frame = f.parseSupportedFrame() - case opAuthenticate: + case FrameOpcodeAuthenticate: frame = f.parseAuthenticateFrame() - case opAuthChallenge: + case FrameOpcodeAuthChallenge: frame = f.parseAuthChallengeFrame() - case opAuthSuccess: + case FrameOpcodeAuthSuccess: frame = f.parseAuthSuccessFrame() - case opEvent: + case FrameOpcodeEvent: frame = f.parseEventFrame() default: return nil, NewErrProtocol("unknown op in frame header: %s", f.header.op) @@ -785,7 +787,7 @@ func (f *framer) readErrorMap() (errMap ErrorMap) { return } -func (f *framer) writeHeader(flags byte, op frameOp, stream int) { +func (f *framer) writeHeader(flags byte, op FrameOpcode, stream int) { f.outFrameOp = op f.buf = f.buf[:0] @@ -829,7 +831,7 @@ func (f *framer) setLength(length int) { type outFrameInfo struct { // op is the type of the frame. - op frameOp + op FrameOpcode // compressedSize of the frame payload (without header). compressedSize int // uncompressedSize of the frame payload (without header). @@ -916,7 +918,7 @@ func (w writeStartupFrame) String() string { } func (w *writeStartupFrame) buildFrame(f *framer, streamID int) (outFrameInfo, error) { - f.writeHeader(f.flags&^flagCompress, opStartup, streamID) + f.writeHeader(f.flags&^flagCompress, FrameOpcodeStartup, streamID) f.writeStringMap(w.opts) return f.finish() @@ -932,7 +934,7 @@ func (w *writePrepareFrame) buildFrame(f *framer, streamID int) (outFrameInfo, e if len(w.customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opPrepare, streamID) + f.writeHeader(f.flags, FrameOpcodePrepare, streamID) f.writeCustomPayload(&w.customPayload) f.writeLongString(w.statement) @@ -1529,7 +1531,7 @@ func (a *writeAuthResponseFrame) buildFrame(framer *framer, streamID int) (outFr } func (f *framer) writeAuthResponseFrame(streamID int, data []byte) (outFrameInfo, error) { - f.writeHeader(f.flags, opAuthResponse, streamID) + f.writeHeader(f.flags, FrameOpcodeAuthResponse, streamID) f.writeBytes(data) return f.finish() } @@ -1683,7 +1685,7 @@ func (f *framer) writeQueryFrame(streamID int, statement string, params *queryPa if len(customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opQuery, streamID) + f.writeHeader(f.flags, FrameOpcodeQuery, streamID) f.writeCustomPayload(&customPayload) f.writeLongString(statement) valuesSize := f.writeQueryParams(params) @@ -1724,7 +1726,7 @@ func (f *framer) writeExecuteFrame(streamID int, preparedID []byte, params *quer if len(*customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opExecute, streamID) + f.writeHeader(f.flags, FrameOpcodeExecute, streamID) f.writeCustomPayload(customPayload) f.writeShortBytes(preparedID) var valuesSize int @@ -1781,7 +1783,7 @@ func (f *framer) writeBatchFrame(streamID int, w *writeBatchFrame, customPayload if len(customPayload) > 0 { f.payload() } - f.writeHeader(f.flags, opBatch, streamID) + f.writeHeader(f.flags, FrameOpcodeBatch, streamID) f.writeCustomPayload(&customPayload) f.writeByte(byte(w.typ)) @@ -1870,7 +1872,7 @@ func (w *writeOptionsFrame) buildFrame(framer *framer, streamID int) (outFrameIn } func (f *framer) writeOptionsFrame(stream int, _ *writeOptionsFrame) (outFrameInfo, error) { - f.writeHeader(f.flags&^flagCompress, opOptions, stream) + f.writeHeader(f.flags&^flagCompress, FrameOpcodeOptions, stream) return f.finish() } @@ -1883,7 +1885,7 @@ func (w *writeRegisterFrame) buildFrame(framer *framer, streamID int) (outFrameI } func (f *framer) writeRegisterFrame(streamID int, w *writeRegisterFrame) (outFrameInfo, error) { - f.writeHeader(f.flags, opRegister, streamID) + f.writeHeader(f.flags, FrameOpcodeRegister, streamID) f.writeStringList(w.events) return f.finish() diff --git a/frame_test.go b/frame_test.go index 299ffbcc4..8844747fc 100644 --- a/frame_test.go +++ b/frame_test.go @@ -64,7 +64,7 @@ func TestFrameWriteTooLong(t *testing.T) { framer := newFramer(nil, 2) - framer.writeHeader(0, opStartup, 1) + framer.writeHeader(0, FrameOpcodeStartup, 1) framer.writeBytes(make([]byte, maxFrameSize+1)) _, err := framer.finish() if err != ErrFrameTooBig { @@ -80,13 +80,13 @@ func TestFrameReadTooLong(t *testing.T) { r := &bytes.Buffer{} r.Write(make([]byte, maxFrameSize+1)) // write a new header right after this frame to verify that we can read it - r.Write([]byte{0x02, 0x00, 0x00, byte(opReady), 0x00, 0x00, 0x00, 0x00}) + r.Write([]byte{0x02, 0x00, 0x00, byte(FrameOpcodeReady), 0x00, 0x00, 0x00, 0x00}) framer := newFramer(nil, 2) head := frameHeader{ version: 2, - op: opReady, + op: FrameOpcodeReady, length: r.Len() - 8, } @@ -99,8 +99,8 @@ func TestFrameReadTooLong(t *testing.T) { if err != nil { t.Fatal(err) } - if head.op != opReady { - t.Fatalf("expected to get header %v got %v", opReady, head.op) + if head.op != FrameOpcodeReady { + t.Fatalf("expected to get header %v got %v", FrameOpcodeReady, head.op) } } @@ -135,7 +135,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opQuery, + op: FrameOpcodeQuery, uncompressedSize: 81, compressedSize: 72, queryValuesSize: 30, @@ -159,7 +159,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opExecute, + op: FrameOpcodeExecute, compressedSize: 50, uncompressedSize: 51, queryValuesSize: 30, @@ -203,7 +203,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opBatch, + op: FrameOpcodeBatch, compressedSize: 96, uncompressedSize: 130, queryValuesSize: 60, @@ -214,7 +214,7 @@ func TestOutFrameInfo(t *testing.T) { frame: &writeOptionsFrame{}, compress: true, expectedInfo: outFrameInfo{ - op: opOptions, + op: FrameOpcodeOptions, compressedSize: 0, uncompressedSize: 0, queryValuesSize: 0, @@ -227,7 +227,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: true, expectedInfo: outFrameInfo{ - op: opRegister, + op: FrameOpcodeRegister, compressedSize: 20, uncompressedSize: 18, queryValuesSize: 0, @@ -240,7 +240,7 @@ func TestOutFrameInfo(t *testing.T) { }, compress: false, expectedInfo: outFrameInfo{ - op: opRegister, + op: FrameOpcodeRegister, compressedSize: 0, uncompressedSize: 18, queryValuesSize: 0, diff --git a/framer_bench_test.go b/framer_bench_test.go index 5b7b442f9..95f3aa6ba 100644 --- a/framer_bench_test.go +++ b/framer_bench_test.go @@ -34,7 +34,7 @@ func BenchmarkParseRowsFrame(b *testing.B) { framer := &framer{ header: &frameHeader{ version: protoVersion4 | 0x80, - op: opResult, + op: FrameOpcodeResult, length: len(data), }, buf: data,