From aef91728e296ec8690b6fa1a6610f77c304d0ab2 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Thu, 28 Sep 2023 17:17:34 +0200 Subject: [PATCH] Support subject transforms on streams Signed-off-by: R.I.Pienaar --- ABTaskFile | 126 ++++++++++++++++++++ go.mod | 4 +- go.sum | 8 +- jetstream/resource_jetstream_stream.go | 42 ++++++- jetstream/resource_jetstream_stream_test.go | 108 ++++++++++++++++- jetstream/util.go | 11 ++ jetstream/util_test.go | 69 +++++++++++ 7 files changed, 358 insertions(+), 10 deletions(-) create mode 100644 ABTaskFile diff --git a/ABTaskFile b/ABTaskFile new file mode 100644 index 0000000..dc51c00 --- /dev/null +++ b/ABTaskFile @@ -0,0 +1,126 @@ +# Install https://choria-io.github.io/appbuilder/ and run `abt` to use this file + +name: build_tasks +description: NATS Developer Commands + +commands: + - name: dependencies + type: parent + description: Manage dependencies + aliases: [d] + commands: + - name: update + description: Update dependencies + type: exec + aliases: [up] + flags: + - name: verbose + description: Log verbosely + bool: true + script: | + . "{{ BashHelperPath }}" + + ab_announce Updating all dependencies + + go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./... + + go mod tidy + + - name: test + type: parent + aliases: [t] + description: Perform various tests + commands: + - name: unit + type: exec + description: Run unit tests + aliases: [u] + dir: "{{ AppDir }}" + environment: + - "TF_ACC=1" + script: go test -v --failfast -p=1 ./... + + - name: lint + type: exec + dir: "{{ AppDir }}" + flags: + - name: vet + description: Perform go vet + bool: true + default: true + - name: staticcheck + description: Perform staticcheck + bool: true + default: true + - name: update + description: Updates lint dependencies + bool: true + script: | + set -e + + . "{{ BashHelperPath }}" + + {{ if .Flags.update }} + ab_say Updating linting tools + go install github.com/client9/misspell/cmd/misspell@latest + go install honnef.co/go/tools/cmd/staticcheck@latest + {{ else }} + echo ">>> Run with --update to install required commands" + echo + {{ end }} + + ab_say Formatting source files + go fmt ./... + + ab_say Tidying go mod + go mod tidy + + ab_say Checking spelling + find . -type f -name "*.go" | xargs misspell -error -locale US + + {{ if .Flags.vet }} + ab_say Performing go vet + go vet ./... + {{ end }} + + {{ if .Flags.staticcheck }} + ab_say Running staticcheck + staticcheck ./... + {{ end }} + + - name: build + type: parent + aliases: [b] + description: Code build steps + commands: + - name: binary + aliases: [bin] + description: Build a basic test binary + type: exec + dir: "{{ TaskDir }}/nats" + banner: | + >>> + >>> Building 'nats' locally {{ if .Flags.target }}for target '{{ .Flags.target }}'{{ end }} + >>> + flags: + - name: target + description: Target platform to build for + enum: ["linux/amd64"] + short: T + script: | + {{ if eq .Flags.target "linux/amd64" }} + export GOOS=linux + export GOARCH=amd64 + {{ end }} + + go build -o nats + + ls -l nats + + - name: snapshot + description: Goreleaser snapshot + aliases: [snap] + type: exec + dir: "{{ TaskDir }}" + script: | + goreleaser release --snapshot --clean diff --git a/go.mod b/go.mod index ec3c86d..05a482a 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ go 1.20 require ( github.com/google/go-cmp v0.5.9 github.com/hashicorp/terraform-plugin-sdk v1.17.2 - github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8 + github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d github.com/nats-io/jwt/v2 v2.5.2 github.com/nats-io/nats-server/v2 v2.10.1 - github.com/nats-io/nats.go v1.30.0 + github.com/nats-io/nats.go v1.30.2 github.com/xeipuuv/gojsonschema v1.2.0 ) diff --git a/go.sum b/go.sum index 311335f..7cb00d5 100644 --- a/go.sum +++ b/go.sum @@ -494,14 +494,14 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE= github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8 h1:OKm9e1//rlcl4i9zXQ6QQxj7DJaeL+Oe8WBgAKO4cqI= -github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk= +github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d h1:W1vFAseJ8J2315SXVyIMCePKm1P7ucA2CDBubxSNHxg= +github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk= github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ= github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= -github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc= -github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= +github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/jetstream/resource_jetstream_stream.go b/jetstream/resource_jetstream_stream.go index 8fa0779..9907cbd 100644 --- a/jetstream/resource_jetstream_stream.go +++ b/jetstream/resource_jetstream_stream.go @@ -12,6 +12,19 @@ import ( ) func resourceStream() *schema.Resource { + subjectTransform := map[string]*schema.Schema{ + "source": { + Type: schema.TypeString, + Description: "The subject transform source", + Required: true, + }, + "destination": { + Type: schema.TypeString, + Description: "The subject transform destination", + Required: true, + }, + } + sourceInfo := map[string]*schema.Schema{ "name": { Type: schema.TypeString, @@ -34,6 +47,14 @@ func resourceStream() *schema.Resource { Description: "Only copy messages matching a specific subject, not usable for mirrors", Optional: true, }, + "subject_transform": { + Type: schema.TypeList, + Description: "The subject filtering sources and associated destination transforms", + Optional: true, + ForceNew: false, + Required: false, + Elem: &schema.Resource{Schema: subjectTransform}, + }, "external": { Type: schema.TypeList, MaxItems: 1, @@ -225,6 +246,15 @@ func resourceStream() *schema.Resource { Type: schema.TypeString, }, }, + "subject_transform": { + Type: schema.TypeList, + Description: "Subject transform to apply to matching messages", + MaxItems: 1, + ForceNew: false, + Required: false, + Optional: true, + Elem: &schema.Resource{Schema: subjectTransform}, + }, "mirror": { Type: schema.TypeList, Description: "Specifies a remote stream to mirror into this one", @@ -382,6 +412,10 @@ func resourceStreamRead(d *schema.ResourceData, m any) error { d.Set("mirror.0.external.api", mirror.External.ApiPrefix) d.Set("mirror.0.external.deliver", mirror.External.DeliverPrefix) } + for c, v := range mirror.SubjectTransforms { + d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.src", c), v.Source) + d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.dest", c), v.Destination) + } } if str.IsSourced() { @@ -393,8 +427,12 @@ func resourceStreamRead(d *schema.ResourceData, m any) error { d.Set(fmt.Sprintf("source.%d.start_time", i), source.OptStartTime.Format(time.RFC3339)) } if source.External != nil { - d.Set(fmt.Sprintf("mirror.%d.external.api", i), source.External.ApiPrefix) - d.Set(fmt.Sprintf("mirror.%d.external.deliver", i), source.External.DeliverPrefix) + d.Set(fmt.Sprintf("source.%d.external.api", i), source.External.ApiPrefix) + d.Set(fmt.Sprintf("source.%d.external.deliver", i), source.External.DeliverPrefix) + } + for c, v := range source.SubjectTransforms { + d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.src", i, c), v.Source) + d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.dest", i, c), v.Destination) } } } diff --git a/jetstream/resource_jetstream_stream_test.go b/jetstream/resource_jetstream_stream_test.go index 5574c93..a7b8fc1 100644 --- a/jetstream/resource_jetstream_stream_test.go +++ b/jetstream/resource_jetstream_stream_test.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/helper/resource" "github.com/nats-io/jsm.go" + "github.com/nats-io/jsm.go/api" "github.com/nats-io/nats.go" ) @@ -61,6 +62,74 @@ resource "jetstream_stream" "test" { } ` +const typeStreamConfigMirrorTransformed = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "other" { + name = "OTHER" + subjects = ["js.in.OTHER.>"] +} + +resource "jetstream_stream" "mirror_transform_test" { + name = "MIRROR_TRANSFORM_TEST" + description = "typeStreamConfigMirrorTransformed" + mirror { + name = "OTHER" + start_seq = 11 + + subject_transform { + source = "js.in.OTHER.1.>" + destination = "1.>" + } + + subject_transform { + source = "js.in.OTHER.2.>" + destination = "2.>" + } + } +} +` + +const typeStreamConfigSourcesTransformed = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "other1" { + name = "OTHER1" + subjects = ["js.in.OTHER1.>"] +} + +resource "jetstream_stream" "other2" { + name = "OTHER2" + subjects = ["js.in.OTHER2.>"] +} + +resource "jetstream_stream" "source_transform_test" { + name = "SOURCE_TRANSFORM_TEST" + description = "typeStreamConfigSourcesTransformed" + source { + name = "OTHER1" + + subject_transform { + source = "js.in.OTHER1.>" + destination = "1.>" + } + } + + source { + name = "OTHER2" + + subject_transform { + source = "js.in.OTHER2.>" + destination = "2.>" + } + } +} +` + const testStreamConfigSources = ` provider "jetstream" { servers = "%s" @@ -106,8 +175,12 @@ func TestResourceStream(t *testing.T) { } resource.Test(t, resource.TestCase{ - Providers: testJsProviders, - CheckDestroy: testStreamDoesNotExist(t, mgr, "TEST"), + Providers: testJsProviders, + CheckDestroy: resource.ComposeTestCheckFunc( + testStreamDoesNotExist(t, mgr, "TEST"), + testStreamDoesNotExist(t, mgr, "OTHER"), + testStreamDoesNotExist(t, mgr, "OTHER1"), + testStreamDoesNotExist(t, mgr, "OTHER2")), Steps: []resource.TestStep{ { Config: fmt.Sprintf(testStreamConfigBasic, nc.ConnectedUrl()), @@ -145,6 +218,37 @@ func TestResourceStream(t *testing.T) { testStreamHasSubjects(t, mgr, "TEST", []string{}), ), }, + { + Config: fmt.Sprintf(typeStreamConfigMirrorTransformed, nc.ConnectedUrl()), + Check: resource.ComposeTestCheckFunc( + testStreamExist(t, mgr, "MIRROR_TRANSFORM_TEST"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.name", "OTHER"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.start_seq", "11"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.source", "js.in.OTHER.1.>"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.destination", "1.>"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.source", "js.in.OTHER.2.>"), + resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.destination", "2.>"), + testStreamIsMirrorOf(t, mgr, "MIRROR_TRANSFORM_TEST", "OTHER"), + testStreamIsMirrorTransformed(t, mgr, "MIRROR_TRANSFORM_TEST", api.SubjectTransformConfig{Source: "js.in.OTHER.1.>", Destination: "1.>"}), + testStreamHasSubjects(t, mgr, "MIRROR_TRANSFORM_TEST", []string{}), + ), + }, + { + Config: fmt.Sprintf(typeStreamConfigSourcesTransformed, nc.ConnectedUrl()), + Check: resource.ComposeTestCheckFunc( + testStreamExist(t, mgr, "SOURCE_TRANSFORM_TEST"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.name", "OTHER1"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.source", "js.in.OTHER1.>"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.destination", "1.>"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.name", "OTHER2"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.source", "js.in.OTHER2.>"), + resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.destination", "2.>"), + testStreamIsSourceOf(t, mgr, "SOURCE_TRANSFORM_TEST", []string{"OTHER1", "OTHER2"}), + testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER1", api.SubjectTransformConfig{Source: "js.in.OTHER1.>", Destination: "1.>"}), + testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER2", api.SubjectTransformConfig{Source: "js.in.OTHER2.>", Destination: "2.>"}), + testStreamHasSubjects(t, mgr, "SOURCE_TRANSFORM_TEST", []string{}), + ), + }, { Config: fmt.Sprintf(testStreamConfigSources, nc.ConnectedUrl()), Check: resource.ComposeTestCheckFunc( diff --git a/jetstream/util.go b/jetstream/util.go index 0747b3b..0bff1e2 100644 --- a/jetstream/util.go +++ b/jetstream/util.go @@ -112,6 +112,17 @@ func streamSourceFromResourceData(d any) ([]*api.StreamSource, error) { } } + transforms := s["subject_transform"].([]any) + if len(transforms) > 0 { + for _, transform := range transforms { + st := transform.(map[string]any) + source.SubjectTransforms = append(source.SubjectTransforms, api.SubjectTransformConfig{ + Source: st["source"].(string), + Destination: st["destination"].(string), + }) + } + } + res = append(res, source) } diff --git a/jetstream/util_test.go b/jetstream/util_test.go index 58cfe63..38fa6f3 100644 --- a/jetstream/util_test.go +++ b/jetstream/util_test.go @@ -59,6 +59,75 @@ func testStreamHasSubjects(t *testing.T, mgr *jsm.Manager, stream string, subjec } } +func testStreamIsMirrorTransformed(t *testing.T, mgr *jsm.Manager, stream string, transforms ...api.SubjectTransformConfig) resource.TestCheckFunc { + return func(s *terraform.State) error { + str, err := mgr.LoadStream(stream) + if err != nil { + return err + } + + if !str.IsMirror() { + return fmt.Errorf("stream is not a mirror") + } + + mirror := str.Mirror() + + for i, trans := range transforms { + if len(mirror.SubjectTransforms) == 0 || len(mirror.SubjectTransforms) < i { + return fmt.Errorf("transform %v does not match %v", mirror.SubjectTransforms, trans) + } + if mirror.SubjectTransforms[i].Source != trans.Source { + return fmt.Errorf("transform %v does not match %v", mirror.SubjectTransforms[i], trans) + } + if mirror.SubjectTransforms[i].Destination != trans.Destination { + return fmt.Errorf("transform %v does not match %v", mirror.SubjectTransforms[i], trans) + } + } + + return nil + } +} + +func testStreamIsSourceTransformed(t *testing.T, mgr *jsm.Manager, stream string, sourceName string, transforms ...api.SubjectTransformConfig) resource.TestCheckFunc { + return func(s *terraform.State) error { + str, err := mgr.LoadStream(stream) + if err != nil { + return err + } + + if !str.IsSourced() { + return fmt.Errorf("stream is not sourced") + } + + var source *api.StreamSource + + for _, s := range str.Sources() { + if s.Name == sourceName { + source = s + break + } + } + + if source == nil { + return fmt.Errorf("source not found") + } + + for i, trans := range transforms { + if len(source.SubjectTransforms) == 0 || len(source.SubjectTransforms) < i { + return fmt.Errorf("transform %v does not match %v", source.SubjectTransforms, trans) + } + if source.SubjectTransforms[i].Source != trans.Source { + return fmt.Errorf("transform %v does not match %v", source.SubjectTransforms[i], trans) + } + if source.SubjectTransforms[i].Destination != trans.Destination { + return fmt.Errorf("transform %v does not match %v", source.SubjectTransforms[i], trans) + } + } + + return nil + } +} + func testStreamIsSourceOf(t *testing.T, mgr *jsm.Manager, stream string, sources []string) resource.TestCheckFunc { return func(s *terraform.State) error { str, err := mgr.LoadStream(stream)