Skip to content

Commit

Permalink
add-e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed Nov 7, 2023
1 parent 3efa4ed commit 323abb6
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 39 deletions.
11 changes: 10 additions & 1 deletion guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,18 @@ type FilterPlugin interface {
type PostFilterPlugin interface {
Plugin

PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatusMap) (nominatedNodeName string, nominatingMode int32, status *Status)
PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatusMap) (nominatedNodeName string, nominatingMode NominatingMode, status *Status)
}

// NominatingMode is the Mode which is returned from PostFilter.
type NominatingMode int32

// These are predefined modes
const (
ModeNoop NominatingMode = iota
ModeOverride
)

// EnqueueExtensions is a WebAssembly implementation of framework.EnqueueExtensions.
type EnqueueExtensions interface {
EventsToRegister() []ClusterEvent
Expand Down
14 changes: 4 additions & 10 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package postfilter

import (
"runtime"
"unsafe"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"

"sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/nodetostatus"
)

Expand Down Expand Up @@ -79,14 +78,9 @@ func _postfilter() uint64 { //nolint
// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
nominatedNodeName, nominatingMode, status := postfilter.PostFilter(cyclestate.Values, cyclestate.Pod, &nodeToStatusMap{})

cString := []byte(nominatedNodeName)
if cString != nil {
ptr := uint32(uintptr(unsafe.Pointer(&cString[0])))
size := uint32(len(cString))
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(cString) // until ptr is no longer needed.
}
ptr, size := mem.StringToPtr(nominatedNodeName)
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(nominatedNodeName) // until ptr is no longer needed.

return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status))
}
Expand Down
21 changes: 20 additions & 1 deletion guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
Expand Down Expand Up @@ -56,6 +57,7 @@ func main() {
enqueue.SetPlugin(plugin)
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
prescore.SetPlugin(plugin)
score.SetPlugin(plugin)
}
Expand Down Expand Up @@ -109,10 +111,25 @@ func (statePlugin) Filter(state api.CycleState, pod proto.Pod, _ api.NodeInfo) (
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
func (statePlugin) PostFilter(state api.CycleState, pod proto.Pod, _ api.NodeToStatusMap) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from filter")
}
mustNotScoreState(state)
if val, ok := state.Read(preFilterStateKey); !ok {
panic("didn't propagate state from pre-filter")
} else if _, ok = val.(preFilterStateVal)["filter"]; !ok {
panic("filter value lost propagating from post-filter")
} else {
val.(preFilterStateVal)["postfilter"] = struct{}{}
}
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from post-filter")
}
mustFilterState(state)
if _, ok := state.Read(preScoreStateKey); ok {
panic("didn't reset score state on pre-score")
Expand Down Expand Up @@ -153,5 +170,7 @@ func mustFilterState(state api.CycleState) {
panic("didn't propagate state from pre-filter")
} else if _, ok = val.(preFilterStateVal)["filter"]; !ok {
panic("filter value lost propagating from pre-score")
} else if _, ok = val.(preFilterStateVal)["postfilter"]; !ok {
panic("postfilter value lost propagating from pre-score")
}
}
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
37 changes: 37 additions & 0 deletions guest/testdata/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
)

type extensionPoints interface {
api.PreFilterPlugin
api.FilterPlugin
api.PostFilterPlugin
}

func main() {
Expand All @@ -40,10 +42,13 @@ func main() {
plugin = filterPlugin{}
case "preFilter":
plugin = preFilterPlugin{}
case "postFilter":
plugin = postFilterPlugin{}
}
}
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
}

// noopPlugin doesn't do anything, except evaluate each parameter.
Expand All @@ -62,6 +67,13 @@ func (noopPlugin) Filter(state api.CycleState, pod proto.Pod, nodeInfo api.NodeI
return
}

func (noopPlugin) PostFilter(state api.CycleState, pod proto.Pod, nodeMap api.NodeToStatusMap) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
_, _ = state.Read("ok")
_ = pod.Spec()
_ = nodeMap.NodeToStatusMap()
return
}

// preFilterPlugin schedules a node if its name equals its pod spec.
type preFilterPlugin struct{ noopPlugin }

Expand Down Expand Up @@ -96,3 +108,28 @@ func (filterPlugin) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.NodeInf
Reason: podSpecNodeName + " != " + nodeName,
}
}

type postFilterPlugin struct{ noopPlugin }

