Skip to content

Commit

Permalink
add load tracking to scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
francoposa committed May 16, 2024
1 parent b1a9a63 commit 3b1ab33
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/scheduler/queue/query_component_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ const ingesterQueueDimension = "ingester"
const storeGatewayQueueDimension = "store-gateway"
const ingesterAndStoreGatewayQueueDimension = "ingester-and-store-gateway"

// QueryComponentFlagsForRequest wraps QueryComponentFlags to parse the expected query component from a request.
// QueryComponentForRequest wraps QueryComponentFlags to parse the expected query component from a request.
// nolint: unused
func QueryComponentFlagsForRequest(req *SchedulerRequest) (isIngester, isStoreGateway bool) {
func QueryComponentForRequest(req *SchedulerRequest) string {
var expectedQueryComponent string
if len(req.AdditionalQueueDimensions) > 0 {
expectedQueryComponent = req.AdditionalQueueDimensions[0]
}

return QueryComponentFlags(expectedQueryComponent)
return expectedQueryComponent
}

// QueryComponentFlags interprets annotations by the frontend for the expected query component,
Expand Down Expand Up @@ -66,8 +66,8 @@ type QueryComponentLoad struct {
overloadFactor float64 // nolint: unused
}

// nolint: unused
const defaultOverloadFactor = 2.0 // component is overloaded if it has double the inflight requests as the other
// QueryComponentDefaultOverloadFactor component is overloaded if it has double the inflight requests as the other
const QueryComponentDefaultOverloadFactor = 2.0

func NewQueryComponentLoad(overloadFactor float64) (*QueryComponentLoad, error) {
if overloadFactor <= 1 {
Expand Down
14 changes: 14 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ type Scheduler struct {
// to the time they are completed by the querier or failed due to cancel, timeout, or disconnect.
schedulerInflightRequests map[requestKey]*queue.SchedulerRequest

// queryComponentLoad encapsulates tracking requests from the time they are forwarded to a querier
// to the time are completed by the querier or failed due to cancel, timeout, or disconnect.
// schedulerInflightRequests, tracking begins only when the request is sent to a querier.
queryComponentLoad *queue.QueryComponentLoad

// The ring is used to let other components discover query-scheduler replicas.
// The ring is optional.
schedulerLifecycler *ring.BasicLifecycler
Expand Down Expand Up @@ -122,13 +127,18 @@ func (cfg *Config) Validate() error {
// NewScheduler creates a new Scheduler.
func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Scheduler, error) {
var err error
queryComponentLoad, err := queue.NewQueryComponentLoad(queue.QueryComponentDefaultOverloadFactor)
if err != nil {
return nil, err
}

s := &Scheduler{
cfg: cfg,
log: log,
limits: limits,

schedulerInflightRequests: map[requestKey]*queue.SchedulerRequest{},
queryComponentLoad: queryComponentLoad,
connectedFrontends: map[string]*connectedFrontend{},
subservicesWatcher: services.NewFailureWatcher(),
}
Expand Down Expand Up @@ -452,6 +462,10 @@ func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuer
// Make sure to cancel request at the end to clean up resources.
defer s.cancelRequestAndRemoveFromPending(req.FrontendAddress, req.QueryID, "request complete")

queryComponentName := queue.QueryComponentForRequest(req)
s.queryComponentLoad.IncrementForComponentName(queryComponentName)
defer s.queryComponentLoad.DecrementForComponentName(queryComponentName)

// Handle the stream sending & receiving on a goroutine so we can
// monitor the contexts in a select and cancel things appropriately.
errCh := make(chan error, 1)
Expand Down

0 comments on commit 3b1ab33

Please sign in to comment.