Skip to content

Commit

Permalink
mimir: simplify grpc in-flight limiter
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo committed Mar 15, 2024
1 parent 44e3ebc commit 18f3b7c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 27 deletions.
18 changes: 5 additions & 13 deletions pkg/mimir/grpc_push_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,21 @@ import (
"github.com/grafana/mimir/pkg/api"
)

type ingesterPushReceiver interface {
type pushReceiver interface {
StartPushRequest(ctx context.Context, requestSize int64) (context.Context, error)
FinishPushRequest(ctx context.Context)
}

// Interface exposed by Distributor.
type distributorPushReceiver interface {
StartPushRequest(ctx context.Context, httpgrpcRequestSize int64) (context.Context, error)
FinishPushRequest(ctx context.Context)
}

// getPushReceiver function must be constant -- return same value on each call.
// if getIngester or getDistributor functions are nil, those specific checks are not used.
func newGrpcInflightMethodLimiter(getIngester func() ingesterPushReceiver, getDistributor func() distributorPushReceiver) *grpcInflightMethodLimiter {
func newGrpcInflightMethodLimiter(getIngester, getDistributor func() pushReceiver) *grpcInflightMethodLimiter {
return &grpcInflightMethodLimiter{getIngester: getIngester, getDistributor: getDistributor}
}

// grpcInflightMethodLimiter implements gRPC TapHandle and gRPC stats.Handler.
type grpcInflightMethodLimiter struct {
getIngester func() ingesterPushReceiver
getDistributor func() distributorPushReceiver
getIngester func() pushReceiver
getDistributor func() pushReceiver
}

type ctxKey int
Expand All @@ -63,9 +57,7 @@ func (g *grpcInflightMethodLimiter) RPCCallStarting(ctx context.Context, methodN
return ctx, errNoIngester
}

reqSize := getMessageSize(md, grpcutil.MetadataMessageSize)

ctx, err := ing.StartPushRequest(ctx, reqSize)
ctx, err := ing.StartPushRequest(ctx, getMessageSize(md, grpcutil.MetadataMessageSize))
if err != nil {
return ctx, status.Error(codes.Unavailable, err.Error())
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/mimir/grpc_push_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func TestGrpcInflightMethodLimiter(t *testing.T) {
t.Run("nil ingester and distributor receiver", func(t *testing.T) {
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return nil }, func() distributorPushReceiver { return nil })
l := newGrpcInflightMethodLimiter(func() pushReceiver { return nil }, func() pushReceiver { return nil })

ctx, err := l.RPCCallStarting(context.Background(), "test", nil)
require.NoError(t, err)
Expand All @@ -44,7 +44,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("ingester push receiver, wrong method name", func(t *testing.T) {
m := &mockIngesterReceiver{}
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return m }, nil)
l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil)

ctx, err := l.RPCCallStarting(context.Background(), "test", nil)
require.NoError(t, err)
Expand All @@ -57,7 +57,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("ingester push receiver, check returns error", func(t *testing.T) {
m := &mockIngesterReceiver{}
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return m }, nil)
l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil)

m.returnError = errors.New("hello there")
ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, nil)
Expand All @@ -69,7 +69,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("ingester push receiver, without size", func(t *testing.T) {
m := &mockIngesterReceiver{}
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return m }, nil)
l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil)

ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, nil)
require.NoError(t, err)
Expand All @@ -89,7 +89,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("ingester push receiver, with size provided", func(t *testing.T) {
m := &mockIngesterReceiver{}
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return m }, nil)
l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil)

ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, metadata.New(map[string]string{
grpcutil.MetadataMessageSize: "123456",
Expand All @@ -111,7 +111,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("ingester push receiver, with wrong size", func(t *testing.T) {
m := &mockIngesterReceiver{}
l := newGrpcInflightMethodLimiter(func() ingesterPushReceiver { return m }, nil)
l := newGrpcInflightMethodLimiter(func() pushReceiver { return m }, nil)

ctx, err := l.RPCCallStarting(context.Background(), ingesterPushMethod, metadata.New(map[string]string{
grpcutil.MetadataMessageSize: "wrong",
Expand All @@ -134,7 +134,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {
t.Run("distributor push via httpgrpc", func(t *testing.T) {
m := &mockDistributorReceiver{}

l := newGrpcInflightMethodLimiter(nil, func() distributorPushReceiver { return m })
l := newGrpcInflightMethodLimiter(nil, func() pushReceiver { return m })

ctx, err := l.RPCCallStarting(context.Background(), "test", nil)
require.NoError(t, err)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {
t.Run("distributor push via httpgrpc, GET", func(t *testing.T) {
m := &mockDistributorReceiver{}

l := newGrpcInflightMethodLimiter(nil, func() distributorPushReceiver { return m })
l := newGrpcInflightMethodLimiter(nil, func() pushReceiver { return m })

_, err := l.RPCCallStarting(context.Background(), httpgrpcHandleMethod, metadata.New(map[string]string{
httpgrpc.MetadataMethod: "GET",
Expand All @@ -192,7 +192,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("distributor push via httpgrpc, /hello", func(t *testing.T) {
m := &mockDistributorReceiver{}
l := newGrpcInflightMethodLimiter(nil, func() distributorPushReceiver { return m })
l := newGrpcInflightMethodLimiter(nil, func() pushReceiver { return m })

_, err := l.RPCCallStarting(context.Background(), httpgrpcHandleMethod, metadata.New(map[string]string{
httpgrpc.MetadataMethod: "POST",
Expand All @@ -207,7 +207,7 @@ func TestGrpcInflightMethodLimiter(t *testing.T) {

t.Run("distributor push via httpgrpc, wrong message size", func(t *testing.T) {
m := &mockDistributorReceiver{}
l := newGrpcInflightMethodLimiter(nil, func() distributorPushReceiver { return m })
l := newGrpcInflightMethodLimiter(nil, func() pushReceiver { return m })

_, err := l.RPCCallStarting(context.Background(), httpgrpcHandleMethod, metadata.New(map[string]string{
httpgrpc.MetadataMethod: "POST",
Expand Down
8 changes: 4 additions & 4 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,9 @@ func (t *Mimir) initServer() (services.Service, error) {
// t.Ingester or t.Distributor will be available. There's no race condition here, because gRPC server (service returned by this method, ie. initServer)
// is started only after t.Ingester and t.Distributor are set in initIngester or initDistributorService.

var ingFn func() ingesterPushReceiver
var ingFn func() pushReceiver
if t.Cfg.Ingester.LimitInflightRequestsUsingGrpcMethodLimiter {
ingFn = func() ingesterPushReceiver {
ingFn = func() pushReceiver {
// Return explicit nil, if there's no ingester. We don't want to return typed-nil as interface value.
if t.Ingester == nil {
return nil
Expand All @@ -297,9 +297,9 @@ func (t *Mimir) initServer() (services.Service, error) {
}
}

var distFn func() distributorPushReceiver
var distFn func() pushReceiver
if t.Cfg.Distributor.LimitInflightRequestsUsingGrpcMethodLimiter {
distFn = func() distributorPushReceiver {
distFn = func() pushReceiver {
// Return explicit nil, if there's no distributor. We don't want to return typed-nil as interface value.
if t.Distributor == nil {
return nil
Expand Down

0 comments on commit 18f3b7c

Please sign in to comment.