Skip to content
This repository has been archived by the owner on Aug 26, 2024. It is now read-only.

Commit

Permalink
Merge Add observers for payload sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
martin-sucha committed Sep 22, 2023
2 parents 3b7e681 + 891afea commit 0770336
Show file tree
Hide file tree
Showing 8 changed files with 485 additions and 144 deletions.
4 changes: 2 additions & 2 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2179,8 +2179,8 @@ func TestNegativeStream(t *testing.T) {
conn := getRandomConn(t, session)

const stream = -50
writer := frameWriterFunc(func(f *framer, streamID int) error {
f.writeHeader(0, opOptions, stream)
writer := frameWriterFunc(func(f *framer, streamID int) (outFrameInfo, error) {
f.writeHeader(0, FrameOpcodeOptions, stream)
return f.finish()
})

Expand Down
4 changes: 4 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ type ClusterConfig struct {
// Use it to collect metrics / stats from frames by providing an implementation of FrameHeaderObserver.
FrameHeaderObserver FrameHeaderObserver

// FrameObserver will be notified of all received frames that were read.
// FrameObserver will not see frames that were discarded.
FrameObserver FrameObserver

// StreamObserver will be notified of stream state changes.
// This can be used to track in-flight protocol requests and responses.
StreamObserver StreamObserver
Expand Down
95 changes: 71 additions & 24 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,12 @@ type Conn struct {
r *bufio.Reader
w contextWriter

timeout time.Duration
writeTimeout time.Duration
cfg *ConnConfig
frameObserver FrameHeaderObserver
streamObserver StreamObserver
timeout time.Duration
writeTimeout time.Duration
cfg *ConnConfig
frameHeaderObserver FrameHeaderObserver
frameObserver FrameObserver
streamObserver StreamObserver

headerBuf [maxFrameHeaderSize]byte

Expand Down Expand Up @@ -279,19 +280,20 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *

ctx, cancel := context.WithCancel(ctx)
c := &Conn{
conn: dialedHost.Conn,
r: bufio.NewReader(dialedHost.Conn),
cfg: cfg,
calls: make(map[int]*callReq),
version: uint8(cfg.ProtoVersion),
addr: dialedHost.Conn.RemoteAddr().String(),
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: s.streamIDGenerator(cfg.ProtoVersion),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameObserver: s.frameObserver,
conn: dialedHost.Conn,
r: bufio.NewReader(dialedHost.Conn),
cfg: cfg,
calls: make(map[int]*callReq),
version: uint8(cfg.ProtoVersion),
addr: dialedHost.Conn.RemoteAddr().String(),
errorHandler: errorHandler,
compressor: cfg.Compressor,
session: s,
streams: s.streamIDGenerator(cfg.ProtoVersion),
host: host,
isSchemaV2: true, // Try using "system.peers_v2" until proven otherwise
frameHeaderObserver: s.frameHeaderObserver,
frameObserver: s.frameObserver,
w: &deadlineContextWriter{
w: dialedHost.Conn,
timeout: writeTimeout,
Expand Down Expand Up @@ -713,17 +715,29 @@ func (c *Conn) recv(ctx context.Context) error {
return err
}

if c.frameObserver != nil {
c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
var parseObserver frameParseObserver
if c.frameHeaderObserver != nil || c.frameObserver != nil {
observedHeader := ObservedFrameHeader{
Version: protoVersion(head.version),
Flags: head.flags,
Stream: int16(head.stream),
Opcode: frameOp(head.op),
Opcode: FrameOpcode(head.op),
Length: int32(head.length),
Start: headStartTime,
End: headEndTime,
Host: c.host,
})
}

if c.frameHeaderObserver != nil {
c.frameHeaderObserver.ObserveFrameHeader(context.Background(), observedHeader)
}

if c.frameObserver != nil {
parseObserver = frameParseObserver{
head: observedHeader,
frameObserver: c.frameObserver,
}
}
}

if head.stream > c.streams.NumStreams {
Expand All @@ -734,6 +748,9 @@ func (c *Conn) recv(ctx context.Context) error {
if err := framer.readFrame(c, &head); err != nil {
return err
}
if c.frameObserver != nil {
framer.observer = parseObserver
}
go c.session.handleEvent(framer)
return nil
} else if head.stream <= 0 {
Expand All @@ -743,6 +760,9 @@ func (c *Conn) recv(ctx context.Context) error {
if err := framer.readFrame(c, &head); err != nil {
return err
}
if c.frameObserver != nil {
framer.observer = parseObserver
}

frame, err := framer.parseFrame()
if err != nil {
Expand Down Expand Up @@ -779,6 +799,9 @@ func (c *Conn) recv(ctx context.Context) error {
return err
}
}
if c.frameObserver != nil {
framer.observer = parseObserver
}

// we either, return a response to the caller, the caller timedout, or the
// connection has closed. Either way we should never block indefinatly here
Expand Down Expand Up @@ -1099,13 +1122,20 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
framer.trace()
}

ofi, err := req.buildFrame(framer, stream)
// The error is handled after we call the stream observer.

if call.streamObserverContext != nil {
call.streamObserverContext.StreamStarted(ObservedStream{
Host: c.host,
Host: c.host,
FrameOpcode: ofi.op,
FramePayloadUncompressedSize: ofi.uncompressedSize,
FramePayloadCompressedSize: ofi.compressedSize,
QueryValuesSize: ofi.queryValuesSize,
QueryCount: ofi.queryCount,
})
}

err := req.buildFrame(framer, stream)
if err != nil {
// closeWithError will block waiting for this stream to either receive a response
// or for us to timeout.
Expand Down Expand Up @@ -1217,6 +1247,23 @@ func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*fram
type ObservedStream struct {
// Host of the connection used to send the stream.
Host *HostInfo
// FrameOpcode is the frame operation (type) that was used.
FrameOpcode FrameOpcode
// FramePayloadUncompressedSize is the uncompressed size of the frame payload (without frame header).
// This field is only available in StreamStarted.
FramePayloadUncompressedSize int
// FramePayloadCompressedSize is the compressed size of the frame payload (without frame header).
// This field is only available in StreamStarted.
// FramePayloadCompressedSize is zero if the frame was not compressed.
FramePayloadCompressedSize int
// QueryValuesSize is the total uncompressed size of query values in the frame (without other query options).
// For a batch, it is the sum for all queries in the batch.
// For frames that contain no query values QueryValuesSize is zero.
// This field is only available in StreamStarted.
QueryValuesSize int
// QueryCount is 1 for EXECUTE/QUERY and size of the batch for BATCH frames.
// This field is only available in StreamStarted.
QueryCount int
}

// StreamObserver is notified about request/response pairs.
Expand Down
38 changes: 19 additions & 19 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,10 @@ func TestStream0(t *testing.T) {

var buf bytes.Buffer
f := newFramer(nil, protoVersion4)
f.writeHeader(0, opResult, 0)
f.writeHeader(0, FrameOpcodeResult, 0)
f.writeInt(resultKindVoid)
f.buf[0] |= 0x80
if err := f.finish(); err != nil {
if _, err := f.finish(); err != nil {
t.Fatal(err)
}
if err := f.writeTo(&buf); err != nil {
Expand Down Expand Up @@ -734,7 +734,7 @@ func TestContext_CanceledBeforeExec(t *testing.T) {
addr: "127.0.0.1:0",
protocol: defaultProto,
recvHook: func(f *framer) {
if f.header.op == opStartup || f.header.op == opOptions {
if f.header.op == FrameOpcodeStartup || f.header.op == FrameOpcodeOptions {
// ignore statup and heartbeat messages
return
}
Expand Down Expand Up @@ -966,7 +966,7 @@ func TestFrameHeaderObserver(t *testing.T) {
}

frames := observer.getFrames()
expFrames := []frameOp{opSupported, opReady, opResult}
expFrames := []FrameOpcode{FrameOpcodeSupported, FrameOpcodeReady, FrameOpcodeResult}
if len(frames) != len(expFrames) {
t.Fatalf("Expected to receive %d frames, instead received %d", len(expFrames), len(frames))
}
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
respFrame := newFramer(nil, reqFrame.proto)

switch head.op {
case opStartup:
case FrameOpcodeStartup:
if atomic.LoadInt32(&srv.TimeoutOnStartup) > 0 {
// Do not respond to startup command
// wait until we get a cancel signal
Expand All @@ -1218,11 +1218,11 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
return
}
}
respFrame.writeHeader(0, opReady, head.stream)
case opOptions:
respFrame.writeHeader(0, opSupported, head.stream)
respFrame.writeHeader(0, FrameOpcodeReady, head.stream)
case FrameOpcodeOptions:
respFrame.writeHeader(0, FrameOpcodeSupported, head.stream)
respFrame.writeStringMultiMap(exts)
case opQuery:
case FrameOpcodeQuery:
query := reqFrame.readLongString()
first := query
if n := strings.Index(query, " "); n > 0 {
Expand All @@ -1231,21 +1231,21 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
switch strings.ToLower(first) {
case "kill":
atomic.AddInt64(&srv.nKillReq, 1)
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0x1001)
respFrame.writeString("query killed")
case "use":
respFrame.writeInt(resultKindKeyspace)
respFrame.writeString(strings.TrimSpace(query[3:]))
case "void":
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
case "timeout":
<-srv.ctx.Done()
return
case "slow":
go func() {
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
respFrame.buf[0] = srv.protocol | 0x80
select {
Expand All @@ -1260,32 +1260,32 @@ func (srv *TestServer) process(conn net.Conn, reqFrame *framer, exts map[string]
case "speculative":
atomic.AddInt64(&srv.nKillReq, 1)
if atomic.LoadInt64(&srv.nKillReq) > 3 {
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
respFrame.writeString("speculative query success on the node " + srv.Address)
} else {
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0x1001)
respFrame.writeString("speculative error")
rand.Seed(time.Now().UnixNano())
<-time.After(time.Millisecond * 120)
}
default:
respFrame.writeHeader(0, opResult, head.stream)
respFrame.writeHeader(0, FrameOpcodeResult, head.stream)
respFrame.writeInt(resultKindVoid)
}
case opError:
respFrame.writeHeader(0, opError, head.stream)
case FrameOpcodeError:
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.buf = append(respFrame.buf, reqFrame.buf...)
default:
respFrame.writeHeader(0, opError, head.stream)
respFrame.writeHeader(0, FrameOpcodeError, head.stream)
respFrame.writeInt(0)
respFrame.writeString("not supported")
}

respFrame.buf[0] = srv.protocol | 0x80

if err := respFrame.finish(); err != nil {
if _, err := respFrame.finish(); err != nil {
srv.errorLocked(err)
}

Expand Down
Loading

0 comments on commit 0770336

Please sign in to comment.