Skip to content

Commit

Permalink
Support subject transforms on streams
Browse files Browse the repository at this point in the history
Signed-off-by: R.I.Pienaar <[email protected]>
  • Loading branch information
ripienaar committed Sep 28, 2023
1 parent ebe784e commit aef9172
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 10 deletions.
126 changes: 126 additions & 0 deletions ABTaskFile
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Install https://choria-io.github.io/appbuilder/ and run `abt` to use this file

name: build_tasks
description: NATS Developer Commands

commands:
- name: dependencies
type: parent
description: Manage dependencies
aliases: [d]
commands:
- name: update
description: Update dependencies
type: exec
aliases: [up]
flags:
- name: verbose
description: Log verbosely
bool: true
script: |
. "{{ BashHelperPath }}"

ab_announce Updating all dependencies

go get -u -n -a -t {{- if .Flags.verbose }} -d -x {{ end }} ./...

go mod tidy

- name: test
type: parent
aliases: [t]
description: Perform various tests
commands:
- name: unit
type: exec
description: Run unit tests
aliases: [u]
dir: "{{ AppDir }}"
environment:
- "TF_ACC=1"
script: go test -v --failfast -p=1 ./...

- name: lint
type: exec
dir: "{{ AppDir }}"
flags:
- name: vet
description: Perform go vet
bool: true
default: true
- name: staticcheck
description: Perform staticcheck
bool: true
default: true
- name: update
description: Updates lint dependencies
bool: true
script: |
set -e

. "{{ BashHelperPath }}"

{{ if .Flags.update }}
ab_say Updating linting tools
go install github.com/client9/misspell/cmd/misspell@latest
go install honnef.co/go/tools/cmd/staticcheck@latest
{{ else }}
echo ">>> Run with --update to install required commands"
echo
{{ end }}

ab_say Formatting source files
go fmt ./...

ab_say Tidying go mod
go mod tidy

ab_say Checking spelling
find . -type f -name "*.go" | xargs misspell -error -locale US

{{ if .Flags.vet }}
ab_say Performing go vet
go vet ./...
{{ end }}

{{ if .Flags.staticcheck }}
ab_say Running staticcheck
staticcheck ./...
{{ end }}

- name: build
type: parent
aliases: [b]
description: Code build steps
commands:
- name: binary
aliases: [bin]
description: Build a basic test binary
type: exec
dir: "{{ TaskDir }}/nats"
banner: |
>>>
>>> Building 'nats' locally {{ if .Flags.target }}for target '{{ .Flags.target }}'{{ end }}
>>>
flags:
- name: target
description: Target platform to build for
enum: ["linux/amd64"]
short: T
script: |
{{ if eq .Flags.target "linux/amd64" }}
export GOOS=linux
export GOARCH=amd64
{{ end }}

go build -o nats

ls -l nats

