Skip to content

Commit

Permalink
support postfilter
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed Nov 23, 2023
1 parent 6de3af8 commit 6830418
Show file tree
Hide file tree
Showing 30 changed files with 566 additions and 50 deletions.
20 changes: 20 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ type FilterPlugin interface {
Filter(state CycleState, pod proto.Pod, nodeInfo NodeInfo) *Status
}

// PostFilterPlugin is a WebAssembly implementation of framework.PostFilterPlugin.
type PostFilterPlugin interface {
Plugin

PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatusMap) (nominatedNodeName string, nominatingMode NominatingMode, status *Status)
}

// NominatingMode is the Mode which is returned from PostFilter.
type NominatingMode int32

// These are predefined modes
const (
ModeNoop NominatingMode = iota
ModeOverride
)

// EnqueueExtensions is a WebAssembly implementation of framework.EnqueueExtensions.
type EnqueueExtensions interface {
EventsToRegister() []ClusterEvent
Expand Down Expand Up @@ -97,3 +113,7 @@ type NodeInfo interface {

Node() proto.Node
}

type NodeToStatusMap interface {
NodeToStatusMap() map[string]int
}
15 changes: 15 additions & 0 deletions guest/internal/imports/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package imports

import (
"encoding/json"
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
Expand Down Expand Up @@ -66,3 +67,17 @@ func Pod(updater func([]byte) error) error {
return k8sApiPod(ptr, limit)
}, updater)
}

func NodeToStatusMap() map[string]int {
// Wrap to avoid TinyGo 0.28: cannot use an exported function as value
jsonStr := mem.GetString(func(ptr uint32, limit mem.BufLimit) (len uint32) {
return k8sSchedulerNodeToStatusMap(ptr, limit)
})
byte := []byte(jsonStr)
var nodeToMap map[string]int
err := json.Unmarshal(byte, &nodeToMap)
if err != nil {
panic(err)
}
return nodeToMap
}
3 changes: 3 additions & 0 deletions guest/internal/imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ func k8sApiNodeName(ptr uint32, limit mem.BufLimit) (len uint32)
//go:wasmimport k8s.io/api pod
func k8sApiPod(ptr uint32, limit mem.BufLimit) (len uint32)

//go:wasmimport k8s.io/scheduler nodeToStatusMap
func k8sSchedulerNodeToStatusMap(ptr uint32, limit mem.BufLimit) (len uint32)

//go:wasmimport k8s.io/scheduler result.status_reason
func k8sSchedulerResultStatusReason(ptr, size uint32)
3 changes: 3 additions & 0 deletions guest/internal/imports/imports_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ func k8sApiNodeName(uint32, mem.BufLimit) (len uint32) { return }
// k8sApiPod is stubbed for compilation outside TinyGo.
func k8sApiPod(uint32, mem.BufLimit) (len uint32) { return }

// k8sSchedulerNodeToStatusMap is stubbed for compilation outside TinyGo.
func k8sSchedulerNodeToStatusMap(uint32, mem.BufLimit) (len uint32) { return }

// k8sSchedulerResultStatusReason is stubbed for compilation outside TinyGo.
func k8sSchedulerResultStatusReason(uint32, uint32) {}
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
)
Expand Down Expand Up @@ -51,6 +52,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(plugin)
}
Expand Down
22 changes: 22 additions & 0 deletions guest/postfilter/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

//go:wasmimport k8s.io/scheduler result.nominated_node_name
func setNominatedNodeNameResult(ptr, size uint32)
22 changes: 22 additions & 0 deletions guest/postfilter/imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build !tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

// setNominatedNodeNameResult is stubbed for compilation outside TinyGo.
func setNominatedNodeNameResult(uint32, uint32) {}
100 changes: 100 additions & 0 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postfilter exports an api.PostFilterPlugin to the host.
package postfilter

import (
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// postfilter is the current plugin assigned with SetPlugin.
var postfilter api.PostFilterPlugin

// SetPlugin should be called in `main` to assign an api.PostFilterPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := filterPlugin{}
// postfilter.SetPlugin(plugin)
// filter.SetPlugin(plugin)
// }
//
// type filterPlugin struct{}
//
// func (filterPlugin) PostFilter(state api.CycleState, pod proto.Pod, filteredNodeStatusMap internalpostfilter.NodeToStatusMap) (int32, status *api.Status) {
// // Write state you need on Filter
// }
//
// func (filterPlugin) Filter(state api.CycleState, pod api.Pod, nodeInfo api.NodeInfo) (status *api.Status) {
// var Filter int32
// // Derive Filter for the node name using state set on PreFilter!
// return Filter, nil
// }
func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
if postfilterPlugin == nil {
panic("nil postfilterPlugin")
}
postfilter = postfilterPlugin
plugin.MustSet(postfilterPlugin)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint64 = _postfilter

// _postfilter is only exported to the host.
//
//export postfilter
func _postfilter() uint64 { //nolint

if postfilter == nil { // Then, the user didn't define one.
// Unlike most plugins we always export postfilter so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
nominatedNodeName, nominatingMode, status := postfilter.PostFilter(cyclestate.Values, cyclestate.Pod, &nodeToStatusMap{})
ptr, size := mem.StringToPtr(nominatedNodeName)
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(nominatedNodeName) // until ptr is no longer needed.

return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status))
}

