Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce telemetry logging #458

Merged
merged 2 commits into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 13 additions & 29 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io"
"strings"

"github.com/Snowflake-Labs/sansshell/auth/opa/rpcauth"
"github.com/go-logr/logr"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
Expand All @@ -40,11 +39,10 @@ const (
// context of the invoker.
func UnaryClientLogInterceptor(logger logr.Logger) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
l := logger.WithValues("method", method, "target", cc.Target())
logCtx := logr.NewContext(ctx, l)
logCtx := logr.NewContext(ctx, logger)
logCtx = passAlongMetadata(logCtx)
l = logMetadata(logCtx, l)
l.Info("new client request")
l := logMetadata(logCtx, logger)
l.V(1).Info("new client request", "method", method, "target", cc.Target())
err := invoker(logCtx, method, req, reply, cc, opts...)
if err != nil {
l.Error(err, "")
Expand All @@ -53,16 +51,14 @@ func UnaryClientLogInterceptor(logger logr.Logger) grpc.UnaryClientInterceptor {
}
}

// StreamClientLogInterceptor returns a new grpc.StreamClientInterceptor that logs
// client requests using the supplied logger, as well as injecting it into the context
// StreamClientLogInterceptor returns a new grpc.StreamClientInterceptor that injects it into the context
// of the created stream.
func StreamClientLogInterceptor(logger logr.Logger) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
l := logger.WithValues("method", method, "target", cc.Target())
logCtx := logr.NewContext(ctx, l)
logCtx := logr.NewContext(ctx, logger)
logCtx = passAlongMetadata(logCtx)
l = logMetadata(logCtx, l)
l.Info("new client stream")
l := logMetadata(logCtx, logger)
l.V(1).Info("new client stream", "method", method, "target", cc.Target())
stream, err := streamer(logCtx, desc, cc, method, opts...)
if err != nil {
l.Error(err, "create stream")
Expand Down Expand Up @@ -164,20 +160,14 @@ func (l *loggedClientStream) CloseSend() error {
return err
}

// UnaryServerLogInterceptor returns a new gprc.UnaryServerInterceptor that logs
// incoming requests using the supplied logger, as well as injecting it into the
// UnaryServerLogInterceptor returns a new gprc.UnaryServerInterceptor that injects it into the
// context of downstream handlers. If incoming calls require client side provided justification
// (which is logged) then the justification parameter should be true and a required
// key of ReqJustKey must be in the context when the interceptor runs.
func UnaryServerLogInterceptor(logger logr.Logger) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
l := logger.WithValues("method", info.FullMethod)
p := rpcauth.PeerInputFromContext(ctx)
if p != nil {
l = l.WithValues("peer", p)
}
l = logMetadata(ctx, l)
l.Info("new request")
l := logMetadata(ctx, logger)
l.V(1).Info("new request", "method", info.FullMethod)
logCtx := logr.NewContext(ctx, l)
resp, err := handler(logCtx, req)
if err != nil {
Expand All @@ -187,20 +177,14 @@ func UnaryServerLogInterceptor(logger logr.Logger) grpc.UnaryServerInterceptor {
}
}

// StreamServerLogInterceptor returns a new grpc.StreamServerInterceptor that logs
// incoming streams using the supplied logger, and makes it available via the stream
// StreamServerLogInterceptor returns a new grpc.StreamServerInterceptor that makes it available via the stream
// context to stream handlers. If incoming calls require client side provided justification
// (which is logged) then the justification parameter should be true and a required
// key of ReqJustKey must be in the context when the interceptor runs.
func StreamServerLogInterceptor(logger logr.Logger) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
l := logger.WithValues("method", info.FullMethod)
p := rpcauth.PeerInputFromContext(ss.Context())
if p != nil {
l = l.WithValues("peer", p)
}
l = logMetadata(ss.Context(), l)
l.Info("new stream")
l := logMetadata(ss.Context(), logger)
l.V(1).Info("new stream", "method", info.FullMethod)
stream := &loggedStream{
ServerStream: ss,
logger: l,
Expand Down
8 changes: 4 additions & 4 deletions telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestUnaryClient(t *testing.T) {
fn := func(p, a string) {
args = a
}
logger := funcr.New(fn, funcr.Options{})
logger := funcr.New(fn, funcr.Options{Verbosity: 1})

intercept := UnaryClientLogInterceptor(logger)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestStreamClient(t *testing.T) {
fn := func(p, a string) {
args = a
}
logger := funcr.New(fn, funcr.Options{})
logger := funcr.New(fn, funcr.Options{Verbosity: 1})

intercept := StreamClientLogInterceptor(logger)

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestUnaryServer(t *testing.T) {
fn := func(p, a string) {
args = a
}
logger := funcr.New(fn, funcr.Options{})
logger := funcr.New(fn, funcr.Options{Verbosity: 1})

intercept := UnaryServerLogInterceptor(logger)

Expand Down Expand Up @@ -266,7 +266,7 @@ func TestStreamServer(t *testing.T) {
fn := func(p, a string) {
args = a
}
logger := funcr.New(fn, funcr.Options{})
logger := funcr.New(fn, funcr.Options{Verbosity: 1})

intercept := StreamServerLogInterceptor(logger)

Expand Down
Loading