Skip to content

Commit

Permalink
Merge pull request #110 from kubernetes-sigs/permit
Browse files Browse the repository at this point in the history
Support permit extension point
  • Loading branch information
k8s-ci-robot authored Jul 10, 2024
2 parents 45428bb + 0bb5a81 commit fb4c8bc
Show file tree
Hide file tree
Showing 19 changed files with 366 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ examples/advanced/main.wasm: examples/advanced/main.go

.PHONY: build-tinygo
build-tinygo: examples/nodenumber/main.wasm examples/advanced/main.wasm guest/testdata/cyclestate/main.wasm guest/testdata/filter/main.wasm guest/testdata/score/main.wasm \
guest/testdata/bind/main.wasm guest/testdata/reserve/main.wasm guest/testdata/handle/main.wasm
guest/testdata/bind/main.wasm guest/testdata/reserve/main.wasm guest/testdata/handle/main.wasm guest/testdata/permit/main.wasm

%/main-debug.wasm: %/main.go
@(cd $(@D); tinygo build -o main-debug.wasm -gc=custom -tags=custommalloc -scheduler=none -target=wasi .)
Expand Down
11 changes: 11 additions & 0 deletions RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ result, not a parameter. The only impact would be to new plugins or those
ported to WebAssembly. We do not expect limiting the scores to two billion
above the valid range to be a practical concern for these authors.

### Why does the Permit function return a uint32 representing milliseconds for the timeout, not `time.Duration`?

`framework.PermitPlugin` returns `time.Duration` to represent the timeout.
`time.Duration` is int64 underneath and 1 time.Duration represents 1 nanosecond.

Given the scheduling throughput in the upstream kube-scheduler is around 300 pods/s,
that is, 3+ milliseconds per pod,
we consider a millisecond-level timeout with uint32 to be sufficiently fine-grained.

Also, tha maximum timeout is around 24 days, which also should be large enough.

## Why do we return a non-status, second numeric result as an i32?

Most compilers that target WebAssembly Core Specification 1.0, the only REC
Expand Down
12 changes: 11 additions & 1 deletion guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package api

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
)

// CycleState is a WebAssembly implementation of framework.CycleState.
//
Expand Down Expand Up @@ -122,6 +124,14 @@ type ReservePlugin interface {
Unreserve(state CycleState, p proto.Pod, nodeName string)
}

// PermitPlugin is a WebAssembly implementation of framework.PermitPlugin.
type PermitPlugin interface {
Plugin

// Note: This is uint32, not time.Duration. See /RATIONALE.md for why.
Permit(state CycleState, p proto.Pod, nodeName string) (status *Status, timeoutMilliSeconds uint32)
}