- name: snapshot
description: Goreleaser snapshot
aliases: [snap]
type: exec
dir: "{{ TaskDir }}"
script: |
goreleaser release --snapshot --clean
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ go 1.20
require (
github.com/google/go-cmp v0.5.9
github.com/hashicorp/terraform-plugin-sdk v1.17.2
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d
github.com/nats-io/jwt/v2 v2.5.2
github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nats.go v1.30.0
github.com/nats-io/nats.go v1.30.2
github.com/xeipuuv/gojsonschema v1.2.0
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -494,14 +494,14 @@ github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.1 h1:FVzMWA5RllMAKIdUSC8mdWo3XtwoecrH79BY70sEEpE=
github.com/mitchellh/reflectwalk v1.0.1/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8 h1:OKm9e1//rlcl4i9zXQ6QQxj7DJaeL+Oe8WBgAKO4cqI=
github.com/nats-io/jsm.go v0.1.1-0.20230921074448-1bbb5650afc8/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk=
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d h1:W1vFAseJ8J2315SXVyIMCePKm1P7ucA2CDBubxSNHxg=
github.com/nats-io/jsm.go v0.1.1-0.20230926103807-a54fd61d399d/go.mod h1:hB4Qd+IKoRvAAWTOI1HkCy4wotjFwOIT+codHCFOZqk=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.30.0 h1:bj/rVsRCrFXxmm9mJiDhb74UKl2HhKpDwKRBtvCjZjc=
github.com/nats-io/nats.go v1.30.0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
42 changes: 40 additions & 2 deletions jetstream/resource_jetstream_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ import (
)

func resourceStream() *schema.Resource {
subjectTransform := map[string]*schema.Schema{
"source": {
Type: schema.TypeString,
Description: "The subject transform source",
Required: true,
},
"destination": {
Type: schema.TypeString,
Description: "The subject transform destination",
Required: true,
},
}

sourceInfo := map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Expand All @@ -34,6 +47,14 @@ func resourceStream() *schema.Resource {
Description: "Only copy messages matching a specific subject, not usable for mirrors",
Optional: true,
},
"subject_transform": {
Type: schema.TypeList,
Description: "The subject filtering sources and associated destination transforms",
Optional: true,
ForceNew: false,
Required: false,
Elem: &schema.Resource{Schema: subjectTransform},
},
"external": {
Type: schema.TypeList,
MaxItems: 1,
Expand Down Expand Up @@ -225,6 +246,15 @@ func resourceStream() *schema.Resource {
Type: schema.TypeString,
},
},
"subject_transform": {
Type: schema.TypeList,
Description: "Subject transform to apply to matching messages",
MaxItems: 1,
ForceNew: false,
Required: false,
Optional: true,
Elem: &schema.Resource{Schema: subjectTransform},
},
"mirror": {
Type: schema.TypeList,
Description: "Specifies a remote stream to mirror into this one",
Expand Down Expand Up @@ -382,6 +412,10 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
d.Set("mirror.0.external.api", mirror.External.ApiPrefix)
d.Set("mirror.0.external.deliver", mirror.External.DeliverPrefix)
}
for c, v := range mirror.SubjectTransforms {
d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.src", c), v.Source)
d.Set(fmt.Sprintf("mirror.0.subject_transforms.%d.dest", c), v.Destination)
}
}

if str.IsSourced() {
Expand All @@ -393,8 +427,12 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
d.Set(fmt.Sprintf("source.%d.start_time", i), source.OptStartTime.Format(time.RFC3339))
}
if source.External != nil {
d.Set(fmt.Sprintf("mirror.%d.external.api", i), source.External.ApiPrefix)
d.Set(fmt.Sprintf("mirror.%d.external.deliver", i), source.External.DeliverPrefix)
d.Set(fmt.Sprintf("source.%d.external.api", i), source.External.ApiPrefix)
d.Set(fmt.Sprintf("source.%d.external.deliver", i), source.External.DeliverPrefix)
}
for c, v := range source.SubjectTransforms {
d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.src", i, c), v.Source)
d.Set(fmt.Sprintf("source.%d.subject_transforms.%d.dest", i, c), v.Destination)
}
}
}
Expand Down
108 changes: 106 additions & 2 deletions jetstream/resource_jetstream_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/nats.go"
)

