Skip to content

Commit

Permalink
Merge pull request #123 from ploubser/issue_122
Browse files Browse the repository at this point in the history
(#122) Enable pedantic mode
  • Loading branch information
ripienaar authored Sep 4, 2024
2 parents b03bda9 + c0a008f commit 03348d2
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 77 deletions.
1 change: 1 addition & 0 deletions docs/resources/jetstream_consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ resource "jetstream_consumer" "ORDERS_NEW" {
* `max_bytes` - (optional)The maximum bytes value that maybe set when dong a pull on a Pull Consumer
* `max_expires` - (optional) Limits the Pull Expires duration to this maximum in seconds
* `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds
* `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused
* `replicas` - (optional) How many replicas of the data to keep in a clustered environment
* `memory` - (optional) Force the consumer state to be kept in memory rather than inherit the setting from the stream
* `backoff` - (optional) List of durations in Go format that represents a retry time scale for NaK'd messages. A list of durations in seconds
11 changes: 8 additions & 3 deletions docs/resources/jetstream_stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ resource "jetstream_stream" "ORDERS" {

```hcl
resource "jetstream_stream" "ORDERS_ARCHIVE" {
name = "ORDERS_ARCHIVE"
storage = "file"
max_age = 5 * 60 * 60 * 24 * 365
name = "ORDERS_ARCHIVE"
storage = "file"
max_age = 5 * 60 * 60 * 24 * 365
mirror_direct = true
mirror {
name = "ORDERS"
Expand All @@ -34,6 +35,8 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
* `start_seq` - (optional) Starts the mirror or source at this sequence in the source
* `start_time` - (optional) Starts the mirror or source at this time in the source, in RFC3339 format
* `external` - (optional) Reference to an external stream with keys `api` and `deliver`
* `mirror_direct` - (optional) If true the mirror will participate in a serving direct get requests for individual messages from the origin stream


## Attribute Reference

Expand Down Expand Up @@ -66,3 +69,5 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe
* `republish_source` - (optional) Republish matching messages to `republish_destination`
* `republish_destination` - (optional) The destination to publish messages to
* `republish_headers_only` - (optional) Republish only message headers, no bodies
* `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds
* `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
module github.com/nats-io/terraform-provider-jetstream

go 1.22
go 1.22.0

toolchain go1.22.5

require (
github.com/google/go-cmp v0.6.0
github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320
github.com/nats-io/jsm.go v0.1.2
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7
github.com/nats-io/nats.go v1.37.0
github.com/xeipuuv/gojsonschema v1.2.0
)
Expand All @@ -19,6 +19,7 @@ require (
github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect
github.com/cloudflare/circl v1.3.9 // indirect
github.com/expr-lang/expr v1.16.9 // indirect
github.com/google/go-tpm v0.9.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/hc-install v0.8.0 // indirect
github.com/hashicorp/terraform-plugin-go v0.23.0 // indirect
Expand Down
16 changes: 9 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.1 h1:0pGc4X//bAlmZzMKf8iz6IsDo1nYTbYJ6FZN/rg4zdM=
github.com/google/go-tpm v0.9.1/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -102,8 +104,9 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand All @@ -125,12 +128,12 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/nats-io/jsm.go v0.1.2 h1:T4Fq88a03sPAPWYwrOLQ85oanYsC2Bs6517rUiWBMpQ=
github.com/nats-io/jsm.go v0.1.2/go.mod h1:tnubE70CAKi5TNfQiq6XHFqWTuSIe1H7X4sDwfq6ZK8=
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996 h1:TIA0KkwEqEeSXLf6kb0m8lm+S75wqWttTKjxt6+ySHA=
github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996/go.mod h1:GqhrbjF7id/JW5arS8TZUbHwWTsxL98ubUAloC2f6QY=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM=
github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7 h1:GnAHBiWk/CTiaKuf530XMNZdUevPqgvDaaEx4Dn6vEA=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7/go.mod h1:vCoSIPH7pQwl4DhZvT3XiLVjq50ycYEDy/ZbvTzNGrY=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
Expand Down Expand Up @@ -236,9 +239,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
3 changes: 3 additions & 0 deletions jetstream/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func createJSServer(t *testing.T) (srv *server.Server) {
Port: -1,
StoreDir: dir,
JetStream: true,
JetStreamLimits: server.JSLimitOpts{
MaxRequestBatch: 1,
},
})
checkErr(t, err, "could not start js server: %v", err)

Expand Down
60 changes: 3 additions & 57 deletions jetstream/resource_jetstream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,70 +411,21 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error {
return nil
}

cons, err := mgr.LoadConsumer(stream, durable)
if err != nil {
return err
}

cfg, err := consumerConfigFromResourceData(d)
if err != nil {
return err
}

freq := 0
if len(cons.SampleFrequency()) > 0 {
s := strings.TrimSuffix(cons.SampleFrequency(), "%")
freq, err = strconv.Atoi(s)
if err != nil {
return fmt.Errorf("failed to parse consumer sampling configuration: %v", err)
}
}

opts := []jsm.ConsumerOption{
jsm.ConsumerDescription(cfg.Description),
jsm.ConsumerMetadata(cfg.Metadata),
jsm.FilterStreamBySubject(cfg.FilterSubject),
jsm.FilterStreamBySubject(cfg.FilterSubjects...),
jsm.AckWait(cfg.AckWait),
jsm.MaxDeliveryAttempts(cfg.MaxDeliver),
jsm.SamplePercent(freq),
jsm.MaxAckPending(uint(cfg.MaxAckPending)),
jsm.MaxWaiting(uint(cfg.MaxWaiting)),
jsm.MaxRequestExpires(cfg.MaxRequestExpires),
jsm.MaxRequestBatch(uint(cfg.MaxRequestBatch)),
}

if cfg.HeadersOnly {
opts = append(opts, jsm.DeliverHeadersOnly())
}

err = cons.UpdateConfiguration(opts...)
// We call NewconsumerFromDefault because of the idempotent way consumers are created/updated
// If the consumer already exists, it will be updated, and if not we'll exit before we get here
_, err = mgr.NewConsumerFromDefault(stream, cfg)
if err != nil {
return err
}

return resourceConsumerRead(d, m)
}

func checkConsumerOnLimitsStream(mgr *jsm.Manager, streamName string, cfg *api.ConsumerConfig) error {
stream, err := mgr.LoadStream(streamName)
if err != nil {
return err
}

if stream.ConsumerLimits().InactiveThreshold > 0 || stream.ConsumerLimits().MaxAckPending > 0 {
if cfg.InactiveThreshold == 0 {
return fmt.Errorf("inactive_threshold is required on streams with consumer limits set")
}

if cfg.MaxAckPending == 0 {
return fmt.Errorf("max_ack_pending is required on streams with consumer limits set")
}
}

return nil
}

func resourceConsumerCreate(d *schema.ResourceData, m any) error {
cfg, err := consumerConfigFromResourceData(d)
if err != nil {
Expand All @@ -492,11 +443,6 @@ func resourceConsumerCreate(d *schema.ResourceData, m any) error {
}
defer nc.Close()

err = checkConsumerOnLimitsStream(mgr, stream, &cfg)
if err != nil {
return err
}

_, err = mgr.NewConsumerFromDefault(stream, cfg)
if err != nil {
return err
Expand Down
89 changes: 83 additions & 6 deletions jetstream/resource_jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jetstream

import (
"fmt"
"regexp"
"strings"
"testing"

Expand Down Expand Up @@ -32,6 +33,7 @@ resource "jetstream_consumer" "TEST_C1" {
metadata = {
foo = "bar"
}
max_batch = 1
}
`

Expand All @@ -51,7 +53,8 @@ resource "jetstream_consumer" "TEST_C2" {
stream_sequence = 10
max_ack_pending = 20
filter_subjects = ["TEST.a", "TEST.b"]
max_waiting = 10
max_waiting = 10
max_batch = 1
}
`

Expand All @@ -66,12 +69,74 @@ resource "jetstream_stream" "test" {
}
resource "jetstream_consumer" "TEST_C3" {
stream_id = jetstream_stream.test.id
durable_name = "C3"
stream_sequence = 10
max_ack_pending = 20
filter_subject = "TEST.a"
stream_id = jetstream_stream.test.id
durable_name = "C3"
stream_sequence = 10
max_ack_pending = 20
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
}
`

const testBackoffPedantic = `
provider "jetstream" {
servers = "%s"
}
resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
}
resource "jetstream_consumer" "TEST_C4" {
stream_id = jetstream_stream.test.id
durable_name = "C4"
stream_sequence = 10
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
ack_wait = 10
backoff = [1,10,20,60]
}
`

const testConsumerLimitsPedantic = `
provider "jetstream" {
servers = "%s"
}
resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
max_ack_pending = 1
inactive_threshold = 1
}
resource "jetstream_consumer" "TEST_C5" {
stream_id = jetstream_stream.test.id
durable_name = "C5"
stream_sequence = 10
filter_subject = "TEST.a"
delivery_subject = "ORDERS.a"
}
`

const testConsumerMaxRequestBatchPedantic = `
provider "jetstream" {
servers = "%s"
}
resource "jetstream_stream" "test" {
name = "TEST"
subjects = ["TEST.*"]
}
resource "jetstream_consumer" "TEST_C6" {
stream_id = jetstream_stream.test.id
durable_name = "C5"
deliver_all = true
filter_subject = "TEST.received"
max_batch = 100
}
`

Expand Down Expand Up @@ -140,6 +205,18 @@ func TestResourceConsumer(t *testing.T) {
resource.TestCheckResourceAttr("jetstream_consumer.TEST_C3", "filter_subject", "TEST.a"),
),
},
{
Config: fmt.Sprintf(testBackoffPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`first backoff value has to equal batch AckWait \(10157\)`),
},
{
Config: fmt.Sprintf(testConsumerLimitsPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`inactive_threshold must be set if it's configured in stream limits \(10157\)`),
},
{
Config: fmt.Sprintf(testConsumerMaxRequestBatchPedantic, nc.ConnectedUrl()),
ExpectError: regexp.MustCompile(`consumer max request batch exceeds server limit of 1 \(10125\)`),
},
},
})
}
20 changes: 20 additions & 0 deletions jetstream/resource_jetstream_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func resourceStream() *schema.Resource {
Default: true,
Optional: true,
},
"mirror_direct": {
Type: schema.TypeBool,
Description: "If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream",
Optional: true,
},
"placement_cluster": {
Type: schema.TypeString,
Description: "Place the stream in a specific cluster, influenced by placement_tags",
Expand Down Expand Up @@ -291,6 +296,18 @@ func resourceStream() *schema.Resource {
ForceNew: false,
Optional: true,
},
"max_ack_pending": {
Type: schema.TypeInt,
Description: "Defines the maximum number of messages, without acknowledgment, that can be outstanding",
ForceNew: false,
Optional: true,
},
"inactive_threshold": {
Type: schema.TypeInt,
Description: "Duration that instructs the server to clean up consumers inactive for that long",
ForceNew: false,
Optional: true,
},
},
}
}
Expand Down Expand Up @@ -368,6 +385,8 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
d.Set("allow_direct", str.DirectAllowed())
d.Set("discard_new_per_subject", str.DiscardNewPerSubject())
d.Set("compression", compression)
d.Set("max_ack_pending", str.ConsumerLimits().MaxAckPending)
d.Set("inactive_threshold", str.ConsumerLimits().InactiveThreshold)

if transform := str.Configuration().SubjectTransform; transform != nil {
d.Set("subject_transform", []map[string]string{
Expand Down Expand Up @@ -412,6 +431,7 @@ func resourceStreamRead(d *schema.ResourceData, m any) error {
mirrors := []map[string]any{
streamSourceConfigRead(mirror),
}
d.Set("mirror_direct", str.MirrorDirectAllowed())
d.Set("mirror", mirrors)
}

Expand Down
Loading

0 comments on commit 03348d2

Please sign in to comment.