From 3f9d6b5ef8912a05314cb086f799d72afc871591 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Thu, 26 Oct 2023 22:51:23 -0700 Subject: [PATCH] fix(tests): isolate embedded nats in tests --- broker/nats.go | 3 ++- broker/nats_test.go | 45 ++++++++++++++++++++++++++++----- enats/config.go | 1 + enats/enats.go | 4 +++ node/broker_integration_test.go | 18 ++++++++++--- 5 files changed, 60 insertions(+), 11 deletions(-) diff --git a/broker/nats.go b/broker/nats.go index 02348bc5..44d287eb 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -491,10 +491,11 @@ bucketSetup: bucket, err := n.js.KeyValue(context.Background(), key) if err == jetstream.ErrBucketNotFound { + n.log.Debugf("no JetStream bucket found, creating a new one: %s", key) var berr error bucket, berr = n.js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{ Bucket: key, - TTL: time.Duration(n.conf.SessionsTTL * int64(time.Second)), + TTL: ttl, }) if berr != nil { diff --git a/broker/nats_test.go b/broker/nats_test.go index fda98b1e..241b41c0 100644 --- a/broker/nats_test.go +++ b/broker/nats_test.go @@ -28,7 +28,9 @@ func (FakeBroadastHandler) ExecuteRemoteCommand(cmd *common.RemoteCommandMessage var _ pubsub.Handler = (*FakeBroadastHandler)(nil) func TestNATSBroker_HistorySince_expiration(t *testing.T) { - server := buildNATSServer() + port := 32 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -37,6 +39,8 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { config.HistoryTTL = 1 nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broadcastHandler := FakeBroadastHandler{} broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) @@ -81,7 +85,9 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { } func TestNATSBroker_HistorySince_with_limit(t *testing.T) { - server := buildNATSServer() + port := 33 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -90,6 +96,8 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { config.HistoryLimit = 2 nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broadcastHandler := FakeBroadastHandler{} broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) @@ -120,7 +128,10 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { } func TestNATSBroker_HistoryFrom(t *testing.T) { - server := buildNATSServer() + port := 34 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + server := buildNATSServer(t, addr) + err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -128,6 +139,8 @@ func TestNATSBroker_HistoryFrom(t *testing.T) { config := NewConfig() nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broadcastHandler := FakeBroadastHandler{} broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) @@ -202,7 +215,9 @@ func (t *TestCacheable) ToCacheEntry() ([]byte, error) { } func TestNATSBroker_Sessions(t *testing.T) { - server := buildNATSServer() + port := 41 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -211,6 +226,8 @@ func TestNATSBroker_Sessions(t *testing.T) { config.SessionsTTL = 1 nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broker := NewNATSBroker(nil, &config, &nconfig) err = broker.Start() @@ -257,7 +274,10 @@ func TestNATSBroker_Sessions(t *testing.T) { } func TestNATSBroker_SessionsTTLChange(t *testing.T) { - server := buildNATSServer() + port := 43 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -266,6 +286,8 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { config.SessionsTTL = 1 nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broker := NewNATSBroker(nil, &config, &nconfig) err = broker.Start() @@ -317,7 +339,10 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { } func TestNATSBroker_Epoch(t *testing.T) { - server := buildNATSServer() + port := 45 + addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) + + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -325,12 +350,16 @@ func TestNATSBroker_Epoch(t *testing.T) { config := NewConfig() nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broker := NewNATSBroker(nil, &config, &nconfig) err = broker.Start() require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck + broker.Reset() // nolint: errcheck + epoch := broker.Epoch() anotherBroker := NewNATSBroker(nil, &config, &nconfig) @@ -340,9 +369,11 @@ func TestNATSBroker_Epoch(t *testing.T) { assert.Equal(t, epoch, anotherBroker.Epoch()) } -func buildNATSServer() *enats.Service { +func buildNATSServer(t *testing.T, addr string) *enats.Service { conf := enats.NewConfig() conf.JetStream = true + conf.ServiceAddr = addr + conf.StoreDir = t.TempDir() service := enats.NewService(&conf) return service diff --git a/enats/config.go b/enats/config.go index 7544368e..2068a675 100644 --- a/enats/config.go +++ b/enats/config.go @@ -12,4 +12,5 @@ type Config struct { Gateways []string Routes []string JetStream bool + StoreDir string } diff --git a/enats/enats.go b/enats/enats.go index 0a0cdcf8..1c93f988 100644 --- a/enats/enats.go +++ b/enats/enats.go @@ -99,6 +99,10 @@ func (s *Service) Start() error { JetStream: s.config.JetStream, } + if s.config.StoreDir != "" { + opts.StoreDir = s.config.StoreDir + } + s.server, err = server.NewServer(opts) if err != nil { return errorx.Decorate(err, "Failed to start NATS server") diff --git a/node/broker_integration_test.go b/node/broker_integration_test.go index 5c435b5b..584b2972 100644 --- a/node/broker_integration_test.go +++ b/node/broker_integration_test.go @@ -52,7 +52,10 @@ func TestIntegrationRestore_Memory(t *testing.T) { } func TestIntegrationRestore_NATS(t *testing.T) { - server := buildNATSServer() + port := 32 + addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) + + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -63,6 +66,8 @@ func TestIntegrationRestore_NATS(t *testing.T) { bconf.SessionsTTL = 2 nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broadcaster := pubsub.NewLegacySubscriber(node) broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) @@ -244,7 +249,10 @@ func TestIntegrationHistory_Memory(t *testing.T) { } func TestIntegrationHistory_NATS(t *testing.T) { - server := buildNATSServer() + port := 33 + addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) + + server := buildNATSServer(t, addr) err := server.Start() require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -254,6 +262,8 @@ func TestIntegrationHistory_NATS(t *testing.T) { bconf := broker.NewConfig() nconfig := natsconfig.NewNATSConfig() + nconfig.Servers = addr + broadcaster := pubsub.NewLegacySubscriber(node) broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) @@ -427,9 +437,11 @@ func requireAuthenticatedSession(t *testing.T, node *Node, sid string) *Session return session } -func buildNATSServer() *enats.Service { +func buildNATSServer(t *testing.T, addr string) *enats.Service { conf := enats.NewConfig() conf.JetStream = true + conf.ServiceAddr = addr + conf.StoreDir = t.TempDir() service := enats.NewService(&conf) return service