Expand Down Expand Up @@ -61,6 +62,74 @@ resource "jetstream_stream" "test" {
}
`

const typeStreamConfigMirrorTransformed = `
provider "jetstream" {
servers = "%s"
}
resource "jetstream_stream" "other" {
name = "OTHER"
subjects = ["js.in.OTHER.>"]
}
resource "jetstream_stream" "mirror_transform_test" {
name = "MIRROR_TRANSFORM_TEST"
description = "typeStreamConfigMirrorTransformed"
mirror {
name = "OTHER"
start_seq = 11
subject_transform {
source = "js.in.OTHER.1.>"
destination = "1.>"
}
subject_transform {
source = "js.in.OTHER.2.>"
destination = "2.>"
}
}
}
`

const typeStreamConfigSourcesTransformed = `
provider "jetstream" {
servers = "%s"
}
resource "jetstream_stream" "other1" {
name = "OTHER1"
subjects = ["js.in.OTHER1.>"]
}
resource "jetstream_stream" "other2" {
name = "OTHER2"
subjects = ["js.in.OTHER2.>"]
}
resource "jetstream_stream" "source_transform_test" {
name = "SOURCE_TRANSFORM_TEST"
description = "typeStreamConfigSourcesTransformed"
source {
name = "OTHER1"
subject_transform {
source = "js.in.OTHER1.>"
destination = "1.>"
}
}
source {
name = "OTHER2"
subject_transform {
source = "js.in.OTHER2.>"
destination = "2.>"
}
}
}
`

const testStreamConfigSources = `
provider "jetstream" {
servers = "%s"
Expand Down Expand Up @@ -106,8 +175,12 @@ func TestResourceStream(t *testing.T) {
}

resource.Test(t, resource.TestCase{
Providers: testJsProviders,
CheckDestroy: testStreamDoesNotExist(t, mgr, "TEST"),
Providers: testJsProviders,
CheckDestroy: resource.ComposeTestCheckFunc(
testStreamDoesNotExist(t, mgr, "TEST"),
testStreamDoesNotExist(t, mgr, "OTHER"),
testStreamDoesNotExist(t, mgr, "OTHER1"),
testStreamDoesNotExist(t, mgr, "OTHER2")),
Steps: []resource.TestStep{
{
Config: fmt.Sprintf(testStreamConfigBasic, nc.ConnectedUrl()),
Expand Down Expand Up @@ -145,6 +218,37 @@ func TestResourceStream(t *testing.T) {
testStreamHasSubjects(t, mgr, "TEST", []string{}),
),
},
{
Config: fmt.Sprintf(typeStreamConfigMirrorTransformed, nc.ConnectedUrl()),
Check: resource.ComposeTestCheckFunc(
testStreamExist(t, mgr, "MIRROR_TRANSFORM_TEST"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.name", "OTHER"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.start_seq", "11"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.source", "js.in.OTHER.1.>"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.0.destination", "1.>"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.source", "js.in.OTHER.2.>"),
resource.TestCheckResourceAttr("jetstream_stream.mirror_transform_test", "mirror.0.subject_transform.1.destination", "2.>"),
testStreamIsMirrorOf(t, mgr, "MIRROR_TRANSFORM_TEST", "OTHER"),
testStreamIsMirrorTransformed(t, mgr, "MIRROR_TRANSFORM_TEST", api.SubjectTransformConfig{Source: "js.in.OTHER.1.>", Destination: "1.>"}),
testStreamHasSubjects(t, mgr, "MIRROR_TRANSFORM_TEST", []string{}),
),
},
{
Config: fmt.Sprintf(typeStreamConfigSourcesTransformed, nc.ConnectedUrl()),
Check: resource.ComposeTestCheckFunc(
testStreamExist(t, mgr, "SOURCE_TRANSFORM_TEST"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.name", "OTHER1"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.source", "js.in.OTHER1.>"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.0.subject_transform.0.destination", "1.>"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.name", "OTHER2"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.source", "js.in.OTHER2.>"),
resource.TestCheckResourceAttr("jetstream_stream.source_transform_test", "source.1.subject_transform.0.destination", "2.>"),
testStreamIsSourceOf(t, mgr, "SOURCE_TRANSFORM_TEST", []string{"OTHER1", "OTHER2"}),
testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER1", api.SubjectTransformConfig{Source: "js.in.OTHER1.>", Destination: "1.>"}),
testStreamIsSourceTransformed(t, mgr, "SOURCE_TRANSFORM_TEST", "OTHER2", api.SubjectTransformConfig{Source: "js.in.OTHER2.>", Destination: "2.>"}),
testStreamHasSubjects(t, mgr, "SOURCE_TRANSFORM_TEST", []string{}),
),
},
{
Config: fmt.Sprintf(testStreamConfigSources, nc.ConnectedUrl()),
Check: resource.ComposeTestCheckFunc(
Expand Down
11 changes: 11 additions & 0 deletions jetstream/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ func streamSourceFromResourceData(d any) ([]*api.StreamSource, error) {
}
}

transforms := s["subject_transform"].([]any)
if len(transforms) > 0 {
for _, transform := range transforms {
st := transform.(map[string]any)
source.SubjectTransforms = append(source.SubjectTransforms, api.SubjectTransformConfig{
Source: st["source"].(string),
Destination: st["destination"].(string),
})
}
}

res = append(res, source)
}

Expand Down
Loading

0 comments on commit aef9172

Please sign in to comment.