Skip to content

Commit

Permalink
implement GetWaitingPod
Browse files Browse the repository at this point in the history
  • Loading branch information
chansuke committed Sep 23, 2024
1 parent 31d0159 commit 29db872
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 73 deletions.
46 changes: 43 additions & 3 deletions guest/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,39 @@
package handle

import (
"encoding/json"
"runtime"

"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/api/proto"
"sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"
internalproto "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/proto"
protoapi "sigs.k8s.io/kube-scheduler-wasm-extension/kubernetes/proto/api"
)

type wasmWaitingPod struct {
uid string
ptr uint32
pod *protoapi.Pod
}

func (w *wasmWaitingPod) GetPod() proto.Pod {
var msg protoapi.Pod
return &internalproto.Pod{Msg: &msg}
}

func (w *wasmWaitingPod) GetPendingPlugins() []string {
return []string{}
}

func (w *wasmWaitingPod) Allow(pluginName string) {
allowWaitingPod(w.ptr, w.ptr, w.ptr, w.ptr)
}

func (w *wasmWaitingPod) Reject(pluginName, msg string) {
rejectWaitingPod(w.ptr, w.ptr, w.ptr, w.ptr)
}

func RejectWaitingPod(uid string) bool {
ptr, size := mem.StringToPtr(uid)

Expand All @@ -40,11 +67,24 @@ func GetWaitingPod(uid string) api.WaitingPod {
ptr, size := mem.StringToPtr(uid)

// Wrap to avoid TinyGo 0.28: cannot use an exported function as value
mem.SendAndGetString(ptr, size, func(input_ptr, input_size, ptr uint32, limit mem.BufLimit) {
podBytes := mem.SendAndGetPodBytes(ptr, size, func(input_ptr, input_size, ptr uint32, limit mem.BufLimit) {
getWaitingPod(input_ptr, input_size, ptr, limit)
})
runtime.KeepAlive(uid)

waitingPod := make([]api.WaitingPod, size)
return waitingPod[0]
// Deserialize the pod bytes into a proto.Pod
var pod protoapi.Pod
err := json.Unmarshal(podBytes, &pod)
if err != nil {
panic(err)
}

// Create a new instance of wasmWaitingPod with the deserialized pod data
waitingPod := &wasmWaitingPod{
uid: uid,
ptr: ptr, // The pointer that corresponds to this pod's location in memory
pod: &pod,
}

return waitingPod
}
7 changes: 5 additions & 2 deletions guest/handle/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package handle

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

//go:wasmimport k8s.io/scheduler handle.reject_waiting_pod
func rejectWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)
//go:wasmimport k8s.io/scheduler handle.get_waiting_pod
func allowWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)

//go:wasmimport k8s.io/scheduler handle.get_waiting_pod
func getWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)

//go:wasmimport k8s.io/scheduler handle.reject_waiting_pod
func rejectWaitingPod(input_ptr, input_size, ptr uint32, limit mem.BufLimit)
7 changes: 5 additions & 2 deletions guest/handle/imports_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package handle

import "sigs.k8s.io/kube-scheduler-wasm-extension/guest/internal/mem"

// rejectWaitingPod is stubbed for compilation outside TinyGo.
func rejectWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}
// allowWaitingPod is stubbed for compilation outside TinyGo.
func allowWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}

// getWaitingPod is stubbed for compilation outside TinyGo.
func getWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}

