Skip to content

Commit

Permalink
Add test to illustrate updating tenantQuerierAssignments outside of tree
Browse files Browse the repository at this point in the history
  • Loading branch information
chencs committed Apr 10, 2024
1 parent 9ca053e commit 0d6b1ac
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
35 changes: 20 additions & 15 deletions pkg/scheduler/queue/tree_queue_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ func (t *Tree) newNode(name string, depth int) TreeNodeIFace {
}
case shuffleShard:
return &ShuffleShardNode{
name: name,
localQueue: list.New(),
queueOrder: make([]string, 0),
queueMap: make(map[string]TreeNodeIFace, 1),
depth: depth,
stateUpdateInfo: t.ShuffleShardState,
name: name,
localQueue: list.New(),
queueOrder: make([]string, 0),
queueMap: make(map[string]TreeNodeIFace, 1),
depth: depth,
ShuffleShardState: t.ShuffleShardState,
}
default:
panic("no defined node type at provided depth")
Expand Down Expand Up @@ -218,13 +218,13 @@ func (rrn *RoundRobinNode) IsEmpty() bool {
}

type ShuffleShardNode struct {
name string
localQueue *list.List // should never be populated
queuePosition int
queueOrder []string // will be a slice of tenants (+ self?)
queueMap map[string]TreeNodeIFace
depth int
stateUpdateInfo *ShuffleShardState
name string
localQueue *list.List // should never be populated
queuePosition int
queueOrder []string // will be a slice of tenants (+ self?)
queueMap map[string]TreeNodeIFace
depth int
*ShuffleShardState
}

func (ssn *ShuffleShardNode) Name() string {
Expand All @@ -242,6 +242,11 @@ func (ssn *ShuffleShardNode) dequeue() (path QueuePath, v any) {
return path, nil
}

// can't get a tenant if no querier set
if ssn.currentQuerier == nil {
return path, nil
}

// no children, but has a non-zero length local queue -- should probably never happen?
if len(ssn.queueOrder) == 0 {
if elt := ssn.localQueue.Front(); elt != nil {
Expand Down Expand Up @@ -274,8 +279,8 @@ func (ssn *ShuffleShardNode) dequeueGetNode() (string, TreeNodeIFace) {
checkIndex = 0
}
tenantName := ssn.queueOrder[checkIndex]
if tenantQuerierSet, ok := ssn.stateUpdateInfo.tenantQuerierMap[TenantID(tenantName)]; ok {
if _, ok := tenantQuerierSet[*ssn.stateUpdateInfo.currentQuerier]; ok {
if tenantQuerierSet, ok := ssn.tenantQuerierMap[TenantID(tenantName)]; ok {
if _, ok := tenantQuerierSet[*ssn.currentQuerier]; ok {
return tenantName, ssn.queueMap[tenantName]
}
}
Expand Down
43 changes: 41 additions & 2 deletions pkg/scheduler/queue/tree_queue_sketch_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package queue

import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
)
Expand Down Expand Up @@ -207,7 +206,6 @@ func Test_RoundRobinDequeue(t *testing.T) {
var finalValue any
for i := 0; i < tt.numDequeuesToExpected; i++ {
_, finalValue = tree.Dequeue()
fmt.Println("v: ", finalValue)
}
v, ok := finalValue.(string)
require.True(t, ok)
Expand Down Expand Up @@ -324,3 +322,44 @@ func Test_ShuffleShardDequeue(t *testing.T) {
}

}

// This test is a little messy; I can clean it up, but it's meant to illustrate that we can update a state
// in tenantQuerierAssignments, and the tree dequeue behavior will adjust accordingly.
func Test_ChangeShuffleShardState(t *testing.T) {
tqa := tenantQuerierAssignments{
tenantQuerierIDs: map[TenantID]map[QuerierID]struct{}{"tenant-1": {"querier-1": {}}, "tenant-2": {"querier-2": {}}},
}

state := &ShuffleShardState{
tenantQuerierMap: tqa.tenantQuerierIDs,
currentQuerier: nil,
}

tree, err := NewTree([]NodeType{shuffleShard, roundRobin, roundRobin}, state)
require.NoError(t, err)
err = tree.EnqueueBackByPath(QueuePath{"tenant-1", "query-component-1"}, "query-1")
err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-2")
err = tree.EnqueueBackByPath(QueuePath{"tenant-2", "query-component-1"}, "query-3")
require.NoError(t, err)

querier1 := QuerierID("querier-1")
querier2 := QuerierID("querier-2")
querier3 := QuerierID("querier-3")

// set state to querier-2 should dequeue query-2
state.currentQuerier = &querier2
_, v := tree.Dequeue()
require.Equal(t, "query-2", v)

// set state to querier-1 should dequeue query-1
state.currentQuerier = &querier1
_, v = tree.Dequeue()
require.Equal(t, "query-1", v)

// update tqa map to assign querier-3 to tenant-2, then set state to querier-3 should dequeue query-3
tqa.tenantQuerierIDs["tenant-2"]["querier-3"] = struct{}{}
state.currentQuerier = &querier3
_, v = tree.Dequeue()
require.Equal(t, "query-3", v)

}

0 comments on commit 0d6b1ac

Please sign in to comment.