Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query scheduler multi dimension queueing data structures #6519

Closed
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b8c0da4
add tree queue data structures
francoposa Oct 24, 2023
c75ea64
add child queue deletion and max queue len to tree queue data structures
francoposa Oct 24, 2023
ec4ba0b
convert queuebroker len check to isEmpty
francoposa Oct 24, 2023
0dae2ba
messy fix that appears to pass previously failed integration tests
francoposa Oct 25, 2023
ee3bc36
remove old methods and now passing integration tests; TODO clean up u…
francoposa Oct 27, 2023
4581e1f
WIP on code cleanup; re-introducing commented tests with fixes for ne…
francoposa Oct 27, 2023
1abe62f
Merge branch 'main' into francoposa/query-scheduler-multi-dimension-q…
francoposa Oct 30, 2023
d24f6e7
re-fix merge resolution
francoposa Oct 30, 2023
12c047c
address linting
francoposa Oct 30, 2023
5123936
address linting
francoposa Oct 30, 2023
7e4130d
Update mimir-prometheus (#6520)
charleskorn Oct 30, 2023
264fb50
Renovate: Ignore opentracing-contrib/go-grpc fork (#6512)
aknuds1 Oct 31, 2023
0ac75f2
chore(deps): update memcached docker tag to v1.6.22 (main) (#6455)
renovate[bot] Oct 31, 2023
6825f10
chore(deps): update helm release grafana-agent-operator to v0.3.8 (ma…
renovate[bot] Oct 31, 2023
f2986cf
Update dskit, add `-server.http-read-header-timeout` option (#6517)
pstibrany Oct 31, 2023
a8f8641
Add tracing span to Ruler.getLocalRules() (#6515)
pracucci Oct 31, 2023
c742c4b
chore(deps): update registry.k8s.io/kustomize/kustomize docker tag to…
renovate[bot] Oct 31, 2023
f0e55a5
chore(deps): update grafana/agent docker tag to v0.37.3 (#6453)
renovate[bot] Oct 31, 2023
311f572
Ensure that DoNotLogErrors set DoNotLogError header in the HTTP respo…
duricanikolic Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (q *RequestQueue) dispatcherLoop() {
if needToDispatchQueries {
currentElement := waitingGetNextRequestForQuerierCalls.Front()

for currentElement != nil && queueBroker.len() > 0 {
for currentElement != nil && !queueBroker.isEmpty() {
call := currentElement.Value.(*nextRequestForQuerierCall)
nextElement := currentElement.Next() // We have to capture the next element before calling Remove(), as Remove() clears it.

Expand All @@ -205,7 +205,7 @@ func (q *RequestQueue) dispatcherLoop() {
}
}

if stopping && (queueBroker.len() == 0 || q.connectedQuerierWorkers.Load() == 0) {
if stopping && (queueBroker.isEmpty() || q.connectedQuerierWorkers.Load() == 0) {
// Tell any waiting GetNextRequestForQuerier calls that nothing is coming.
currentElement := waitingGetNextRequestForQuerierCalls.Front()

Expand Down Expand Up @@ -241,7 +241,6 @@ func (q *RequestQueue) enqueueRequestToBroker(broker *queueBroker, r requestToEn
}
return err
}

q.queueLength.WithLabelValues(string(r.tenantID)).Inc()

// Call the successFn here to ensure we call it before sending this request to a waiting querier.
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
maxQueriers: 0, // no sharding
}

require.Nil(t, queueBroker.tenantQueues["tenant-1"])
require.Nil(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"root", "tenant-1"}))
require.NoError(t, queueBroker.enqueueRequestBack(&tr))
require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1)
require.False(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"root", "tenant-1"}).isEmpty())

ctx, cancel := context.WithCancel(context.Background())
call := &nextRequestForQuerierCall{
Expand All @@ -278,5 +278,5 @@ func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSend
// indicating not to re-submit a request for nextRequestForQuerierCall for the querier
require.True(t, queue.tryDispatchRequestToQuerier(queueBroker, call))
// assert request was re-enqueued for tenant after failed send
require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1)
require.False(t, queueBroker.tenantQueuesTree.getNode(QueuePath{"root", "tenant-1"}).isEmpty())
}
94 changes: 31 additions & 63 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package queue

import (
"container/list"
"math/rand"
"sort"
"time"
Expand Down Expand Up @@ -101,20 +100,16 @@ type queueTenant struct {
// queueBroker encapsulates access to tenant queues for pending requests
// and maintains consistency with the tenant-querier assignments
type queueBroker struct {
tenantQueues map[TenantID]*tenantQueue
tenantQueuesTree *TreeQueue

tenantQuerierAssignments tenantQuerierAssignments

maxTenantQueueSize int
}

type tenantQueue struct {
requests *list.List
}

func newQueueBroker(maxTenantQueueSize int, forgetDelay time.Duration) *queueBroker {
return &queueBroker{
tenantQueues: map[TenantID]*tenantQueue{},
tenantQueuesTree: NewTreeQueue("root", maxTenantQueueSize),
tenantQuerierAssignments: tenantQuerierAssignments{
queriersByID: map[QuerierID]*querierConn{},
querierIDsSorted: nil,
Expand All @@ -127,22 +122,18 @@ func newQueueBroker(maxTenantQueueSize int, forgetDelay time.Duration) *queueBro
}
}

func (qb *queueBroker) len() int {
return len(qb.tenantQueues)
func (qb *queueBroker) isEmpty() bool {
return qb.tenantQueuesTree.isEmpty()
}

func (qb *queueBroker) enqueueRequestBack(request *tenantRequest) error {
queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers)
_, err := qb.tenantQuerierAssignments.getOrAddTenant(request.tenantID, request.maxQueriers)
if err != nil {
return err
}

if queue.Len()+1 > qb.maxTenantQueueSize {
return ErrTooManyRequests
}

queue.PushBack(request)
return nil
queuePath := QueuePath{qb.tenantQueuesTree.name, string(request.tenantID)}
return qb.tenantQueuesTree.EnqueueBackByPath(queuePath, request)
}

// enqueueRequestFront should only be used for re-enqueueing previously dequeued requests
Expand All @@ -151,34 +142,13 @@ func (qb *queueBroker) enqueueRequestBack(request *tenantRequest) error {
// max tenant queue size checks are skipped even though queue size violations
// are not expected to occur when re-enqueuing a previously dequeued request.
func (qb *queueBroker) enqueueRequestFront(request *tenantRequest) error {
queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers)
_, err := qb.tenantQuerierAssignments.getOrAddTenant(request.tenantID, request.maxQueriers)
if err != nil {
return err
}

queue.PushFront(request)
return nil
}

// getOrAddTenantQueue returns existing or new queue for tenant.
// maxQueriers is used to compute which queriers should handle requests for this tenant.
// If maxQueriers is <= 0, all queriers can handle this tenant's requests.
// If maxQueriers has changed since the last call, queriers for this are recomputed.
func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) (*list.List, error) {
_, err := qb.tenantQuerierAssignments.getOrAddTenant(tenantID, maxQueriers)
if err != nil {
return nil, err
}
queue := qb.tenantQueues[tenantID]

if queue == nil {
queue = &tenantQueue{
requests: list.New(),
}
qb.tenantQueues[tenantID] = queue
}

return queue.requests, nil
queuePath := QueuePath{qb.tenantQueuesTree.name, string(request.tenantID)}
return qb.tenantQueuesTree.EnqueueFrontByPath(queuePath, request)
}

func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID QuerierID) (*tenantRequest, TenantID, int, error) {
Expand All @@ -187,24 +157,22 @@ func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID Q
return nil, tenantID, tenantIndex, err
}

tenantQueue := qb.tenantQueues[tenantID]
if tenantQueue == nil {
return nil, tenantID, tenantIndex, nil
}

// queue will be nonempty as empty queues are deleted
queueElement := tenantQueue.requests.Front()
tenantQueue.requests.Remove(queueElement)
queuePath := QueuePath{qb.tenantQueuesTree.name, string(tenantID)}
_, queueElement := qb.tenantQueuesTree.DequeueByPath(queuePath)

if tenantQueue.requests.Len() == 0 {
qb.deleteQueue(tenantID)
queueNodeAfterDequeue := qb.tenantQueuesTree.getNode(queuePath)
if queueNodeAfterDequeue == nil {
// queue node was deleted due to being empty after dequeue
qb.tenantQuerierAssignments.removeTenant(tenantID)
}

// re-casting to same type it was enqueued as; panic would indicate a bug
request := queueElement.Value.(*tenantRequest)
var request *tenantRequest
if queueElement != nil {
// re-casting to same type it was enqueued as; panic would indicate a bug
request = queueElement.(*tenantRequest)
}

return request, tenantID, tenantIndex, nil

}

func (qb *queueBroker) addQuerierConnection(querierID QuerierID) {
Expand All @@ -220,16 +188,8 @@ func (qb *queueBroker) notifyQuerierShutdown(querierID QuerierID) {
}

func (qb *queueBroker) forgetDisconnectedQueriers(now time.Time) int {
return qb.tenantQuerierAssignments.forgetDisconnectedQueriers(now)
}

func (qb *queueBroker) deleteQueue(tenantID TenantID) {
tenantQueue := qb.tenantQueues[tenantID]
if tenantQueue == nil {
return
}
delete(qb.tenantQueues, tenantID)
qb.tenantQuerierAssignments.removeTenant(tenantID)
numDisconnected := qb.tenantQuerierAssignments.forgetDisconnectedQueriers(now)
return numDisconnected
}

func (tqa *tenantQuerierAssignments) getNextTenantIDForQuerier(lastTenantIndex int, querierID QuerierID) (TenantID, int, error) {
Expand Down Expand Up @@ -264,6 +224,14 @@ func (tqa *tenantQuerierAssignments) getNextTenantIDForQuerier(lastTenantIndex i
return emptyTenantID, lastTenantIndex, nil
}

func (tqa *tenantQuerierAssignments) getTenant(tenantID TenantID) (*queueTenant, error) {
if tenantID == emptyTenantID {
return nil, ErrInvalidTenantID
}
tenant := tqa.tenantsByID[tenantID]
return tenant, nil
}

func (tqa *tenantQuerierAssignments) getOrAddTenant(tenantID TenantID, maxQueriers int) (*queueTenant, error) {
if tenantID == emptyTenantID {
// empty tenantID is not allowed; "" is used for free spot
Expand Down
62 changes: 45 additions & 17 deletions pkg/scheduler/queue/tenant_queues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestQueues(t *testing.T) {
lastTenantIndex = confirmOrderForQuerier(t, qb, "querier-1", lastTenantIndex, qTwo, qThree, qOne)

// Remove one: ["" two three]
qb.deleteQueue("one")
qb.removeTenantQueue("one")
assert.NoError(t, isConsistent(qb))

lastTenantIndex = confirmOrderForQuerier(t, qb, "querier-1", lastTenantIndex, qTwo, qThree, qTwo)
Expand All @@ -60,17 +60,17 @@ func TestQueues(t *testing.T) {
lastTenantIndex = confirmOrderForQuerier(t, qb, "querier-1", lastTenantIndex, qThree, qFour, qTwo, qThree)

// Remove two: [four "" three]
qb.deleteQueue("two")
qb.removeTenantQueue("two")
assert.NoError(t, isConsistent(qb))

lastTenantIndex = confirmOrderForQuerier(t, qb, "querier-1", lastTenantIndex, qFour, qThree, qFour)

// Remove three: [four]
qb.deleteQueue("three")
qb.removeTenantQueue("three")
assert.NoError(t, isConsistent(qb))

// Remove four: []
qb.deleteQueue("four")
qb.removeTenantQueue("four")
assert.NoError(t, isConsistent(qb))

req, _, _, err = qb.dequeueRequestForQuerier(lastTenantIndex, "querier-1")
Expand All @@ -95,7 +95,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
// After notify shutdown for querier-2, it's expected to own no queue.
qb.notifyQuerierShutdown("querier-2")
tenantID, _, err := qb.tenantQuerierAssignments.getNextTenantIDForQuerier(-1, "querier-2")
tenantQueue := qb.tenantQueues[tenantID]
tenantQueue := qb.getQueue(tenantID)
assert.Nil(t, tenantQueue)
assert.Equal(t, emptyTenantID, tenantID)
assert.Equal(t, ErrQuerierShuttingDown, err)
Expand All @@ -106,7 +106,7 @@ func TestQueuesOnTerminatingQuerier(t *testing.T) {
// After disconnecting querier-2, it's expected to own no queue.
qb.tenantQuerierAssignments.removeQuerier("querier-2")
tenantID, _, err = qb.tenantQuerierAssignments.getNextTenantIDForQuerier(-1, "querier-2")
tenantQueue = qb.tenantQueues[tenantID]
tenantQueue = qb.getQueue(tenantID)
assert.Nil(t, tenantQueue)
assert.Equal(t, emptyTenantID, tenantID)
assert.Equal(t, ErrQuerierShuttingDown, err)
Expand Down Expand Up @@ -206,14 +206,15 @@ func TestQueuesConsistency(t *testing.T) {
switch r.Int() % 6 {
case 0:
queue, err := qb.getOrAddTenantQueue(generateTenant(r), 3)
//queueTenant, err := qb.tenantQuerierAssignments.getOrAddTenant(generateTenant(r), 3)
assert.Nil(t, err)
assert.NotNil(t, queue)
case 1:
querierID := generateQuerier(r)
_, tenantIndex, _ := qb.tenantQuerierAssignments.getNextTenantIDForQuerier(lastTenantIndexes[querierID], querierID)
lastTenantIndexes[querierID] = tenantIndex
case 2:
qb.deleteQueue(generateTenant(r))
qb.removeTenantQueue(generateTenant(r))
case 3:
q := generateQuerier(r)
qb.addQuerierConnection(q)
Expand Down Expand Up @@ -416,15 +417,42 @@ func getOrAdd(t *testing.T, qb *queueBroker, tenantID TenantID, maxQueriers int)
reAddedQueue, err := qb.getOrAddTenantQueue(tenantID, maxQueriers)
assert.Nil(t, err)
assert.Equal(t, addedQueue, reAddedQueue)
return addedQueue
return addedQueue.localQueue
}

func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) (*TreeQueue, error) {
_, err := qb.tenantQuerierAssignments.getOrAddTenant(tenantID, maxQueriers)
if err != nil {
return nil, err
}

queuePath := QueuePath{qb.tenantQueuesTree.name, string(tenantID)}
return qb.tenantQueuesTree.getOrAddNode(queuePath)
}

func (qb *queueBroker) getQueue(tenantID TenantID) *TreeQueue {
tenant, err := qb.tenantQuerierAssignments.getTenant(tenantID)
if tenant == nil || err != nil {
return nil
}

queuePath := QueuePath{qb.tenantQueuesTree.name, string(tenantID)}
tenantQueue := qb.tenantQueuesTree.getNode(queuePath)
return tenantQueue
}

func (qb *queueBroker) removeTenantQueue(tenantID TenantID) bool {
qb.tenantQuerierAssignments.removeTenant(tenantID)
queuePath := QueuePath{qb.tenantQueuesTree.name, string(tenantID)}
return qb.tenantQueuesTree.deleteNode(queuePath)
}

func confirmOrderForQuerier(t *testing.T, qb *queueBroker, querier QuerierID, lastTenantIndex int, queues ...*list.List) int {
for _, queue := range queues {
var err error
tenantID, _, err := qb.tenantQuerierAssignments.getNextTenantIDForQuerier(lastTenantIndex, querier)
tenantQueue := qb.tenantQueues[tenantID]
assert.Equal(t, queue, tenantQueue.requests)
tenantQueue := qb.getQueue(tenantID)
assert.Equal(t, queue, tenantQueue.localQueue)
assert.NoError(t, isConsistent(qb))
assert.NoError(t, err)
}
Expand All @@ -438,11 +466,11 @@ func isConsistent(qb *queueBroker) error {

tenantCount := 0
for ix, tenantID := range qb.tenantQuerierAssignments.tenantIDOrder {
tq := qb.tenantQueues[tenantID]
if tenantID != "" && tq == nil {
//tq := qb.tenantQueues[tenantID]
if tenantID != "" && qb.getQueue(tenantID) == nil {
return fmt.Errorf("tenant %s doesn't have queue", tenantID)
}
if tenantID == "" && tq != nil {
if tenantID == "" && qb.getQueue(tenantID) != nil {
return fmt.Errorf("tenant %s shouldn't have queue", tenantID)
}
if tenantID == "" {
Expand Down Expand Up @@ -470,17 +498,17 @@ func isConsistent(qb *queueBroker) error {
}
}

if tenantCount != len(qb.tenantQueues) {
return fmt.Errorf("inconsistent number of tenants list and tenant queues")
}
//if tenantCount != len(qb.tenantQueues) {
// return fmt.Errorf("inconsistent number of tenants list and tenant queues")
//}
Comment on lines +501 to +503
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO implement recursive node count on tree queue to enable this check


return nil
}

// getTenantsByQuerier returns the list of tenants handled by the provided QuerierID.
func getTenantsByQuerier(broker *queueBroker, querierID QuerierID) []TenantID {
var tenantIDs []TenantID
for tenantID := range broker.tenantQueues {
for _, tenantID := range broker.tenantQuerierAssignments.tenantIDOrder {
querierSet := broker.tenantQuerierAssignments.tenantQuerierIDs[tenantID]
if querierSet == nil {
// If it's nil then all queriers can handle this tenant.
Expand Down
Loading
Loading