Skip to content

Commit

Permalink
feat: Support for multiple subjects in consumers
Browse files Browse the repository at this point in the history
This is a NATS 2.10+ feature, specifying multiple subjects for will fail
for older versions.
  • Loading branch information
aholstenson committed Oct 6, 2023
1 parent e3a0134 commit 0a87f20
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 43 deletions.
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.5
github.com/levelfourab/sprout-go v0.10.1
github.com/nats-io/nats-server/v2 v2.9.17
github.com/nats-io/nats.go v1.28.0
github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nats.go v1.30.2
github.com/onsi/ginkgo/v2 v2.9.5
github.com/onsi/gomega v1.27.7
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
Expand Down Expand Up @@ -45,12 +45,12 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
Expand All @@ -73,13 +73,13 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.39.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.5.2 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.9.1 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
Expand Down
37 changes: 19 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand Down Expand Up @@ -285,16 +285,16 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.17 h1:gFpUQ3hqIDJrnqog+Bl5vaXg+RhhYEZIElasEuRn2tw=
github.com/nats-io/nats-server/v2 v2.9.17/go.mod h1:eQysm3xDZmIjfkjr7DuD9DjRFpnxQc2vKVxtEg0Dp6s=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand Down Expand Up @@ -423,8 +423,8 @@ go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJP
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.5.2 h1:2LxUOGiR3O6tw8ui5sZa2LAaHnsviZdVOUZw4fvbnME=
go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/dig v1.16.1 h1:+alNIBsl0qfY0j6epRubp/9obgtrObRAc5aD+6jbWY8=
go.uber.org/dig v1.16.1/go.mod h1:557JTAUZT5bUK0SvCwikmLPPtdQhfvLYtO5tJgQSbnk=
go.uber.org/fx v1.19.3 h1:YqMRE4+2IepTYCMOvXqQpRa+QAVdiSTnsHU4XNWBceA=
Expand All @@ -446,8 +446,8 @@ golang.org/x/crypto v0.0.0-20191227163750-53104e6ec876/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand All @@ -458,6 +458,7 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -578,8 +579,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -589,8 +590,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down
20 changes: 15 additions & 5 deletions internal/events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (m *Manager) EnsureConsumer(ctx context.Context, config *ConsumerConfig) (*
return nil, errors.New("stream must be specified")
}

if len(config.Subjects) != 1 {
return nil, errors.New("only one subject can be specified")
if len(config.Subjects) == 0 {
return nil, errors.New("one or more subjects must be specified")
}

var name string
Expand Down Expand Up @@ -157,8 +157,12 @@ func (m *Manager) declareDurableConsumer(ctx context.Context, config *ConsumerCo
// consumers.
func (m *Manager) setConsumerSettings(c *jetstream.ConsumerConfig, qc *ConsumerConfig, update bool) {
c.AckPolicy = jetstream.AckExplicitPolicy
// TODO: With NATS 2.10 multiple subjects can be specified
c.FilterSubject = qc.Subjects[0]
if len(qc.Subjects) == 1 {
c.FilterSubject = qc.Subjects[0]
} else {
c.FilterSubject = ""
c.FilterSubjects = qc.Subjects
}

// If a timeout is specified set it or use the default
if qc.Timeout > 0 {
Expand Down Expand Up @@ -193,7 +197,13 @@ type ZapConsumerConfig jetstream.ConsumerConfig

func (c *ZapConsumerConfig) MarshalLogObject(enc zapcore.ObjectEncoder) error {
err := enc.AddArray("subjects", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
enc.AppendString(c.FilterSubject)
if c.FilterSubjects == nil {
enc.AppendString(c.FilterSubject)
} else {
for _, subject := range c.FilterSubjects {
enc.AppendString(subject)
}
}
return nil
}))
if err != nil {
Expand Down
82 changes: 71 additions & 11 deletions internal/events/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,78 @@ var _ = Describe("Consumers", func() {
})

It("consumer with zero subjects fails", func(ctx context.Context) {
_, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Subjects: []string{
"test",
},
})
Expect(err).ToNot(HaveOccurred())

_, err = manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
})
Expect(err).To(HaveOccurred())
})
})

