Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#122) Enable pedantic mode #123

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/resources/jetstream_consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
* `max_bytes` - (optional)The maximum bytes value that maybe set when dong a pull on a Pull Consumer
* `max_expires` - (optional) Limits the Pull Expires duration to this maximum in seconds
* `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds
* `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused
* `replicas` - (optional) How many replicas of the data to keep in a clustered environment
* `memory` - (optional) Force the consumer state to be kept in memory rather than inherit the setting from the stream
* `backoff` - (optional) List of durations in Go format that represents a retry time scale for NaK'd messages. A list of durations in seconds
11 changes: 8 additions & 3 deletions docs/resources/jetstream_stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ resource "jetstream_stream" "ORDERS" {

```hcl
resource "jetstream_stream" "ORDERS_ARCHIVE" {
name = "ORDERS_ARCHIVE"
storage = "file"
max_age = 5 * 60 * 60 * 24 * 365
name = "ORDERS_ARCHIVE"
storage = "file"
max_age = 5 * 60 * 60 * 24 * 365
mirror_direct = true

mirror {
name = "ORDERS"
Expand All @@ -34,6 +35,8 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
* `start_seq` - (optional) Starts the mirror or source at this sequence in the source
* `start_time` - (optional) Starts the mirror or source at this time in the source, in RFC3339 format
* `external` - (optional) Reference to an external stream with keys `api` and `deliver`
* `mirror_direct` - (optional) If true the mirror will participate in a serving direct get requests for individual messages from the origin stream


## Attribute Reference

Expand Down Expand Up @@ -66,3 +69,5 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
* `republish_source` - (optional) Republish matching messages to `republish_destination`
* `republish_destination` - (optional) The destination to publish messages to
* `republish_headers_only` - (optional) Republish only message headers, no bodies
* `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds
* `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
module github.com/nats-io/terraform-provider-jetstream

go 1.22
go 1.22.0

toolchain go1.22.5

require (
github.com/google/go-cmp v0.6.0
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
github.com/nats-io/jsm.go v0.1.2
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7
github.com/nats-io/nats.go v1.37.0
github.com/xeipuuv/gojsonschema v1.2.0
)
Expand All @@ -19,6 +19,7 @@ require (
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/cloudflare/circl v1.3.9 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/google/go-tpm v0.9.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/hc-install v0.8.0 // indirect
github.com/hashicorp/terraform-plugin-go v0.23.0 // indirect
Expand Down
16 changes: 9 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.1 h1:0pGc4X//bAlmZzMKf8iz6IsDo1nYTbYJ6FZN/rg4zdM=
github.com/google/go-tpm v0.9.1/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -102,8 +104,9 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand All @@ -125,12 +128,12 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/jsm.go v0.1.2 h1:T4Fq88a03sPAPWYwrOLQ85oanYsC2Bs6517rUiWBMpQ=
github.com/nats-io/jsm.go v0.1.2/go.mod h1:tnubE70CAKi5TNfQiq6XHFqWTuSIe1H7X4sDwfq6ZK8=
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996 h1:TIA0KkwEqEeSXLf6kb0m8lm+S75wqWttTKjxt6+ySHA=
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996/go.mod h1:GqhrbjF7id/JW5arS8TZUbHwWTsxL98ubUAloC2f6QY=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM=
github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7 h1:GnAHBiWk/CTiaKuf530XMNZdUevPqgvDaaEx4Dn6vEA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7/go.mod h1:vCoSIPH7pQwl4DhZvT3XiLVjq50ycYEDy/ZbvTzNGrY=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down Expand Up @@ -236,9 +239,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
3 changes: 3 additions & 0 deletions jetstream/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func createJSServer(t *testing.T) (srv *server.Server) {
Port: -1,
StoreDir: dir,
JetStream: true,
JetStreamLimits: server.JSLimitOpts{
MaxRequestBatch: 1,
},
})
checkErr(t, err, "could not start js server: %v", err)

Expand Down
60 changes: 3 additions & 57 deletions jetstream/resource_jetstream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,70 +411,21 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error {
return nil
}

cons, err := mgr.LoadConsumer(stream, durable)
if err != nil {
return err
}

cfg, err := consumerConfigFromResourceData(d)
if err != nil {
return err
}

freq := 0
if len(cons.SampleFrequency()) > 0 {
s := strings.TrimSuffix(cons.SampleFrequency(), "%")
freq, err = strconv.Atoi(s)
if err != nil {
return fmt.Errorf("failed to parse consumer sampling configuration: %v", err)
}
}

opts := []jsm.ConsumerOption{
jsm.ConsumerDescription(cfg.Description),
jsm.ConsumerMetadata(cfg.Metadata),
jsm.FilterStreamBySubject(cfg.FilterSubject),
jsm.FilterStreamBySubject(cfg.FilterSubjects...),
jsm.AckWait(cfg.AckWait),
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
jsm.SamplePercent(freq),
jsm.MaxAckPending(uint(cfg.MaxAckPending)),
jsm.MaxWaiting(uint(cfg.MaxWaiting)),
jsm.MaxRequestExpires(cfg.MaxRequestExpires),
jsm.MaxRequestBatch(uint(cfg.MaxRequestBatch)),
}

if cfg.HeadersOnly {
opts = append(opts, jsm.DeliverHeadersOnly())
}

err = cons.UpdateConfiguration(opts...)
// We call NewconsumerFromDefault because of the idempotent way consumers are created/updated
// If the consumer already exists, it will be updated, and if not we'll exit before we get here
_, err = mgr.NewConsumerFromDefault(stream, cfg)
if err != nil {
return err
}

return resourceConsumerRead(d, m)
}

func checkConsumerOnLimitsStream(mgr *jsm.Manager, streamName string, cfg *api.ConsumerConfig) error {
stream, err := mgr.LoadStream(streamName)
if err != nil {
return err
}

if stream.ConsumerLimits().InactiveThreshold > 0 || stream.ConsumerLimits().MaxAckPending > 0 {
if cfg.InactiveThreshold == 0 {
return fmt.Errorf("inactive_threshold is required on streams with consumer limits set")
}

if cfg.MaxAckPending == 0 {
return fmt.Errorf("max_ack_pending is required on streams with consumer limits set")
}
}

return nil
}

func resourceConsumerCreate(d *schema.ResourceData, m any) error {
cfg, err := consumerConfigFromResourceData(d)
if err != nil {
Expand All @@ -492,11 +443,6 @@ func resourceConsumerCreate(d *schema.ResourceData, m any) error {
}
defer nc.Close()

err = checkConsumerOnLimitsStream(mgr, stream, &cfg)
if err != nil {
return err
}

_, err = mgr.NewConsumerFromDefault(stream, cfg)
if err != nil {
return err
Expand Down
89 changes: 83 additions & 6 deletions jetstream/resource_jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jetstream

import (
"fmt"
"regexp"
"strings"
"testing"

Expand Down Expand Up @@ -32,6 +33,7 @@ resource "jetstream_consumer" "TEST_C1" {
metadata = {
foo = "bar"
}
max_batch = 1
}
`

Expand All @@ -51,7 +53,8 @@ resource "jetstream_consumer" "TEST_C2" {
stream_sequence = 10
max_ack_pending = 20
filter_subjects = ["TEST.a", "TEST.b"]
max_waiting = 10
max_waiting = 10
max_batch = 1
}
`

Expand All @@ -66,12 +69,74 @@ resource "jetstream_stream" "test" {
}

resource "jetstream_consumer" "TEST_C3" {
stream_id = jetstream_stream.test.id
durable_name = "C3"
stream_sequence = 10
max_ack_pending = 20
filter_subject = "TEST.a"
stream_id = jetstream_stream.test.id
durable_name = "C3"
stream_sequence = 10
max_ack_pending = 20
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
}
`

const testBackoffPedantic = `
provider "jetstream" {
servers = "%s"
}

resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
}

resource "jetstream_consumer" "TEST_C4" {
stream_id = jetstream_stream.test.id
durable_name = "C4"
stream_sequence = 10
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
ack_wait = 10
backoff = [1,10,20,60]
}
`

const testConsumerLimitsPedantic = `
provider "jetstream" {
servers = "%s"
}

resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
max_ack_pending = 1
inactive_threshold = 1
}

resource "jetstream_consumer" "TEST_C5" {
stream_id = jetstream_stream.test.id
durable_name = "C5"
stream_sequence = 10
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
}

`

const testConsumerMaxRequestBatchPedantic = `
provider "jetstream" {
servers = "%s"
}

resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
}

resource "jetstream_consumer" "TEST_C6" {
stream_id = jetstream_stream.test.id
durable_name = "C5"
deliver_all = true
filter_subject = "TEST.received"
max_batch = 100
}
`

Expand Down Expand Up @@ -140,6 +205,18 @@ func TestResourceConsumer(t *testing.T) {
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C3", "filter_subject", "TEST.a"),
),
},
{
Config: fmt.Sprintf(testBackoffPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`first backoff value has to equal batch AckWait \(10157\)`),
},
{
Config: fmt.Sprintf(testConsumerLimitsPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`inactive_threshold must be set if it's configured in stream limits \(10157\)`),
},
{
Config: fmt.Sprintf(testConsumerMaxRequestBatchPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`consumer max request batch exceeds server limit of 1 \(10125\)`),
},
},
})
}
20 changes: 20 additions & 0 deletions jetstream/resource_jetstream_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func resourceStream() *schema.Resource {
Default: true,
Optional: true,
},
"mirror_direct": {
Type: schema.TypeBool,
Description: "If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream",
Optional: true,
},
"placement_cluster": {
Type: schema.TypeString,
Description: "Place the stream in a specific cluster, influenced by placement_tags",
Expand Down Expand Up @@ -291,6 +296,18 @@ func resourceStream() *schema.Resource {
ForceNew: false,
Optional: true,
},
"max_ack_pending": {
Type: schema.TypeInt,
Description: "Defines the maximum number of messages, without acknowledgment, that can be outstanding",
ForceNew: false,
Optional: true,
},
"inactive_threshold": {
Type: schema.TypeInt,
Description: "Duration that instructs the server to clean up consumers inactive for that long",
ForceNew: false,
Optional: true,
},
},
}
}
Expand Down Expand Up @@ -368,6 +385,8 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
d.Set("allow_direct", str.DirectAllowed())
d.Set("discard_new_per_subject", str.DiscardNewPerSubject())
d.Set("compression", compression)
d.Set("max_ack_pending", str.ConsumerLimits().MaxAckPending)
d.Set("inactive_threshold", str.ConsumerLimits().InactiveThreshold)

if transform := str.Configuration().SubjectTransform; transform != nil {
d.Set("subject_transform", []map[string]string{
Expand Down Expand Up @@ -412,6 +431,7 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
mirrors := []map[string]any{
streamSourceConfigRead(mirror),
}
d.Set("mirror_direct", str.MirrorDirectAllowed())
d.Set("mirror", mirrors)
}

Expand Down
Loading
Loading