Skip to content

Commit

Permalink
support postfilter
Browse files Browse the repository at this point in the history
  • Loading branch information
Gekko0114 committed Oct 29, 2023
1 parent 6708ed4 commit f5b92e2
Show file tree
Hide file tree
Showing 18 changed files with 418 additions and 41 deletions.
9 changes: 9 additions & 0 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ type FilterPlugin interface {
Filter(state CycleState, pod proto.Pod, nodeInfo NodeInfo) *Status
}

// PostFilterPlugin is a WebAssembly implementation of framework.PostFilterPlugin.
type PostFilterPlugin interface {
Plugin

PostFilter(state CycleState, pod proto.Pod, filteredNodeStatusMap NodeToStatusMap) (nominatedNodeName string, nominatingMode int32, status *Status)
}

// EnqueueExtensions is a WebAssembly implementation of framework.EnqueueExtensions.
type EnqueueExtensions interface {
EventsToRegister() []ClusterEvent
Expand Down Expand Up @@ -97,3 +104,5 @@ type NodeInfo interface {

Node() proto.Node
}

type NodeToStatusMap map[string]*Status
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/score"
Expand Down Expand Up @@ -51,6 +52,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.FilterPlugin); ok {
filter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PostFilterPlugin); ok {
postfilter.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreScorePlugin); ok {
prescore.SetPlugin(plugin)
}
Expand Down
22 changes: 22 additions & 0 deletions guest/postfilter/imports.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

//go:wasmimport k8s.io/scheduler result.nominated_node_name
func setNominatedNodeNameResult(ptr, size uint32)
22 changes: 22 additions & 0 deletions guest/postfilter/imports_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//go:build !tinygo.wasm

/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package postfilter

// setNominatedNodeNameResult is stubbed for compilation outside TinyGo.
func setNominatedNodeNameResult(uint32, uint32) {}
87 changes: 87 additions & 0 deletions guest/postfilter/postfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package postfilter exports an api.PostFilterPlugin to the host.
package postfilter

import (
"runtime"
"unsafe"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
)

// postfilter is the current plugin assigned with SetPlugin.
var postfilter api.PostFilterPlugin

