From c0a008f514687a6856db27f0417b03d91017b53e Mon Sep 17 00:00:00 2001 From: Pieter Loubser Date: Thu, 29 Aug 2024 13:10:18 +0100 Subject: [PATCH] (#122) Enable pedantic mode Here we enable the new ability in jsm.go to use nat's pedantic mode. As a side effect certain stream and consumer fields, as well as tests for the pedantic behaviour, have been added. Streams can now use the `mirror_direct` field which allow mirror streams to specify direct get behaviour. Consumer limits can now be defined in stream definitions with the `max_ack_pending` and `inactive_threshold` fields. --- docs/resources/jetstream_consumer.md | 1 + docs/resources/jetstream_stream.md | 11 ++- go.mod | 7 +- go.sum | 16 ++-- jetstream/provider_test.go | 3 + jetstream/resource_jetstream_consumer.go | 60 +------------ jetstream/resource_jetstream_consumer_test.go | 89 +++++++++++++++++-- jetstream/resource_jetstream_stream.go | 20 +++++ jetstream/resource_jetstream_stream_test.go | 45 ++++++++++ jetstream/util.go | 16 +++- 10 files changed, 191 insertions(+), 77 deletions(-) diff --git a/docs/resources/jetstream_consumer.md b/docs/resources/jetstream_consumer.md index 76668d9..16c25ea 100644 --- a/docs/resources/jetstream_consumer.md +++ b/docs/resources/jetstream_consumer.md @@ -49,6 +49,7 @@ resource "jetstream_consumer" "ORDERS_NEW" { * `max_bytes` - (optional)The maximum bytes value that maybe set when dong a pull on a Pull Consumer * `max_expires` - (optional) Limits the Pull Expires duration to this maximum in seconds * `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds + * `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused * `replicas` - (optional) How many replicas of the data to keep in a clustered environment * `memory` - (optional) Force the consumer state to be kept in memory rather than inherit the setting from the stream * `backoff` - (optional) List of durations in Go format that represents a retry time scale for NaK'd messages. A list of durations in seconds diff --git a/docs/resources/jetstream_stream.md b/docs/resources/jetstream_stream.md index e21e55c..9b54dd2 100644 --- a/docs/resources/jetstream_stream.md +++ b/docs/resources/jetstream_stream.md @@ -15,9 +15,10 @@ resource "jetstream_stream" "ORDERS" { ```hcl resource "jetstream_stream" "ORDERS_ARCHIVE" { - name = "ORDERS_ARCHIVE" - storage = "file" - max_age = 5 * 60 * 60 * 24 * 365 + name = "ORDERS_ARCHIVE" + storage = "file" + max_age = 5 * 60 * 60 * 24 * 365 + mirror_direct = true mirror { name = "ORDERS" @@ -34,6 +35,8 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe * `start_seq` - (optional) Starts the mirror or source at this sequence in the source * `start_time` - (optional) Starts the mirror or source at this time in the source, in RFC3339 format * `external` - (optional) Reference to an external stream with keys `api` and `deliver` + * `mirror_direct` - (optional) If true the mirror will participate in a serving direct get requests for individual messages from the origin stream + ## Attribute Reference @@ -66,3 +69,5 @@ Above the `ORDERS_ARCHIVE` stream is a mirror of `ORDERS`, valid options for spe * `republish_source` - (optional) Republish matching messages to `republish_destination` * `republish_destination` - (optional) The destination to publish messages to * `republish_headers_only` - (optional) Republish only message headers, no bodies + * `inactive_threshold` - (optional) Removes the consumer after a idle period, specified as a duration in seconds + * `max_ack_pending` - (optional) Maximum pending Acks before consumers are paused \ No newline at end of file diff --git a/go.mod b/go.mod index 68f9112..0c651cf 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,15 @@ module github.com/nats-io/terraform-provider-jetstream -go 1.22 +go 1.22.0 toolchain go1.22.5 require ( github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320 - github.com/nats-io/jsm.go v0.1.2 + github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996 github.com/nats-io/jwt/v2 v2.5.8 - github.com/nats-io/nats-server/v2 v2.10.18 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7 github.com/nats-io/nats.go v1.37.0 github.com/xeipuuv/gojsonschema v1.2.0 ) @@ -19,6 +19,7 @@ require ( github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/cloudflare/circl v1.3.9 // indirect github.com/expr-lang/expr v1.16.9 // indirect + github.com/google/go-tpm v0.9.1 // indirect github.com/hashicorp/go-retryablehttp v0.7.7 // indirect github.com/hashicorp/hc-install v0.8.0 // indirect github.com/hashicorp/terraform-plugin-go v0.23.0 // indirect diff --git a/go.sum b/go.sum index acdd614..f14fc6c 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.1 h1:0pGc4X//bAlmZzMKf8iz6IsDo1nYTbYJ6FZN/rg4zdM= +github.com/google/go-tpm v0.9.1/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -102,8 +104,9 @@ github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ib github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -125,12 +128,12 @@ github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyua github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/nats-io/jsm.go v0.1.2 h1:T4Fq88a03sPAPWYwrOLQ85oanYsC2Bs6517rUiWBMpQ= -github.com/nats-io/jsm.go v0.1.2/go.mod h1:tnubE70CAKi5TNfQiq6XHFqWTuSIe1H7X4sDwfq6ZK8= +github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996 h1:TIA0KkwEqEeSXLf6kb0m8lm+S75wqWttTKjxt6+ySHA= +github.com/nats-io/jsm.go v0.1.1-0.20240828111101-4d2a0a055996/go.mod h1:GqhrbjF7id/JW5arS8TZUbHwWTsxL98ubUAloC2f6QY= github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE= github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.18 h1:tRdZmBuWKVAFYtayqlBB2BuCHNGAQPvoQIXOKwU3WSM= -github.com/nats-io/nats-server/v2 v2.10.18/go.mod h1:97Qyg7YydD8blKlR8yBsUlPlWyZKjA7Bp5cl3MUE9K8= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7 h1:GnAHBiWk/CTiaKuf530XMNZdUevPqgvDaaEx4Dn6vEA= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20240825211147-3fd298ed30c7/go.mod h1:vCoSIPH7pQwl4DhZvT3XiLVjq50ycYEDy/ZbvTzNGrY= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -236,9 +239,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= -gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/jetstream/provider_test.go b/jetstream/provider_test.go index ea2535d..aabac9e 100644 --- a/jetstream/provider_test.go +++ b/jetstream/provider_test.go @@ -50,6 +50,9 @@ func createJSServer(t *testing.T) (srv *server.Server) { Port: -1, StoreDir: dir, JetStream: true, + JetStreamLimits: server.JSLimitOpts{ + MaxRequestBatch: 1, + }, }) checkErr(t, err, "could not start js server: %v", err) diff --git a/jetstream/resource_jetstream_consumer.go b/jetstream/resource_jetstream_consumer.go index bb64a8a..4f30d6c 100644 --- a/jetstream/resource_jetstream_consumer.go +++ b/jetstream/resource_jetstream_consumer.go @@ -411,44 +411,14 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error { return nil } - cons, err := mgr.LoadConsumer(stream, durable) - if err != nil { - return err - } - cfg, err := consumerConfigFromResourceData(d) if err != nil { return err } - freq := 0 - if len(cons.SampleFrequency()) > 0 { - s := strings.TrimSuffix(cons.SampleFrequency(), "%") - freq, err = strconv.Atoi(s) - if err != nil { - return fmt.Errorf("failed to parse consumer sampling configuration: %v", err) - } - } - - opts := []jsm.ConsumerOption{ - jsm.ConsumerDescription(cfg.Description), - jsm.ConsumerMetadata(cfg.Metadata), - jsm.FilterStreamBySubject(cfg.FilterSubject), - jsm.FilterStreamBySubject(cfg.FilterSubjects...), - jsm.AckWait(cfg.AckWait), - jsm.MaxDeliveryAttempts(cfg.MaxDeliver), - jsm.SamplePercent(freq), - jsm.MaxAckPending(uint(cfg.MaxAckPending)), - jsm.MaxWaiting(uint(cfg.MaxWaiting)), - jsm.MaxRequestExpires(cfg.MaxRequestExpires), - jsm.MaxRequestBatch(uint(cfg.MaxRequestBatch)), - } - - if cfg.HeadersOnly { - opts = append(opts, jsm.DeliverHeadersOnly()) - } - - err = cons.UpdateConfiguration(opts...) + // We call NewconsumerFromDefault because of the idempotent way consumers are created/updated + // If the consumer already exists, it will be updated, and if not we'll exit before we get here + _, err = mgr.NewConsumerFromDefault(stream, cfg) if err != nil { return err } @@ -456,25 +426,6 @@ func resourceConsumerUpdate(d *schema.ResourceData, m any) error { return resourceConsumerRead(d, m) } -func checkConsumerOnLimitsStream(mgr *jsm.Manager, streamName string, cfg *api.ConsumerConfig) error { - stream, err := mgr.LoadStream(streamName) - if err != nil { - return err - } - - if stream.ConsumerLimits().InactiveThreshold > 0 || stream.ConsumerLimits().MaxAckPending > 0 { - if cfg.InactiveThreshold == 0 { - return fmt.Errorf("inactive_threshold is required on streams with consumer limits set") - } - - if cfg.MaxAckPending == 0 { - return fmt.Errorf("max_ack_pending is required on streams with consumer limits set") - } - } - - return nil -} - func resourceConsumerCreate(d *schema.ResourceData, m any) error { cfg, err := consumerConfigFromResourceData(d) if err != nil { @@ -492,11 +443,6 @@ func resourceConsumerCreate(d *schema.ResourceData, m any) error { } defer nc.Close() - err = checkConsumerOnLimitsStream(mgr, stream, &cfg) - if err != nil { - return err - } - _, err = mgr.NewConsumerFromDefault(stream, cfg) if err != nil { return err diff --git a/jetstream/resource_jetstream_consumer_test.go b/jetstream/resource_jetstream_consumer_test.go index adc357c..dd4e7d3 100644 --- a/jetstream/resource_jetstream_consumer_test.go +++ b/jetstream/resource_jetstream_consumer_test.go @@ -2,6 +2,7 @@ package jetstream import ( "fmt" + "regexp" "strings" "testing" @@ -32,6 +33,7 @@ resource "jetstream_consumer" "TEST_C1" { metadata = { foo = "bar" } + max_batch = 1 } ` @@ -51,7 +53,8 @@ resource "jetstream_consumer" "TEST_C2" { stream_sequence = 10 max_ack_pending = 20 filter_subjects = ["TEST.a", "TEST.b"] - max_waiting = 10 + max_waiting = 10 + max_batch = 1 } ` @@ -66,12 +69,74 @@ resource "jetstream_stream" "test" { } resource "jetstream_consumer" "TEST_C3" { - stream_id = jetstream_stream.test.id - durable_name = "C3" - stream_sequence = 10 - max_ack_pending = 20 - filter_subject = "TEST.a" + stream_id = jetstream_stream.test.id + durable_name = "C3" + stream_sequence = 10 + max_ack_pending = 20 + filter_subject = "TEST.a" + delivery_subject = "ORDERS.a" +} +` + +const testBackoffPedantic = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "test" { + name = "TEST" + subjects = ["TEST.*"] +} + +resource "jetstream_consumer" "TEST_C4" { + stream_id = jetstream_stream.test.id + durable_name = "C4" + stream_sequence = 10 + filter_subject = "TEST.a" delivery_subject = "ORDERS.a" + ack_wait = 10 + backoff = [1,10,20,60] +} +` + +const testConsumerLimitsPedantic = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "test" { + name = "TEST" + subjects = ["TEST.*"] + max_ack_pending = 1 + inactive_threshold = 1 +} + +resource "jetstream_consumer" "TEST_C5" { + stream_id = jetstream_stream.test.id + durable_name = "C5" + stream_sequence = 10 + filter_subject = "TEST.a" + delivery_subject = "ORDERS.a" +} + +` + +const testConsumerMaxRequestBatchPedantic = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "test" { + name = "TEST" + subjects = ["TEST.*"] +} + +resource "jetstream_consumer" "TEST_C6" { + stream_id = jetstream_stream.test.id + durable_name = "C5" + deliver_all = true + filter_subject = "TEST.received" + max_batch = 100 } ` @@ -140,6 +205,18 @@ func TestResourceConsumer(t *testing.T) { resource.TestCheckResourceAttr("jetstream_consumer.TEST_C3", "filter_subject", "TEST.a"), ), }, + { + Config: fmt.Sprintf(testBackoffPedantic, nc.ConnectedUrl()), + ExpectError: regexp.MustCompile(`first backoff value has to equal batch AckWait \(10157\)`), + }, + { + Config: fmt.Sprintf(testConsumerLimitsPedantic, nc.ConnectedUrl()), + ExpectError: regexp.MustCompile(`inactive_threshold must be set if it's configured in stream limits \(10157\)`), + }, + { + Config: fmt.Sprintf(testConsumerMaxRequestBatchPedantic, nc.ConnectedUrl()), + ExpectError: regexp.MustCompile(`consumer max request batch exceeds server limit of 1 \(10125\)`), + }, }, }) } diff --git a/jetstream/resource_jetstream_stream.go b/jetstream/resource_jetstream_stream.go index bbace97..4f5be1c 100644 --- a/jetstream/resource_jetstream_stream.go +++ b/jetstream/resource_jetstream_stream.go @@ -232,6 +232,11 @@ func resourceStream() *schema.Resource { Default: true, Optional: true, }, + "mirror_direct": { + Type: schema.TypeBool, + Description: "If true, and the stream is a mirror, the mirror will participate in a serving direct get requests for individual messages from origin stream", + Optional: true, + }, "placement_cluster": { Type: schema.TypeString, Description: "Place the stream in a specific cluster, influenced by placement_tags", @@ -291,6 +296,18 @@ func resourceStream() *schema.Resource { ForceNew: false, Optional: true, }, + "max_ack_pending": { + Type: schema.TypeInt, + Description: "Defines the maximum number of messages, without acknowledgment, that can be outstanding", + ForceNew: false, + Optional: true, + }, + "inactive_threshold": { + Type: schema.TypeInt, + Description: "Duration that instructs the server to clean up consumers inactive for that long", + ForceNew: false, + Optional: true, + }, }, } } @@ -368,6 +385,8 @@ func resourceStreamRead(d *schema.ResourceData, m any) error { d.Set("allow_direct", str.DirectAllowed()) d.Set("discard_new_per_subject", str.DiscardNewPerSubject()) d.Set("compression", compression) + d.Set("max_ack_pending", str.ConsumerLimits().MaxAckPending) + d.Set("inactive_threshold", str.ConsumerLimits().InactiveThreshold) if transform := str.Configuration().SubjectTransform; transform != nil { d.Set("subject_transform", []map[string]string{ @@ -412,6 +431,7 @@ func resourceStreamRead(d *schema.ResourceData, m any) error { mirrors := []map[string]any{ streamSourceConfigRead(mirror), } + d.Set("mirror_direct", str.MirrorDirectAllowed()) d.Set("mirror", mirrors) } diff --git a/jetstream/resource_jetstream_stream_test.go b/jetstream/resource_jetstream_stream_test.go index 9ac171d..5000ce9 100644 --- a/jetstream/resource_jetstream_stream_test.go +++ b/jetstream/resource_jetstream_stream_test.go @@ -2,6 +2,7 @@ package jetstream import ( "fmt" + "regexp" "testing" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" @@ -70,11 +71,13 @@ provider "jetstream" { resource "jetstream_stream" "other" { name = "OTHER" subjects = ["js.in.OTHER.>"] + allow_direct = true } resource "jetstream_stream" "mirror_transform_test" { name = "MIRROR_TRANSFORM_TEST" description = "typeStreamConfigMirrorTransformed" + mirror_direct = true mirror { name = "OTHER" start_seq = 11 @@ -174,6 +177,40 @@ resource "jetstream_stream" "test" { } ` +const pedanticMaxAge = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "max_age_test" { + name = "MAX_AGE_TEST" + subjects = ["MAX_AGE_TEXT.*"] + duplicate_window = 2 + max_age = 1 +}` + +const pedanticMirrorDirect = ` +provider "jetstream" { + servers = "%s" +} + +resource "jetstream_stream" "pedantic_source" { + name = "PEDANTIC_SOURCE" + subjects = ["PEDANTIC_SOURCE.*"] + allow_direct = false +} + +resource "jetstream_stream" "pedantic_mirror" { + name = "PEDANTIC_MIRROR" + mirror_direct = true + mirror { + name = "PEDANTIC_SOURCE" + } + + depends_on = [ jetstream_stream.pedantic_source ] +} +` + func TestResourceStream(t *testing.T) { srv := createJSServer(t) defer srv.Shutdown() @@ -284,6 +321,14 @@ func TestResourceStream(t *testing.T) { testStreamIsTransformed(t, mgr, "TEST", api.SubjectTransformConfig{Source: "TEST.>", Destination: "1.>"}), ), }, + { + Config: fmt.Sprintf(pedanticMaxAge, nc.ConnectedUrl()), + ExpectError: regexp.MustCompile(`duplicates window can not be larger then max age \(10052\)`), + }, + { + Config: fmt.Sprintf(pedanticMirrorDirect, nc.ConnectedUrl()), + ExpectError: regexp.MustCompile(`origin stream has direct get set, mirror has it disabled \(10157\)`), + }, }, }) } diff --git a/jetstream/util.go b/jetstream/util.go index fb6d51d..8465be7 100644 --- a/jetstream/util.go +++ b/jetstream/util.go @@ -269,6 +269,10 @@ func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig, return api.StreamConfig{}, fmt.Errorf("expected exactly one mirror source") } stream.Mirror = sources[0] + mirrorDirect, ok := d.GetOk("mirror_direct") + if ok { + stream.MirrorDirect = mirrorDirect.(bool) + } } ss, ok := d.GetOk("source") @@ -303,6 +307,16 @@ func streamConfigFromResourceData(d *schema.ResourceData) (cfg api.StreamConfig, } } + max_ack_pending, ok := d.GetOk("max_ack_pending") + if ok { + stream.ConsumerLimits.MaxAckPending = max_ack_pending.(int) + } + + inactive_threshold, ok := d.GetOk("inactive_threshold") + if ok { + stream.ConsumerLimits.InactiveThreshold = time.Second * time.Duration(inactive_threshold.(int)) + } + ok, errs := stream.Validate(new(SchemaValidator)) if !ok { return api.StreamConfig{}, errors.New(strings.Join(errs, ", ")) @@ -528,7 +542,7 @@ func connectMgr(d *schema.ResourceData) (any, error) { return nil, nil, err } - mgr, err := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator))) + mgr, err := jsm.New(nc, jsm.WithAPIValidation(new(SchemaValidator)), jsm.WithPedanticRequests()) if err != nil { return nil, nil, err }