// rejectWaitingPod is stubbed for compilation outside TinyGo.
func rejectWaitingPod(uint32, uint32, uint32, mem.BufLimit) {}
19 changes: 16 additions & 3 deletions guest/internal/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,21 @@ func SendAndGetUint64(input_ptr uint32, input_size uint32, fn func(input_ptr, in
return binary.LittleEndian.Uint64(readBuf)
}

func SendAndGetString(input_ptr uint32, input_size uint32, fn func(input_ptr, input_size, ptr uint32, limit BufLimit)) string {
// ReadBytes reads a given number of bytes from memory, starting at the provided pointer.
func ReadBytes(ptr uint32, limit BufLimit) []byte {
if limit == 0 {
return nil
}
// Allocate a slice of the desired size.
buf := make([]byte, limit)
// Copy data from the memory buffer to the new slice.
copy(buf, unsafe.Slice((*byte)(unsafe.Pointer(uintptr(ptr))), limit))
return buf
}

// SendAndGetPodBytes is similar to SendAndGetUint64, but it retrieves a byte slice.
func SendAndGetPodBytes(input_ptr uint32, input_size uint32, fn func(input_ptr, input_size, ptr uint32, limit BufLimit)) []byte {
fn(input_ptr, input_size, uint32(readBufPtr), readBufLimit)
size := binary.LittleEndian.Uint32(readBuf)
return string(readBuf[size : size+binary.LittleEndian.Uint32(readBuf[size:])])
// Return the bytes from the buffer.
return ReadBytes(uint32(readBufPtr), readBufLimit)
}
12 changes: 9 additions & 3 deletions guest/testdata/handle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ func (pluginForGet) Filter(_ api.CycleState, pod proto.Pod, nodeInfo api.NodeInf
// Call GetWaitingPod first
waitingPod := handle.GetWaitingPod(pod.GetUid())

// This is being skipped, note the reason.
if waitingPod == nil {
// This is being skipped, note the reason.
return &api.Status{
Code: api.StatusCodeError,
Reason: "UID is " + pod.GetUid(),
}
}

return &api.Status{
Code: api.StatusCodeSkip,
Reason: "UID is " + pod.GetUid() + " and waitingPod is " + waitingPod.GetPod().GetName(),
Code: api.StatusCodeSuccess,
}
}
Binary file modified guest/testdata/handle/main.wasm
Binary file not shown.
9 changes: 2 additions & 7 deletions scheduler/plugin/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,15 +551,10 @@ func (h host) k8sHandleGetWaitingPodFn(ctx context.Context, mod wazeroapi.Module
}
uid := types.UID(b)
waitingPod := h.handle.GetWaitingPod(uid)
println("waitingPod: ", waitingPod)
println("pod in host.go: ", waitingPod)
if waitingPod == nil {
print("waitingPod is nil")
stack[0] = 0 // Return 0 to indicate no pod found or an error
return
panic("waitingPod not found")
}

print("waitingPod!!!!!!: ", waitingPod)
print("waitingPod.GetPod() : ", waitingPod.GetPod())

stack[0] = uint64(marshalIfUnderLimit(mod.Memory(), waitingPod.GetPod(), oBuf, oBufLimit))
}
178 changes: 135 additions & 43 deletions scheduler/plugin/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package wasm
import (
"bytes"
"context"
"encoding/json"
"sync"
"testing"
"time"

"github.com/tetratelabs/wazero/experimental/wazerotest"
v1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
k8stest "k8s.io/klog/v2/test"
"k8s.io/kubernetes/pkg/scheduler/framework"

"sigs.k8s.io/kube-scheduler-wasm-extension/scheduler/test"
)
Expand Down Expand Up @@ -96,71 +100,131 @@ func initKlog(t *testing.T, buf *bytes.Buffer) {
klog.SetOutput(buf)
}

func Test_k8sHandleEventRecorderEventFn(t *testing.T) {
recorder := &test.FakeRecorder{EventMsg: ""}
handle := &test.FakeHandle{Recorder: recorder}
h := host{handle: handle}
type waitingPod struct {
pod *v1.Pod
pendingPlugins map[string]*time.Timer
s chan *framework.Status
mu sync.RWMutex
}

// Create a fake wasm module, which has data the guest should write.
mem := wazerotest.NewMemory(wazerotest.PageSize)
mod := wazerotest.NewModule(mem)
message := EventMessage{
RegardingReference: ObjectReference{},
RelatedReference: ObjectReference{},
Eventtype: "event",
Reason: "reason",
Action: "action",
Note: "note",
}
jsonmsg, err := json.Marshal(message)
if err != nil {
t.Fatalf("error during json.Marshal %v", err)
}
copy(mem.Bytes, jsonmsg)
func (wp *waitingPod) GetPod() *v1.Pod {
return wp.pod
}

// Invoke the host function in the same way the guest would have.
h.k8sHandleEventRecorderEventfFn(context.Background(), mod, []uint64{
0,
uint64(len(jsonmsg)),
})
func (wp *waitingPod) GetPendingPlugins() []string {
wp.mu.RLock()
defer wp.mu.RUnlock()
var plugins []string
for plugin := range wp.pendingPlugins {
plugins = append(plugins, plugin)
}
return plugins
}

have := recorder.EventMsg
want := "event reason action note"
func (wp *waitingPod) Allow(pluginName string) {
wp.mu.Lock()
defer wp.mu.Unlock()
if timer, ok := wp.pendingPlugins[pluginName]; ok {
timer.Stop()
delete(wp.pendingPlugins, pluginName)
}
}

if want != have {
t.Fatalf("unexpected event: %v != %v", want, have)
func (wp *waitingPod) Reject(pluginName, msg string) {
wp.mu.Lock()
defer wp.mu.Unlock()
if timer, ok := wp.pendingPlugins[pluginName]; ok {
timer.Stop()
delete(wp.pendingPlugins, pluginName)
}
}

func Test_k8sHandleRejectWaitingPodFn(t *testing.T) {
func Test_k8sHandleGetWaitingPodFn(t *testing.T) {
recorder := &test.FakeRecorder{EventMsg: ""}
handle := &test.FakeHandle{Recorder: recorder}
uid := types.UID("c6feae3a-7082-42a5-a5ec-6ae2e1603727")

// Create a fake WaitingPod
pod := &v1.Pod{
ObjectMeta: apimeta.ObjectMeta{
Name: "good-pod",
Namespace: "test",
UID: uid,
},
}
wp := &waitingPod{
pod: pod,
pendingPlugins: make(map[string]*time.Timer),
s: make(chan *framework.Status, 1),
}

wp.mu.Lock()

handle := &test.FakeHandle{
Recorder: recorder,
GetWaitingPodValue: wp,
}

h := host{handle: handle}

// Create a fake wasm module, which has data the guest should write.
mem := wazerotest.NewMemory(wazerotest.PageSize)
mod := wazerotest.NewModule(mem)
uid := types.UID("c6feae3a-7082-42a5-a5ec-6ae2e1603727")
copy(mem.Bytes, uid)

// Invoke the host function in the same way the guest would have.
h.k8sHandleRejectWaitingPodFn(context.Background(), mod, []uint64{
h.k8sHandleGetWaitingPodFn(context.Background(), mod, []uint64{
0,
uint64(len(uid)),
0, // Ideally we should define some value, but we don't define it for now.
0, // Ideally we should define some value, but we don't define it for now.
0,
0,
})

// Checking the value stored on handle
have := handle.RejectWaitingPodValue
// Checking the value returned by GetWaitingPod
have := handle.GetWaitingPodValue.GetPod().GetUID()
want := uid

if want != have {
t.Fatalf("unexpected uid: %v != %v", want, have)
}
}

func Test_k8sHandleGetWaitingPodFn(t *testing.T) {
//func Test_k8sHandleEventRecorderEventFn(t *testing.T) {
// recorder := &test.FakeRecorder{EventMsg: ""}
// handle := &test.FakeHandle{Recorder: recorder}
// h := host{handle: handle}
//
// // Create a fake wasm module, which has data the guest should write.
// mem := wazerotest.NewMemory(wazerotest.PageSize)
// mod := wazerotest.NewModule(mem)
// message := EventMessage{
// RegardingReference: ObjectReference{},
// RelatedReference: ObjectReference{},
// Eventtype: "event",
// Reason: "reason",
// Action: "action",
// Note: "note",
// }
// jsonmsg, err := json.Marshal(message)
// if err != nil {
// t.Fatalf("error during json.Marshal %v", err)
// }
// copy(mem.Bytes, jsonmsg)
//
// // Invoke the host function in the same way the guest would have.
// h.k8sHandleEventRecorderEventfFn(context.Background(), mod, []uint64{
// 0,
// uint64(len(jsonmsg)),
// })
//
// have := recorder.EventMsg
// want := "event reason action note"
//
// if want != have {
// t.Fatalf("unexpected event: %v != %v", want, have)
// }
//}

func Test_k8sHandleRejectWaitingPodFn(t *testing.T) {
recorder := &test.FakeRecorder{EventMsg: ""}
handle := &test.FakeHandle{Recorder: recorder}
h := host{handle: handle}
Expand All @@ -172,18 +236,46 @@ func Test_k8sHandleGetWaitingPodFn(t *testing.T) {
copy(mem.Bytes, uid)

// Invoke the host function in the same way the guest would have.
h.k8sHandleGetWaitingPodFn(context.Background(), mod, []uint64{
h.k8sHandleRejectWaitingPodFn(context.Background(), mod, []uint64{
0,
uint64(len(uid)),
0,
0,
0, // Ideally we should define some value, but we don't define it for now.
0, // Ideally we should define some value, but we don't define it for now.
})

// Checking the value stored on handle
have := handle.GetWaitingPodValue
have := handle.RejectWaitingPodValue
want := uid

if want != have {
t.Fatalf("unexpected uid: %v != %v", want, have)
}
}

//func Test_k8sHandleGetWaitingPodFn(t *testing.T) {
// recorder := &test.FakeRecorder{EventMsg: ""}
// handle := &test.FakeHandle{Recorder: recorder}
// h := host{handle: handle}
//
// // Create a fake wasm module, which has data the guest should write.
// mem := wazerotest.NewMemory(wazerotest.PageSize)
// mod := wazerotest.NewModule(mem)
// uid := types.UID("c6feae3a-7082-42a5-a5ec-6ae2e1603727")
// copy(mem.Bytes, uid)
//
// // Invoke the host function in the same way the guest would have.
// h.k8sHandleGetWaitingPodFn(context.Background(), mod, []uint64{
// 0,
// uint64(len(uid)),
// 0,
// 0,
// })
//
// // Checking the value stored on handle
// have := handle.GetWaitingPodValue
// want := uid
//
// if have == nil {
// t.Fatalf("unexpected pod: %v != %v", want, have)
// }
//}
Loading

0 comments on commit 29db872

Please sign in to comment.