Skip to content

Commit

Permalink
Merge pull request #75 from Gekko0114/postfilter
Browse files Browse the repository at this point in the history
support postfilter plugin
  • Loading branch information
k8s-ci-robot authored Nov 26, 2023
2 parents 047774e + e3f0e74 commit 3c8342a
Show file tree
Hide file tree
Showing 30 changed files with 574 additions and 59 deletions.
23 changes: 23 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ type FilterPlugin interface {
Filter(state CycleState, pod proto.Pod, nodeInfo NodeInfo) *Status
}

// PostFilterPlugin is a WebAssembly implementation of framework.PostFilterPlugin.
type PostFilterPlugin interface {
Plugin

PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatus) (nominatedNodeName string, nominatingMode NominatingMode, status *Status)
}

// NominatingMode is the Mode which is returned from PostFilter.
type NominatingMode int32

// These are predefined modes
const (
ModeNoop NominatingMode = iota
ModeOverride
)

// EnqueueExtensions is a WebAssembly implementation of framework.EnqueueExtensions.
type EnqueueExtensions interface {
EventsToRegister() []ClusterEvent
Expand Down Expand Up @@ -111,3 +127,10 @@ type NodeInfo interface {

Node() proto.Node
}

// NodeToStatus contains which Node got which status during the scheduling cycle.
type NodeToStatus interface {
// NodeToStatus returns a map
// which is keyed by the node name and valued by the status code.
Map() map[string]StatusCode
}
15 changes: 15 additions & 0 deletions guest/internal/imports/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package imports

import (
"encoding/json"
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
Expand Down Expand Up @@ -66,3 +67,17 @@ func Pod(updater func([]byte) error) error {
return k8sApiPod(ptr, limit)
}, updater)
}

func NodeToStatusMap() map[string]api.StatusCode {
// Wrap to avoid TinyGo 0.28: cannot use an exported function as value
jsonStr := mem.GetString(func(ptr uint32, limit mem.BufLimit) (len uint32) {
return k8sSchedulerNodeToStatusMap(ptr, limit)
})
byte := []byte(jsonStr)
var nodeToMap map[string]api.StatusCode
err := json.Unmarshal(byte, &nodeToMap)
if err != nil {
panic(err)
}
return nodeToMap
}
3 changes: 3 additions & 0 deletions guest/internal/imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ func k8sApiNodeName(ptr uint32, limit mem.BufLimit) (len uint32)
//go:wasmimport k8s.io/api pod
func k8sApiPod(ptr uint32, limit mem.BufLimit) (len uint32)

//go:wasmimport k8s.io/scheduler nodeToStatusMap
func k8sSchedulerNodeToStatusMap(ptr uint32, limit mem.BufLimit) (len uint32)

//go:wasmimport k8s.io/scheduler result.status_reason
func k8sSchedulerResultStatusReason(ptr, size uint32)
3 changes: 3 additions & 0 deletions guest/internal/imports/imports_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ func k8sApiNodeName(uint32, mem.BufLimit) (len uint32) { return }
// k8sApiPod is stubbed for compilation outside TinyGo.
func k8sApiPod(uint32, mem.BufLimit) (len uint32) { return }

// k8sSchedulerNodeToStatusMap is stubbed for compilation outside TinyGo.
func k8sSchedulerNodeToStatusMap(uint32, mem.BufLimit) (len uint32) { return }

// k8sSchedulerResultStatusReason is stubbed for compilation outside TinyGo.
func k8sSchedulerResultStatusReason(uint32, uint32) {}
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
Expand Down Expand Up @@ -53,6 +54,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(plugin)
}
Expand Down
22 changes: 22 additions & 0 deletions guest/postfilter/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

//go:wasmimport k8s.io/scheduler result.nominated_node_name
func setNominatedNodeNameResult(ptr, size uint32)
22 changes: 22 additions & 0 deletions guest/postfilter/imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build !tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

// setNominatedNodeNameResult is stubbed for compilation outside TinyGo.
func setNominatedNodeNameResult(uint32, uint32) {}
100 changes: 100 additions & 0 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postfilter exports an api.PostFilterPlugin to the host.
package postfilter

import (
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// postfilter is the current plugin assigned with SetPlugin.
var postfilter api.PostFilterPlugin

// SetPlugin should be called in `main` to assign an api.PostFilterPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := filterPlugin{}
// postfilter.SetPlugin(plugin)
// filter.SetPlugin(plugin)
// }
//
// type filterPlugin struct{}
//
// func (filterPlugin) PostFilter(state api.CycleState, pod proto.Pod, filteredNodeStatusMap api.NodeToStatus) (int32, status *api.Status) {
// // Write state you need on Filter
// }
//
// func (filterPlugin) Filter(state api.CycleState, pod api.Pod, nodeInfo api.NodeInfo) (status *api.Status) {
// var Filter int32
// // Derive Filter for the node name using state set on PreFilter!
// return Filter, nil
// }
func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
if postfilterPlugin == nil {
panic("nil postfilterPlugin")
}
postfilter = postfilterPlugin
plugin.MustSet(postfilterPlugin)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint64 = _postfilter

// _postfilter is only exported to the host.
//
//export postfilter
func _postfilter() uint64 { //nolint

if postfilter == nil { // Then, the user didn't define one.
// Unlike most plugins we always export postfilter so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
nominatedNodeName, nominatingMode, status := postfilter.PostFilter(cyclestate.Values, cyclestate.Pod, &nodeToStatus{})
ptr, size := mem.StringToPtr(nominatedNodeName)
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(nominatedNodeName) // until ptr is no longer needed.

return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status))
}

type nodeToStatus struct {
statusMap map[string]api.StatusCode
}