// PreBindPlugin is a WebAssembly implementation of framework.PreBindPlugin.
type PreBindPlugin interface {
Plugin
Expand Down
73 changes: 73 additions & 0 deletions guest/permit/permit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package permit exports an api.PermitPlugin to the host.
package permit

import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

// permit is the current plugin assigned with SetPlugin.
var permit api.PermitPlugin

// SetPlugin should be called in `main` to assign an api.PermitPlugin
// instance.
//
// For example:
//
// func main() {
// plugin := permitPlugin{}
// permit.SetPlugin(plugin)
// }
//
// type permitPlugin struct{}
//
// func (permitPlugin) Permit(state api.CycleState, p proto.Pod, nodeName string) (status *api.Status, timeout uint32)
// // Write state you need on Permit
// }
func SetPlugin(permitPlugin api.PermitPlugin) {
if permitPlugin == nil {
panic("nil permitPlugin")
}
permit = permitPlugin
plugin.MustSet(permit)
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint64 = _permit

// _permit is only exported to the host.
//
//export permit
func _permit() uint64 {
if permit == nil { // Then, the user didn't define one.
// Unlike most plugins we always export permit so that we can reset
// the cycle state: return success to avoid no-op overhead.
return 0
}

pod := cyclestate.Pod
nodeName := imports.NodeName()
status, timeout := permit.Permit(cyclestate.Values, pod, nodeName)

// Pack the score and status code into a single WebAssembly 1.0 compatible
// result
return (uint64(imports.StatusToCode(status)) << uint64(32)) | uint64(timeout)
}
4 changes: 4 additions & 0 deletions guest/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/permit"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postbind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
Expand Down Expand Up @@ -72,6 +73,9 @@ func Set(plugin api.Plugin) {
if plugin, ok := plugin.(api.ReservePlugin); ok {
reserve.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PermitPlugin); ok {
permit.SetPlugin(plugin)
}
if plugin, ok := plugin.(api.PreBindPlugin); ok {
prebind.SetPlugin(plugin)
}
Expand Down
7 changes: 7 additions & 0 deletions guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/bind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/permit"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postbind"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/prebind"
Expand Down Expand Up @@ -67,6 +68,7 @@ func main() {
score.SetPlugin(plugin)
scoreextensions.SetPlugin(plugin)
reserve.SetPlugin(plugin)
permit.SetPlugin(plugin)
prebind.SetPlugin(plugin)
bind.SetPlugin(plugin)
postbind.SetPlugin(plugin)
Expand Down Expand Up @@ -187,6 +189,11 @@ func (statePlugin) Unreserve(state api.CycleState, pod proto.Pod, nodeName strin
mustFilterState(state)
}

func (statePlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status, timeout uint32) {
mustFilterState(state)
return
}

func (statePlugin) PreBind(state api.CycleState, pod proto.Pod, _ string) (status *api.Status) {
if unsafe.Pointer(pod.Spec()) != unsafe.Pointer(podSpec) {
panic("didn't cache pod from pre-filter")
Expand Down
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
47 changes: 47 additions & 0 deletions guest/testdata/permit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"time"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/permit"
)

type extensionPoints interface {
api.PermitPlugin
}

func main() {
var plugin extensionPoints = permitPlugin{}
permit.SetPlugin(plugin)
}

type permitPlugin struct{}

func (permitPlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (*api.Status, uint32) {
status, timeout := api.StatusCodeSuccess, time.Duration(0)
if nodeName == "bad" {
status = api.StatusCodeError
} else if nodeName == "wait" {
status = api.StatusCodeWait
timeout = 10 * time.Second
}
return &api.Status{Code: status, Reason: "name is " + nodeName}, uint32(timeout.Milliseconds())
}
Binary file added guest/testdata/permit/main.wasm
Binary file not shown.
5 changes: 5 additions & 0 deletions internal/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ func RunAll(ctx context.Context, t Testing, plugin framework.Plugin, pod *v1.Pod
reserveP.Unreserve(ctx, nil, pod, ni.Node().Name)
}

if permitP, ok := plugin.(framework.PermitPlugin); ok {
s, _ = permitP.Permit(ctx, nil, pod, ni.Node().Name)
RequireSuccess(t, s)
}

if prebindP, ok := plugin.(framework.PreBindPlugin); ok {
s = prebindP.PreBind(ctx, nil, pod, "")
RequireSuccess(t, s)
Expand Down
2 changes: 1 addition & 1 deletion internal/e2e/scheduler_perf/scheduler_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ var (
Metrics: map[string]*labelValues{
"scheduler_framework_extension_point_duration_seconds": {
label: extensionPointsLabelName,
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "Reserve", "PreBind", "Bind", "PostBind"},
values: []string{"PreFilter", "Filter", "PostFilter", "PreScore", "Score", "Reserve", "Permit", "PreBind", "Bind", "PostBind"},
},
"scheduler_scheduling_attempt_duration_seconds": nil,
"scheduler_pod_scheduling_duration_seconds": nil,
Expand Down
24 changes: 24 additions & 0 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/tetratelabs/wazero"
wazeroapi "github.com/tetratelabs/wazero/api"
Expand All @@ -39,6 +40,7 @@ const (
guestExportNormalizeScore = "normalizescore"
guestExportReserve = "reserve"
guestExportUnreserve = "unreserve"
guestExportPermit = "permit"
guestExportPreBind = "prebind"
guestExportBind = "bind"
guestExportPostBind = "postbind"
Expand All @@ -56,6 +58,7 @@ type guest struct {
normalizescoreFn wazeroapi.Function
reserveFn wazeroapi.Function
unreserveFn wazeroapi.Function
permitFn wazeroapi.Function
prebindFn wazeroapi.Function
bindFn wazeroapi.Function
postbindFn wazeroapi.Function
Expand Down Expand Up @@ -108,6 +111,7 @@ func (pl *wasmPlugin) newGuest(ctx context.Context) (*guest, error) {
normalizescoreFn: g.ExportedFunction(guestExportNormalizeScore),
reserveFn: g.ExportedFunction(guestExportReserve),
unreserveFn: g.ExportedFunction(guestExportUnreserve),
permitFn: g.ExportedFunction(guestExportPermit),
prebindFn: g.ExportedFunction(guestExportPreBind),
bindFn: g.ExportedFunction(guestExportBind),
postbindFn: g.ExportedFunction(guestExportPostBind),
Expand Down Expand Up @@ -238,6 +242,21 @@ func (g *guest) unreserve(ctx context.Context) {
}
}

// permit calls guestExportPermit.
func (g *guest) permit(ctx context.Context) (*framework.Status, time.Duration) {
defer g.out.Reset()
callStack := g.callStack

if err := g.permitFn.CallWithStack(ctx, callStack); err != nil {
return framework.AsStatus(decorateError(g.out, guestExportPermit, err)), 0
}

statusCode := int32(callStack[0] >> 32)
timeoutMilliSeconds := uint32(callStack[0])
statusReason := paramsFromContext(ctx).resultStatusReason
return framework.NewStatus(framework.Code(statusCode), statusReason), time.Duration(timeoutMilliSeconds) * time.Millisecond
}

// preBind calls guestExportPreBind.
func (g *guest) preBind(ctx context.Context) *framework.Status {
defer g.out.Reset()
Expand Down Expand Up @@ -335,6 +354,11 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> ()", name)
}
e |= iReservePlugin
case guestExportPermit:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i64}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i64)", name)
}
e |= iPermitPlugin
case guestExportPreBind:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
Expand Down
14 changes: 9 additions & 5 deletions scheduler/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,16 @@ func (pl *wasmPlugin) PostBind(ctx context.Context, state *framework.CycleState,
var _ framework.PermitPlugin = (*wasmPlugin)(nil)

// Permit implements the same method as documented on framework.PermitPlugin.
func (pl *wasmPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
func (pl *wasmPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status, timeout time.Duration) {
params := &stack{pod: pod, nodeName: nodeName}
ctx = context.WithValue(ctx, stackKey{}, params)
if err := pl.pool.doWithSchedulingGuest(ctx, pod.UID, func(g *guest) {
status, timeout = g.permit(ctx)
}); err != nil {
status = framework.AsStatus(err)
}
_ = pl.pool.getForBinding(pod.UID)

// TODO: partially implemented for testing

return nil, 0
return
}

var _ framework.BindPlugin = (*wasmPlugin)(nil)
Expand Down
Loading

0 comments on commit fb4c8bc

Please sign in to comment.