func (postFilterPlugin) PostFilter(_ api.CycleState, pod proto.Pod, nodeMap api.NodeToStatusMap) (string, api.NominatingMode, *api.Status) {
// First, check if the pod spec node name is empty. If so, pass!
podSpecNodeName := pod.Spec().GetNodeName()
if len(podSpecNodeName) == 0 {
return "", 0, nil
}
m := nodeMap.NodeToStatusMap()
if m == nil {
return "", 0, nil
}
// If nominatedNodeName is schedulable, pass!
if val, ok := m[podSpecNodeName]; ok {
if val.Code == api.StatusCodeSuccess {
return podSpecNodeName, api.ModeOverride, nil
}
}
// Otherwise, this is unschedulableAndUnresolvable, so note the reason.
return podSpecNodeName, api.ModeNoop, &api.Status{
Code: api.StatusCodeUnschedulableAndUnresolvable,
Reason: podSpecNodeName + " is unschedulable",
}
}
Binary file modified guest/testdata/filter/main.wasm
Binary file not shown.
3 changes: 2 additions & 1 deletion kubernetes/proto/nodetostatus/api_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ func (g *guest) filter(ctx context.Context) *framework.Status {
func (g *guest) postFilter(ctx context.Context) (*framework.PostFilterResult, *framework.Status) {
defer g.out.Reset()
callStack := g.callStack

if err := g.postfilterFn.CallWithStack(ctx, callStack); err != nil {
return nil, framework.AsStatus(decorateError(g.out, guestExportPostFilter, err))
}
Expand Down
5 changes: 2 additions & 3 deletions scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const (
k8sSchedulerGetConfig = "get_config"
k8sSchedulerResultClusterEvents = "result.cluster_events"
k8sSchedulerResultNodeNames = "result.node_names"
k8sSchedulerResultNominatedNodeName = "result.nominating_node_name"
k8sSchedulerResultNominatedNodeName = "result.nominated_node_name"
k8sSchedulerResultStatusReason = "result.status_reason"
)

Expand Down Expand Up @@ -196,7 +196,6 @@ func k8sApiPodFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
stack[0] = uint64(marshalIfUnderLimit(mod.Memory(), pod, buf, bufLimit))
}

// TODO: fix
func k8sApiNodeToStatusMapFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
buf := uint32(stack[0])
bufLimit := bufLimit(stack[1])
Expand Down Expand Up @@ -364,6 +363,7 @@ func k8sSchedulerResultStatusReasonFn(ctx context.Context, mod wazeroapi.Module,
func ConvertNodeToStatusMapToProtoMap(statusMap map[string]*framework.Status) *proto.NodeToStatusMap {
var protoMap proto.NodeToStatusMap
separator := ","
protoMap.NodeStatus = make(map[string]*proto.Status)

for key, status := range statusMap {
code := proto.StatusCode(int32(status.Code()))
Expand All @@ -374,6 +374,5 @@ func ConvertNodeToStatusMapToProtoMap(statusMap map[string]*framework.Status) *p
}
protoMap.NodeStatus[key] = protobufStatus
}

return &protoMap
}
1 change: 0 additions & 1 deletion scheduler/plugin/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package wasm

import (
wazeroapi "github.com/tetratelabs/wazero/api"

proto "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/nodetostatus"
)

Expand Down
2 changes: 1 addition & 1 deletion scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (pl *wasmPlugin) PostFilter(ctx context.Context, state *framework.CycleStat

// Add the stack to the go context so that the corresponding host function
// can look them up.
params := &stack{pod: pod}
params := &stack{pod: pod, nodeToStatusMap: filteredNodeStatusMap}
ctx = context.WithValue(ctx, stackKey{}, params)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
result, status = g.postFilter(ctx)
Expand Down
36 changes: 17 additions & 19 deletions scheduler/plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,30 +568,32 @@ func TestPostFilter(t *testing.T) {
args []string
globals map[string]int32
pod *v1.Pod
nodeName string
nodeToStatusMap map[string]*framework.Status
expectedResult *framework.PostFilterResult
expectedStatusCode framework.Code
expectedStatusMessage string
}{
{
name: "success: nodeName equals spec.NodeName",
args: []string{"test", "score"},
name: "success",
args: []string{"test", "postFilter"},
pod: test.PodSmall,
nodeName: test.PodSmall.Spec.NodeName,
nodeToStatusMap: map[string]*framework.Status{test.NodeSmallName: framework.NewStatus(framework.Success, "")},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "good-node", NominatingMode: framework.ModeOverride}},
expectedStatusCode: framework.Success,
},
{
name: "skipped: bad-node",
args: []string{"test", "score"},
pod: test.PodSmall,
nodeName: "bad-node",
expectedStatusCode: framework.Success,
name: "unschedulable",
args: []string{"test", "postFilter"},
pod: test.PodSmall,
nodeToStatusMap: map[string]*framework.Status{test.NodeSmallName: framework.NewStatus(framework.Unschedulable, "")},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "good-node", NominatingMode: framework.ModeNoop}},
expectedStatusMessage: "good-node is unschedulable",
expectedStatusCode: framework.UnschedulableAndUnresolvable,
},
{
name: "min statusCode",
guestURL: test.URLTestPostFilterFromGlobal,
pod: test.PodSmall,
nodeName: test.NodeSmall.Name,
globals: map[string]int32{"status_code": math.MinInt32},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: 0}},
expectedStatusCode: math.MinInt32,
Expand All @@ -600,7 +602,6 @@ func TestPostFilter(t *testing.T) {
name: "max statusCode",
guestURL: test.URLTestPostFilterFromGlobal,
pod: test.PodSmall,
nodeName: test.NodeSmall.Name,
globals: map[string]int32{"status_code": math.MaxInt32},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: 0}},
expectedStatusCode: math.MaxInt32,
Expand All @@ -609,7 +610,6 @@ func TestPostFilter(t *testing.T) {
name: "min nominatingMode",
guestURL: test.URLTestPostFilterFromGlobal,
pod: test.PodSmall,
nodeName: test.NodeSmall.Name,
globals: map[string]int32{"nominating_mode": math.MinInt32},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: math.MinInt32}},
expectedStatusCode: framework.Success,
Expand All @@ -618,7 +618,6 @@ func TestPostFilter(t *testing.T) {
name: "max nominatingMode",
guestURL: test.URLTestPostFilterFromGlobal,
pod: test.PodSmall,
nodeName: test.NodeSmall.Name,
globals: map[string]int32{"nominating_mode": math.MaxInt32},
expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: math.MaxInt32}},
expectedStatusCode: framework.Success,
Expand All @@ -627,7 +626,6 @@ func TestPostFilter(t *testing.T) {
name: "panic",
guestURL: test.URLErrorPanicOnPostFilter,
pod: test.PodSmall,
nodeName: test.NodeSmall.Name,
expectedStatusCode: framework.Error,
expectedStatusMessage: `wasm: postfilter error: panic!
wasm error: unreachable
Expand All @@ -654,16 +652,16 @@ wasm stack trace:
pl.SetGlobals(tc.globals)
}

result, status := p.(framework.PostFilterPlugin).PostFilter(ctx, nil, tc.pod, nil)
result, status := p.(framework.PostFilterPlugin).PostFilter(ctx, nil, tc.pod, tc.nodeToStatusMap)
if want, have := tc.expectedResult, result; !reflect.DeepEqual(want, have) {
t.Fatalf("unexpected result: want %v, have %v", want, have)
}
if want, have := tc.expectedStatusCode, status.Code(); want != have {
t.Fatalf("unexpected status code: want %v, have %v", want, have)
t.Fatalf("unexpected result: want %#v, have %#v", want.NominatingInfo, have.NominatingInfo)
}
if want, have := tc.expectedStatusMessage, status.Message(); want != have {
t.Fatalf("unexpected status message: want %v, have %v", want, have)
}
if want, have := tc.expectedStatusCode, status.Code(); want != have {
t.Fatalf("unexpected status code: want %v, have %v", want, have)
}
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion scheduler/test/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ var NodeReal = func() *v1.Node {
return &node
}()

var NodeSmallName = "good-node"

// NodeSmall is the smallest node that works with URLExampleFilterSimple.
var NodeSmall = &v1.Node{ObjectMeta: apimeta.ObjectMeta{Name: "good-node"}}
var NodeSmall = &v1.Node{ObjectMeta: apimeta.ObjectMeta{Name: NodeSmallName}}

//go:embed testdata/yaml/pod.yaml
var yamlPodReal string
Expand Down

0 comments on commit 323abb6

Please sign in to comment.