Skip to content

Commit

Permalink
Add an option to enable metrics collection in sansshell-server (#219)
Browse files Browse the repository at this point in the history
* add metrics instrumentations

* init metric during server initialization

* change metrics structure to allow registration

* Remove comments

* add comments

* fix npe

* Handle metric error. Add proxy metrics

* handle metric error

* handle err

* rm irrelevant files

* add this one back

* add comments

* metric desc variables

* change singleton to contextbased

* remove unused file

* Delete launch.json

* Add comments
Correct otel metrics function definition

* update comment and change otelrecorder to interface

* add comments

* add test for metrics package

* fix comment and remove debug statement.

* naming consistency

* add comment, and change beahvior of RegisterInt64Gauge to allow overwriting gauge

* fix comment

* fix comment typo and inconsistency

* add metrics recorder to sansshell server

* server metrics endpoint

* fix metric name

* disable otelgrpc metrics collection

* add method as an attribute of authzDeniedPolicyCounter

* add to other authz failure counters
  • Loading branch information
sfc-gh-elinardi authored Apr 20, 2023
1 parent 6da9d98 commit 254fac1
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 11 deletions.
7 changes: 4 additions & 3 deletions auth/opa/rpcauth/rpcauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"

"github.com/go-logr/logr"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -129,7 +130,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error {
if errRegister != nil {
logger.V(1).Error(errRegister, "failed to register "+authzFailureEvalErrorCounterName)
}
errCounter := recorder.AddInt64Counter(ctx, authzFailureEvalErrorCounterName, 1)
errCounter := recorder.AddInt64Counter(ctx, authzFailureEvalErrorCounterName, 1, attribute.String("method", input.Method))
if errCounter != nil {
logger.V(1).Error(errCounter, "failed to add counter "+authzFailureEvalErrorCounterName)
}
Expand All @@ -144,7 +145,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error {
if errRegister != nil {
logger.V(1).Error(errRegister, "failed to register "+authzDenialHintErrorCounterName)
}
errCounter := recorder.AddInt64Counter(ctx, authzDenialHintErrorCounterName, 1)
errCounter := recorder.AddInt64Counter(ctx, authzDenialHintErrorCounterName, 1, attribute.String("method", input.Method))
if errCounter != nil {
logger.V(1).Error(errCounter, "failed to add counter "+authzDenialHintErrorCounterName)
}
Expand All @@ -158,7 +159,7 @@ func (g *Authorizer) Eval(ctx context.Context, input *RPCAuthInput) error {
if errRegister != nil {
logger.V(1).Error(errRegister, "failed to register "+authzDeniedPolicyCounterName)
}
errCounter := recorder.AddInt64Counter(ctx, authzDeniedPolicyCounterName, 1)
errCounter := recorder.AddInt64Counter(ctx, authzDeniedPolicyCounterName, 1, attribute.String("method", input.Method))
if errCounter != nil {
logger.V(1).Error(errCounter, "failed to add counter "+authzDeniedPolicyCounterName)
}
Expand Down
14 changes: 9 additions & 5 deletions cmd/proxy-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
otelmetric "go.opentelemetry.io/otel/metric"
"google.golang.org/grpc"

"github.com/Snowflake-Labs/sansshell/auth/mtls"
Expand Down Expand Up @@ -319,19 +320,22 @@ func WithMetricsPort(addr string) Option {

// WithOtelTracing adds the OpenTelemetry gRPC interceptors to all servers and clients.
// The interceptors collect and export tracing data for gRPC requests and responses
func WithOtelTracing(interceptorOpt otelgrpc.Option) Option {
func WithOtelTracing(interceptorOpts ...otelgrpc.Option) Option {
return optionFunc(func(_ context.Context, r *runState) error {
interceptorOpts = append(interceptorOpts,
otelgrpc.WithMeterProvider(otelmetric.NewNoopMeterProvider()), // We don't want otel grpc metrics so discard them
)
r.unaryClientInterceptors = append(r.unaryClientInterceptors,
otelgrpc.UnaryClientInterceptor(interceptorOpt),
otelgrpc.UnaryClientInterceptor(interceptorOpts...),
)
r.streamClientInterceptors = append(r.streamClientInterceptors,
otelgrpc.StreamClientInterceptor(interceptorOpt),
otelgrpc.StreamClientInterceptor(interceptorOpts...),
)
r.unaryInterceptors = append(r.unaryInterceptors,
otelgrpc.UnaryServerInterceptor(interceptorOpt),
otelgrpc.UnaryServerInterceptor(interceptorOpts...),
)
r.streamInterceptors = append(r.streamInterceptors,
otelgrpc.StreamServerInterceptor(interceptorOpt),
otelgrpc.StreamServerInterceptor(interceptorOpts...),
)
return nil
})
Expand Down
22 changes: 22 additions & 0 deletions cmd/sansshell-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (

"github.com/go-logr/logr"
"github.com/go-logr/stdr"
prometheus_exporter "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric/global"
otelmetricsdk "go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/Snowflake-Labs/sansshell/auth/opa/rpcauth"
"github.com/Snowflake-Labs/sansshell/cmd/sansshell-server/server"
"github.com/Snowflake-Labs/sansshell/cmd/util"
"github.com/Snowflake-Labs/sansshell/telemetry/metrics"

// Import the server modules you want to expose, they automatically register

Expand Down Expand Up @@ -75,6 +79,7 @@ var (
policyFile = flag.String("policy-file", "", "Path to a file with an OPA policy. If empty, uses --policy.")
hostport = flag.String("hostport", "localhost:50042", "Where to listen for connections.")
debugport = flag.String("debugport", "localhost:50044", "A separate port for http debug pages. Set to an empty string to disable.")
metricsport = flag.String("metricsport", "localhost:50047", "A separate port for http debug pages. Set to an empty string to disable.")
credSource = flag.String("credential-source", mtlsFlags.Name(), fmt.Sprintf("Method used to obtain mTLS credentials (one of [%s])", strings.Join(mtls.Loaders(), ",")))
verbosity = flag.Int("v", 0, "Verbosity level. > 0 indicates more extensive logging")
validate = flag.Bool("validate", false, "If true will evaluate the policy and then exit (non-zero on error)")
Expand Down Expand Up @@ -124,8 +129,23 @@ func main() {
logger := stdr.New(log.New(os.Stderr, "", logOpts)).WithName("sanshell-server")
stdr.SetVerbosity(*verbosity)

// Setup exporter using the default prometheus registry
exporter, err := prometheus_exporter.New()
if err != nil {
log.Fatalf("failed to create prometheus exporter: %v\n", err)
}
global.SetMeterProvider(otelmetricsdk.NewMeterProvider(
otelmetricsdk.WithReader(exporter),
))
meter := global.Meter("sansshell-server")
recorder, err := metrics.NewOtelRecorder(meter, metrics.WithMetricNamePrefix("sansshell-server"))
if err != nil {
log.Fatalf("failed to create OtelRecorder: %v\n", err)
}

policy := util.ChoosePolicy(logger, defaultPolicy, *policyFlag, *policyFile)
ctx := logr.NewContext(context.Background(), logger)
ctx = metrics.NewContextWithRecorder(ctx, recorder)

parsed, err := opa.NewAuthzPolicy(ctx, policy, opa.WithDenialHintsQuery("data.sansshell.authz.denial_hints"))
if err != nil {
Expand All @@ -146,5 +166,7 @@ func main() {
server.WithRawServerOption(func(s *grpc.Server) { reflection.Register(s) }),
server.WithRawServerOption(func(s *grpc.Server) { channelz.RegisterChannelzServiceToServer(s) }),
server.WithDebugPort(*debugport),
server.WithMetricsPort(*metricsport),
server.WithMetricsRecorder(recorder),
)
}
63 changes: 60 additions & 3 deletions cmd/sansshell-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@ import (
"os"

"github.com/go-logr/logr"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
otelmetric "go.opentelemetry.io/otel/metric"
"google.golang.org/grpc"

"github.com/Snowflake-Labs/sansshell/auth/mtls"
"github.com/Snowflake-Labs/sansshell/auth/opa"
"github.com/Snowflake-Labs/sansshell/auth/opa/rpcauth"
"github.com/Snowflake-Labs/sansshell/server"
"github.com/Snowflake-Labs/sansshell/telemetry/metrics"
"google.golang.org/grpc/credentials"
)

Expand All @@ -49,6 +53,9 @@ type runState struct {
hostport string
debugport string
debughandler *http.ServeMux
metricsport string
metricshandler *http.ServeMux
metricsRecorder metrics.MetricsRecorder
policy *opa.AuthzPolicy
justification bool
justificationFunc func(string) error
Expand Down Expand Up @@ -210,15 +217,58 @@ func WithDebugPort(addr string) Option {
})
}

// WithMetricsRecorder enables metric instrumentations by inserting grpc metric interceptors
// and attaching recorder to the server runstate
func WithMetricsRecorder(recorder metrics.MetricsRecorder) Option {
return optionFunc(func(ctx context.Context, r *runState) error {
r.metricsRecorder = recorder
// Instrument gRPC Server
grpcServerMetrics := grpc_prometheus.NewServerMetrics(
grpc_prometheus.WithServerHandlingTimeHistogram(),
)
errRegister := prometheus.Register(grpcServerMetrics)
if errRegister != nil {
return fmt.Errorf("fail to register grpc server metrics: %s", errRegister)
}
r.unaryInterceptors = append(r.unaryInterceptors,
metrics.UnaryServerMetricsInterceptor(recorder),
grpcServerMetrics.UnaryServerInterceptor(),
)
r.streamInterceptors = append(r.streamInterceptors,
metrics.StreamServerMetricsInterceptor(recorder),
grpcServerMetrics.StreamServerInterceptor(),
)
return nil
})
}

// WithMetricsPort opens a HTTP endpoint for publishing metrics at the given addr
// and initializes metrics exporter.
// This endpoint is to be scraped by a Prometheus-style metrics scraper.
// It can be accessed at http://{addr}/metrics
func WithMetricsPort(addr string) Option {
return optionFunc(func(ctx context.Context, r *runState) error {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
r.metricsport = addr
r.metricshandler = mux

return nil
})
}

// WithOtelTracing adds the OpenTelemetry gRPC interceptors to both stream and unary servers
// The interceptors collect and export tracing data for gRPC requests and responses
func WithOtelTracing(interceptorOpt otelgrpc.Option) Option {
func WithOtelTracing(interceptorOpts ...otelgrpc.Option) Option {
return optionFunc(func(_ context.Context, r *runState) error {
interceptorOpts = append(interceptorOpts,
otelgrpc.WithMeterProvider(otelmetric.NewNoopMeterProvider()), // We don't want otel grpc metrics so discard them
)
r.unaryInterceptors = append(r.unaryInterceptors,
otelgrpc.UnaryServerInterceptor(interceptorOpt),
otelgrpc.UnaryServerInterceptor(interceptorOpts...),
)
r.streamInterceptors = append(r.streamInterceptors,
otelgrpc.StreamServerInterceptor(interceptorOpt),
otelgrpc.StreamServerInterceptor(interceptorOpts...),
)
return nil
})
Expand All @@ -244,6 +294,13 @@ func Run(ctx context.Context, opts ...Option) {
}()
}

// Start metrics endpoint if both metrics port and handler are configured
if rs.metricshandler != nil && rs.metricsport != "" {
go func() {
rs.logger.Error(http.ListenAndServe(rs.metricsport, rs.metricshandler), "Metrics handler unexpectedly exited")
}()
}

creds, err := extractTransportCredentialsFromRunState(ctx, rs)
if err != nil {
rs.logger.Error(err, "unable to extract transport credentials from runstate", "credsource", rs.credSource)
Expand Down

0 comments on commit 254fac1

Please sign in to comment.