Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
fix: allow configuring eNATS store dir and make it writable on Docker
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Nov 1, 2023
1 parent 3fbef4f commit 9bb4a28
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 14 deletions.
5 changes: 5 additions & 0 deletions .docker/Dockerfile.mrb-linux-amd64
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ ADD .docker/${TARGETPLATFORM}/anycable-go /usr/local/bin/anycable-go

USER nobody

RUN mkdir -p /tmp/anycable \
&& chown -R nobody:nogroup /tmp/anycable

ENV ANYCABLE_ENATS_STORE_DIR=/tmp/anycable/nats-data

EXPOSE 8080

ENTRYPOINT ["/usr/local/bin/anycable-go"]
5 changes: 5 additions & 0 deletions .docker/Dockerfile.universal
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ ADD .docker/${TARGETPLATFORM}/anycable-go /usr/local/bin/anycable-go

USER nobody

RUN mkdir -p /tmp/anycable \
&& chown -R nobody:nogroup /tmp/anycable

ENV ANYCABLE_ENATS_STORE_DIR=/tmp/anycable/nats-data

EXPOSE 8080

ENTRYPOINT ["/usr/local/bin/anycable-go"]
12 changes: 7 additions & 5 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,10 @@ func (n *NATS) ensureStreamExists(stream string) error {
ctx := context.Background()

_, err := n.js.CreateStream(ctx, jetstream.StreamConfig{
Name: prefixedStream,
MaxMsgs: int64(n.conf.HistoryLimit),
MaxAge: time.Duration(n.conf.HistoryTTL * int64(time.Second)),
Name: prefixedStream,
MaxMsgs: int64(n.conf.HistoryLimit),
MaxAge: time.Duration(n.conf.HistoryTTL * int64(time.Second)),
Replicas: 1,
})

if err != nil {
Expand Down Expand Up @@ -494,8 +495,9 @@ bucketSetup:
n.log.Debugf("no JetStream bucket found, creating a new one: %s", key)
var berr error
bucket, berr = n.js.CreateKeyValue(context.Background(), jetstream.KeyValueConfig{
Bucket: key,
TTL: ttl,
Bucket: key,
TTL: ttl,
Replicas: 1,
})

if berr != nil {
Expand Down
7 changes: 7 additions & 0 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,13 @@ func embeddedNatsCLIFlags(c *config.Config, routes *string, gateways *string) []
Destination: &c.EmbeddedNats.GatewayAdvertise,
},

&cli.StringFlag{
Name: "enats_store_dir",
Usage: "Embedded NATS store directory (for JetStream)",
Value: c.EmbeddedNats.StoreDir,
Destination: &c.EmbeddedNats.StoreDir,
},

&cli.BoolFlag{
Name: "enats_debug",
Usage: "Enable NATS server logs",
Expand Down
2 changes: 2 additions & 0 deletions docs/reliable_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ The default broker adapter. It stores all data in memory. It can be used **only

### NATS

_🧪 Experimental_

This adapter uses [NATS JetStream](https://nats.io/) as a shared distributed storage for sessions and streams cache and also keeps a local snapshot in memory (using the in-memory adapter described above).

It can be used with both external NATS and [embedded NATS](./embedded_nats.md):
Expand Down
13 changes: 13 additions & 0 deletions enats/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package enats

import "github.com/nats-io/nats.go"

// Config represents NATS service configuration
type Config struct {
Debug bool
Expand All @@ -14,4 +16,15 @@ type Config struct {
Routes []string
JetStream bool
StoreDir string
// Seconds to wait for JetStream to become ready (can take a lot of time when connecting to a cluster)
JetStreamReadyTimeout int
}

// NewConfig returns defaults for NATSServiceConfig
func NewConfig() Config {
return Config{
ServiceAddr: nats.DefaultURL,
ClusterName: "anycable-cluster",
JetStreamReadyTimeout: 30, // seconds
}
}
53 changes: 44 additions & 9 deletions enats/enats.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ func (e *LogEntry) Tracef(format string, v ...interface{}) {
e.Debugf(format, v...)
}

// NewConfig returns defaults for NATSServiceConfig
func NewConfig() Config {
return Config{
ServiceAddr: nats.DefaultURL,
ClusterName: "anycable-cluster",
}
}

// NewService returns an instance of NATS service
func NewService(c *Config) *Service {
return &Service{config: c}
Expand Down Expand Up @@ -127,7 +119,7 @@ func (s *Service) Start() error {
// WaitReady waits while NATS server is starting
func (s *Service) WaitReady() error {
if s.server.ReadyForConnections(serverStartTimeout) {
return nil
return s.WaitJetStreamReady(s.config.JetStreamReadyTimeout)
}

return errorx.TimeoutElapsed.New(
Expand Down Expand Up @@ -296,3 +288,46 @@ func (s *Service) serverName() string {
s.name = strings.ReplaceAll(strings.ReplaceAll(s.config.ServiceAddr, ":", "-"), "/", "") + "-" + suf
return s.name
}

func (s *Service) WaitJetStreamReady(maxSeconds int) error {
if !s.config.JetStream {
return nil
}

start := time.Now()
for {
if time.Since(start) > time.Duration(maxSeconds)*time.Second {
return fmt.Errorf("JetStream is not ready after %d seconds", maxSeconds)
}

c, err := nats.Connect("", nats.InProcessServer(s.server))
if err != nil {
log.Debugf("NATS server not accepting connections: %v", err)
continue
}

j, err := c.JetStream()
if err != nil {
return err
}

st, err := j.StreamInfo("__anycable__ready__", nats.MaxWait(1*time.Second))
if err == nats.ErrStreamNotFound || st != nil {
leader := s.server.JetStreamIsLeader()
log.Debugf("JetStream cluster is ready: leader=%v", leader)

// FIXME: This hack is needed to give leader some time to prepare the state (e.g., broker epoch)
if !leader {
time.Sleep(1 * time.Second)
}

return nil
}

c.Close()

log.Debugf("JetStream cluster is not ready yet, waiting for 1 second...")

time.Sleep(1 * time.Second)
}
}
131 changes: 131 additions & 0 deletions features/enats_broker.testfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
launch :rpc, "bundle exec anyt --only-rpc",
env: {"ANYCABLE_BROADCAST_ADAPTER" => "http"}
wait_tcp 50051

store_path = File.expand_path("../tmp/nats-data", __dir__)
FileUtils.rm_rf(store_path) if File.directory?(store_path)

launch :anycable_1,
"./dist/anycable-go --broker=nats --pubsub=nats --broadcast_adapter=http --embed_nats --enats_addr=nats://localhost:4242 --enats_cluster=nats://localhost:4342 --enats_gateway=nats://localhost:4442 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/one"

launch :anycable_2,
"./dist/anycable-go --port 8081 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4243 --enats_cluster=nats://localhost:4343 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/two"

wait_tcp 8080
wait_tcp 8081

# We need to wait a bit for the NATS servers to find each other
sleep 2

scenario = [
{
client: {
protocol: "action_cable",
name: "publisher",
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
{
perform: {
channel: "BenchmarkChannel",
action: "broadcast",
data: {
message: "hello"
}
}
},
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast",
},
stream_id: "all"
}
},
{
receive: {
channel: "BenchmarkChannel",
data: {
message: "hello",
action: "broadcastResult"
}
}
}
]
}
},
{
client: {
protocol: "action_cable",
name: "subscriber",
connection_options: {
url: "http://localhost:8080/cable"
},
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast"
},
stream_id: "all"
}
}
]
}
},
{
client: {
protocol: "action_cable",
name: "another_subscriber",
connection_options: {
url: "http://localhost:8081/cable"
},
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast"
},
stream_id: "all"
}
}
]
}
}
]

TEST_COMMAND = <<~CMD
bundle exec wsdirector ws://localhost:8080/cable -i #{scenario.to_json}
CMD

# NATS super-cluster may take longer to fully connect
retrying(delay: 2) do
run :wsdirector, TEST_COMMAND

result = stdout(:wsdirector)

unless result.include?("Group publisher: 1 clients, 0 failures") &&
result.include?("Group subscriber: 1 clients, 0 failures") &&
result.include?("Group another_subscriber: 1 clients, 0 failures")
fail "Unexpected scenario result:\n#{result}"
end
end

0 comments on commit 9bb4a28

Please sign in to comment.