From 8bd7e07a852a4d0727e518563d300bdb5d92a34f Mon Sep 17 00:00:00 2001 From: Josh van Leeuwen Date: Thu, 11 Apr 2024 19:58:30 +0100 Subject: [PATCH] Subscriptions: Declarative Go Generic Loader (#7582) * Subscriptions: Declarative Go Generic Loader Following Components and HTTPEndpoints, creates a generic disk & kubernetes loader for Subscriptions. Removes the adhok loaders from existing runtime pubsub packages. This is required for adding Subscriptions to the hot reloader reconciler and making a more consistent & testable manifest loader package. The disk loader loads both `v1alpha1` and `v2alpha1` Subscriptions, converting `v1alpha1` to `v2alpha1` on successful load. Since the operator returns only `v2alpha1` Subscriptions (Kubernetes API converts `v1alpha1` to `v2alpha1`), the Kubernetes loader only loads `v2alpha1`. `APIVersion() string` func has been added to the generic `meta.Resource` interface type to allow for the generic loader to determine between resource versions, supporting differentiating `v1alpha1` and `v2alpha1` Subscription resource versions. To ensure backwards compatibility of the previous disk Subscription loader, the generic disk loader now tracks the order in which manifests are loaded. This ensures that, even though `v1alpha1` and `v2alpha1` Subscriptions use separate loaders, their file position order is preserved once `v1alpha1` Subscriptions are converted to `v2alpha1`. Subscription backwards compatibility of parsing & ingestion priority is covered by existing extensive integration tests. Notice that _ZERO_ of the comprehensive subscription integration tests have been modified, proving no behaviour change has occurred for loading and actuating Subscriptions. Explanation of integration test changes: - daprd/hotreload/selfhosted/crypto: loader now mandates `apiVersion` on disk manifests which is more correct. Previous behaviour ignoring this field should be considered a bug. Since we use Kubernetes resource API schema, `apiVersion` is (and has been) _always_ required. - framework/process/grpc/operator/server.go: returning non-nil gRPC object for mocked operator Subscription RPC call prevents underlying gRPC library unmarshal nil errors- more correct than previous implementation. Part of [Subscription hot-reloading](https://github.com/dapr/dapr/issues/7139). Signed-off-by: joshvanl * Update errS to patherrs to avoid confusion Signed-off-by: joshvanl * Updates sub import to use bare version string Signed-off-by: joshvanl * Remove unneeded code comment Signed-off-by: joshvanl * Change test manifests to have unique names Signed-off-by: joshvanl --------- Signed-off-by: joshvanl Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com> --- pkg/apis/components/v1alpha1/types.go | 4 + pkg/apis/httpEndpoint/v1alpha1/types.go | 4 + pkg/apis/subscriptions/v1alpha1/register.go | 4 +- pkg/apis/subscriptions/v1alpha1/types.go | 48 ++ pkg/apis/subscriptions/v2alpha1/register.go | 4 +- pkg/apis/subscriptions/v2alpha1/types.go | 48 ++ pkg/internal/apis/namevaluepair.go | 15 +- pkg/internal/loader/disk/components.go | 23 + pkg/internal/loader/disk/disk.go | 177 ++------ pkg/internal/loader/disk/disk_test.go | 135 +++++- pkg/internal/loader/disk/httpendpoints.go | 23 + pkg/internal/loader/disk/manifest.go | 202 +++++++++ pkg/internal/loader/disk/subscriptions.go | 63 +++ .../loader/kubernetes/components_test.go | 17 +- .../loader/kubernetes/subscriptions.go | 65 +++ pkg/operator/api/api.go | 2 +- pkg/runtime/hotreload/differ/differ.go | 4 +- pkg/runtime/hotreload/loader/disk/disk.go | 2 +- .../hotreload/loader/disk/resource_test.go | 6 +- pkg/runtime/meta/resource.go | 1 + pkg/runtime/processor/pubsub/pubsub_test.go | 50 +- pkg/runtime/processor/pubsub/subscribe.go | 62 ++- pkg/runtime/processor/wfbackend/wfbackend.go | 9 + pkg/runtime/pubsub/subscriptions.go | 216 +-------- pkg/runtime/pubsub/subscriptions_test.go | 426 +----------------- pkg/runtime/runtime.go | 9 +- pkg/runtime/wfengine/component.go | 10 - .../framework/process/grpc/operator/server.go | 4 +- .../daprd/hotreload/selfhosted/crypto.go | 3 + 29 files changed, 762 insertions(+), 874 deletions(-) create mode 100644 pkg/internal/loader/disk/components.go create mode 100644 pkg/internal/loader/disk/httpendpoints.go create mode 100644 pkg/internal/loader/disk/manifest.go create mode 100644 pkg/internal/loader/disk/subscriptions.go create mode 100644 pkg/internal/loader/kubernetes/subscriptions.go diff --git a/pkg/apis/components/v1alpha1/types.go b/pkg/apis/components/v1alpha1/types.go index 8f4c8c11d6c..23852a8ce4b 100644 --- a/pkg/apis/components/v1alpha1/types.go +++ b/pkg/apis/components/v1alpha1/types.go @@ -47,6 +47,10 @@ func (Component) Kind() string { return "Component" } +func (Component) APIVersion() string { + return components.GroupName + "/" + Version +} + // GetName returns the component name. func (c Component) GetName() string { return c.Name diff --git a/pkg/apis/httpEndpoint/v1alpha1/types.go b/pkg/apis/httpEndpoint/v1alpha1/types.go index f04979cf085..cfbb54d01e9 100644 --- a/pkg/apis/httpEndpoint/v1alpha1/types.go +++ b/pkg/apis/httpEndpoint/v1alpha1/types.go @@ -49,6 +49,10 @@ func (HTTPEndpoint) Kind() string { return kind } +func (HTTPEndpoint) APIVersion() string { + return httpendpoint.GroupName + "/" + Version +} + // GetName returns the component name. func (h HTTPEndpoint) GetName() string { return h.Name diff --git a/pkg/apis/subscriptions/v1alpha1/register.go b/pkg/apis/subscriptions/v1alpha1/register.go index 1375c50534d..a9c58e6732b 100644 --- a/pkg/apis/subscriptions/v1alpha1/register.go +++ b/pkg/apis/subscriptions/v1alpha1/register.go @@ -24,8 +24,8 @@ import ( // SchemeGroupVersion is group version used to register these objects. var SchemeGroupVersion = schema.GroupVersion{Group: subscriptions.GroupName, Version: "v1alpha1"} -// Kind takes an unqualified kind and returns back a Group qualified GroupKind. -func Kind(kind string) schema.GroupKind { +// GroupKindFromKind takes an unqualified kind and returns back a Group qualified GroupKind. +func GroupKindFromKind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } diff --git a/pkg/apis/subscriptions/v1alpha1/types.go b/pkg/apis/subscriptions/v1alpha1/types.go index d683ff59346..1a67e7609c2 100644 --- a/pkg/apis/subscriptions/v1alpha1/types.go +++ b/pkg/apis/subscriptions/v1alpha1/types.go @@ -15,6 +15,14 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/dapr/dapr/pkg/apis/common" + "github.com/dapr/dapr/pkg/apis/subscriptions" +) + +const ( + Kind = "Subscription" + Version = "v1alpha1" ) // +genclient @@ -59,3 +67,43 @@ type SubscriptionList struct { Items []Subscription `json:"items"` } + +func (Subscription) Kind() string { + return Kind +} + +func (Subscription) APIVersion() string { + return subscriptions.GroupName + "/" + Version +} + +// EmptyMetaDeepCopy returns a new instance of the subscription type with the +// TypeMeta's Kind and APIVersion fields set. +func (s Subscription) EmptyMetaDeepCopy() metav1.Object { + n := s.DeepCopy() + n.TypeMeta = metav1.TypeMeta{ + Kind: Kind, + APIVersion: subscriptions.GroupName + "/" + Version, + } + n.ObjectMeta = metav1.ObjectMeta{Name: s.Name} + return n +} + +func (s Subscription) GetName() string { + return s.Name +} + +func (s Subscription) GetNamespace() string { + return s.Namespace +} + +func (s Subscription) GetSecretStore() string { + return "" +} + +func (s Subscription) LogName() string { + return s.GetName() +} + +func (s Subscription) NameValuePairs() []common.NameValuePair { + return nil +} diff --git a/pkg/apis/subscriptions/v2alpha1/register.go b/pkg/apis/subscriptions/v2alpha1/register.go index 07f872b5e80..c50e171621d 100644 --- a/pkg/apis/subscriptions/v2alpha1/register.go +++ b/pkg/apis/subscriptions/v2alpha1/register.go @@ -24,8 +24,8 @@ import ( // SchemeGroupVersion is group version used to register these objects. var SchemeGroupVersion = schema.GroupVersion{Group: subscriptions.GroupName, Version: "v2alpha1"} -// Kind takes an unqualified kind and returns back a Group qualified GroupKind. -func Kind(kind string) schema.GroupKind { +// GroupKindFromKind takes an unqualified kind and returns back a Group qualified GroupKind. +func GroupKindFromKind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } diff --git a/pkg/apis/subscriptions/v2alpha1/types.go b/pkg/apis/subscriptions/v2alpha1/types.go index 91c4c3de149..01fd674a47d 100644 --- a/pkg/apis/subscriptions/v2alpha1/types.go +++ b/pkg/apis/subscriptions/v2alpha1/types.go @@ -15,6 +15,14 @@ package v2alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/dapr/dapr/pkg/apis/common" + "github.com/dapr/dapr/pkg/apis/subscriptions" +) + +const ( + Kind = "Subscription" + Version = "v2alpha1" ) // +genclient @@ -90,3 +98,43 @@ type SubscriptionList struct { Items []Subscription `json:"items"` } + +func (Subscription) Kind() string { + return Kind +} + +func (Subscription) APIVersion() string { + return subscriptions.GroupName + "/" + Version +} + +// EmptyMetaDeepCopy returns a new instance of the subscription type with the +// TypeMeta's Kind and APIVersion fields set. +func (s Subscription) EmptyMetaDeepCopy() metav1.Object { + n := s.DeepCopy() + n.TypeMeta = metav1.TypeMeta{ + Kind: Kind, + APIVersion: subscriptions.GroupName + "/" + Version, + } + n.ObjectMeta = metav1.ObjectMeta{Name: s.Name} + return n +} + +func (s Subscription) GetName() string { + return s.Name +} + +func (s Subscription) GetNamespace() string { + return s.Namespace +} + +func (s Subscription) GetSecretStore() string { + return "" +} + +func (s Subscription) LogName() string { + return s.GetName() +} + +func (s Subscription) NameValuePairs() []common.NameValuePair { + return nil +} diff --git a/pkg/internal/apis/namevaluepair.go b/pkg/internal/apis/namevaluepair.go index 80b82f1a093..7c0bc94a438 100644 --- a/pkg/internal/apis/namevaluepair.go +++ b/pkg/internal/apis/namevaluepair.go @@ -22,11 +22,12 @@ import ( ) type GenericNameValueResource struct { - Name string - Namespace string - SecretStore string - ResourceKind string - Pairs []common.NameValuePair + Name string + Namespace string + SecretStore string + ResourceKind string + ResourceAPIVersion string + Pairs []common.NameValuePair } func (g GenericNameValueResource) Kind() string { @@ -37,6 +38,10 @@ func (g GenericNameValueResource) GetName() string { return g.Name } +func (g GenericNameValueResource) APIVersion() string { + return g.ResourceAPIVersion +} + func (g GenericNameValueResource) GetNamespace() string { return g.Namespace } diff --git a/pkg/internal/loader/disk/components.go b/pkg/internal/loader/disk/components.go new file mode 100644 index 00000000000..5b3bbdef76d --- /dev/null +++ b/pkg/internal/loader/disk/components.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disk + +import ( + compapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1" + "github.com/dapr/dapr/pkg/internal/loader" +) + +func NewComponents(paths ...string) loader.Loader[compapi.Component] { + return new[compapi.Component](paths...) +} diff --git a/pkg/internal/loader/disk/disk.go b/pkg/internal/loader/disk/disk.go index 8525cf1762b..87058920840 100644 --- a/pkg/internal/loader/disk/disk.go +++ b/pkg/internal/loader/disk/disk.go @@ -14,58 +14,39 @@ limitations under the License. package disk import ( - "bufio" - "bytes" "context" "errors" "fmt" "os" - "path/filepath" - "strings" - "k8s.io/apimachinery/pkg/api/validation/path" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/yaml" - - "github.com/dapr/dapr/pkg/internal/loader" "github.com/dapr/dapr/pkg/runtime/meta" "github.com/dapr/dapr/pkg/security" - "github.com/dapr/kit/logger" - "github.com/dapr/kit/utils" ) -var log = logger.NewLogger("dapr.runtime.loader.disk") - -const yamlSeparator = "\n---" - // disk loads a specific manifest kind from a folder. type disk[T meta.Resource] struct { - kind string - paths []string - namespace string + kind string + apiVersion string + dirs []string + namespace string } -// New creates a new manifest loader for the given paths and kind. -func New[T meta.Resource](paths ...string) loader.Loader[T] { +// new creates a new manifest loader for the given paths and kind. +func new[T meta.Resource](dirs ...string) *disk[T] { var zero T return &disk[T]{ - paths: paths, - kind: zero.Kind(), - namespace: security.CurrentNamespace(), + dirs: dirs, + kind: zero.Kind(), + apiVersion: zero.APIVersion(), + namespace: security.CurrentNamespace(), } } // load loads manifests for the given directory. func (d *disk[T]) Load(context.Context) ([]T, error) { - var manifests []T - for _, path := range d.paths { - loaded, err := d.loadManifestsFromPath(path) - if err != nil { - return nil, err - } - if len(loaded) > 0 { - manifests = append(manifests, loaded...) - } + set, err := d.loadWithOrder() + if err != nil { + return nil, err } nsDefined := len(os.Getenv("NAMESPACE")) != 0 @@ -73,24 +54,24 @@ func (d *disk[T]) Load(context.Context) ([]T, error) { names := make(map[string]string) goodManifests := make([]T, 0) var errs []error - for i := range manifests { + for i := range set.ts { // If the process or manifest namespace are not defined, ignore the // manifest namespace. - ignoreNamespace := !nsDefined || len(manifests[i].GetNamespace()) == 0 + ignoreNamespace := !nsDefined || len(set.ts[i].GetNamespace()) == 0 // Ignore manifests that are not in the process security namespace. - if !ignoreNamespace && manifests[i].GetNamespace() != d.namespace { + if !ignoreNamespace && set.ts[i].GetNamespace() != d.namespace { continue } - if existing, ok := names[manifests[i].GetName()]; ok { + if existing, ok := names[set.ts[i].GetName()]; ok { errs = append(errs, fmt.Errorf("duplicate definition of %s name %s with existing %s", - manifests[i].Kind(), manifests[i].LogName(), existing)) + set.ts[i].Kind(), set.ts[i].LogName(), existing)) continue } - names[manifests[i].GetName()] = manifests[i].LogName() - goodManifests = append(goodManifests, manifests[i]) + names[set.ts[i].GetName()] = set.ts[i].LogName() + goodManifests = append(goodManifests, set.ts[i]) } if len(errs) > 0 { @@ -100,120 +81,14 @@ func (d *disk[T]) Load(context.Context) ([]T, error) { return goodManifests, nil } -func (d *disk[T]) loadManifestsFromPath(path string) ([]T, error) { - files, err := os.ReadDir(path) - if err != nil { - return nil, err - } - - manifests := make([]T, 0) - - for _, file := range files { - if !file.IsDir() { - fileName := file.Name() - if !utils.IsYaml(fileName) { - log.Warnf("A non-YAML %s file %s was detected, it will not be loaded", d.kind, fileName) - continue - } - fileManifests := d.loadManifestsFromFile(filepath.Join(path, fileName)) - manifests = append(manifests, fileManifests...) - } - } - - return manifests, nil -} - -func (d *disk[T]) loadManifestsFromFile(manifestPath string) []T { - var errors []error - - manifests := make([]T, 0) - b, err := os.ReadFile(manifestPath) - if err != nil { - log.Warnf("daprd load %s error when reading file %s: %v", d.kind, manifestPath, err) - return manifests - } - manifests, errors = d.decodeYaml(b) - for _, err := range errors { - log.Warnf("daprd load %s error when parsing manifests yaml resource in %s: %v", d.kind, manifestPath, err) - } - return manifests -} - -type typeInfo struct { - metav1.TypeMeta `json:",inline"` - metav1.ObjectMeta `json:"metadata,omitempty"` -} - -// decodeYaml decodes the yaml document. -func (d *disk[T]) decodeYaml(b []byte) ([]T, []error) { - list := make([]T, 0) - errors := []error{} - scanner := bufio.NewScanner(bytes.NewReader(b)) - scanner.Split(splitYamlDoc) - - for { - if !scanner.Scan() { - err := scanner.Err() - if err != nil { - errors = append(errors, err) - continue - } - - break - } - - scannerBytes := scanner.Bytes() - var ti typeInfo - if err := yaml.Unmarshal(scannerBytes, &ti); err != nil { - errors = append(errors, err) - continue - } - - if ti.Kind != d.kind { - continue - } - - if errs := path.IsValidPathSegmentName(ti.Name); len(errs) > 0 { - errors = append(errors, fmt.Errorf("invalid name %q for %q: %s", ti.Name, d.kind, strings.Join(errs, "; "))) - continue - } +func (d *disk[T]) loadWithOrder() (*manifestSet[T], error) { + set := &manifestSet[T]{d: d} - var manifest T - if err := yaml.Unmarshal(scannerBytes, &manifest); err != nil { - errors = append(errors, err) - continue + for _, dir := range d.dirs { + if err := set.loadManifestsFromDirectory(dir); err != nil { + return nil, err } - list = append(list, manifest) } - return list, errors -} - -// splitYamlDoc - splits the yaml docs. -func splitYamlDoc(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - sep := len([]byte(yamlSeparator)) - if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { - i += sep - after := data[i:] - - if len(after) == 0 { - if atEOF { - return len(data), data[:len(data)-sep], nil - } - return 0, nil, nil - } - if j := bytes.IndexByte(after, '\n'); j >= 0 { - return i + j + 1, data[0 : i-sep], nil - } - return 0, nil, nil - } - // If we're at EOF, we have a final, non-terminated line. Return it. - if atEOF { - return len(data), data, nil - } - // Request more data. - return 0, nil, nil + return set, nil } diff --git a/pkg/internal/loader/disk/disk_test.go b/pkg/internal/loader/disk/disk_test.go index 2d819deab52..f92426db99f 100644 --- a/pkg/internal/loader/disk/disk_test.go +++ b/pkg/internal/loader/disk/disk_test.go @@ -33,7 +33,7 @@ import ( func TestLoad(t *testing.T) { t.Run("valid yaml content", func(t *testing.T) { tmp := t.TempDir() - request := New[compapi.Component](tmp) + request := NewComponents(tmp) filename := "test-component-valid.yaml" yaml := ` apiVersion: dapr.io/v1alpha1 @@ -56,7 +56,7 @@ spec: t.Run("invalid yaml head", func(t *testing.T) { tmp := t.TempDir() - request := New[compapi.Component](tmp) + request := NewComponents(tmp) filename := "test-component-invalid.yaml" yaml := ` @@ -72,7 +72,7 @@ name: statestore` }) t.Run("load components file not exist", func(t *testing.T) { - request := New[compapi.Component]("test-path-no-exists") + request := NewComponents("test-path-no-exists") components, err := request.Load(context.Background()) require.Error(t, err) @@ -195,13 +195,136 @@ metadata: if test.namespace != nil { t.Setenv("NAMESPACE", *test.namespace) } - - loader := New[compapi.Component](tmp) + loader := NewComponents(tmp) components, err := loader.Load(context.Background()) - assert.Equal(t, test.expErr, err != nil, "%v", err) assert.Equal(t, test.expComps, components) }) } }) } + +func Test_loadWithOrder(t *testing.T) { + t.Run("no file should return empty set", func(t *testing.T) { + tmp := t.TempDir() + d := NewComponents(tmp).(*disk[compapi.Component]) + set, err := d.loadWithOrder() + require.NoError(t, err) + assert.Empty(t, set.order) + assert.Empty(t, set.ts) + }) + + t.Run("single manifest file should return", func(t *testing.T) { + tmp := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(tmp, "test-component.yaml"), []byte(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.couchbase +`), fs.FileMode(0o600))) + + d := NewComponents(tmp).(*disk[compapi.Component]) + set, err := d.loadWithOrder() + require.NoError(t, err) + assert.Equal(t, []manifestOrder{ + {dirIndex: 0, fileIndex: 0, manifestIndex: 0}, + }, set.order) + assert.Len(t, set.ts, 1) + }) + + t.Run("3 manifest file should have order set", func(t *testing.T) { + tmp := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Join(tmp, "test-component.yaml"), []byte(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo1 +spec: + type: state.couchbase +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo2 +spec: + type: state.couchbase +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo3 +spec: + type: state.couchbase +`), fs.FileMode(0o600))) + + d := NewComponents(tmp).(*disk[compapi.Component]) + set, err := d.loadWithOrder() + require.NoError(t, err) + assert.Equal(t, []manifestOrder{ + {dirIndex: 0, fileIndex: 0, manifestIndex: 0}, + {dirIndex: 0, fileIndex: 0, manifestIndex: 1}, + {dirIndex: 0, fileIndex: 0, manifestIndex: 2}, + }, set.order) + assert.Len(t, set.ts, 3) + }) + + t.Run("3 dirs, 3 files, 3 manifests should return order. Skips manifests of different type", func(t *testing.T) { + tmp1, tmp2, tmp3 := t.TempDir(), t.TempDir(), t.TempDir() + + for _, dir := range []string{tmp1, tmp2, tmp3} { + for _, file := range []string{"1.yaml", "2.yaml", "3.yaml"} { + require.NoError(t, os.WriteFile(filepath.Join(dir, file), []byte(` +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo1 +spec: + type: state.couchbase +--- +apiVersion: dapr.io/v1alpha1 +kind: Subscription +metadata: + name: foo2 +spec: + type: state.couchbase +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: foo3 +spec: + type: state.couchbase +`), fs.FileMode(0o600))) + } + } + + d := NewComponents(tmp1, tmp2, tmp3).(*disk[compapi.Component]) + set, err := d.loadWithOrder() + require.NoError(t, err) + assert.Equal(t, []manifestOrder{ + {dirIndex: 0, fileIndex: 0, manifestIndex: 0}, + {dirIndex: 0, fileIndex: 0, manifestIndex: 2}, + {dirIndex: 0, fileIndex: 1, manifestIndex: 0}, + {dirIndex: 0, fileIndex: 1, manifestIndex: 2}, + {dirIndex: 0, fileIndex: 2, manifestIndex: 0}, + {dirIndex: 0, fileIndex: 2, manifestIndex: 2}, + + {dirIndex: 1, fileIndex: 0, manifestIndex: 0}, + {dirIndex: 1, fileIndex: 0, manifestIndex: 2}, + {dirIndex: 1, fileIndex: 1, manifestIndex: 0}, + {dirIndex: 1, fileIndex: 1, manifestIndex: 2}, + {dirIndex: 1, fileIndex: 2, manifestIndex: 0}, + {dirIndex: 1, fileIndex: 2, manifestIndex: 2}, + + {dirIndex: 2, fileIndex: 0, manifestIndex: 0}, + {dirIndex: 2, fileIndex: 0, manifestIndex: 2}, + {dirIndex: 2, fileIndex: 1, manifestIndex: 0}, + {dirIndex: 2, fileIndex: 1, manifestIndex: 2}, + {dirIndex: 2, fileIndex: 2, manifestIndex: 0}, + {dirIndex: 2, fileIndex: 2, manifestIndex: 2}, + }, set.order) + assert.Len(t, set.ts, 18) + }) +} diff --git a/pkg/internal/loader/disk/httpendpoints.go b/pkg/internal/loader/disk/httpendpoints.go new file mode 100644 index 00000000000..b722e28347c --- /dev/null +++ b/pkg/internal/loader/disk/httpendpoints.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disk + +import ( + endpointapi "github.com/dapr/dapr/pkg/apis/httpEndpoint/v1alpha1" + "github.com/dapr/dapr/pkg/internal/loader" +) + +func NewHTTPEndpoints(paths ...string) loader.Loader[endpointapi.HTTPEndpoint] { + return new[endpointapi.HTTPEndpoint](paths...) +} diff --git a/pkg/internal/loader/disk/manifest.go b/pkg/internal/loader/disk/manifest.go new file mode 100644 index 00000000000..7255a3906d1 --- /dev/null +++ b/pkg/internal/loader/disk/manifest.go @@ -0,0 +1,202 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disk + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "k8s.io/apimachinery/pkg/api/validation/path" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/yaml" + + "github.com/dapr/dapr/pkg/runtime/meta" + "github.com/dapr/kit/logger" + "github.com/dapr/kit/utils" +) + +const yamlSeparator = "\n---" + +var log = logger.NewLogger("dapr.runtime.loader.disk") + +type manifestSet[T meta.Resource] struct { + d *disk[T] + + dirIndex int + fileIndex int + manifestIndex int + + order []manifestOrder + ts []T +} + +type manifestOrder struct { + dirIndex int + fileIndex int + manifestIndex int +} + +func (m *manifestSet[T]) loadManifestsFromDirectory(dir string) error { + defer func() { + m.dirIndex++ + }() + + files, err := os.ReadDir(dir) + if err != nil { + return err + } + + m.fileIndex = 0 + for _, file := range files { + if file.IsDir() { + continue + } + + fileName := file.Name() + if !utils.IsYaml(fileName) { + log.Warnf("A non-YAML %s file %s was detected, it will not be loaded", m.d.kind, fileName) + continue + } + + m.loadManifestsFromFile(filepath.Join(dir, fileName)) + } + + return nil +} + +func (m *manifestSet[T]) loadManifestsFromFile(path string) { + defer func() { + m.fileIndex++ + }() + + f, err := os.Open(path) + if err != nil { + log.Warnf("daprd load %s error when opening file %s: %v", m.d.kind, path, err) + return + } + defer f.Close() + + if err := m.decodeYaml(f); err != nil { + log.Warnf("daprd load %s error when parsing manifests yaml resource in %s: %v", m.d.kind, path, err) + } +} + +func (m *manifestSet[T]) decodeYaml(f io.Reader) error { + var errs []error + scanner := bufio.NewScanner(f) + scanner.Split(splitYamlDoc) + + m.manifestIndex = 0 + for { + m.manifestIndex++ + + if !scanner.Scan() { + err := scanner.Err() + if err != nil { + errs = append(errs, err) + continue + } + + break + } + + scannerBytes := scanner.Bytes() + var ti struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + } + if err := yaml.Unmarshal(scannerBytes, &ti); err != nil { + errs = append(errs, err) + continue + } + + if ti.APIVersion != m.d.apiVersion || ti.Kind != m.d.kind { + continue + } + + if patherrs := path.IsValidPathSegmentName(ti.Name); len(patherrs) > 0 { + errs = append(errs, fmt.Errorf("invalid name %q for %q: %s", ti.Name, m.d.kind, strings.Join(patherrs, "; "))) + continue + } + + var manifest T + if err := yaml.Unmarshal(scannerBytes, &manifest); err != nil { + errs = append(errs, err) + continue + } + + m.order = append(m.order, manifestOrder{ + dirIndex: m.dirIndex, + fileIndex: m.fileIndex, + manifestIndex: m.manifestIndex - 1, + }) + m.ts = append(m.ts, manifest) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil +} + +// splitYamlDoc - splits the yaml docs. +func splitYamlDoc(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + sep := len([]byte(yamlSeparator)) + if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { + i += sep + after := data[i:] + + if len(after) == 0 { + if atEOF { + return len(data), data[:len(data)-sep], nil + } + return 0, nil, nil + } + if j := bytes.IndexByte(after, '\n'); j >= 0 { + return i + j + 1, data[0 : i-sep], nil + } + return 0, nil, nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil +} + +func (m *manifestSet[T]) Len() int { + return len(m.order) +} + +func (m *manifestSet[T]) Swap(i, j int) { + m.order[i], m.order[j] = m.order[j], m.order[i] + m.ts[i], m.ts[j] = m.ts[j], m.ts[i] +} + +func (m *manifestSet[T]) Less(i, j int) bool { + return m.order[i].dirIndex < m.order[j].dirIndex || + m.order[i].fileIndex < m.order[j].fileIndex || + m.order[i].manifestIndex < m.order[j].manifestIndex +} diff --git a/pkg/internal/loader/disk/subscriptions.go b/pkg/internal/loader/disk/subscriptions.go new file mode 100644 index 00000000000..0a01829fdba --- /dev/null +++ b/pkg/internal/loader/disk/subscriptions.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package disk + +import ( + "context" + "sort" + + "github.com/dapr/dapr/pkg/apis/subscriptions/v1alpha1" + "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" + "github.com/dapr/dapr/pkg/internal/loader" +) + +type subscriptions struct { + v1 *disk[v1alpha1.Subscription] + v2 *disk[v2alpha1.Subscription] +} + +func NewSubscriptions(paths ...string) loader.Loader[v2alpha1.Subscription] { + return &subscriptions{ + v1: new[v1alpha1.Subscription](paths...), + v2: new[v2alpha1.Subscription](paths...), + } +} + +func (s *subscriptions) Load(context.Context) ([]v2alpha1.Subscription, error) { + v1, err := s.v1.loadWithOrder() + if err != nil { + return nil, err + } + + v2, err := s.v2.loadWithOrder() + if err != nil { + return nil, err + } + + v2.order = append(v2.order, v1.order...) + + for _, s := range v1.ts { + var subv2 v2alpha1.Subscription + if err := subv2.ConvertFrom(s.DeepCopy()); err != nil { + return nil, err + } + + v2.ts = append(v2.ts, subv2) + } + + // Preserve manifest load order between v1 and v2. + sort.Sort(v2) + + return v2.ts, nil +} diff --git a/pkg/internal/loader/kubernetes/components_test.go b/pkg/internal/loader/kubernetes/components_test.go index 6247bc2bbe8..2a7bac25636 100644 --- a/pkg/internal/loader/kubernetes/components_test.go +++ b/pkg/internal/loader/kubernetes/components_test.go @@ -27,7 +27,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/dapr/dapr/pkg/apis/components/v1alpha1" - subscriptions "github.com/dapr/dapr/pkg/apis/subscriptions/v1alpha1" + subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" config "github.com/dapr/dapr/pkg/config/modes" operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1" ) @@ -57,11 +57,11 @@ func (o *mockOperator) ListComponents(ctx context.Context, in *operatorv1pb.List } func (o *mockOperator) ListSubscriptionsV2(ctx context.Context, in *operatorv1pb.ListSubscriptionsRequest) (*operatorv1pb.ListSubscriptionsResponse, error) { - subscription := subscriptions.Subscription{} + subscription := subapi.Subscription{} subscription.ObjectMeta.Name = "test" - subscription.Spec = subscriptions.SubscriptionSpec{ + subscription.Spec = subapi.SubscriptionSpec{ Topic: "topic", - Route: "route", + Routes: subapi.Routes{Default: "route"}, Pubsubname: "pubsub", } b, _ := json.Marshal(&subscription) @@ -87,10 +87,15 @@ func TestLoadComponents(t *testing.T) { s := grpc.NewServer() operatorv1pb.RegisterOperatorServer(s, &mockOperator{}) - t.Cleanup(s.Stop) + errCh := make(chan error) + + t.Cleanup(func() { + s.Stop() + require.NoError(t, <-errCh) + }) go func() { - s.Serve(lis) + errCh <- s.Serve(lis) }() request := &components{ diff --git a/pkg/internal/loader/kubernetes/subscriptions.go b/pkg/internal/loader/kubernetes/subscriptions.go new file mode 100644 index 00000000000..298b8e32f74 --- /dev/null +++ b/pkg/internal/loader/kubernetes/subscriptions.go @@ -0,0 +1,65 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "context" + "encoding/json" + "fmt" + + grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry" + + subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" + "github.com/dapr/dapr/pkg/internal/loader" + operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1" +) + +// subscriptions loads subscriptions from a Kubernetes environment. +type subscriptions struct { + client operatorv1pb.OperatorClient + namespace string + podName string +} + +// NewSubscriptions returns a new Kubernetes loader. +func NewSubscriptions(opts Options) loader.Loader[subapi.Subscription] { + return &subscriptions{ + client: opts.Client, + namespace: opts.Namespace, + podName: opts.PodName, + } +} + +func (s *subscriptions) Load(ctx context.Context) ([]subapi.Subscription, error) { + resp, err := s.client.ListSubscriptionsV2(ctx, &operatorv1pb.ListSubscriptionsRequest{ + Namespace: s.namespace, + PodName: s.podName, + }, grpcretry.WithMax(operatorMaxRetries), grpcretry.WithPerRetryTimeout(operatorCallTimeout)) + if err != nil { + return nil, err + } + subs := resp.GetSubscriptions() + + subscriptions := make([]subapi.Subscription, len(subs)) + for i, s := range subs { + var subscription subapi.Subscription + if err := json.Unmarshal(s, &subscription); err != nil { + return nil, fmt.Errorf("error deserializing subscription: %s", err) + } + + subscriptions[i] = subscription + } + + return subscriptions, nil +} diff --git a/pkg/operator/api/api.go b/pkg/operator/api/api.go index 477a8a84fab..5d17bf2ef08 100644 --- a/pkg/operator/api/api.go +++ b/pkg/operator/api/api.go @@ -382,7 +382,7 @@ func (a *apiServer) ListSubscriptionsV2(ctx context.Context, in *operatorv1pb.Li } for i := range subsV2alpha1.Items { s := subsV2alpha1.Items[i] // Make a copy since we will refer to this as a reference in this loop. - if s.APIVersion != APIVersionV2alpha1 { + if s.APIVersion() != APIVersionV2alpha1 { continue } b, err := json.Marshal(&s) diff --git a/pkg/runtime/hotreload/differ/differ.go b/pkg/runtime/hotreload/differ/differ.go index 67a9f8af409..927271e8620 100644 --- a/pkg/runtime/hotreload/differ/differ.go +++ b/pkg/runtime/hotreload/differ/differ.go @@ -21,10 +21,10 @@ import ( componentsapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1" "github.com/dapr/dapr/pkg/components/secretstores" "github.com/dapr/dapr/pkg/runtime/meta" - "github.com/dapr/dapr/pkg/runtime/wfengine" + "github.com/dapr/dapr/pkg/runtime/processor/wfbackend" ) -var wfengineComp = wfengine.ComponentDefinition() +var wfengineComp = wfbackend.ComponentDefinition() // Resource is a generic type constraint. type Resource interface { diff --git a/pkg/runtime/hotreload/loader/disk/disk.go b/pkg/runtime/hotreload/loader/disk/disk.go index 332a830f6c9..585768035c4 100644 --- a/pkg/runtime/hotreload/loader/disk/disk.go +++ b/pkg/runtime/hotreload/loader/disk/disk.go @@ -59,7 +59,7 @@ func New(opts Options) (loader.Interface, error) { return &disk{ fs: fs, component: newResource[componentsapi.Component]( - loaderdisk.New[componentsapi.Component](opts.Dirs...), + loaderdisk.NewComponents(opts.Dirs...), store.NewComponent(opts.ComponentStore), updateCh, ), diff --git a/pkg/runtime/hotreload/loader/disk/resource_test.go b/pkg/runtime/hotreload/loader/disk/resource_test.go index db8da5a7dcc..c5221c5f949 100644 --- a/pkg/runtime/hotreload/loader/disk/resource_test.go +++ b/pkg/runtime/hotreload/loader/disk/resource_test.go @@ -151,7 +151,7 @@ func Test_Stream(t *testing.T) { updateCh := make(chan struct{}) r := newResource[componentsapi.Component]( - loaderdisk.New[componentsapi.Component](dir), + loaderdisk.NewComponents(dir), loadercompstore.NewComponent(store), updateCh, ) @@ -226,7 +226,7 @@ func Test_Stream(t *testing.T) { updateCh := make(chan struct{}) r := newResource[componentsapi.Component]( - loaderdisk.New[componentsapi.Component](dir), + loaderdisk.NewComponents(dir), loadercompstore.NewComponent(store), updateCh, ) @@ -302,7 +302,7 @@ func Test_Stream(t *testing.T) { updateCh := make(chan struct{}) r := newResource[componentsapi.Component]( - loaderdisk.New[componentsapi.Component](dir), + loaderdisk.NewComponents(dir), loadercompstore.NewComponent(store), updateCh, ) diff --git a/pkg/runtime/meta/resource.go b/pkg/runtime/meta/resource.go index 6e7836f6d00..502829c9bd1 100644 --- a/pkg/runtime/meta/resource.go +++ b/pkg/runtime/meta/resource.go @@ -23,6 +23,7 @@ import ( // resources. type Resource interface { Kind() string + APIVersion() string GetName() string GetNamespace() string LogName() string diff --git a/pkg/runtime/processor/pubsub/pubsub_test.go b/pkg/runtime/processor/pubsub/pubsub_test.go index 0d2512d4bff..f5bf837f38d 100644 --- a/pkg/runtime/processor/pubsub/pubsub_test.go +++ b/pkg/runtime/processor/pubsub/pubsub_test.go @@ -17,9 +17,9 @@ import ( "context" "encoding/base64" "encoding/json" - "fmt" "io" "os" + "path/filepath" "reflect" "sort" "strings" @@ -57,7 +57,6 @@ import ( const ( TestPubsubName = "testpubsub" TestSecondPubsubName = "testpubsub2" - resourcesDir = "./components" ) func TestInitPubSub(t *testing.T) { @@ -463,17 +462,13 @@ func TestInitPubSub(t *testing.T) { Channels: new(channels.Channels), }) - require.NoError(t, os.Mkdir(resourcesDir, 0o777)) - defer os.RemoveAll(resourcesDir) - s := testDeclarativeSubscription() - cleanup, err := writeComponentToDisk(s, "sub.yaml") - require.NoError(t, err) - defer cleanup() + resourcesDir := writeComponentToDisk(t, s) pst.resourcesPath = []string{resourcesDir} - subs := pst.declarativeSubscriptions(context.Background()) + subs, err := pst.declarativeSubscriptions(context.Background()) + require.NoError(t, err) if assert.Len(t, subs, 1) { assert.Equal(t, "topic1", subs[0].Topic) if assert.Len(t, subs[0].Rules, 1) { @@ -495,18 +490,14 @@ func TestInitPubSub(t *testing.T) { Channels: new(channels.Channels), }) - require.NoError(t, os.Mkdir(resourcesDir, 0o777)) - defer os.RemoveAll(resourcesDir) - s := testDeclarativeSubscription() s.Scopes = []string{TestRuntimeConfigID} - cleanup, err := writeComponentToDisk(s, "sub.yaml") - require.NoError(t, err) - defer cleanup() + resourcesDir := writeComponentToDisk(t, s) pst.resourcesPath = []string{resourcesDir} - subs := pst.declarativeSubscriptions(context.Background()) + subs, err := pst.declarativeSubscriptions(context.Background()) + require.NoError(t, err) if assert.Len(t, subs, 1) { assert.Equal(t, "topic1", subs[0].Topic) if assert.Len(t, subs[0].Rules, 1) { @@ -528,18 +519,14 @@ func TestInitPubSub(t *testing.T) { Channels: new(channels.Channels), }) - require.NoError(t, os.Mkdir(resourcesDir, 0o777)) - defer os.RemoveAll(resourcesDir) - s := testDeclarativeSubscription() s.Scopes = []string{"scope1"} - cleanup, err := writeComponentToDisk(s, "sub.yaml") - require.NoError(t, err) - defer cleanup() + resourcesDir := writeComponentToDisk(t, s) pst.resourcesPath = []string{resourcesDir} - subs := pst.declarativeSubscriptions(context.Background()) + subs, err := pst.declarativeSubscriptions(context.Background()) + require.NoError(t, err) assert.Empty(t, subs) }) @@ -1414,7 +1401,7 @@ func testDeclarativeSubscription() subscriptionsapi.Subscription { return subscriptionsapi.Subscription{ TypeMeta: metav1.TypeMeta{ Kind: "Subscription", - APIVersion: "v1alpha1", + APIVersion: "dapr.io/v1alpha1", }, Spec: subscriptionsapi.SubscriptionSpec{ Topic: "topic1", @@ -1424,16 +1411,13 @@ func testDeclarativeSubscription() subscriptionsapi.Subscription { } } -// writeComponentToDisk the given content into a file inside components directory. -func writeComponentToDisk(content any, fileName string) (cleanup func(), error error) { - filePath := fmt.Sprintf("%s/%s", resourcesDir, fileName) +func writeComponentToDisk(t *testing.T, content any) string { + dir := t.TempDir() + filePath := filepath.Join(dir, "comp.yaml") b, err := yaml.Marshal(content) - if err != nil { - return nil, err - } - return func() { - os.Remove(filePath) - }, os.WriteFile(filePath, b, 0o600) + require.NoError(t, err) + require.NoError(t, os.WriteFile(filePath, b, 0o600)) + return dir } func TestNamespacedPublisher(t *testing.T) { diff --git a/pkg/runtime/processor/pubsub/subscribe.go b/pkg/runtime/processor/pubsub/subscribe.go index c087aaae85f..cbbad6578a5 100644 --- a/pkg/runtime/processor/pubsub/subscribe.go +++ b/pkg/runtime/processor/pubsub/subscribe.go @@ -20,6 +20,10 @@ import ( "google.golang.org/grpc" + subapi "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" + "github.com/dapr/dapr/pkg/internal/loader" + "github.com/dapr/dapr/pkg/internal/loader/disk" + "github.com/dapr/dapr/pkg/internal/loader/kubernetes" "github.com/dapr/dapr/pkg/modes" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/pkg/runtime/compstore" @@ -174,7 +178,10 @@ func (p *pubsub) subscriptions(ctx context.Context) ([]rtpubsub.Subscription, er } // handle declarative subscriptions - ds := p.declarativeSubscriptions(ctx) + ds, err := p.declarativeSubscriptions(ctx) + if err != nil { + return nil, err + } for _, s := range ds { skip := false @@ -204,14 +211,53 @@ func (p *pubsub) subscriptions(ctx context.Context) ([]rtpubsub.Subscription, er // Refer for state store api decision // https://github.com/dapr/dapr/blob/master/docs/decision_records/api/API-008-multi-state-store-api-design.md -func (p *pubsub) declarativeSubscriptions(ctx context.Context) []rtpubsub.Subscription { - var subs []rtpubsub.Subscription - +func (p *pubsub) declarativeSubscriptions(ctx context.Context) ([]rtpubsub.Subscription, error) { + var loader loader.Loader[subapi.Subscription] switch p.mode { case modes.KubernetesMode: - subs = rtpubsub.DeclarativeKubernetes(ctx, p.operatorClient, p.podName, p.namespace, log) - case modes.StandaloneMode: - subs = rtpubsub.DeclarativeLocal(p.resourcesPath, p.namespace, log) + loader = kubernetes.NewSubscriptions(kubernetes.Options{ + Client: p.operatorClient, + Namespace: p.namespace, + PodName: p.podName, + }) + default: + loader = disk.NewSubscriptions(p.resourcesPath...) + } + + subsv2, err := loader.Load(ctx) + if err != nil { + return nil, err + } + + subs := make([]rtpubsub.Subscription, len(subsv2)) + + for i, subv2 := range subsv2 { + sub := rtpubsub.Subscription{ + PubsubName: subv2.Spec.Pubsubname, + Topic: subv2.Spec.Topic, + DeadLetterTopic: subv2.Spec.DeadLetterTopic, + Metadata: subv2.Spec.Metadata, + Scopes: subv2.Scopes, + BulkSubscribe: &rtpubsub.BulkSubscribe{ + Enabled: subv2.Spec.BulkSubscribe.Enabled, + MaxMessagesCount: subv2.Spec.BulkSubscribe.MaxMessagesCount, + MaxAwaitDurationMs: subv2.Spec.BulkSubscribe.MaxAwaitDurationMs, + }, + } + for _, rule := range subv2.Spec.Routes.Rules { + erule, err := rtpubsub.CreateRoutingRule(rule.Match, rule.Path) + if err != nil { + return nil, err + } + sub.Rules = append(sub.Rules, erule) + } + if len(subv2.Spec.Routes.Default) > 0 { + sub.Rules = append(sub.Rules, &rtpubsub.Rule{ + Path: subv2.Spec.Routes.Default, + }) + } + + subs[i] = sub } // only return valid subscriptions for this app id @@ -234,5 +280,5 @@ func (p *pubsub) declarativeSubscriptions(ctx context.Context) []rtpubsub.Subscr i++ } } - return subs[:i] + return subs[:i], nil } diff --git a/pkg/runtime/processor/wfbackend/wfbackend.go b/pkg/runtime/processor/wfbackend/wfbackend.go index 786c2778cd9..3ffff2fc974 100644 --- a/pkg/runtime/processor/wfbackend/wfbackend.go +++ b/pkg/runtime/processor/wfbackend/wfbackend.go @@ -20,6 +20,7 @@ import ( "time" "github.com/microsoft/durabletask-go/backend" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" compapi "github.com/dapr/dapr/pkg/apis/components/v1alpha1" wfbeComp "github.com/dapr/dapr/pkg/components/wfbackend" @@ -129,3 +130,11 @@ func (w *workflowBackend) Backend() (backend.Backend, bool) { } return w.backend, true } + +func ComponentDefinition() compapi.Component { + return compapi.Component{ + TypeMeta: metav1.TypeMeta{Kind: "Component"}, + ObjectMeta: metav1.ObjectMeta{Name: "dapr"}, + Spec: compapi.ComponentSpec{Type: "workflow.dapr", Version: "v1"}, + } +} diff --git a/pkg/runtime/pubsub/subscriptions.go b/pkg/runtime/pubsub/subscriptions.go index 6db1b7d4415..18391e12f5a 100644 --- a/pkg/runtime/pubsub/subscriptions.go +++ b/pkg/runtime/pubsub/subscriptions.go @@ -1,31 +1,22 @@ package pubsub import ( - "bytes" "context" "encoding/json" "fmt" "net/http" - "os" - "path/filepath" "strings" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/yaml" - subscriptionsapiV1alpha1 "github.com/dapr/dapr/pkg/apis/subscriptions/v1alpha1" - subscriptionsapiV2alpha1 "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" "github.com/dapr/dapr/pkg/channel" "github.com/dapr/dapr/pkg/expr" invokev1 "github.com/dapr/dapr/pkg/messaging/v1" - operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/pkg/resiliency" "github.com/dapr/kit/logger" - "github.com/dapr/kit/utils" ) const ( @@ -111,7 +102,7 @@ func GetSubscriptionsHTTP(ctx context.Context, channel channel.AppChannel, log l rules := make([]*Rule, len(si.Routes.Rules)+1) n := 0 for _, r := range si.Routes.Rules { - rule, err := createRoutingRule(r.Match, r.Path) + rule, err := CreateRoutingRule(r.Match, r.Path) if err != nil { return nil, err } @@ -229,170 +220,6 @@ func GetSubscriptionsGRPC(ctx context.Context, channel runtimev1pb.AppCallbackCl return subscriptions, nil } -// DeclarativeLocal loads subscriptions from the given local resources path. -func DeclarativeLocal(resourcesPaths []string, namespace string, log logger.Logger) (subs []Subscription) { - for _, path := range resourcesPaths { - res := declarativeFile(path, namespace, log) - if len(res) > 0 { - subs = append(subs, res...) - } - } - return subs -} - -// Used by DeclarativeLocal to load a single path. -func declarativeFile(resourcesPath string, namespace string, log logger.Logger) (subs []Subscription) { - if _, err := os.Stat(resourcesPath); os.IsNotExist(err) { - return subs - } - - files, err := os.ReadDir(resourcesPath) - if err != nil { - log.Errorf("Failed to read subscriptions from path %s: %s", resourcesPath, err) - return subs - } - - for _, f := range files { - if f.IsDir() { - continue - } - - if !utils.IsYaml(f.Name()) { - log.Warnf("A non-YAML pubsub file %s was detected, it will not be loaded", f.Name()) - continue - } - - filePath := filepath.Join(resourcesPath, f.Name()) - b, err := os.ReadFile(filePath) - if err != nil { - log.Warnf("Failed to read file %s: %v", f.Name(), err) - continue - } - - bytesArray := bytes.Split(b, []byte("\n---")) - for _, item := range bytesArray { - // Skip empty items - item = bytes.TrimSpace(item) - if len(item) == 0 { - continue - } - - subs, err = appendSubscription(subs, item, namespace) - if err != nil { - log.Warnf("Failed to add subscription from file %s: %v", f.Name(), err) - continue - } - } - } - - return subs -} - -func unmarshalSubscription(b []byte, namespace string) (*Subscription, error) { - // Parse only the type metadata first in order - // to filter out non-Subscriptions without other errors. - type typeInfo struct { - metav1.TypeMeta `json:",inline"` - } - - var ti typeInfo - if err := yaml.Unmarshal(b, &ti); err != nil { - return nil, err - } - - if ti.Kind != subscriptionKind { - return nil, nil - } - - switch ti.APIVersion { - case APIVersionV2alpha1: - // "v2alpha1" is the CRD that introduces pubsub routing. - var sub subscriptionsapiV2alpha1.Subscription - if err := yaml.Unmarshal(b, &sub); err != nil { - return nil, err - } - if namespace != "" && sub.Namespace != "" && sub.Namespace != namespace { - return nil, nil - } - - rules, err := parseRoutingRulesYAML(sub.Spec.Routes) - if err != nil { - return nil, err - } - - return &Subscription{ - Topic: sub.Spec.Topic, - PubsubName: sub.Spec.Pubsubname, - Rules: rules, - Metadata: sub.Spec.Metadata, - Scopes: sub.Scopes, - DeadLetterTopic: sub.Spec.DeadLetterTopic, - BulkSubscribe: &BulkSubscribe{ - Enabled: sub.Spec.BulkSubscribe.Enabled, - MaxMessagesCount: sub.Spec.BulkSubscribe.MaxMessagesCount, - MaxAwaitDurationMs: sub.Spec.BulkSubscribe.MaxAwaitDurationMs, - }, - }, nil - - default: - // assume "v1alpha1" for backward compatibility as this was - // not checked before the introduction of "v2alpha". - var sub subscriptionsapiV1alpha1.Subscription - if err := yaml.Unmarshal(b, &sub); err != nil { - return nil, err - } - if namespace != "" && sub.Namespace != "" && sub.Namespace != namespace { - return nil, nil - } - - return &Subscription{ - Topic: sub.Spec.Topic, - PubsubName: sub.Spec.Pubsubname, - Rules: []*Rule{ - { - Path: sub.Spec.Route, - }, - }, - Metadata: sub.Spec.Metadata, - Scopes: sub.Scopes, - DeadLetterTopic: sub.Spec.DeadLetterTopic, - BulkSubscribe: &BulkSubscribe{ - Enabled: sub.Spec.BulkSubscribe.Enabled, - MaxMessagesCount: sub.Spec.BulkSubscribe.MaxMessagesCount, - MaxAwaitDurationMs: sub.Spec.BulkSubscribe.MaxAwaitDurationMs, - }, - }, nil - } -} - -func parseRoutingRulesYAML(routes subscriptionsapiV2alpha1.Routes) ([]*Rule, error) { - r := make([]*Rule, len(routes.Rules)+1) - - var ( - n int - err error - ) - for _, rule := range routes.Rules { - r[n], err = createRoutingRule(rule.Match, rule.Path) - if err != nil { - return nil, err - } - n++ - } - - // If a default path is set, add a rule with a nil `Match`, - // which is treated as `true` and always selected if - // no previous rules match. - if routes.Default != "" { - r[n] = &Rule{ - Path: routes.Default, - } - n++ - } - - return r[:n], nil -} - func parseRoutingRulesGRPC(routes *runtimev1pb.TopicRoutes) ([]*Rule, error) { if routes == nil { return []*Rule{{ @@ -402,7 +229,7 @@ func parseRoutingRulesGRPC(routes *runtimev1pb.TopicRoutes) ([]*Rule, error) { r := make([]*Rule, 0, len(routes.GetRules())+1) for _, rule := range routes.GetRules() { - rr, err := createRoutingRule(rule.GetMatch(), rule.GetPath()) + rr, err := CreateRoutingRule(rule.GetMatch(), rule.GetPath()) if err != nil { return nil, err } @@ -429,7 +256,7 @@ func parseRoutingRulesGRPC(routes *runtimev1pb.TopicRoutes) ([]*Rule, error) { return r, nil } -func createRoutingRule(match, path string) (*Rule, error) { +func CreateRoutingRule(match, path string) (*Rule, error) { var e *expr.Expr matchTrimmed := strings.TrimSpace(match) if matchTrimmed != "" { @@ -444,40 +271,3 @@ func createRoutingRule(match, path string) (*Rule, error) { Path: path, }, nil } - -// DeclarativeKubernetes loads subscriptions from the operator when running in Kubernetes. -func DeclarativeKubernetes(ctx context.Context, client operatorv1pb.OperatorClient, podName string, namespace string, log logger.Logger) []Subscription { - var subs []Subscription - resp, err := client.ListSubscriptionsV2(ctx, &operatorv1pb.ListSubscriptionsRequest{ - PodName: podName, - Namespace: namespace, - }) - if err != nil { - log.Errorf("Failed to list subscriptions from operator: %s", err) - return subs - } - - for _, s := range resp.GetSubscriptions() { - // No namespace filtering here as it's been already filtered by the operator - subs, err = appendSubscription(subs, s, "") - if err != nil { - log.Warnf("Failed to add subscription from operator: %s", err) - continue - } - } - - return subs -} - -func appendSubscription(list []Subscription, subBytes []byte, namespace string) ([]Subscription, error) { - sub, err := unmarshalSubscription(subBytes, namespace) - if err != nil { - return nil, err - } - - if sub != nil { - list = append(list, *sub) - } - - return list, nil -} diff --git a/pkg/runtime/pubsub/subscriptions_test.go b/pkg/runtime/pubsub/subscriptions_test.go index a74088206ca..9a41c712770 100644 --- a/pkg/runtime/pubsub/subscriptions_test.go +++ b/pkg/runtime/pubsub/subscriptions_test.go @@ -1,31 +1,21 @@ package pubsub import ( - "bytes" "context" "encoding/json" "errors" - "fmt" - "os" - "path/filepath" - "strconv" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/yaml" - subscriptionsapiV1alpha1 "github.com/dapr/dapr/pkg/apis/subscriptions/v1alpha1" subscriptionsapiV2alpha1 "github.com/dapr/dapr/pkg/apis/subscriptions/v2alpha1" "github.com/dapr/dapr/pkg/channel" invokev1 "github.com/dapr/dapr/pkg/messaging/v1" - operatorv1pb "github.com/dapr/dapr/pkg/proto/operator/v1" runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1" "github.com/dapr/dapr/pkg/resiliency" "github.com/dapr/kit/logger" @@ -66,390 +56,6 @@ func TestFilterSubscriptions(t *testing.T) { } } -func testDeclarativeSubscriptionV1() subscriptionsapiV1alpha1.Subscription { - return subscriptionsapiV1alpha1.Subscription{ - TypeMeta: v1.TypeMeta{ - Kind: "Subscription", - APIVersion: APIVersionV1alpha1, - }, - Spec: subscriptionsapiV1alpha1.SubscriptionSpec{ - Pubsubname: "pubsub", - Topic: "topic1", - Metadata: map[string]string{ - "testName": "testValue", - }, - Route: "myroute", - }, - } -} - -func testDeclarativeSubscriptionV2() subscriptionsapiV2alpha1.Subscription { - return subscriptionsapiV2alpha1.Subscription{ - TypeMeta: v1.TypeMeta{ - Kind: "Subscription", - APIVersion: APIVersionV2alpha1, - }, - Spec: subscriptionsapiV2alpha1.SubscriptionSpec{ - Pubsubname: "pubsub", - Topic: "topic1", - Metadata: map[string]string{ - "testName": "testValue", - }, - Routes: subscriptionsapiV2alpha1.Routes{ - Rules: []subscriptionsapiV2alpha1.Rule{ - { - Match: `event.type == "myevent.v3"`, - Path: "myroute.v3", - }, - { - Match: `event.type == "myevent.v2"`, - Path: "myroute.v2", - }, - }, - Default: "myroute", - }, - }, - } -} - -func writeSubscriptionToDisk(subscription any, filePath string) { - b, _ := yaml.Marshal(subscription) - os.WriteFile(filePath, b, 0o600) -} - -func writeSubscriptionsToDisk(subscriptions []any, filePath string) { - byteArray := make([][]byte, len(subscriptions)) - for i, sub := range subscriptions { - if sub != nil { - byteArray[i], _ = yaml.Marshal(sub) - } else { - byteArray[i] = []byte{} - } - } - - b := bytes.Join(byteArray, []byte("\n---\n")) - os.WriteFile(filePath, b, 0o600) -} - -func TestDeclarativeSubscriptionsV1(t *testing.T) { - dir := filepath.Join(".", "components") - os.Mkdir(dir, 0o777) - defer os.RemoveAll(dir) - subscriptionCount := 5 - - t.Run("load single valid subscription", func(t *testing.T) { - s := testDeclarativeSubscriptionV1() - s.Scopes = []string{"scope1"} - - filePath := filepath.Join(dir, "sub.yaml") - writeSubscriptionToDisk(s, filePath) - defer os.RemoveAll(filePath) - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, 1) { - assert.Equal(t, "topic1", subs[0].Topic) - if assert.Len(t, subs[0].Rules, 1) { - assert.Equal(t, "myroute", subs[0].Rules[0].Path) - } - assert.Equal(t, "pubsub", subs[0].PubsubName) - assert.Equal(t, "scope1", subs[0].Scopes[0]) - assert.Equal(t, "testValue", subs[0].Metadata["testName"]) - } - }) - - t.Run("load multiple subscriptions in different files", func(t *testing.T) { - for i := 0; i < subscriptionCount; i++ { - s := testDeclarativeSubscriptionV1() - s.Spec.Topic = strconv.Itoa(i) - s.Spec.Route = strconv.Itoa(i) - s.Spec.Pubsubname = strconv.Itoa(i) - s.Spec.Metadata = map[string]string{ - "testName": strconv.Itoa(i), - } - s.Scopes = []string{strconv.Itoa(i)} - - filepath := fmt.Sprintf("%s/%v.yaml", dir, i) - writeSubscriptionToDisk(s, filepath) - defer os.RemoveAll(filepath) - } - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, subscriptionCount) { - for i := 0; i < subscriptionCount; i++ { - assert.Equal(t, strconv.Itoa(i), subs[i].Topic) - if assert.Len(t, subs[i].Rules, 1) { - assert.Equal(t, strconv.Itoa(i), subs[i].Rules[0].Path) - } - assert.Equal(t, strconv.Itoa(i), subs[i].PubsubName) - assert.Equal(t, strconv.Itoa(i), subs[i].Scopes[0]) - assert.Equal(t, strconv.Itoa(i), subs[i].Metadata["testName"]) - } - } - }) - - t.Run("load multiple subscriptions in single file", func(t *testing.T) { - subscriptions := []any{} - // Add an empty block at the beginning which will be ignored - subscriptions = append(subscriptions, nil) - - for i := 0; i < subscriptionCount; i++ { - s := testDeclarativeSubscriptionV1() - s.Spec.Topic = strconv.Itoa(i) - s.Spec.Route = strconv.Itoa(i) - s.Spec.Pubsubname = strconv.Itoa(i) - s.Spec.Metadata = map[string]string{ - "testName": strconv.Itoa(i), - } - s.Scopes = []string{strconv.Itoa(i)} - - subscriptions = append(subscriptions, s) - } - - filepath := filepath.Join(dir, "sub.yaml") - writeSubscriptionsToDisk(subscriptions, filepath) - defer os.RemoveAll(filepath) - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, subscriptionCount) { - for i := 0; i < subscriptionCount; i++ { - assert.Equal(t, strconv.Itoa(i), subs[i].Topic) - if assert.Len(t, subs[i].Rules, 1) { - assert.Equal(t, strconv.Itoa(i), subs[i].Rules[0].Path) - } - assert.Equal(t, strconv.Itoa(i), subs[i].PubsubName) - assert.Equal(t, strconv.Itoa(i), subs[i].Scopes[0]) - assert.Equal(t, strconv.Itoa(i), subs[i].Metadata["testName"]) - } - } - }) - - t.Run("filter subscriptions by namespace v1", func(t *testing.T) { - // Subscription v1 in namespace dev - s := testDeclarativeSubscriptionV1() - s.ObjectMeta.Namespace = "dev" - s.Spec.Topic = "dev" - path := filepath.Join(dir, "dev.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Subscription v1 in namespace prod - s = testDeclarativeSubscriptionV1() - s.ObjectMeta.Namespace = "prod" - s.Spec.Topic = "prod" - path = filepath.Join(dir, "prod.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Subscription v1 doesn't have a namespace - s = testDeclarativeSubscriptionV1() - s.ObjectMeta.Namespace = "" - s.Spec.Topic = "all" - path = filepath.Join(dir, "all.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Test function - loadAndReturnTopics := func(namespace string, expect []string) func(t *testing.T) { - return func(t *testing.T) { - res := []string{} - subs := DeclarativeLocal([]string{dir}, namespace, log) - for _, sub := range subs { - res = append(res, sub.Topic) - } - slices.Sort(res) - - require.Equal(t, expect, res) - } - } - - t.Run("load all subscriptions without a namespace specified", loadAndReturnTopics("", []string{"all", "dev", "prod"})) - t.Run("load subscriptions for dev namespace only", loadAndReturnTopics("dev", []string{"all", "dev"})) - t.Run("load subscriptions for prod namespace only", loadAndReturnTopics("prod", []string{"all", "prod"})) - }) - - t.Run("filter subscriptions by namespace v2", func(t *testing.T) { - // Subscription v2 in namespace dev - s := testDeclarativeSubscriptionV2() - s.ObjectMeta.Namespace = "dev" - s.Spec.Topic = "dev" - path := filepath.Join(dir, "dev.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Subscription v2 in namespace prod - s = testDeclarativeSubscriptionV2() - s.ObjectMeta.Namespace = "prod" - s.Spec.Topic = "prod" - path = filepath.Join(dir, "prod.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Subscription v2 doesn't have a namespace - s = testDeclarativeSubscriptionV2() - s.ObjectMeta.Namespace = "" - s.Spec.Topic = "all" - path = filepath.Join(dir, "all.yaml") - writeSubscriptionToDisk(s, path) - defer os.RemoveAll(path) - - // Test function - loadAndReturnTopics := func(namespace string, expect []string) func(t *testing.T) { - return func(t *testing.T) { - res := []string{} - subs := DeclarativeLocal([]string{dir}, namespace, log) - for _, sub := range subs { - res = append(res, sub.Topic) - } - slices.Sort(res) - - require.Equal(t, expect, res) - } - } - - t.Run("load all subscriptions without a namespace specified", loadAndReturnTopics("", []string{"all", "dev", "prod"})) - t.Run("load subscriptions for dev namespace only", loadAndReturnTopics("dev", []string{"all", "dev"})) - t.Run("load subscriptions for prod namespace only", loadAndReturnTopics("prod", []string{"all", "prod"})) - }) - - t.Run("will not load non yaml file", func(t *testing.T) { - s := testDeclarativeSubscriptionV1() - s.Scopes = []string{"scope1"} - - filePath := filepath.Join(dir, "sub.txt") - writeSubscriptionToDisk(s, filePath) - defer os.RemoveAll(filePath) - - subs := DeclarativeLocal([]string{dir}, "", log) - assert.Empty(t, subs) - }) - - t.Run("no subscriptions loaded", func(t *testing.T) { - os.RemoveAll(dir) - - s := testDeclarativeSubscriptionV1() - s.Scopes = []string{"scope1"} - - writeSubscriptionToDisk(s, dir) - - subs := DeclarativeLocal([]string{dir}, "", log) - assert.Empty(t, subs) - }) -} - -func TestDeclarativeSubscriptionsV2(t *testing.T) { - dir := filepath.Join(".", "componentsV2") - os.Mkdir(dir, 0o777) - defer os.RemoveAll(dir) - subscriptionCount := 5 - - t.Run("load single valid subscription", func(t *testing.T) { - s := testDeclarativeSubscriptionV2() - s.Scopes = []string{"scope1"} - - filePath := filepath.Join(dir, "sub.yaml") - writeSubscriptionToDisk(s, filePath) - defer os.RemoveAll(filePath) - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, 1) { - assert.Equal(t, "topic1", subs[0].Topic) - if assert.Len(t, subs[0].Rules, 3) { - assert.Equal(t, "myroute.v3", subs[0].Rules[0].Path) - assert.Equal(t, "myroute.v2", subs[0].Rules[1].Path) - assert.Equal(t, "myroute", subs[0].Rules[2].Path) - } - assert.Equal(t, "pubsub", subs[0].PubsubName) - assert.Equal(t, "scope1", subs[0].Scopes[0]) - assert.Equal(t, "testValue", subs[0].Metadata["testName"]) - } - }) - - t.Run("load multiple subscriptions in different files", func(t *testing.T) { - for i := 0; i < subscriptionCount; i++ { - iStr := strconv.Itoa(i) - s := testDeclarativeSubscriptionV2() - s.Spec.Topic = iStr - for j := range s.Spec.Routes.Rules { - s.Spec.Routes.Rules[j].Path = iStr - } - s.Spec.Routes.Default = iStr - s.Spec.Pubsubname = iStr - s.Spec.Metadata = map[string]string{ - "testName": iStr, - } - s.Scopes = []string{iStr} - - filePath := fmt.Sprintf("%s/%v.yaml", dir, i) - writeSubscriptionToDisk(s, filePath) - defer os.RemoveAll(filePath) - } - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, subscriptionCount) { - for i := 0; i < subscriptionCount; i++ { - iStr := strconv.Itoa(i) - assert.Equal(t, iStr, subs[i].Topic) - if assert.Len(t, subs[i].Rules, 3) { - assert.Equal(t, iStr, subs[i].Rules[0].Path) - } - assert.Equal(t, iStr, subs[i].PubsubName) - assert.Equal(t, iStr, subs[i].Scopes[0]) - assert.Equal(t, iStr, subs[i].Metadata["testName"]) - } - } - }) - - t.Run("load multiple subscriptions in single file", func(t *testing.T) { - subscriptions := []any{} - for i := 0; i < subscriptionCount; i++ { - iStr := strconv.Itoa(i) - s := testDeclarativeSubscriptionV2() - s.Spec.Topic = iStr - for j := range s.Spec.Routes.Rules { - s.Spec.Routes.Rules[j].Path = iStr - } - s.Spec.Routes.Default = iStr - s.Spec.Pubsubname = iStr - s.Spec.Metadata = map[string]string{ - "testName": iStr, - } - s.Scopes = []string{iStr} - - subscriptions = append(subscriptions, s) - } - - filepath := filepath.Join(dir, "sub.yaml") - writeSubscriptionsToDisk(subscriptions, filepath) - defer os.RemoveAll(filepath) - - subs := DeclarativeLocal([]string{dir}, "", log) - if assert.Len(t, subs, subscriptionCount) { - for i := 0; i < subscriptionCount; i++ { - iStr := strconv.Itoa(i) - assert.Equal(t, iStr, subs[i].Topic) - if assert.Len(t, subs[i].Rules, 3) { - assert.Equal(t, iStr, subs[i].Rules[0].Path) - } - assert.Equal(t, iStr, subs[i].PubsubName) - assert.Equal(t, iStr, subs[i].Scopes[0]) - assert.Equal(t, iStr, subs[i].Metadata["testName"]) - } - } - }) - - t.Run("no subscriptions loaded", func(t *testing.T) { - os.RemoveAll(dir) - - s := testDeclarativeSubscriptionV2() - s.Scopes = []string{"scope1"} - - writeSubscriptionToDisk(s, dir) - - subs := DeclarativeLocal([]string{dir}, "", log) - assert.Empty(t, subs) - }) -} - type mockUnstableHTTPSubscriptions struct { channel.AppChannel callCount int @@ -770,36 +376,6 @@ func TestGRPCSubscriptions(t *testing.T) { }) } -type mockK8sSubscriptions struct { - operatorv1pb.OperatorClient -} - -func (m *mockK8sSubscriptions) ListSubscriptionsV2(ctx context.Context, in *operatorv1pb.ListSubscriptionsRequest, opts ...grpc.CallOption) (*operatorv1pb.ListSubscriptionsResponse, error) { - v2 := testDeclarativeSubscriptionV2() - v2Bytes, err := yaml.Marshal(v2) - if err != nil { - return nil, err - } - return &operatorv1pb.ListSubscriptionsResponse{ - Subscriptions: [][]byte{v2Bytes}, - }, nil -} - -func TestK8sSubscriptions(t *testing.T) { - m := mockK8sSubscriptions{} - subs := DeclarativeKubernetes(context.TODO(), &m, "testPodName", "testNamespace", log) - if assert.Len(t, subs, 1) { - assert.Equal(t, "topic1", subs[0].Topic) - if assert.Len(t, subs[0].Rules, 3) { - assert.Equal(t, "myroute.v3", subs[0].Rules[0].Path) - assert.Equal(t, "myroute.v2", subs[0].Rules[1].Path) - assert.Equal(t, "myroute", subs[0].Rules[2].Path) - } - assert.Equal(t, "pubsub", subs[0].PubsubName) - assert.Equal(t, "testValue", subs[0].Metadata["testName"]) - } -} - func TestGetRuleMatchString(t *testing.T) { cases := []subscriptionsapiV2alpha1.Rule{ { @@ -817,7 +393,7 @@ func TestGetRuleMatchString(t *testing.T) { } for _, v := range cases { - rule, err := createRoutingRule(v.Match, v.Path) + rule, err := CreateRoutingRule(v.Match, v.Path) require.NoError(t, err) assert.Equal(t, v.Match, rule.Match.String()) } diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index b36d4713ea4..d80052ba95a 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -72,6 +72,7 @@ import ( "github.com/dapr/dapr/pkg/runtime/hotreload" "github.com/dapr/dapr/pkg/runtime/meta" "github.com/dapr/dapr/pkg/runtime/processor" + "github.com/dapr/dapr/pkg/runtime/processor/wfbackend" "github.com/dapr/dapr/pkg/runtime/processor/workflow" "github.com/dapr/dapr/pkg/runtime/registry" "github.com/dapr/dapr/pkg/runtime/wfengine" @@ -656,14 +657,14 @@ func (a *DaprRuntime) initWorkflowEngine(ctx context.Context) error { ComponentStore: a.compStore, Meta: a.meta, }) - if err := wfe.Init(ctx, wfengine.ComponentDefinition()); err != nil { + if err := wfe.Init(ctx, wfbackend.ComponentDefinition()); err != nil { return fmt.Errorf("failed to initialize Dapr workflow component: %w", err) } log.Info("Workflow engine initialized.") return a.runnerCloser.AddCloser(func() error { - return wfe.Close(wfengine.ComponentDefinition()) + return wfe.Close(wfbackend.ComponentDefinition()) }) } @@ -999,7 +1000,7 @@ func (a *DaprRuntime) loadComponents(ctx context.Context) error { PodName: a.podName, }) case modes.StandaloneMode: - loader = disk.New[compapi.Component](a.runtimeConfig.standalone.ResourcesPath...) + loader = disk.NewComponents(a.runtimeConfig.standalone.ResourcesPath...) default: return nil } @@ -1063,7 +1064,7 @@ func (a *DaprRuntime) loadHTTPEndpoints(ctx context.Context) error { PodName: a.podName, }) case modes.StandaloneMode: - loader = disk.New[endpointapi.HTTPEndpoint](a.runtimeConfig.standalone.ResourcesPath...) + loader = disk.NewHTTPEndpoints(a.runtimeConfig.standalone.ResourcesPath...) default: return nil } diff --git a/pkg/runtime/wfengine/component.go b/pkg/runtime/wfengine/component.go index 9d0ec5f2f0d..0d6b2c145bf 100644 --- a/pkg/runtime/wfengine/component.go +++ b/pkg/runtime/wfengine/component.go @@ -22,10 +22,8 @@ import ( "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/dapr/components-contrib/workflows" - componentsV1alpha1 "github.com/dapr/dapr/pkg/apis/components/v1alpha1" // This will be removed "github.com/dapr/kit/logger" ) @@ -246,11 +244,3 @@ func getStatusString(status int32) string { } return "UNKNOWN" } - -func ComponentDefinition() componentsV1alpha1.Component { - return componentsV1alpha1.Component{ - TypeMeta: metav1.TypeMeta{Kind: "Component"}, - ObjectMeta: metav1.ObjectMeta{Name: "dapr"}, - Spec: componentsV1alpha1.ComponentSpec{Type: "workflow.dapr", Version: "v1"}, - } -} diff --git a/tests/integration/framework/process/grpc/operator/server.go b/tests/integration/framework/process/grpc/operator/server.go index a484b18aed9..270199a1f44 100644 --- a/tests/integration/framework/process/grpc/operator/server.go +++ b/tests/integration/framework/process/grpc/operator/server.go @@ -86,12 +86,12 @@ func (s *server) ListSubscriptions(ctx context.Context, in *emptypb.Empty) (*ope if s.listSubscriptionsFn != nil { return s.listSubscriptionsFn(ctx, in) } - return nil, nil + return new(operatorv1.ListSubscriptionsResponse), nil } func (s *server) ListSubscriptionsV2(ctx context.Context, in *operatorv1.ListSubscriptionsRequest) (*operatorv1.ListSubscriptionsResponse, error) { if s.listSubscriptionsV2Fn != nil { return s.listSubscriptionsV2Fn(ctx, in) } - return nil, nil + return new(operatorv1.ListSubscriptionsResponse), nil } diff --git a/tests/integration/suite/daprd/hotreload/selfhosted/crypto.go b/tests/integration/suite/daprd/hotreload/selfhosted/crypto.go index 5341cb47313..6559d0e5a92 100644 --- a/tests/integration/suite/daprd/hotreload/selfhosted/crypto.go +++ b/tests/integration/suite/daprd/hotreload/selfhosted/crypto.go @@ -160,6 +160,7 @@ spec: - name: path value: '%[1]s' --- +apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: crypto2 @@ -191,6 +192,7 @@ spec: type: state.in-memory version: v1 --- +apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: crypto3 @@ -255,6 +257,7 @@ spec: t.Run("recreating crypto component should make it available again", func(t *testing.T) { require.NoError(t, os.WriteFile(filepath.Join(c.resDir, "1.yaml"), []byte(fmt.Sprintf(` +apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: crypto2