func (n *nodeToStatus) Map() map[string]api.StatusCode {
return n.lazyNodeToStatusMap()
}

// lazyNodeToStatusMap returns NodeToStatusMap from imports.NodeToStatusMap.
func (n *nodeToStatus) lazyNodeToStatusMap() map[string]api.StatusCode {
nodeMap := imports.NodeToStatusMap()
n.statusMap = nodeMap
return n.statusMap
}
2 changes: 1 addition & 1 deletion guest/prefilter/prefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
//
// type filterPlugin struct{}
//
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod, nodeList proto.NodeList) {
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod) (nodeNames []string, status *Status) {
// // Write state you need on Filter
// }
//
Expand Down
23 changes: 19 additions & 4 deletions guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/bind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
Expand Down Expand Up @@ -58,6 +59,7 @@ func main() {
enqueue.SetPlugin(plugin)
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
prescore.SetPlugin(plugin)
score.SetPlugin(plugin)
prebind.SetPlugin(plugin)
Expand Down Expand Up @@ -89,11 +91,11 @@ type preScoreStateVal map[string]any
type preBindStateVal map[string]any

func (statePlugin) PreFilter(state api.CycleState, pod proto.Pod) (nodeNames []string, status *api.Status) {
if nextPodSpec := pod.Spec(); unsafe.Pointer(nextPodSpec) == unsafe.Pointer(podSpec) {
nextPodSpec := pod.Spec()
if unsafe.Pointer(nextPodSpec) == unsafe.Pointer(podSpec) {
panic("didn't reset pod on pre-filter")
} else {
podSpec = nextPodSpec
}
podSpec = nextPodSpec
mustNotScoreState(state)
if _, ok := state.Read(preFilterStateKey); ok {
panic("didn't reset filter state on pre-filter")
Expand All @@ -116,10 +118,23 @@ func (statePlugin) Filter(state api.CycleState, pod proto.Pod, _ api.NodeInfo) (
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
func (statePlugin) PostFilter(state api.CycleState, pod proto.Pod, _ api.NodeToStatus) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from filter")
}
mustNotScoreState(state)
if val, ok := state.Read(preFilterStateKey); !ok {
panic("didn't propagate state from pre-filter")
} else if _, ok = val.(preFilterStateVal)["filter"]; !ok {
panic("filter value lost propagating from filter")
}
return
}

func (statePlugin) PreScore(state api.CycleState, pod proto.Pod, _ proto.NodeList) *api.Status {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-filter")
}
mustFilterState(state)
if _, ok := state.Read(preScoreStateKey); ok {
panic("didn't reset score state on pre-score")
Expand Down
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
37 changes: 37 additions & 0 deletions guest/testdata/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prefilter"
)

type extensionPoints interface {
api.PreFilterPlugin
api.FilterPlugin
api.PostFilterPlugin
}

func main() {
Expand All @@ -40,10 +42,13 @@ func main() {
plugin = filterPlugin{}
case "preFilter":
plugin = preFilterPlugin{}
case "postFilter":
plugin = postFilterPlugin{}
}
}
prefilter.SetPlugin(plugin)
filter.SetPlugin(plugin)
postfilter.SetPlugin(plugin)
}

// noopPlugin doesn't do anything, except evaluate each parameter.
Expand All @@ -62,6 +67,13 @@ func (noopPlugin) Filter(state api.CycleState, pod proto.Pod, nodeInfo api.NodeI
return
}

func (noopPlugin) PostFilter(state api.CycleState, pod proto.Pod, nodeMap api.NodeToStatus) (nominatedNodeName string, nominatingMode api.NominatingMode, status *api.Status) {
_, _ = state.Read("ok")
_ = pod.Spec()
_ = nodeMap.Map()
return
}

// preFilterPlugin schedules a node if its name equals its pod spec.
type preFilterPlugin struct{ noopPlugin }

Expand Down Expand Up @@ -96,3 +108,28 @@ func (filterPlugin) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.NodeInf
Reason: podSpecNodeName + " != " + nodeName,
}
}

type postFilterPlugin struct{ noopPlugin }

func (postFilterPlugin) PostFilter(_ api.CycleState, pod proto.Pod, nodeMap api.NodeToStatus) (string, api.NominatingMode, *api.Status) {
// First, check if the pod spec node name is empty. If so, pass!
podSpecNodeName := pod.Spec().GetNodeName()
if len(podSpecNodeName) == 0 {
return "", 0, nil
}
m := nodeMap.Map()
if m == nil {
return "", 0, nil
}
// If nominatedNodeName is schedulable, pass!
if val, ok := m[podSpecNodeName]; ok {
if val == 0 {
return podSpecNodeName, api.ModeOverride, nil
}
}
// Otherwise, this is unschedulableAndUnresolvable, so note the reason.
return podSpecNodeName, api.ModeNoop, &api.Status{
Code: api.StatusCodeUnschedulableAndUnresolvable,
Reason: podSpecNodeName + " is unschedulable",
}
}
Binary file modified guest/testdata/filter/main.wasm
Binary file not shown.
5 changes: 5 additions & 0 deletions internal/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func RunAll(ctx context.Context, t Testing, plugin framework.Plugin, pod *v1.Pod
RequireSuccess(t, s)
}

if postfilterP, ok := plugin.(framework.PostFilterPlugin); ok {
_, s = postfilterP.PostFilter(ctx, nil, pod, nil)
RequireSuccess(t, s)
}

if prescoreP, ok := plugin.(framework.PreScorePlugin); ok {
s = prescoreP.PreScore(ctx, nil, pod, []*v1.Node{ni.Node()})
RequireSuccess(t, s)
Expand Down
Loading

0 comments on commit 3c8342a

Please sign in to comment.