It("consumer with multiple subjects fails", func(ctx context.Context) {
_, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Describe("Ephemeral", func() {
It("can create a subscription", func(ctx context.Context) {
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Subjects: []string{
"test",
},
})
Expect(err).ToNot(HaveOccurred())

_, err = js.Consumer(ctx, "test", "test")
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, jetstream.ErrConsumerNotFound)).To(BeTrue())

s, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
Subjects: []string{
"test",
},
})
Expect(err).ToNot(HaveOccurred())

_, err = js.Consumer(ctx, "test", s.ID)
Expect(err).ToNot(HaveOccurred())
})

It("can create consumer with multiple subjects", func(ctx context.Context) {
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Subjects: []string{
"test",
"test.>",
},
})
Expect(err).ToNot(HaveOccurred())

_, err = js.Consumer(ctx, "test", "test")
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, jetstream.ErrConsumerNotFound)).To(BeTrue())

s, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
Subjects: []string{
"test",
"test.2",
},
})
Expect(err).ToNot(HaveOccurred())

c, err := js.Consumer(ctx, "test", s.ID)
Expect(err).ToNot(HaveOccurred())
Expect(c.CachedInfo().Config.FilterSubject).To(BeEmpty())
Expect(c.CachedInfo().Config.FilterSubjects).To(ConsistOf("test", "test.2"))
})
})

Describe("Ephemeral", func() {
Describe("Durable", func() {
It("can create a subscription", func(ctx context.Context) {
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Expand All @@ -73,6 +126,7 @@ var _ = Describe("Consumers", func() {

s, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
Name: "test",
Subjects: []string{
"test",
},
Expand All @@ -95,6 +149,7 @@ var _ = Describe("Consumers", func() {

s, err := manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
Name: "test",
Subjects: []string{
"test",
},
Expand All @@ -106,6 +161,7 @@ var _ = Describe("Consumers", func() {

s, err = manager.EnsureConsumer(ctx, &events.ConsumerConfig{
Stream: "test",
Name: "test",
Subjects: []string{
"test.2",
},
Expand All @@ -116,14 +172,13 @@ var _ = Describe("Consumers", func() {
Expect(err).ToNot(HaveOccurred())
Expect(c.CachedInfo().Config.FilterSubject).To(Equal("test.2"))
})
})

Describe("Durable", func() {
It("can create a subscription", func(ctx context.Context) {
It("can create consumer with multiple subjects", func(ctx context.Context) {
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Subjects: []string{
"test",
"test.>",
},
})
Expect(err).ToNot(HaveOccurred())
Expand All @@ -137,15 +192,18 @@ var _ = Describe("Consumers", func() {
Name: "test",
Subjects: []string{
"test",
"test.2",
},
})
Expect(err).ToNot(HaveOccurred())

_, err = js.Consumer(ctx, "test", s.ID)
c, err := js.Consumer(ctx, "test", s.ID)
Expect(err).ToNot(HaveOccurred())
Expect(c.CachedInfo().Config.FilterSubject).To(BeEmpty())
Expect(c.CachedInfo().Config.FilterSubjects).To(ConsistOf("test", "test.2"))
})

It("can update subject of subscription", func(ctx context.Context) {
It("can update from one subject to multiple", func(ctx context.Context) {
_, err := manager.EnsureStream(ctx, &events.StreamConfig{
Name: "test",
Subjects: []string{
Expand All @@ -172,13 +230,15 @@ var _ = Describe("Consumers", func() {
Name: "test",
Subjects: []string{
"test.2",
"test.3",
},
})
Expect(err).ToNot(HaveOccurred())

ci, err := js.Consumer(ctx, "test", s.ID)
c, err := js.Consumer(ctx, "test", s.ID)
Expect(err).ToNot(HaveOccurred())
Expect(ci.CachedInfo().Config.FilterSubject).To(Equal("test.2"))
Expect(c.CachedInfo().Config.FilterSubject).To(BeEmpty())
Expect(c.CachedInfo().Config.FilterSubjects).To(ConsistOf("test.2", "test.3"))
})
})
})

0 comments on commit 0a87f20

Please sign in to comment.