// SetPlugin should be called in `main` to assign an api.PostFilterPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := filterPlugin{}
// postfilter.SetPlugin(plugin)
// filter.SetPlugin(plugin)
// }
//
// type filterPlugin struct{}
//
// func (filterPlugin) PostFilter(state api.CycleState, pod proto.Pod, filteredNodeStatusMap internalpostfilter.NodeToStatusMap) (int32, status *api.Status) {
// // Write state you need on Filter
// }
//
// func (filterPlugin) Filter(state api.CycleState, pod api.Pod, nodeInfo api.NodeInfo) (status *api.Status) {
// var Filter int32
// // Derive Filter for the node name using state set on PreFilter!
// return Filter, nil
// }
func SetPlugin(postfilterPlugin api.PostFilterPlugin) {
if postfilterPlugin == nil {
panic("nil postfilterPlugin")
}
postfilter = postfilterPlugin
plugin.MustSet(postfilterPlugin)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint64 = _postfilter

// _postfilter is only exported to the host.
//
//export postfilter
func _postfilter() uint64 { //nolint

if postfilter == nil { // Then, the user didn't define one.
// Unlike most plugins we always export postfilter so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

// The parameters passed are lazy with regard to host functions. This means
// a no-op plugin should not have any unmarshal penalty.
nominatedNodeName, nominatingMode, status := postfilter.PostFilter(CycleState, Pod, nil)

cString := []byte(nominatedNodeName)
if cString != nil {
ptr := uint32(uintptr(unsafe.Pointer(&cString[0])))
size := uint32(len(cString))
setNominatedNodeNameResult(ptr, size)
runtime.KeepAlive(cString) // until ptr is no longer needed.
}

return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status))
}
2 changes: 1 addition & 1 deletion guest/prefilter/prefilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
//
// type filterPlugin struct{}
//
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod, nodeList proto.NodeList) {
// func (filterPlugin) PreFilter(state api.CycleState, pod proto.Pod) (nodeNames []string, status *Status) {
// // Write state you need on Filter
// }
//
Expand Down
70 changes: 48 additions & 22 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,25 @@ import (
)

const (
guestExportMemory = "memory"
guestExportEnqueue = "enqueue"
guestExportPreFilter = "prefilter"
guestExportFilter = "filter"
guestExportPreScore = "prescore"
guestExportScore = "score"
guestExportMemory = "memory"
guestExportEnqueue = "enqueue"
guestExportPreFilter = "prefilter"
guestExportFilter = "filter"
guestExportPostFilter = "postfilter"
guestExportPreScore = "prescore"
guestExportScore = "score"
)

type guest struct {
guest wazeroapi.Module
out *bytes.Buffer
enqueueFn wazeroapi.Function
prefilterFn wazeroapi.Function
filterFn wazeroapi.Function
prescoreFn wazeroapi.Function
scoreFn wazeroapi.Function
callStack []uint64
guest wazeroapi.Module
out *bytes.Buffer
enqueueFn wazeroapi.Function
prefilterFn wazeroapi.Function
filterFn wazeroapi.Function
postfilterFn wazeroapi.Function
prescoreFn wazeroapi.Function
scoreFn wazeroapi.Function
callStack []uint64
}

func compileGuest(ctx context.Context, runtime wazero.Runtime, guestBin []byte) (guest wazero.CompiledModule, err error) {
Expand Down Expand Up @@ -82,14 +84,15 @@ func (pl *wasmPlugin) newGuest(ctx context.Context) (*guest, error) {
callStack := make([]uint64, 1)

return &guest{
guest: g,
out: &out,
enqueueFn: g.ExportedFunction(guestExportEnqueue),
prefilterFn: g.ExportedFunction(guestExportPreFilter),
filterFn: g.ExportedFunction(guestExportFilter),
prescoreFn: g.ExportedFunction(guestExportPreScore),
scoreFn: g.ExportedFunction(guestExportScore),
callStack: callStack,
guest: g,
out: &out,
enqueueFn: g.ExportedFunction(guestExportEnqueue),
prefilterFn: g.ExportedFunction(guestExportPreFilter),
filterFn: g.ExportedFunction(guestExportFilter),
postfilterFn: g.ExportedFunction(guestExportPostFilter),
prescoreFn: g.ExportedFunction(guestExportPreScore),
scoreFn: g.ExportedFunction(guestExportScore),
callStack: callStack,
}, nil
}

Expand Down Expand Up @@ -131,6 +134,24 @@ func (g *guest) filter(ctx context.Context) *framework.Status {
return framework.NewStatus(framework.Code(statusCode), statusReason)
}

// postFilter calls guestExportPostFilter.
func (g *guest) postFilter(ctx context.Context) (*framework.PostFilterResult, *framework.Status) {
defer g.out.Reset()
callStack := g.callStack

if err := g.postfilterFn.CallWithStack(ctx, callStack); err != nil {
return nil, framework.AsStatus(decorateError(g.out, guestExportPostFilter, err))
}
nominatedNodeName := paramsFromContext(ctx).resultNominatedNodeName
nominatingMode := framework.NominatingMode(int32(callStack[0] >> 32))

statusCode := int32(callStack[0])
statusReason := paramsFromContext(ctx).resultStatusReason

nominatingInfo := &framework.NominatingInfo{NominatedNodeName: nominatedNodeName, NominatingMode: nominatingMode}
return &framework.PostFilterResult{NominatingInfo: nominatingInfo}, framework.NewStatus(framework.Code(statusCode), statusReason)
}

// preScore calls guestExportPreScore.
func (g *guest) preScore(ctx context.Context) *framework.Status {
defer g.out.Reset()
Expand Down Expand Up @@ -188,6 +209,11 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
}
e |= iFilterPlugin
case guestExportPostFilter:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i64}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i64)", name)
}
e |= iPostFilterPlugin
case guestExportPreScore:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
Expand Down
54 changes: 38 additions & 16 deletions scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,23 @@ import (
)

