From 4da9ede74b4db348688e50df83575463bd81bedb Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Fri, 28 Jul 2023 16:23:40 +0200 Subject: [PATCH 1/3] config: add support to file:// and http(s):// URIs Extends the current plugin config to use instead a URI. In the case of `file://` the behavior is the same as it is currently. In the case of `http(s)://` it will fetch the URI and try to evaluate it as a wasm payload. This PR is based on earlier work on `dapr/component-contrib`. See: https://github.com/dapr/components-contrib/pull/3005 Signed-off-by: Edoardo Vacchi --- internal/e2e/profiler/profiler.go | 2 +- internal/e2e/scheduler/scheduler_test.go | 4 +- scheduler/go.mod | 2 + scheduler/plugin/config.go | 6 +- scheduler/plugin/http.go | 65 +++++++++ scheduler/plugin/http_test.go | 89 +++++++++++++ scheduler/plugin/plugin.go | 22 ++- scheduler/plugin/plugin_test.go | 162 +++++++++++++---------- scheduler/test/testdata.go | 59 +++++---- 9 files changed, 310 insertions(+), 101 deletions(-) create mode 100644 scheduler/plugin/http.go create mode 100644 scheduler/plugin/http_test.go diff --git a/internal/e2e/profiler/profiler.go b/internal/e2e/profiler/profiler.go index a7fb84ed..84a86e1e 100644 --- a/internal/e2e/profiler/profiler.go +++ b/internal/e2e/profiler/profiler.go @@ -67,7 +67,7 @@ func main() { ) // Pass the profiling context to the plugin. - plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath}) + plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: "file://" + guestPath}) if err != nil { log.Panicln("failed to create plugin:", err) } diff --git a/internal/e2e/scheduler/scheduler_test.go b/internal/e2e/scheduler/scheduler_test.go index a11caacf..21081888 100644 --- a/internal/e2e/scheduler/scheduler_test.go +++ b/internal/e2e/scheduler/scheduler_test.go @@ -35,7 +35,7 @@ import ( func TestCycleStateCoherence(t *testing.T) { ctx := context.Background() - plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathTestCycleState}) + plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: test.URLTestCycleState}) if err != nil { t.Fatalf("failed to create plugin: %v", err) } @@ -116,7 +116,7 @@ func BenchmarkExample_NodeNumber(b *testing.B) { func newNodeNumberPlugin(ctx context.Context, t e2e.Testing, reverse bool) framework.Plugin { plugin, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{ - GuestPath: test.PathExampleNodeNumber, + GuestURL: test.URLExampleNodeNumber, GuestConfig: fmt.Sprintf(`{"reverse": %v}`, reverse), }) if err != nil { diff --git a/scheduler/go.mod b/scheduler/go.mod index 25bd8f44..bee36856 100644 --- a/scheduler/go.mod +++ b/scheduler/go.mod @@ -36,6 +36,7 @@ replace ( require ( github.com/google/uuid v1.3.0 + github.com/stretchr/testify v1.8.1 github.com/tetratelabs/wazero v1.3.1 k8s.io/api v0.27.3 k8s.io/apimachinery v0.27.3 @@ -90,6 +91,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/selinux v1.10.0 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/scheduler/plugin/config.go b/scheduler/plugin/config.go index f33d31b2..581796c1 100644 --- a/scheduler/plugin/config.go +++ b/scheduler/plugin/config.go @@ -17,8 +17,10 @@ package wasm type PluginConfig struct { - // GuestPath is the path to the guest wasm. - GuestPath string `json:"guestPath"` + // GuestURL is the URL to the guest wasm. + // Valid schemes are file:// for a local file or http[s]:// for one + // retrieved via HTTP. + GuestURL string `json:"guestURL"` // GuestConfig is any configuration to give to the guest. GuestConfig string `json:"guestConfig"` diff --git a/scheduler/plugin/http.go b/scheduler/plugin/http.go new file mode 100644 index 00000000..5a61895c --- /dev/null +++ b/scheduler/plugin/http.go @@ -0,0 +1,65 @@ +/* + Copyright 2023 The Kubernetes Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package wasm + +import ( + "context" + "fmt" + "io" + "net/http" + "net/url" +) + +// httpClient decorates an http.Client with convenience methods. +type httpClient struct { + c http.Client +} + +// newHTTPFetcher is a constructor for httpFetcher. +// +// It is possible to plug a custom http.RoundTripper to handle other concerns (e.g. retries) +// Compression is handled transparently and automatically by http.Client. +func newHTTPCLient(transport http.RoundTripper) *httpClient { + return &httpClient{ + c: http.Client{Transport: transport}, + } +} + +// fetch returns a byte slice of the wasm module found at the given URL, or an error otherwise. +func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + resp, err := f.c.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + io.Copy(io.Discard, resp.Body) //nolint + resp.Body.Close() + return nil, fmt.Errorf("received %v status code from %q", resp.StatusCode, u) + } + + bytes, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, err + } + return bytes, nil +} diff --git a/scheduler/plugin/http_test.go b/scheduler/plugin/http_test.go new file mode 100644 index 00000000..281af907 --- /dev/null +++ b/scheduler/plugin/http_test.go @@ -0,0 +1,89 @@ +/* + Copyright 2023 The Kubernetes Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package wasm + +import ( + "compress/gzip" + "context" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +var wasmMagicNumber = []byte{0x00, 0x61, 0x73, 0x6d} + +func TestWasmHTTPFetch(t *testing.T) { + wasmBinary := wasmMagicNumber + wasmBinary = append(wasmBinary, 0x00, 0x00, 0x00, 0x00) + cases := []struct { + name string + handler http.HandlerFunc + expectedError string + }{ + { + name: "plain wasm binary", + handler: func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write(wasmBinary) + }, + }, + // Compressed payloads are handled automatically by http.Client. + { + name: "compressed payload", + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Encoding", "gzip") + + gw := gzip.NewWriter(w) + defer gw.Close() + _, _ = gw.Write(wasmBinary) + }, + }, + { + name: "http error", + handler: func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + expectedError: "received 500 status code", + }, + } + + for _, proto := range []string{"http", "https"} { + t.Run(proto, func(t *testing.T) { + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ts := httptest.NewServer(tc.handler) + defer ts.Close() + c := newHTTPCLient(http.DefaultTransport) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + parse, err := url.Parse(ts.URL) + require.NoError(t, err) + _, err = c.get(ctx, parse) + if tc.expectedError != "" { + require.ErrorContains(t, err, tc.expectedError) + return + } + require.NoError(t, err, "Wasm download got an unexpected error: %v", err) + }) + } + }) + } +} diff --git a/scheduler/plugin/plugin.go b/scheduler/plugin/plugin.go index 5f496bd9..58c34a91 100644 --- a/scheduler/plugin/plugin.go +++ b/scheduler/plugin/plugin.go @@ -19,6 +19,8 @@ package wasm import ( "context" "fmt" + "net/http" + "net/url" "os" "sync/atomic" "time" @@ -51,9 +53,9 @@ func New(configuration runtime.Object, frameworkHandle framework.Handle) (framew // NewFromConfig is like New, except it allows us to explicitly provide the // context and configuration of the plugin. This allows flexibility in tests. func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin, error) { - guestBin, err := os.ReadFile(config.GuestPath) + guestBin, err := readFromURI(ctx, config.GuestURL) if err != nil { - return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestPath, err) + return nil, fmt.Errorf("wasm: error reading guest binary at %s: %w", config.GuestURL, err) } runtime, guestModule, err := prepareRuntime(ctx, guestBin, config.GuestConfig) @@ -77,6 +79,22 @@ func NewFromConfig(ctx context.Context, config PluginConfig) (framework.Plugin, } } +func readFromURI(ctx context.Context, u string) ([]byte, error) { + uri, err := url.ParseRequestURI(u) + if err != nil { + return nil, err + } + switch uri.Scheme { + case "file": + return os.ReadFile(uri.Path) + case "http", "https": + c := newHTTPCLient(http.DefaultTransport) + return c.get(ctx, uri) + default: + return nil, fmt.Errorf("unsupported URL scheme: %s", uri.Scheme) + } +} + // newWasmPlugin is extracted to prevent small bugs: The caller must close the // wazero.Runtime to avoid leaking mmapped files. func newWasmPlugin(ctx context.Context, runtime wazero.Runtime, guestModule wazero.CompiledModule, config PluginConfig) (*wasmPlugin, error) { diff --git a/scheduler/plugin/plugin_test.go b/scheduler/plugin/plugin_test.go index c9b46634..2ff2ee33 100644 --- a/scheduler/plugin/plugin_test.go +++ b/scheduler/plugin/plugin_test.go @@ -21,6 +21,11 @@ import ( "fmt" "io" "math" + "net/http" + "net/http/httptest" + "net/url" + "os" + "path" "reflect" "testing" @@ -40,7 +45,7 @@ var ctx = context.Background() // Test_guestPool_bindingCycles tests that the bindingCycles field is set correctly. func Test_guestPool_bindingCycles(t *testing.T) { - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathTestAll}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: test.URLTestAll}) if err != nil { t.Fatalf("failed to create plugin: %v", err) } @@ -118,7 +123,7 @@ func Test_guestPool_bindingCycles(t *testing.T) { // Test_guestPool_assignedToSchedulingPod tests that the scheduledPodUID is assigned during PreFilter expectedly. func Test_guestPool_assignedToSchedulingPod(t *testing.T) { - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: test.PathTestAll}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: test.URLTestAll}) if err != nil { t.Fatalf("failed to create plugin: %v", err) } @@ -163,7 +168,7 @@ func Test_guestPool_assignedToSchedulingPod(t *testing.T) { func TestNew_maskInterfaces(t *testing.T) { tests := []struct { name string - guestPath string + guestURL string expectedFilter bool expectedScore bool expectedReserve bool @@ -173,27 +178,27 @@ func TestNew_maskInterfaces(t *testing.T) { }{ { name: "not plugin", - guestPath: test.PathErrorNotPlugin, + guestURL: test.URLErrorNotPlugin, expectedError: "wasm: guest does not export any plugin functions", // not supported to be only enqueue }, { name: "filter", - guestPath: test.PathErrorPanicOnFilter, + guestURL: test.URLErrorPanicOnFilter, expectedFilter: true, }, { name: "prescore|score", - guestPath: test.PathExampleNodeNumber, + guestURL: test.URLExampleNodeNumber, expectedScore: true, }, { name: "score", - guestPath: test.PathErrorPanicOnScore, + guestURL: test.URLErrorPanicOnScore, expectedScore: true, }, { name: "prefilter|filter|prescore|score", - guestPath: test.PathTestAllNoopWat, + guestURL: test.URLTestAllNoopWat, expectedFilter: true, expectedScore: true, }, @@ -203,7 +208,7 @@ func TestNew_maskInterfaces(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p, err := wasm.New(&runtime.Unknown{ ContentType: runtime.ContentTypeJSON, - Raw: []byte(fmt.Sprintf(`{"guestPath": "%s"}`, tc.guestPath)), + Raw: []byte(fmt.Sprintf(`{"guestURL": "%s"}`, tc.guestURL)), }, nil) if tc.expectedError != "" { requireError(t, err, tc.expectedError) @@ -237,23 +242,24 @@ func TestNew_maskInterfaces(t *testing.T) { } func TestNewFromConfig(t *testing.T) { - tests := []struct { + type testcase struct { name string - guestPath string + guestURL string expectedError string - }{ + } + tests := []testcase{ { - name: "valid wasm", - guestPath: test.PathTestFilter, + name: "valid wasm", + guestURL: test.URLTestFilter, }, { name: "not plugin", - guestPath: test.PathErrorNotPlugin, + guestURL: test.URLErrorNotPlugin, expectedError: `wasm: guest does not export any plugin functions`, }, { - name: "panic on _start", - guestPath: test.PathErrorPanicOnStart, + name: "panic on _start", + guestURL: test.URLErrorPanicOnStart, expectedError: `failed to create a guest pool: wasm: instantiate error: panic! module[1] function[_start] failed: wasm error: unreachable wasm stack trace: @@ -261,9 +267,9 @@ wasm stack trace: }, } - for _, tc := range tests { + testWithURL := func(t *testing.T, tc testcase) { t.Run(tc.name, func(t *testing.T) { - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: tc.guestPath}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: tc.guestURL}) if err != nil { if want, have := tc.expectedError, err.Error(); want != have { t.Fatalf("unexpected error: want %v, have %v", want, have) @@ -276,14 +282,36 @@ wasm stack trace: } }) } + + t.Run("local", func(t *testing.T) { + for _, tc := range tests { + testWithURL(t, tc) + } + }) + + t.Run("remote (http)", func(t *testing.T) { + for _, tc := range tests { + uri, _ := url.ParseRequestURI(tc.guestURL) + bytes, _ := os.ReadFile(uri.Path) + _, file := path.Split(uri.Path) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/"+file { + _, _ = w.Write(bytes) + } + })) + defer ts.Close() + tc.guestURL = ts.URL + "/" + file + testWithURL(t, tc) + } + }) } func TestEnqueue(t *testing.T) { tests := []struct { - name string - guestPath string - args []string - expected []framework.ClusterEvent + name string + guestURL string + args []string + expected []framework.ClusterEvent }{ { name: "success: 0", @@ -308,12 +336,12 @@ func TestEnqueue(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - guestPath := tc.guestPath - if guestPath == "" { - guestPath = test.PathTestCycleState + guestURL := tc.guestURL + if guestURL == "" { + guestURL = test.URLTestCycleState } - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args}) if err != nil { t.Fatal(err) } @@ -327,9 +355,9 @@ func TestEnqueue(t *testing.T) { } t.Run("panic", func(t *testing.T) { - guestPath := test.PathErrorPanicOnEnqueue + guestURL := test.URLErrorPanicOnEnqueue - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL}) if err != nil { t.Fatal(err) } @@ -346,7 +374,7 @@ func TestEnqueue(t *testing.T) { func TestPreFilter(t *testing.T) { tests := []struct { name string - guestPath string + guestURL string guestConfig string args []string globals map[string]int32 @@ -370,21 +398,21 @@ func TestPreFilter(t *testing.T) { }, { name: "min statusCode", - guestPath: test.PathTestPreFilterFromGlobal, + guestURL: test.URLTestPreFilterFromGlobal, pod: test.PodSmall, globals: map[string]int32{"status_code": math.MinInt32}, expectedStatusCode: math.MinInt32, }, { name: "max statusCode", - guestPath: test.PathTestPreFilterFromGlobal, + guestURL: test.URLTestPreFilterFromGlobal, pod: test.PodSmall, globals: map[string]int32{"status_code": math.MaxInt32}, expectedStatusCode: math.MaxInt32, }, { name: "panic", - guestPath: test.PathErrorPanicOnPreFilter, + guestURL: test.URLErrorPanicOnPreFilter, pod: test.PodSmall, expectedStatusCode: framework.Error, expectedStatusMessage: `wasm: prefilter error: panic! @@ -394,7 +422,7 @@ wasm stack trace: }, { name: "panic no guestConfig", - guestPath: test.PathErrorPanicOnGetConfig, + guestURL: test.URLErrorPanicOnGetConfig, pod: test.PodSmall, expectedStatusCode: framework.Error, expectedStatusMessage: `wasm: prefilter error: wasm error: unreachable @@ -403,7 +431,7 @@ wasm stack trace: }, { // This only tests that configuration gets assigned name: "panic guestConfig", - guestPath: test.PathErrorPanicOnGetConfig, + guestURL: test.URLErrorPanicOnGetConfig, guestConfig: "hello", pod: test.PodSmall, expectedStatusCode: framework.Error, @@ -416,12 +444,12 @@ wasm stack trace: for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - guestPath := tc.guestPath - if guestPath == "" { - guestPath = test.PathTestFilter + guestURL := tc.guestURL + if guestURL == "" { + guestURL = test.URLTestFilter } - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args, GuestConfig: tc.guestConfig}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args, GuestConfig: tc.guestConfig}) if err != nil { t.Fatal(err) } @@ -449,7 +477,7 @@ wasm stack trace: func TestFilter(t *testing.T) { tests := []struct { name string - guestPath string + guestURL string args []string globals map[string]int32 pod *v1.Pod @@ -474,7 +502,7 @@ func TestFilter(t *testing.T) { }, { name: "min statusCode", - guestPath: test.PathTestFilterFromGlobal, + guestURL: test.URLTestFilterFromGlobal, pod: test.PodSmall, node: test.NodeSmall, globals: map[string]int32{"status_code": math.MinInt32}, @@ -482,7 +510,7 @@ func TestFilter(t *testing.T) { }, { name: "max statusCode", - guestPath: test.PathTestFilterFromGlobal, + guestURL: test.URLTestFilterFromGlobal, pod: test.PodSmall, node: test.NodeSmall, globals: map[string]int32{"status_code": math.MaxInt32}, @@ -490,7 +518,7 @@ func TestFilter(t *testing.T) { }, { name: "panic", - guestPath: test.PathErrorPanicOnFilter, + guestURL: test.URLErrorPanicOnFilter, pod: test.PodSmall, node: test.NodeSmall, expectedStatusCode: framework.Error, @@ -503,12 +531,12 @@ wasm stack trace: for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - guestPath := tc.guestPath - if guestPath == "" { - guestPath = test.PathTestFilter + guestURL := tc.guestURL + if guestURL == "" { + guestURL = test.URLTestFilter } - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args}) if err != nil { t.Fatal(err) } @@ -535,7 +563,7 @@ wasm stack trace: func TestPreScore(t *testing.T) { tests := []struct { name string - guestPath string + guestURL string args []string globals map[string]int32 pod *v1.Pod @@ -566,21 +594,21 @@ func TestPreScore(t *testing.T) { }, { name: "min statusCode", - guestPath: test.PathTestPreScoreFromGlobal, + guestURL: test.URLTestPreScoreFromGlobal, pod: test.PodSmall, globals: map[string]int32{"status_code": math.MinInt32}, expectedStatusCode: math.MinInt32, }, { name: "max statusCode", - guestPath: test.PathTestPreScoreFromGlobal, + guestURL: test.URLTestPreScoreFromGlobal, pod: test.PodSmall, globals: map[string]int32{"status_code": math.MaxInt32}, expectedStatusCode: math.MaxInt32, }, { name: "panic", - guestPath: test.PathErrorPanicOnPreScore, + guestURL: test.URLErrorPanicOnPreScore, pod: test.PodSmall, expectedStatusCode: framework.Error, expectedStatusMessage: `wasm: prescore error: panic! @@ -590,7 +618,7 @@ wasm stack trace: }, { name: "missing score", - guestPath: test.PathErrorPreScoreWithoutScore, + guestURL: test.URLErrorPreScoreWithoutScore, pod: test.PodSmall, expectedStatusCode: framework.Error, expectedError: `wasm: filter, score, reserve, permit or bind must be exported`, @@ -599,12 +627,12 @@ wasm stack trace: for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - guestPath := tc.guestPath - if guestPath == "" { - guestPath = test.PathTestScore + guestURL := tc.guestURL + if guestURL == "" { + guestURL = test.URLTestScore } - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args}) if tc.expectedError != "" { requireError(t, err, tc.expectedError) return @@ -633,7 +661,7 @@ wasm stack trace: func TestScore(t *testing.T) { tests := []struct { name string - guestPath string + guestURL string args []string globals map[string]int32 pod *v1.Pod @@ -660,7 +688,7 @@ func TestScore(t *testing.T) { }, { name: "most negative score", - guestPath: test.PathTestScoreFromGlobal, + guestURL: test.URLTestScoreFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, globals: map[string]int32{"score": math.MinInt32}, @@ -669,7 +697,7 @@ func TestScore(t *testing.T) { }, { name: "min score", - guestPath: test.PathTestScoreFromGlobal, + guestURL: test.URLTestScoreFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, globals: map[string]int32{"score": math.MinInt32}, @@ -678,7 +706,7 @@ func TestScore(t *testing.T) { }, { name: "max score", - guestPath: test.PathTestScoreFromGlobal, + guestURL: test.URLTestScoreFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, globals: map[string]int32{"score": math.MaxInt32}, @@ -687,7 +715,7 @@ func TestScore(t *testing.T) { }, { name: "min statusCode", - guestPath: test.PathTestScoreFromGlobal, + guestURL: test.URLTestScoreFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, globals: map[string]int32{"status_code": math.MinInt32}, @@ -696,7 +724,7 @@ func TestScore(t *testing.T) { }, { name: "max statusCode", - guestPath: test.PathTestScoreFromGlobal, + guestURL: test.URLTestScoreFromGlobal, pod: test.PodSmall, nodeName: test.NodeSmall.Name, globals: map[string]int32{"status_code": math.MaxInt32}, @@ -705,7 +733,7 @@ func TestScore(t *testing.T) { }, { name: "panic", - guestPath: test.PathErrorPanicOnScore, + guestURL: test.URLErrorPanicOnScore, pod: test.PodSmall, nodeName: test.NodeSmall.Name, expectedStatusCode: framework.Error, @@ -718,12 +746,12 @@ wasm stack trace: for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - guestPath := tc.guestPath - if guestPath == "" { - guestPath = test.PathTestScore + guestURL := tc.guestURL + if guestURL == "" { + guestURL = test.URLTestScore } - p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestPath: guestPath, Args: tc.args}) + p, err := wasm.NewFromConfig(ctx, wasm.PluginConfig{GuestURL: guestURL, Args: tc.args}) if err != nil { t.Fatal(err) } diff --git a/scheduler/test/testdata.go b/scheduler/test/testdata.go index 773b9a92..59996d21 100644 --- a/scheduler/test/testdata.go +++ b/scheduler/test/testdata.go @@ -16,43 +16,43 @@ import ( "k8s.io/kubectl/pkg/scheme" ) -var PathErrorNotPlugin = pathWatError("not_plugin") +var URLErrorNotPlugin = localURL(pathWatError("not_plugin")) -var PathErrorPanicOnGetConfig = pathWatError("panic_on_get_config") +var URLErrorPanicOnGetConfig = localURL(pathWatError("panic_on_get_config")) -var PathErrorPanicOnEnqueue = pathWatError("panic_on_enqueue") +var URLErrorPanicOnEnqueue = localURL(pathWatError("panic_on_enqueue")) -var PathErrorPanicOnPreFilter = pathWatError("panic_on_prefilter") +var URLErrorPanicOnPreFilter = localURL(pathWatError("panic_on_prefilter")) -var PathErrorPanicOnFilter = pathWatError("panic_on_filter") +var URLErrorPanicOnFilter = localURL(pathWatError("panic_on_filter")) -var PathErrorPanicOnPreScore = pathWatError("panic_on_prescore") +var URLErrorPanicOnPreScore = localURL(pathWatError("panic_on_prescore")) -var PathErrorPreScoreWithoutScore = pathWatError("prescore_without_score") +var URLErrorPreScoreWithoutScore = localURL(pathWatError("prescore_without_score")) -var PathErrorPanicOnScore = pathWatError("panic_on_score") +var URLErrorPanicOnScore = localURL(pathWatError("panic_on_score")) -var PathErrorPanicOnStart = pathWatError("panic_on_start") +var URLErrorPanicOnStart = localURL(pathWatError("panic_on_start")) -var PathExampleNodeNumber = pathTinyGoExample("nodenumber") +var URLExampleNodeNumber = localURL(pathTinyGoExample("nodenumber")) -var PathTestAll = pathTinyGoTest("all") +var URLTestAll = localURL(pathTinyGoTest("all")) -var PathTestAllNoopWat = pathWatTest("all_noop") +var URLTestAllNoopWat = localURL(pathWatTest("all_noop")) -var PathTestCycleState = pathTinyGoTest("cyclestate") +var URLTestCycleState = localURL(pathTinyGoTest("cyclestate")) -var PathTestPreFilterFromGlobal = pathWatTest("prefilter_from_global") +var URLTestPreFilterFromGlobal = localURL(pathWatTest("prefilter_from_global")) -var PathTestFilter = pathTinyGoTest("filter") +var URLTestFilter = localURL(pathTinyGoTest("filter")) -var PathTestFilterFromGlobal = pathWatTest("filter_from_global") +var URLTestFilterFromGlobal = localURL(pathWatTest("filter_from_global")) -var PathTestPreScoreFromGlobal = pathWatTest("prescore_from_global") +var URLTestPreScoreFromGlobal = localURL(pathWatTest("prescore_from_global")) -var PathTestScore = pathTinyGoTest("score") +var URLTestScore = localURL(pathTinyGoTest("score")) -var PathTestScoreFromGlobal = pathWatTest("score_from_global") +var URLTestScoreFromGlobal = localURL(pathWatTest("score_from_global")) //go:embed testdata/yaml/node.yaml var yamlNodeReal string @@ -64,7 +64,7 @@ var NodeReal = func() *v1.Node { return &node }() -// NodeSmall is the smallest node that works with PathExampleFilterSimple. +// NodeSmall is the smallest node that works with URLExampleFilterSimple. var NodeSmall = &v1.Node{ObjectMeta: apimeta.ObjectMeta{Name: "good-node"}} //go:embed testdata/yaml/pod.yaml @@ -77,7 +77,7 @@ var PodReal = func() *v1.Pod { return &pod }() -// PodSmall is the smallest pod that works with PathExampleFilterSimple. +// PodSmall is the smallest pod that works with URLExampleFilterSimple. var PodSmall = &v1.Pod{ ObjectMeta: apimeta.ObjectMeta{ Name: "good-pod", @@ -102,28 +102,33 @@ func decodeYaml[O apiruntime.Object](yaml string, object O) { } } +// localURL prefixes file:// to a given path. +func localURL(path string) string { + return "file://" + path +} + // pathTinyGoExample gets the absolute path to a given TinyGo example. func pathTinyGoExample(name string) string { - return relativePath(path.Join("..", "..", "examples", name, "main.wasm")) + return relativeURL(path.Join("..", "..", "examples", name, "main.wasm")) } // pathTinyGoTest gets the absolute path to a given TinyGo test. func pathTinyGoTest(name string) string { - return relativePath(path.Join("..", "..", "guest", "testdata", name, "main.wasm")) + return relativeURL(path.Join("..", "..", "guest", "testdata", name, "main.wasm")) } // pathWatError gets the absolute path wasm compiled from a %.wat source. func pathWatError(name string) string { - return relativePath(path.Join("testdata", "error", name+".wasm")) + return relativeURL(path.Join("testdata", "error", name+".wasm")) } // pathWatTest gets the absolute path wasm compiled from a %.wat source. func pathWatTest(name string) string { - return relativePath(path.Join("testdata", "test", name+".wasm")) + return relativeURL(path.Join("testdata", "test", name+".wasm")) } -// relativePath gets the absolute from this file. -func relativePath(fromThisFile string) string { +// relativeURL gets the absolute from this file. +func relativeURL(fromThisFile string) string { _, thisFile, _, ok := runtime.Caller(1) if !ok { panic("cannot determine current path") From af1930a5047dcceccd1674c586219c6f03d52e87 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Sat, 29 Jul 2023 17:37:51 +0200 Subject: [PATCH 2/3] Update scheduler/plugin/http.go Co-authored-by: Kensei Nakada --- scheduler/plugin/http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scheduler/plugin/http.go b/scheduler/plugin/http.go index 5a61895c..5e7e5cd5 100644 --- a/scheduler/plugin/http.go +++ b/scheduler/plugin/http.go @@ -29,17 +29,17 @@ type httpClient struct { c http.Client } -// newHTTPFetcher is a constructor for httpFetcher. +// newHTTPClient is a constructor for httpFetcher. // // It is possible to plug a custom http.RoundTripper to handle other concerns (e.g. retries) // Compression is handled transparently and automatically by http.Client. -func newHTTPCLient(transport http.RoundTripper) *httpClient { +func newHTTPClient(transport http.RoundTripper) *httpClient { return &httpClient{ c: http.Client{Transport: transport}, } } -// fetch returns a byte slice of the wasm module found at the given URL, or an error otherwise. +// get returns a byte slice of the wasm module found at the given URL, or an error otherwise. func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { From 00d385ff9b2f54c1c2527d0f3866884cd45fee38 Mon Sep 17 00:00:00 2001 From: Edoardo Vacchi Date: Sat, 29 Jul 2023 17:43:10 +0200 Subject: [PATCH 3/3] Apply suggestions Signed-off-by: Edoardo Vacchi --- scheduler/plugin/http.go | 7 ++++--- scheduler/plugin/http_test.go | 2 +- scheduler/plugin/plugin.go | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/scheduler/plugin/http.go b/scheduler/plugin/http.go index 5e7e5cd5..6f3b7c96 100644 --- a/scheduler/plugin/http.go +++ b/scheduler/plugin/http.go @@ -46,18 +46,19 @@ func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) { return nil, err } resp, err := f.c.Do(req.WithContext(ctx)) + defer func() { + io.Copy(io.Discard, resp.Body) //nolint + resp.Body.Close() + }() if err != nil { return nil, err } if resp.StatusCode != http.StatusOK { - io.Copy(io.Discard, resp.Body) //nolint - resp.Body.Close() return nil, fmt.Errorf("received %v status code from %q", resp.StatusCode, u) } bytes, err := io.ReadAll(resp.Body) - resp.Body.Close() if err != nil { return nil, err } diff --git a/scheduler/plugin/http_test.go b/scheduler/plugin/http_test.go index 281af907..e2d56365 100644 --- a/scheduler/plugin/http_test.go +++ b/scheduler/plugin/http_test.go @@ -71,7 +71,7 @@ func TestWasmHTTPFetch(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ts := httptest.NewServer(tc.handler) defer ts.Close() - c := newHTTPCLient(http.DefaultTransport) + c := newHTTPClient(http.DefaultTransport) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() parse, err := url.Parse(ts.URL) diff --git a/scheduler/plugin/plugin.go b/scheduler/plugin/plugin.go index 58c34a91..b08b63ae 100644 --- a/scheduler/plugin/plugin.go +++ b/scheduler/plugin/plugin.go @@ -88,7 +88,7 @@ func readFromURI(ctx context.Context, u string) ([]byte, error) { case "file": return os.ReadFile(uri.Path) case "http", "https": - c := newHTTPCLient(http.DefaultTransport) + c := newHTTPClient(http.DefaultTransport) return c.get(ctx, uri) default: return nil, fmt.Errorf("unsupported URL scheme: %s", uri.Scheme)