Skip to content

Commit

Permalink
change the interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sanposhiho committed Jul 1, 2024
1 parent d04084d commit 996bc5c
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 80 deletions.
11 changes: 11 additions & 0 deletions RATIONALE.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ result, not a parameter. The only impact would be to new plugins or those
ported to WebAssembly. We do not expect limiting the scores to two billion
above the valid range to be a practical concern for these authors.

### Why does the Permit function return a uint32 representing milliseconds for the timeout, not `time.Duration`?

`framework.PermitPlugin` returns `time.Duration` to represent the timeout.
`time.Duration` is int64 underneath and 1 time.Duration represents 1 nanosecond.

Given the scheduling throughput in the upstream kube-scheduler is around 300 pods/s,
that is, 3+ milliseconds per pod,
we consider a millisecond-level timeout with uint32 to be sufficiently fine-grained.

Also, tha maximum timeout is around 24 days, which also should be large enough.

## Why do we return a non-status, second numeric result as an i32?

Most compilers that target WebAssembly Core Specification 1.0, the only REC
Expand Down
5 changes: 2 additions & 3 deletions guest/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package api

import (
"time"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
)

Expand Down Expand Up @@ -130,7 +128,8 @@ type ReservePlugin interface {
type PermitPlugin interface {
Plugin

Permit(state CycleState, p proto.Pod, nodeName string) (*Status, time.Duration)
// Note: This is uint32, not time.Duration. See /RATIONALE.md for why.
Permit(state CycleState, p proto.Pod, nodeName string) (status *Status, timeoutMilliSeconds uint32)
}

// PreBindPlugin is a WebAssembly implementation of framework.PreBindPlugin.
Expand Down
9 changes: 0 additions & 9 deletions guest/internal/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ func BytesToPtr(b []byte) (uint32, uint32) {
return uint32(uintptr(ptr)), uint32(len(b))
}

// Int64ToPtr returns a pointer for the given int64 number in a way compatible
// with WebAssembly numeric types.
// The returned pointer aliases the number hence it must be kept alive until ptr
// is no longer needed.
func Int64ToPtr(n int64) uint32 {
ptr := unsafe.Pointer(&n)
return uint32(uintptr(ptr))
}

// Update is for decoding values from memory. The updater doesn't keep a
// reference to the underlying bytes, so we don't need to copy them.
func Update(
Expand Down
6 changes: 0 additions & 6 deletions guest/permit/imports.go

This file was deleted.

6 changes: 0 additions & 6 deletions guest/permit/imports_stub.go

This file was deleted.

17 changes: 6 additions & 11 deletions guest/permit/permit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@
package permit

import (
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin"
)

Expand All @@ -42,7 +39,7 @@ var permit api.PermitPlugin
//
// type permitPlugin struct{}
//
// func (permitPlugin) Permit(state api.CycleState, p proto.Pod, nodeName string) (status *api.Status, timeout time.Duration)
// func (permitPlugin) Permit(state api.CycleState, p proto.Pod, nodeName string) (status *api.Status, timeout uint32)
// // Write state you need on Permit
// }
func SetPlugin(permitPlugin api.PermitPlugin) {
Expand All @@ -54,12 +51,12 @@ func SetPlugin(permitPlugin api.PermitPlugin) {
}

// prevent unused lint errors (lint is run with normal go).
var _ func() uint32 = _permit
var _ func() uint64 = _permit

// _permit is only exported to the host.
//
//export permit
func _permit() uint32 {
func _permit() uint64 {
if permit == nil { // Then, the user didn't define one.
// Unlike most plugins we always export permit so that we can reset
// the cycle state: return success to avoid no-op overhead.
Expand All @@ -70,9 +67,7 @@ func _permit() uint32 {
nodeName := imports.NodeName()
status, timeout := permit.Permit(cyclestate.Values, pod, nodeName)

ptr := mem.Int64ToPtr(int64(timeout))
setTimeoutResult(ptr)
runtime.KeepAlive(timeout) // untir ptr is no longer needed.

return imports.StatusToCode(status)
// Pack the score and status code into a single WebAssembly 1.0 compatible
// result
return (uint64(imports.StatusToCode(status)) << uint64(32)) | uint64(timeout)
}
3 changes: 1 addition & 2 deletions guest/testdata/cyclestate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package main

import (
"os"
"time"
"unsafe"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
Expand Down Expand Up @@ -190,7 +189,7 @@ func (statePlugin) Unreserve(state api.CycleState, pod proto.Pod, nodeName strin
mustFilterState(state)
}

func (statePlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status, timeout time.Duration) {
func (statePlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (status *api.Status, timeout uint32) {
mustFilterState(state)
return
}
Expand Down
Binary file modified guest/testdata/cyclestate/main.wasm
Binary file not shown.
4 changes: 2 additions & 2 deletions guest/testdata/permit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func main() {

type permitPlugin struct{}

func (permitPlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (*api.Status, time.Duration) {
func (permitPlugin) Permit(state api.CycleState, pod proto.Pod, nodeName string) (*api.Status, uint32) {
status, timeout := api.StatusCodeSuccess, time.Duration(0)
if nodeName == "bad" {
status = api.StatusCodeError
} else if nodeName == "wait" {
status = api.StatusCodeWait
timeout = 10 * time.Second
}
return &api.Status{Code: status, Reason: "name is " + nodeName}, timeout
return &api.Status{Code: status, Reason: "name is " + nodeName}, uint32(timeout.Milliseconds())
}
Binary file modified guest/testdata/permit/main.wasm
Binary file not shown.
10 changes: 5 additions & 5 deletions scheduler/plugin/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,10 @@ func (g *guest) permit(ctx context.Context) (*framework.Status, time.Duration) {
return framework.AsStatus(decorateError(g.out, guestExportPermit, err)), 0
}

timeout := paramsFromContext(ctx).resultTimeout
statusCode := int32(callStack[0])
statusCode := int32(callStack[0] >> 32)
timeoutMilliSeconds := int32(callStack[0])
statusReason := paramsFromContext(ctx).resultStatusReason
return framework.NewStatus(framework.Code(statusCode), statusReason), timeout
return framework.NewStatus(framework.Code(statusCode), statusReason), time.Duration(timeoutMilliSeconds) * time.Millisecond
}

// preBind calls guestExportPreBind.
Expand Down Expand Up @@ -355,8 +355,8 @@ func detectInterfaces(exportedFns map[string]wazeroapi.FunctionDefinition) (inte
}
e |= iReservePlugin
case guestExportPermit:
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i32}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i32)", name)
if len(f.ParamTypes()) != 0 || !bytes.Equal(f.ResultTypes(), []wazeroapi.ValueType{i64}) {
return 0, fmt.Errorf("wasm: guest exports the wrong signature for func[%s]. should be () -> (i64)", name)
}
e |= iPermitPlugin
case guestExportPreBind:
Expand Down
20 changes: 0 additions & 20 deletions scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package wasm
import (
"context"
"encoding/json"
"time"

"github.com/tetratelabs/wazero"
wazeroapi "github.com/tetratelabs/wazero/api"
Expand Down Expand Up @@ -50,7 +49,6 @@ const (
k8sSchedulerResultNominatedNodeName = "result.nominated_node_name"
k8sSchedulerResultStatusReason = "result.status_reason"
k8sSchedulerResultNormalizedScoreList = "result.normalized_score_list"
k8sSchedulerResultTimeout = "result.timeout"
k8sSchedulerHandleEventRecorderEventf = "handle.eventrecorder.eventf"
k8sSchedulerHandleRejectWaitingPod = "handle.reject_waiting_pod"
)
Expand Down Expand Up @@ -120,9 +118,6 @@ func instantiateHostScheduler(ctx context.Context, runtime wazero.Runtime, guest
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(host.k8sHandleRejectWaitingPodFn), []wazeroapi.ValueType{i32, i32, i32, i32}, []wazeroapi.ValueType{}).
WithParameterNames("buf", "buf_len").Export(k8sSchedulerHandleRejectWaitingPod).
NewFunctionBuilder().
WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sSchedulerResultTimeoutFn), []wazeroapi.ValueType{i32}, []wazeroapi.ValueType{}).
WithParameterNames("ptr").Export(k8sSchedulerResultTimeout).
Instantiate(ctx)
}

Expand Down Expand Up @@ -177,9 +172,6 @@ type stack struct {

// resultNormalizedScoreList is returned by guest.normalizedscoreFn
resultNormalizedScoreList framework.NodeScoreList

// resultTimeout is returned by guest.permitFn
resultTimeout time.Duration
}

func paramsFromContext(ctx context.Context) *stack {
Expand Down Expand Up @@ -437,18 +429,6 @@ func k8sSchedulerResultNormalizedScoreListFn(ctx context.Context, mod wazeroapi.
paramsFromContext(ctx).resultNormalizedScoreList = MapToNodeScoreList(nodeScoreList)
}

// k8sSchedulerResultTimeoutFn is a function used by the wasm guest to set the
// timeout result from guestExportPermit.
func k8sSchedulerResultTimeoutFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) {
ptr := uint32(stack[0])

n, ok := mod.Memory().ReadUint64Le(ptr)
if !ok {
panic("out of memory reading timeout")
}
paramsFromContext(ctx).resultTimeout = time.Duration(int64(n))
}

// Converts a list of framework.NodeScore to a map with node names as keys and their scores as integer values.
func NodeScoreListToMap(nodeScoreList []framework.NodeScore) map[string]int {
scoreMap := make(map[string]int)
Expand Down
16 changes: 8 additions & 8 deletions scheduler/plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_guestPool_bindingCycles(t *testing.T) {

_, status := pl.PreFilter(ctx, nil, pod)
if !status.IsSuccess() {
t.Fatalf("prefilter failed: %v", status)
t.Fatalf("prefilter failed: %v", status.Reasons())
}

if pl.GetScheduledPodUID() != pod.UID {
Expand All @@ -71,7 +71,7 @@ func Test_guestPool_bindingCycles(t *testing.T) {
// pod is going to the binding cycle.
status, _ = pl.Permit(ctx, nil, pod, "node")
if !status.IsSuccess() {
t.Fatalf("filter failed: %v", status)
t.Fatalf("permit failed: %v", status.Reasons())
}

if len(pl.GetBindingCycles()) != 1 {
Expand All @@ -82,7 +82,7 @@ func Test_guestPool_bindingCycles(t *testing.T) {

_, status = pl.PreFilter(ctx, nil, nextPod)
if !status.IsSuccess() {
t.Fatalf("PreFilter failed: %v", status)
t.Fatalf("PreFilter failed: %v", status.Reasons())
}

if want, have := nextPod.UID, pl.GetScheduledPodUID(); want != have {
Expand All @@ -91,7 +91,7 @@ func Test_guestPool_bindingCycles(t *testing.T) {

status, _ = pl.Permit(ctx, nil, nextPod, "node")
if !status.IsSuccess() {
t.Fatalf("filter failed: %v", status)
t.Fatalf("filter failed: %v", status.Reasons())
}

if len(pl.GetBindingCycles()) != 2 {
Expand Down Expand Up @@ -121,13 +121,13 @@ func Test_guestPool_bindingCycles(t *testing.T) {
// nextPod is going to PreBind process.
status = pl.PreBind(ctx, nil, nextPod, "node")
if !status.IsSuccess() {
t.Fatalf("prebind failed: %v", status)
t.Fatalf("prebind failed: %v", status.Reasons())
}

// nextPod is going to Bind process.
status = pl.Bind(ctx, nil, nextPod, "node")
if !status.IsSuccess() {
t.Fatalf("bind failed: %v", status)
t.Fatalf("bind failed: %v", status.Reasons())
}

// nextPod is rejected in the binding cycle.
Expand Down Expand Up @@ -1109,7 +1109,7 @@ func TestPermit(t *testing.T) {
expectedStatusMessage: "name is bad",
},
{
name: "Error",
name: "Wait",
pod: test.PodSmall,
nodeName: "wait",
expectedStatusCode: framework.Wait,
Expand Down Expand Up @@ -1137,7 +1137,7 @@ func TestPermit(t *testing.T) {
guestURL: test.URLErrorPanicOnPermit,
pod: test.PodSmall,
expectedStatusCode: framework.Error,
expectedStatusMessage: "wasm: permit error: panic!\nwasm error: unreachable\nwasm stack trace:\n\tpanic_on_permit.$1() i32",
expectedStatusMessage: "wasm: permit error: panic!\nwasm error: unreachable\nwasm stack trace:\n\tpanic_on_permit.$1() i64",
},
}

Expand Down
Binary file modified scheduler/test/testdata/error/panic_on_permit.wasm
Binary file not shown.
5 changes: 2 additions & 3 deletions scheduler/test/testdata/error/panic_on_permit.wat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
(data (i32.const 8) "panic!") ;; iovs[0]

;; On permit, write "panic!" to stdout and crash.
(func (export "permit") (result i32)
(func (export "permit") (result i64)
;; Write the panic to stdout via its iovec [offset, len].
(call $wasi.fd_write
(i32.const 1) ;; stdout
Expand All @@ -25,6 +25,5 @@
drop ;; ignore the errno returned

;; Issue the unreachable instruction instead of returning a code
(unreachable)
)
(unreachable))
)
Binary file modified scheduler/test/testdata/test/bind_from_global.wasm
Binary file not shown.
Binary file modified scheduler/test/testdata/test/permit_from_global.wasm
Binary file not shown.
25 changes: 22 additions & 3 deletions scheduler/test/testdata/test/permit_from_global.wat
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
;; permit_from_global lets us test the value range of status_code
(module $permit_from_global

;; Allocate the minimum amount of memory, 1 page (64KB).
(memory (export "memory") 1 1)

;; status_code is set by the host.
(global $status_code (export "status_code_global") (mut i32) (i32.const 0))
;; timeout is set by the host.
(global $timeout (export "timeout_global") (mut i32) (i32.const 0))

;; Allocate the minimum amount of memory, 1 page (64KB).
(memory (export "memory") 1 1)
(func (export "permit") (result i64)
;; var status_code int32
(local $status_code i32)

;; var timeout int32
(local $timeout i32)

;; status_code = global.status_code
(local.set $status_code (global.get $status_code))

;; timeout = global.timeout
(local.set $timeout (global.get $timeout))

(func (export "permit") (result i32) (return (global.get $status_code)))
;; return uint64(timeout) << 32 | uint64(status_code)
(return
(i64.or
(i64.shl (i64.extend_i32_u (local.get $status_code)) (i64.const 32))
(i64.extend_i32_u (local.get $timeout)))))
)
Binary file modified scheduler/test/testdata/test/postbind_from_global.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion scheduler/test/testdata/test/postbind_from_global.wat
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

(func (export "postbind")
(if (i32.eq (global.get $flag) (i32.const 1))
(unreachable)
(then unreachable)
)
)

Expand Down
Binary file modified scheduler/test/testdata/test/postfilter_from_global.wasm
Binary file not shown.
Binary file modified scheduler/test/testdata/test/prebind_from_global.wasm
Binary file not shown.
2 changes: 1 addition & 1 deletion scheduler/test/testdata/test/reserve_from_global.wat
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

(func (export "unreserve")
(if (i32.eq (global.get $flag) (i32.const 1))
(unreachable)
(then unreachable)
)
)
)
Binary file modified scheduler/test/testdata/test/scoreextensions_from_global.wasm
Binary file not shown.

0 comments on commit 996bc5c

Please sign in to comment.