Skip to content

Commit

Permalink
simulating skipping requests when store-gateways are overloaded
Browse files Browse the repository at this point in the history
  • Loading branch information
francoposa committed May 27, 2024
1 parent 8cdc73a commit 64bfaaf
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 58 deletions.
82 changes: 41 additions & 41 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ type Request interface{}
// in a fair fashion.
type RequestQueue struct {
services.Service
log log.Logger
Log log.Logger

// settings
maxOutstandingPerTenant int
additionalQueueDimensionsEnabled bool
forgetDelay time.Duration

// metrics for reporting
connectedQuerierWorkers *atomic.Int64
ConnectedQuerierWorkers *atomic.Int64
// metrics are broken out with "user" label for backwards compat, despite update to "tenant" terminology
queueLength *prometheus.GaugeVec // per user
discardedRequests *prometheus.CounterVec // per user
Expand All @@ -106,14 +106,14 @@ type RequestQueue struct {
querierOperations chan querierOperation
requestsToEnqueue chan requestToEnqueue
nextRequestForQuerierCalls chan *nextRequestForQuerierCall
waitingNextRequestForQuerierCalls *list.List
WaitingNextRequestForQuerierCalls *list.List

// QueryComponentUtilization encapsulates tracking requests from the time they are forwarded to a querier
// to the time are completed by the querier or failed due to cancel, timeout, or disconnect.
// Unlike schedulerInflightRequests, tracking begins only when the request is sent to a querier.
QueryComponentUtilization *QueryComponentUtilization

queueBroker *queueBroker
QueueBroker *queueBroker
}

type querierOperation struct {
Expand Down Expand Up @@ -155,13 +155,13 @@ func NewRequestQueue(

q := &RequestQueue{
// settings
log: log,
Log: log,
maxOutstandingPerTenant: maxOutstandingPerTenant,
additionalQueueDimensionsEnabled: additionalQueueDimensionsEnabled,
forgetDelay: forgetDelay,

// metrics for reporting
connectedQuerierWorkers: atomic.NewInt64(0),
ConnectedQuerierWorkers: atomic.NewInt64(0),
queueLength: queueLength,
discardedRequests: discardedRequests,
enqueueDuration: enqueueDuration,
Expand All @@ -172,10 +172,10 @@ func NewRequestQueue(
querierOperations: make(chan querierOperation),
requestsToEnqueue: make(chan requestToEnqueue),
nextRequestForQuerierCalls: make(chan *nextRequestForQuerierCall),
waitingNextRequestForQuerierCalls: list.New(),
WaitingNextRequestForQuerierCalls: list.New(),

QueryComponentUtilization: queryComponentCapacity,
queueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, forgetDelay),
QueueBroker: newQueueBroker(maxOutstandingPerTenant, additionalQueueDimensionsEnabled, forgetDelay),
}

q.Service = services.NewTimerService(forgetCheckPeriod, q.starting, q.forgetDisconnectedQueriers, q.stop).WithName("request queue")
Expand Down Expand Up @@ -213,41 +213,41 @@ func (q *RequestQueue) dispatcherLoop() {
requestSent := q.trySendNextRequestForQuerier(call)
if !requestSent {
// No requests available for this querier; add it to the list to try later.
q.waitingNextRequestForQuerierCalls.PushBack(call)
q.WaitingNextRequestForQuerierCalls.PushBack(call)
}
}

if needToDispatchQueries {
currentElement := q.waitingNextRequestForQuerierCalls.Front()
currentElement := q.WaitingNextRequestForQuerierCalls.Front()

for currentElement != nil && !q.queueBroker.isEmpty() {
for currentElement != nil && !q.QueueBroker.isEmpty() {
call := currentElement.Value.(*nextRequestForQuerierCall)
nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it.

if q.trySendNextRequestForQuerier(call) {
q.waitingNextRequestForQuerierCalls.Remove(currentElement)
q.WaitingNextRequestForQuerierCalls.Remove(currentElement)
}

currentElement = nextElement
}
}

if stopping && (q.queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) {
if stopping && (q.QueueBroker.isEmpty() || q.ConnectedQuerierWorkers.Load() == 0) {
// Tell any waiting GetNextRequestForQuerier calls that nothing is coming.
currentElement := q.waitingNextRequestForQuerierCalls.Front()
currentElement := q.WaitingNextRequestForQuerierCalls.Front()

for currentElement != nil {
call := currentElement.Value.(*nextRequestForQuerierCall)
call.sendError(ErrStopped)
currentElement = currentElement.Next()
}

if !q.queueBroker.isEmpty() {
if !q.QueueBroker.isEmpty() {
// This should never happen: unless all queriers have shut down themselves, they should remain connected
// until the RequestQueue service enters the stopped state (see Scheduler.QuerierLoop()), and so we won't
// stop the RequestQueue until we've drained all enqueued queries.
// But if this does happen, we want to know about it.
level.Warn(q.log).Log("msg", "shutting down dispatcher loop: have no connected querier workers, but request queue is not empty, so these requests will be abandoned")
level.Warn(q.Log).Log("msg", "shutting down dispatcher loop: have no connected querier workers, but request queue is not empty, so these requests will be abandoned")
}

// We are done.
Expand All @@ -268,7 +268,7 @@ func (q *RequestQueue) enqueueRequestToBroker(r requestToEnqueue) error {
tenantID: r.tenantID,
req: r.req,
}
err := q.queueBroker.enqueueRequestBack(&tr, r.maxQueriers)
err := q.QueueBroker.enqueueRequestBack(&tr, r.maxQueriers)
if err != nil {
if errors.Is(err, ErrTooManyRequests) {
q.discardedRequests.WithLabelValues(string(r.tenantID)).Inc()
Expand Down Expand Up @@ -296,7 +296,7 @@ func (q *RequestQueue) enqueueRequestToBroker(r requestToEnqueue) error {
//
// Sending the request to the call's result channel blocks until the result is read or the call is canceled.
func (q *RequestQueue) trySendNextRequestForQuerier(call *nextRequestForQuerierCall) (sent bool) {
req, tenant, idx, err := q.queueBroker.dequeueRequestForQuerier(call.lastTenantIndex.last, call.querierID)
req, tenant, idx, err := q.QueueBroker.dequeueRequestForQuerier(call.lastTenantIndex.last, call.querierID)
if err != nil {
// If this querier has told us it's shutting down, terminate GetNextRequestForQuerier with an error now...
call.sendError(err)
Expand All @@ -315,20 +315,20 @@ func (q *RequestQueue) trySendNextRequestForQuerier(call *nextRequestForQuerierC
schedulerRequest, ok := req.req.(*SchedulerRequest)
if ok {
queryComponentName := schedulerRequest.ExpectedQueryComponentName()
exceedsThreshold, queryComponent := q.QueryComponentUtilization.ExceedsThresholdForComponentName(
queryComponentName,
q.connectedQuerierWorkers.Load(),
q.queueBroker.tenantQueuesTree.ItemCount(),
q.waitingNextRequestForQuerierCalls.Len(),
)

if exceedsThreshold {
level.Info(q.log).Log(
"msg", "experimental: querier worker connections in use by query component exceed utilization threshold. no action taken",
"query_component_name", queryComponentName,
"overloaded_query_component", queryComponent,
)
}
//exceedsThreshold, queryComponent := q.QueryComponentUtilization.ExceedsThresholdForComponentName(
// queryComponentName,
// q.connectedQuerierWorkers.Load(),
// q.queueBroker.tenantQueuesTree.ItemCount(),
// q.waitingNextRequestForQuerierCalls.Len(),
//)
//
//if exceedsThreshold {
// level.Info(q.log).Log(
// "msg", "experimental: querier worker connections in use by query component exceed utilization threshold. no action taken",
// "query_component_name", queryComponentName,
// "overloaded_query_component", queryComponent,
// )
//}
q.QueryComponentUtilization.IncrementForComponentName(queryComponentName)
}

Expand All @@ -345,9 +345,9 @@ func (q *RequestQueue) trySendNextRequestForQuerier(call *nextRequestForQuerierC
q.queueLength.WithLabelValues(string(tenant.tenantID)).Dec()
} else {
// should never error; any item previously in the queue already passed validation
err := q.queueBroker.enqueueRequestFront(req, tenant.maxQueriers)
err := q.QueueBroker.enqueueRequestFront(req, tenant.maxQueriers)
if err != nil {
level.Error(q.log).Log(
level.Error(q.Log).Log(
"msg", "failed to re-enqueue query request after dequeue",
"err", err, "tenant", tenant.tenantID, "querier", call.querierID,
)
Expand Down Expand Up @@ -423,7 +423,7 @@ func (q *RequestQueue) stop(_ error) error {
}

func (q *RequestQueue) GetConnectedQuerierWorkersMetric() float64 {
return float64(q.connectedQuerierWorkers.Load())
return float64(q.ConnectedQuerierWorkers.Load())
}

func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error {
Expand Down Expand Up @@ -469,7 +469,7 @@ func (q *RequestQueue) processQuerierOperation(querierOp querierOperation) (resh
// All subsequent nextRequestForQuerierCalls for the querier will receive an ErrShuttingDown.
// The querier-worker's end of the QuerierLoop will exit once it has received enough errors,
// and the Querier connection counts will be decremented as the workers disconnect.
return q.queueBroker.notifyQuerierShutdown(querierOp.querierID)
return q.QueueBroker.notifyQuerierShutdown(querierOp.querierID)
case forgetDisconnected:
return q.processForgetDisconnectedQueriers()
default:
Expand All @@ -478,17 +478,17 @@ func (q *RequestQueue) processQuerierOperation(querierOp querierOperation) (resh
}

func (q *RequestQueue) processRegisterQuerierConnection(querierID QuerierID) (resharded bool) {
q.connectedQuerierWorkers.Inc()
return q.queueBroker.addQuerierConnection(querierID)
q.ConnectedQuerierWorkers.Inc()
return q.QueueBroker.addQuerierConnection(querierID)
}

func (q *RequestQueue) processUnregisterQuerierConnection(querierID QuerierID) (resharded bool) {
q.connectedQuerierWorkers.Dec()
return q.queueBroker.removeQuerierConnection(querierID, time.Now())
q.ConnectedQuerierWorkers.Dec()
return q.QueueBroker.removeQuerierConnection(querierID, time.Now())
}

func (q *RequestQueue) processForgetDisconnectedQueriers() (resharded bool) {
return q.queueBroker.forgetDisconnectedQueriers(time.Now())
return q.QueueBroker.forgetDisconnectedQueriers(time.Now())
}

// nextRequestForQuerierCall is a "request" indicating that the querier is ready to receive the next query request.
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,9 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
req: req,
}

require.Nil(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"tenant-1"}))
require.Nil(t, queueBroker.TenantQueuesTree.getNode(QueuePath{"tenant-1"}))
require.NoError(t, queueBroker.enqueueRequestBack(&tr, tenantMaxQueriers))
require.False(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"tenant-1"}).IsEmpty())
require.False(t, queueBroker.TenantQueuesTree.getNode(QueuePath{"tenant-1"}).IsEmpty())

ctx, cancel := context.WithCancel(context.Background())
call := &nextRequestForQuerierCall{
Expand All @@ -671,5 +671,5 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
// indicating not to re-submit a request for nextRequestForQuerierCall for the querier
require.True(t, queue.trySendNextRequestForQuerier(call))
// assert request was re-enqueued for tenant after failed send
require.False(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"tenant-1"}).IsEmpty())
require.False(t, queueBroker.TenantQueuesTree.getNode(QueuePath{"tenant-1"}).IsEmpty())
}
16 changes: 8 additions & 8 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type tenantRequest struct {
// queueBroker encapsulates access to tenant queues for pending requests
// and maintains consistency with the tenant-querier assignments
type queueBroker struct {
tenantQueuesTree *TreeQueue
TenantQueuesTree *TreeQueue

tenantQuerierAssignments tenantQuerierAssignments

Expand All @@ -33,7 +33,7 @@ type queueBroker struct {

func newQueueBroker(maxTenantQueueSize int, additionalQueueDimensionsEnabled bool, forgetDelay time.Duration) *queueBroker {
return &queueBroker{
tenantQueuesTree: NewTreeQueue("root"),
TenantQueuesTree: NewTreeQueue("root"),
tenantQuerierAssignments: tenantQuerierAssignments{
queriersByID: map[QuerierID]*querierConn{},
querierIDsSorted: nil,
Expand All @@ -48,7 +48,7 @@ func newQueueBroker(maxTenantQueueSize int, additionalQueueDimensionsEnabled boo
}

func (qb *queueBroker) isEmpty() bool {
return qb.tenantQueuesTree.IsEmpty()
return qb.TenantQueuesTree.IsEmpty()
}

// enqueueRequestBack is the standard interface to enqueue requests for dispatch to queriers.
Expand All @@ -64,13 +64,13 @@ func (qb *queueBroker) enqueueRequestBack(request *tenantRequest, tenantMaxQueri
if err != nil {
return err
}
if tenantQueueNode := qb.tenantQueuesTree.getNode(queuePath[:1]); tenantQueueNode != nil {
if tenantQueueNode := qb.TenantQueuesTree.getNode(queuePath[:1]); tenantQueueNode != nil {
if tenantQueueNode.ItemCount()+1 > qb.maxTenantQueueSize {
return ErrTooManyRequests
}
}

err = qb.tenantQueuesTree.EnqueueBackByPath(queuePath, request)
err = qb.TenantQueuesTree.EnqueueBackByPath(queuePath, request)
return err
}

Expand All @@ -89,7 +89,7 @@ func (qb *queueBroker) enqueueRequestFront(request *tenantRequest, tenantMaxQuer
if err != nil {
return err
}
return qb.tenantQueuesTree.EnqueueFrontByPath(queuePath, request)
return qb.TenantQueuesTree.EnqueueFrontByPath(queuePath, request)
}

func (qb *queueBroker) makeQueuePath(request *tenantRequest) (QueuePath, error) {
Expand All @@ -110,9 +110,9 @@ func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID Q
}

queuePath := QueuePath{string(tenant.tenantID)}
queueElement := qb.tenantQueuesTree.DequeueByPath(queuePath)
queueElement := qb.TenantQueuesTree.DequeueByPath(queuePath)

queueNodeAfterDequeue := qb.tenantQueuesTree.getNode(queuePath)
queueNodeAfterDequeue := qb.TenantQueuesTree.getNode(queuePath)
if queueNodeAfterDequeue == nil {
// queue node was deleted due to being empty after dequeue
qb.tenantQuerierAssignments.removeTenant(tenant.tenantID)
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/queue/tenant_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestQueuesRespectMaxTenantQueueSizeWithSubQueues(t *testing.T) {
}
// assert item count of tenant node and its subnodes
queuePath := QueuePath{"tenant-1"}
assert.Equal(t, maxTenantQueueSize, qb.tenantQueuesTree.getNode(queuePath).ItemCount())
assert.Equal(t, maxTenantQueueSize, qb.TenantQueuesTree.getNode(queuePath).ItemCount())

// assert equal distribution of queue items between tenant node and 3 subnodes
for _, v := range additionalQueueDimensions {
queuePath := append(QueuePath{"tenant-1"}, v...)
assert.Equal(t, maxTenantQueueSize/len(additionalQueueDimensions), qb.tenantQueuesTree.getNode(queuePath).LocalQueueLen())
assert.Equal(t, maxTenantQueueSize/len(additionalQueueDimensions), qb.TenantQueuesTree.getNode(queuePath).LocalQueueLen())
}

// assert error received when hitting a tenant's enqueue limit,
Expand Down Expand Up @@ -485,7 +485,7 @@ func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) (
}

queuePath := QueuePath{string(tenantID)}
return qb.tenantQueuesTree.getOrAddNode(queuePath)
return qb.TenantQueuesTree.getOrAddNode(queuePath)
}

// getQueue is a test utility, not intended for use by consumers of queueBroker
Expand All @@ -496,15 +496,15 @@ func (qb *queueBroker) getQueue(tenantID TenantID) *TreeQueue {
}

queuePath := QueuePath{string(tenantID)}
tenantQueue := qb.tenantQueuesTree.getNode(queuePath)
tenantQueue := qb.TenantQueuesTree.getNode(queuePath)
return tenantQueue
}

// removeTenantQueue is a test utility, not intended for use by consumers of queueBroker
func (qb *queueBroker) removeTenantQueue(tenantID TenantID) bool {
qb.tenantQuerierAssignments.removeTenant(tenantID)
queuePath := QueuePath{string(tenantID)}
return qb.tenantQueuesTree.deleteNode(queuePath)
return qb.TenantQueuesTree.deleteNode(queuePath)
}

func confirmOrderForQuerier(t *testing.T, qb *queueBroker, querier QuerierID, lastTenantIndex int, queues ...*list.List) int {
Expand Down Expand Up @@ -557,7 +557,7 @@ func isConsistent(qb *queueBroker) error {
}
}

tenantQueueCount := qb.tenantQueuesTree.NodeCount() - 1 // exclude root node
tenantQueueCount := qb.TenantQueuesTree.NodeCount() - 1 // exclude root node
if tenantCount != tenantQueueCount {
return fmt.Errorf("inconsistent number of tenants list and tenant queues")
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,24 @@ func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierL
it's possible that its own queue would perpetually contain only expired requests.
*/

queryComponentName := r.ExpectedQueryComponentName()
exceedsThreshold, queryComponent := s.requestQueue.QueryComponentUtilization.ExceedsThresholdForComponentName(
queryComponentName,
s.requestQueue.ConnectedQuerierWorkers.Load(),
s.requestQueue.QueueBroker.TenantQueuesTree.ItemCount(),
s.requestQueue.WaitingNextRequestForQuerierCalls.Len(),
)

if exceedsThreshold {
level.Info(s.requestQueue.Log).Log(
"msg", "experimental: querier worker connections in use by query component exceed utilization threshold. dropping request",
"query_component_name", queryComponentName,
"overloaded_query_component", queryComponent,
)
s.cancelRequestAndRemoveFromPending(r.FrontendAddress, r.QueryID, "request dropped due to overloaded query component")
continue
}

if r.Ctx.Err() != nil {
// Remove from pending requests.
s.cancelRequestAndRemoveFromPending(r.FrontendAddress, r.QueryID, "request cancelled")
Expand Down
1 change: 1 addition & 0 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (g *StoreGateway) syncStores(ctx context.Context, reason string) {

// Series implements the storegatewaypb.StoreGatewayServer interface.
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
time.Sleep(180 * time.Second)
ix := g.tracker.Insert(func() string {
return requestActivity(srv.Context(), "StoreGateway/Series", req)
})
Expand Down

0 comments on commit 64bfaaf

Please sign in to comment.