const (
i32 = wazeroapi.ValueTypeI32
i64 = wazeroapi.ValueTypeI64
k8sApi = "k8s.io/api"
k8sApiNode = "node"
k8sApiNodeList = "nodeList"
k8sApiNodeName = "nodeName"
k8sApiPod = "pod"
k8sKlog = "k8s.io/klog"
k8sKlogLog = "log"
k8sKlogLogs = "logs"
k8sKlogSeverity = "severity"
k8sScheduler = "k8s.io/scheduler"
k8sSchedulerGetConfig = "get_config"
k8sSchedulerResultClusterEvents = "result.cluster_events"
k8sSchedulerResultNodeNames = "result.node_names"
k8sSchedulerResultStatusReason = "result.status_reason"
i32 = wazeroapi.ValueTypeI32
i64 = wazeroapi.ValueTypeI64
k8sApi = "k8s.io/api"
k8sApiNode = "node"
k8sApiNodeList = "nodeList"
k8sApiNodeName = "nodeName"
k8sApiPod = "pod"
k8sKlog = "k8s.io/klog"
k8sKlogLog = "log"
k8sKlogLogs = "logs"
k8sKlogSeverity = "severity"
k8sScheduler = "k8s.io/scheduler"
k8sSchedulerGetConfig = "get_config"
k8sSchedulerResultClusterEvents = "result.cluster_events"
k8sSchedulerResultNodeNames = "result.node_names"
k8sSchedulerResultNominatedNodeName = "result.nominating_node_name"
k8sSchedulerResultStatusReason = "result.status_reason"
)

func instantiateHostApi(ctx context.Context, runtime wazero.Runtime) (wazeroapi.Module, error) {
Expand Down Expand Up @@ -90,6 +91,9 @@ func instantiateHostScheduler(ctx context.Context, runtime wazero.Runtime, guest
WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sSchedulerResultNodeNamesFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerResultNodeNames).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sSchedulerResultNominatedNodeNameFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerResultNominatedNodeName).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sSchedulerResultStatusReasonFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerResultStatusReason).
Instantiate(ctx)
Expand Down Expand Up @@ -128,6 +132,9 @@ type stack struct {
// resultNodeNames is returned by guest.prefilterFn
resultNodeNames []string

// resultNominatedNodeName is returned by guest.postfilterFn
resultNominatedNodeName string

// reason returned by all guest exports except guest.enqueueFn
//
// It is a field to avoid compiler-specific malloc/free functions, and to
Expand Down Expand Up @@ -303,6 +310,21 @@ func k8sSchedulerResultNodeNamesFn(ctx context.Context, mod wazeroapi.Module, st
paramsFromContext(ctx).resultNodeNames = nodeNames
}

// k8sSchedulerResultNominatedNodeNameFn is a function used by the wasm guest to set the
// nominated node name result from guestExportPostFilter.
func k8sSchedulerResultNominatedNodeNameFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
buf := uint32(stack[0])
bufLen := uint32(stack[1])

var nominatedNodeName string
if b, ok := mod.Memory().Read(buf, bufLen); !ok {
panic("out of memory reading nominatedNodeName")
} else {
nominatedNodeName = string(b)
}
paramsFromContext(ctx).resultNominatedNodeName = nominatedNodeName
}

// k8sSchedulerResultStatusReasonFn is a function used by the wasm guest to set the
// framework.Status reason result from all functions.
func k8sSchedulerResultStatusReasonFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
Expand Down
1 change: 1 addition & 0 deletions scheduler/plugin/mask.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func maskInterfaces(plugin *wasmPlugin) (framework.Plugin, error) {
iPreBindPlugin |
iPostBindPlugin)

// TODO: ask about PostFilter
switch i {
case iFilterPlugin:
type filter interface {
Expand Down
13 changes: 11 additions & 2 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,22 @@ func (pl *wasmPlugin) Filter(ctx context.Context, _ *framework.CycleState, pod *
var _ framework.PostFilterPlugin = (*wasmPlugin)(nil)

// PostFilter implements the same method as documented on framework.PostFilterPlugin.
func (pl *wasmPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
func (pl *wasmPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (result *framework.PostFilterResult, status *framework.Status) {
// We implement PostFilterPlugin with FilterPlugin, even when the guest doesn't.
if pl.guestInterfaces&iPostFilterPlugin == 0 {
return nil, nil // unimplemented
}

panic("TODO: scheduling: PostFilter")
// Add the stack to the go context so that the corresponding host function
// can look them up.
params := &stack{pod: pod}
ctx = context.WithValue(ctx, stackKey{}, params)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
result, status = g.postFilter(ctx)
}); err != nil {
status = framework.AsStatus(err)
}
return
}

var _ framework.PreScorePlugin = (*wasmPlugin)(nil)
Expand Down
Loading

0 comments on commit f5b92e2

Please sign in to comment.