diff --git a/guest/api/types.go b/guest/api/types.go index 26accba4..951cf7c1 100644 --- a/guest/api/types.go +++ b/guest/api/types.go @@ -105,4 +105,6 @@ type NodeInfo interface { Node() proto.Node } -type NodeToStatusMap map[string]*Status +type NodeToStatusMap interface { + NodeToStatusMap() map[string]*Status +} diff --git a/guest/internal/imports/host.go b/guest/internal/imports/host.go index 4ac7794f..e89b6ae3 100644 --- a/guest/internal/imports/host.go +++ b/guest/internal/imports/host.go @@ -66,3 +66,10 @@ func Pod(updater func([]byte) error) error { return k8sApiPod(ptr, limit) }, updater) } + +func NodeToStatusMap(updater func([]byte) error) error { + // Wrap to avoid TinyGo 0.28: cannot use an exported function as value + return mem.Update(func(ptr uint32, limit mem.BufLimit) (len uint32) { + return k8sApiNodeToStatusMap(ptr, limit) + }, updater) +} diff --git a/guest/internal/imports/imports.go b/guest/internal/imports/imports.go index 797ed2f1..eeff64c1 100644 --- a/guest/internal/imports/imports.go +++ b/guest/internal/imports/imports.go @@ -29,5 +29,8 @@ func k8sApiNodeName(ptr uint32, limit mem.BufLimit) (len uint32) //go:wasmimport k8s.io/api pod func k8sApiPod(ptr uint32, limit mem.BufLimit) (len uint32) +//go:wasmimport k8s.io/api nodeToStatusMap +func k8sApiNodeToStatusMap(ptr uint32, limit mem.BufLimit) (len uint32) + //go:wasmimport k8s.io/scheduler result.status_reason func k8sSchedulerResultStatusReason(ptr, size uint32) diff --git a/guest/internal/imports/imports_stub.go b/guest/internal/imports/imports_stub.go index 63ef99b4..fd067e2a 100644 --- a/guest/internal/imports/imports_stub.go +++ b/guest/internal/imports/imports_stub.go @@ -29,5 +29,8 @@ func k8sApiNodeName(uint32, mem.BufLimit) (len uint32) { return } // k8sApiPod is stubbed for compilation outside TinyGo. func k8sApiPod(uint32, mem.BufLimit) (len uint32) { return } +// k8sApiNodeToStatusMap is stubbed for compilation outside TinyGo. +func k8sApiNodeToStatusMap(uint32, mem.BufLimit) (len uint32) { return } + // k8sSchedulerResultStatusReason is stubbed for compilation outside TinyGo. func k8sSchedulerResultStatusReason(uint32, uint32) {} diff --git a/guest/plugin/plugin.go b/guest/plugin/plugin.go index 5b553785..7d8b9af0 100644 --- a/guest/plugin/plugin.go +++ b/guest/plugin/plugin.go @@ -20,8 +20,8 @@ import ( "sigs.k8s.io/kube-scheduler-wasm-extension/guest/api" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/enqueue" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/filter" - "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/postfilter" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/prefilter" + "sigs.k8s.io/kube-scheduler-wasm-extension/guest/postfilter" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/prescore" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/score" ) diff --git a/guest/postfilter/postfilter.go b/guest/postfilter/postfilter.go index 8c9b0c3a..99620784 100644 --- a/guest/postfilter/postfilter.go +++ b/guest/postfilter/postfilter.go @@ -22,6 +22,10 @@ import ( "unsafe" "sigs.k8s.io/kube-scheduler-wasm-extension/guest/api" + "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/cyclestate" + "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/imports" + "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/plugin" + internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto" ) // postfilter is the current plugin assigned with SetPlugin. @@ -71,9 +75,10 @@ func _postfilter() uint64 { //nolint return 0 } + // TODO: fix PostFilter // The parameters passed are lazy with regard to host functions. This means // a no-op plugin should not have any unmarshal penalty. - nominatedNodeName, nominatingMode, status := postfilter.PostFilter(CycleState, Pod, nil) + nominatedNodeName, nominatingMode, status := postfilter.PostFilter(cyclestate.Values, cyclestate.Pod, nil) cString := []byte(nominatedNodeName) if cString != nil { @@ -85,3 +90,21 @@ func _postfilter() uint64 { //nolint return (uint64(nominatingMode) << uint64(32)) | uint64(imports.StatusToCode(status)) } + +type nodeToStatusMap struct { + statusMap map[string]*api.Status +} + +func (n *nodeToStatusMap) NodeToStatusMap() map[string]*api.Status { + return n.lazyNodeToStatusMap() +} + +// lazyNodeToStatusMap returns NodeToStatusMap from imports.NodeToStatusMap. +func (n *nodeToStatusMap) lazyNodeToStatusMap() map[string]*api.Status { + var msg api.NodeToStatusMap + if err := imports.NodeToStatusMap(msg.UnmarshalVT); err != nil { + panic(err.Error()) + } + n.statusMap = &internalproto.NodeToStatusMap{Msg: &msg} + return n.statusMap +} diff --git a/scheduler/plugin/host.go b/scheduler/plugin/host.go index d294a99c..8f5ad2b6 100644 --- a/scheduler/plugin/host.go +++ b/scheduler/plugin/host.go @@ -34,6 +34,7 @@ const ( k8sApiNodeList = "nodeList" k8sApiNodeName = "nodeName" k8sApiPod = "pod" + k8sApiNodeToStatusMap = "nodeToStatusMap" k8sKlog = "k8s.io/klog" k8sKlogLog = "log" k8sKlogLogs = "logs" @@ -60,6 +61,9 @@ func instantiateHostApi(ctx context.Context, runtime wazero.Runtime) (wazeroapi. NewFunctionBuilder(). WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sApiPodFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{i32}). WithParameterNames("buf", "buf_limit").Export(k8sApiPod). + NewFunctionBuilder(). + WithGoModuleFunction(wazeroapi.GoModuleFunc(k8sApiNodeToStatusMapFn), []wazeroapi.ValueType{i32, i32}, []wazeroapi.ValueType{i32}). + WithParameterNames("buf", "buf_limit").Export(k8sApiNodeToStatusMap). Instantiate(ctx) } @@ -126,6 +130,9 @@ type stack struct { // pod is used by guest.filterFn and guest.scoreFn pod *v1.Pod + // nodeToStatusMap is used by guest.postfilterFn + nodeToStatusMap map[string]*framework.Status + // resultClusterEvents is returned by guest.enqueueFn resultClusterEvents []framework.ClusterEvent @@ -186,6 +193,15 @@ func k8sApiPodFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) { stack[0] = uint64(marshalIfUnderLimit(mod.Memory(), pod, buf, bufLimit)) } +// TODO: fix +func k8sApiNodeToStatusMapFn(ctx context.Context, mod wazeroapi.Module, stack []uint64) { + buf := uint32(stack[0]) + bufLimit := bufLimit(stack[1]) + + // nodeToStatusMap := paramsFromContext(ctx).nodeToStatusMap + stack[0] = uint64(marshalIfUnderLimit(mod.Memory(), nil, buf, bufLimit)) +} + type host struct { guestConfig string logSeverity int32 diff --git a/scheduler/plugin/mask.go b/scheduler/plugin/mask.go index 1a53acbc..17318b7c 100644 --- a/scheduler/plugin/mask.go +++ b/scheduler/plugin/mask.go @@ -49,7 +49,6 @@ func maskInterfaces(plugin *wasmPlugin) (framework.Plugin, error) { iPreBindPlugin | iPostBindPlugin) - // TODO: ask about PostFilter switch i { case iFilterPlugin: type filter interface { diff --git a/scheduler/plugin/plugin_test.go b/scheduler/plugin/plugin_test.go index 1aa9b231..3f47e402 100644 --- a/scheduler/plugin/plugin_test.go +++ b/scheduler/plugin/plugin_test.go @@ -574,7 +574,7 @@ func TestPostFilter(t *testing.T) { expectedStatusMessage string }{ { - name: "scored: nodeName equals spec.NodeName", + name: "success: nodeName equals spec.NodeName", args: []string{"test", "score"}, pod: test.PodSmall, nodeName: test.PodSmall.Spec.NodeName, @@ -588,44 +588,40 @@ func TestPostFilter(t *testing.T) { expectedStatusCode: framework.Success, }, { - name: "most negative score", + name: "min statusCode", guestURL: test.URLTestPostFilterFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, - globals: map[string]int32{"score": math.MinInt32}, - expectedStatusCode: framework.Success, + globals: map[string]int32{"status_code": math.MinInt32}, + expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: 0}}, + expectedStatusCode: math.MinInt32, }, { - name: "min score", + name: "max statusCode", guestURL: test.URLTestPostFilterFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, - globals: map[string]int32{"score": math.MinInt32}, - expectedStatusCode: framework.Success, + globals: map[string]int32{"status_code": math.MaxInt32}, + expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: 0}}, + expectedStatusCode: math.MaxInt32, }, { - name: "max score", + name: "min nominatingMode", guestURL: test.URLTestPostFilterFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, - globals: map[string]int32{"score": math.MaxInt32}, + globals: map[string]int32{"nominating_mode": math.MinInt32}, + expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: math.MinInt32}}, expectedStatusCode: framework.Success, }, { - name: "min statusCode", + name: "max nominatingMode", guestURL: test.URLTestPostFilterFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, - globals: map[string]int32{"status_code": math.MinInt32}, - expectedStatusCode: math.MinInt32, - }, - { - name: "max statusCode", - guestURL: test.URLTestPostFilterFromGlobal, - pod: test.PodSmall, - nodeName: test.NodeSmall.Name, - globals: map[string]int32{"status_code": math.MaxInt32}, - expectedStatusCode: math.MaxInt32, + globals: map[string]int32{"nominating_mode": math.MaxInt32}, + expectedResult: &framework.PostFilterResult{NominatingInfo: &framework.NominatingInfo{NominatedNodeName: "", NominatingMode: math.MaxInt32}}, + expectedStatusCode: framework.Success, }, { name: "panic", @@ -633,10 +629,10 @@ func TestPostFilter(t *testing.T) { pod: test.PodSmall, nodeName: test.NodeSmall.Name, expectedStatusCode: framework.Error, - expectedStatusMessage: `wasm: score error: panic! + expectedStatusMessage: `wasm: postfilter error: panic! wasm error: unreachable wasm stack trace: - panic_on_score.$1() i64`, + panic_on_postfilter.$1() i64`, }, } @@ -644,7 +640,7 @@ wasm stack trace: t.Run(tc.name, func(t *testing.T) { guestURL := tc.guestURL if guestURL == "" { - guestURL = test.URLTestPostFilterFromGlobal + guestURL = test.URLTestFilter } p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args}) @@ -659,8 +655,8 @@ wasm stack trace: } result, status := p.(framework.PostFilterPlugin).PostFilter(ctx, nil, tc.pod, nil) - if want, have := tc.expectedResult, result; want != have { - t.Fatalf("unexpected score: want %v, have %v", want, have) + if want, have := tc.expectedResult, result; !reflect.DeepEqual(want, have) { + t.Fatalf("unexpected result: want %v, have %v", want, have) } if want, have := tc.expectedStatusCode, status.Code(); want != have { t.Fatalf("unexpected status code: want %v, have %v", want, have) diff --git a/scheduler/test/testdata/error/panic_on_postfilter.wasm b/scheduler/test/testdata/error/panic_on_postfilter.wasm index fd0b614e..512a3f64 100644 Binary files a/scheduler/test/testdata/error/panic_on_postfilter.wasm and b/scheduler/test/testdata/error/panic_on_postfilter.wasm differ diff --git a/scheduler/test/testdata/error/panic_on_postfilter.wat b/scheduler/test/testdata/error/panic_on_postfilter.wat index 55d2cdbc..069461f4 100644 --- a/scheduler/test/testdata/error/panic_on_postfilter.wat +++ b/scheduler/test/testdata/error/panic_on_postfilter.wat @@ -26,4 +26,7 @@ ;; Issue the unreachable instruction instead of returning a code (unreachable)) + + ;; We require exporting filter with postfilter + (func (export "filter") (result i32) (unreachable)) ) diff --git a/scheduler/test/testdata/test/postfilter_from_global.wasm b/scheduler/test/testdata/test/postfilter_from_global.wasm index 1a9fd4f1..29ee71f2 100644 Binary files a/scheduler/test/testdata/test/postfilter_from_global.wasm and b/scheduler/test/testdata/test/postfilter_from_global.wasm differ diff --git a/scheduler/test/testdata/test/postfilter_from_global.wat b/scheduler/test/testdata/test/postfilter_from_global.wat index b0be3cf0..208cb2f4 100644 --- a/scheduler/test/testdata/test/postfilter_from_global.wat +++ b/scheduler/test/testdata/test/postfilter_from_global.wat @@ -27,4 +27,7 @@ (i64.or (i64.shl (i64.extend_i32_u (local.get $nominating_mode)) (i64.const 32)) (i64.extend_i32_u (local.get $status_code))))) + + ;; We require exporting filter with postfilter + (func (export "filter") (result i32) (unreachable)) )