type nodeToStatusMap struct {
statusMap map[string]int
}

func (n *nodeToStatusMap) NodeToStatusMap() map[string]int {
return n.lazyNodeToStatusMap()
}

// lazyNodeToStatusMap returns NodeToStatusMap from imports.NodeToStatusMap.
func (n *nodeToStatusMap) lazyNodeToStatusMap() map[string]int {
nodeMap := imports.NodeToStatusMap()
n.statusMap = nodeMap
return n.statusMap
}
2 changes: 1 addition & 1 deletion guest/prefilter/prefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
//
// type filterPlugin struct{}
//
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod, nodeList proto.NodeList) {
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod) (nodeNames []string, status *Status) {
// // Write state you need on Filter
// }
//
Expand Down
21 changes: 20 additions & 1 deletion guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
Expand Down Expand Up @@ -56,6 +57,7 @@ func main() {
enqueue.SetPlugin(plugin)
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
prescore.SetPlugin(plugin)
score.SetPlugin(plugin)
}
Expand Down Expand Up @@ -109,10 +111,25 @@ func (statePlugin) Filter(state api.CycleState, pod proto.Pod, _ api.NodeInfo) (
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
func (statePlugin) PostFilter(state api.CycleState, pod proto.Pod, _ api.NodeToStatusMap) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from filter")
}
mustNotScoreState(state)
if val, ok := state.Read(preFilterStateKey); !ok {
panic("didn't propagate post-filter state from pre-filter")
} else if _, ok = val.(preFilterStateVal)["filter"]; !ok {
panic("filter value lost propagating from post-filter")
} else {
val.(preFilterStateVal)["postfilter"] = struct{}{}
}
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-filter")
}
mustFilterState(state)
if _, ok := state.Read(preScoreStateKey); ok {
panic("didn't reset score state on pre-score")
Expand Down Expand Up @@ -153,5 +170,7 @@ func mustFilterState(state api.CycleState) {
panic("didn't propagate state from pre-filter")
} else if _, ok = val.(preFilterStateVal)["filter"]; !ok {
panic("filter value lost propagating from pre-score")
} else if _, ok = val.(preFilterStateVal)["postfilter"]; !ok {
panic("postfilter value lost propagating from pre-score")
}
}
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
37 changes: 37 additions & 0 deletions guest/testdata/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
)

type extensionPoints interface {
api.PreFilterPlugin
api.FilterPlugin
api.PostFilterPlugin
}

func main() {
Expand All @@ -40,10 +42,13 @@ func main() {
plugin = filterPlugin{}
case "preFilter":
plugin = preFilterPlugin{}
case "postFilter":
plugin = postFilterPlugin{}
}
}
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
}

// noopPlugin doesn't do anything, except evaluate each parameter.
Expand All @@ -62,6 +67,13 @@ func (noopPlugin) Filter(state api.CycleState, pod proto.Pod, nodeInfo api.NodeI
return
}

func (noopPlugin) PostFilter(state api.CycleState, pod proto.Pod, nodeMap api.NodeToStatusMap) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
_, _ = state.Read("ok")
_ = pod.Spec()
_ = nodeMap.NodeToStatusMap()
return
}

// preFilterPlugin schedules a node if its name equals its pod spec.
type preFilterPlugin struct{ noopPlugin }

Expand Down Expand Up @@ -96,3 +108,28 @@ func (filterPlugin) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.NodeInf
Reason: podSpecNodeName + " != " + nodeName,
}
}

type postFilterPlugin struct{ noopPlugin }

func (postFilterPlugin) PostFilter(_ api.CycleState, pod proto.Pod, nodeMap api.NodeToStatusMap) (string, api.NominatingMode, *api.Status) {
// First, check if the pod spec node name is empty. If so, pass!
podSpecNodeName := pod.Spec().GetNodeName()
if len(podSpecNodeName) == 0 {
return "", 0, nil
}
m := nodeMap.NodeToStatusMap()
if m == nil {
return "", 0, nil
}
// If nominatedNodeName is schedulable, pass!
if val, ok := m[podSpecNodeName]; ok {
if val == 0 {
return podSpecNodeName, api.ModeOverride, nil
}
}
// Otherwise, this is unschedulableAndUnresolvable, so note the reason.
return podSpecNodeName, api.ModeNoop, &api.Status{
Code: api.StatusCodeUnschedulableAndUnresolvable,
Reason: podSpecNodeName + " is unschedulable",
}
}
Binary file modified guest/testdata/filter/main.wasm
Binary file not shown.
5 changes: 5 additions & 0 deletions internal/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func RunAll(ctx context.Context, t Testing, plugin framework.Plugin, pod *v1.Pod
RequireSuccess(t, s)
}

if postfilterP, ok := plugin.(framework.PostFilterPlugin); ok {
_, s = postfilterP.PostFilter(ctx, nil, pod, nil)
RequireSuccess(t, s)
}

if prescoreP, ok := plugin.(framework.PreScorePlugin); ok {
s = prescoreP.PreScore(ctx, nil, pod, []*v1.Node{ni.Node()})
RequireSuccess(t, s)
Expand Down
2 changes: 1 addition & 1 deletion internal/e2e/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/gcfg.v1 v1.2.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
Loading

0 comments on commit 6830418

Please sign in to comment.