diff --git a/broker/broker.go b/broker/broker.go index 0562b708..e25b0f2a 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -33,7 +33,7 @@ type Cacheable interface { // //go:generate mockery --name Broker --output "../mocks" --outpkg mocks type Broker interface { - Start() error + Start(done chan (error)) error Shutdown(ctx context.Context) error Announce() string @@ -60,7 +60,7 @@ type Broker interface { // LocalBroker is a single-node broker that can used to store streams data locally type LocalBroker interface { - Start() error + Start(done chan (error)) error Shutdown(ctx context.Context) error SetEpoch(epoch string) GetEpoch() string @@ -131,7 +131,7 @@ func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker { } } -func (LegacyBroker) Start() error { +func (LegacyBroker) Start(done chan (error)) error { return nil } diff --git a/broker/memory.go b/broker/memory.go index 3d405add..83c69c16 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -223,7 +223,7 @@ func (b *Memory) SetEpoch(v string) { b.epoch = v } -func (b *Memory) Start() error { +func (b *Memory) Start(done chan (error)) error { go b.expireLoop() return nil diff --git a/broker/nats.go b/broker/nats.go index cf3fe78c..22d2b341 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -44,6 +44,10 @@ type NATS struct { shutdownCtx context.Context shutdownFn func() + readyCtx context.Context + broadcastBacklog []*common.StreamMessage + backlogMu sync.Mutex + log *log.Entry } @@ -53,6 +57,8 @@ const ( epochKey = "_epoch_" sessionsPrefix = "" streamPrefix = "_ac_" + + jetstreamReadyTimeout = 1 * time.Second ) var _ Broker = (*NATS)(nil) @@ -69,16 +75,17 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig shutdownCtx, shutdownFn := context.WithCancel(context.Background()) n := NATS{ - broadcaster: broadcaster, - conf: c, - nconf: nc, - shutdownCtx: shutdownCtx, - shutdownFn: shutdownFn, - tracker: NewStreamsTracker(), - streamSync: newStreamsSynchronizer(), - jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))), - jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))), - log: log.WithField("context", "broker").WithField("provider", "nats"), + broadcaster: broadcaster, + conf: c, + nconf: nc, + shutdownCtx: shutdownCtx, + shutdownFn: shutdownFn, + tracker: NewStreamsTracker(), + broadcastBacklog: []*common.StreamMessage{}, + streamSync: newStreamsSynchronizer(), + jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))), + jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))), + log: log.WithField("context", "broker").WithField("provider", "nats"), } for _, opt := range opts { @@ -93,7 +100,7 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig } // Write Broker implementtaion here -func (n *NATS) Start() error { +func (n *NATS) Start(done chan (error)) error { n.clientMu.Lock() defer n.clientMu.Unlock() @@ -120,13 +127,67 @@ func (n *NATS) Start() error { return err } + n.conn = nc + + readyCtx, readyFn := context.WithCancelCause(context.Background()) + + n.readyCtx = readyCtx + + // Initialize JetStream asynchronously, because we may need to wait for JetStream cluster to be ready + go func() { + err := n.initJetStream() + readyFn(err) + if err != nil && done != nil { + done <- err + } + + if err != nil { + n.backlogFlush() + } + }() + + return nil +} + +func (n *NATS) Ready(timeout ...time.Duration) error { + var err error + + if len(timeout) == 0 { + <-n.readyCtx.Done() + } else { + timer := time.After(timeout[0]) + + select { + case <-n.readyCtx.Done(): + case <-timer: + err = fmt.Errorf("timeout waiting for JetStream to be ready") + } + } + + if err != nil { + return err + } + + cause := context.Cause(n.readyCtx) + + if cause == context.Canceled { + return nil + } else { + return cause + } +} + +func (n *NATS) initJetStream() error { + n.clientMu.Lock() + defer n.clientMu.Unlock() + + nc := n.conn js, err := jetstream.New(nc) if err != nil { return errorx.Decorate(err, "Failed to connect to JetStream") } - n.conn = nc n.js = js kv, err := n.fetchBucketWithTTL(kvBucket, time.Duration(n.conf.SessionsTTL*int64(time.Second))) @@ -144,7 +205,7 @@ func (n *NATS) Start() error { } n.writeEpoch(epoch) - err = n.local.Start() + err = n.local.Start(nil) if err != nil { return errorx.Decorate(err, "Failed to start internal memory broker") @@ -156,8 +217,7 @@ func (n *NATS) Start() error { n.log.Warnf("failed to set up epoch watcher: %s", err) } - n.log.Debugf("Current epoch: %s", epoch) - + n.log.Infof("NATS broker is ready (epoch=%s)", epoch) return nil } @@ -223,6 +283,13 @@ func (n *NATS) writeEpoch(val string) { } func (n *NATS) HandleBroadcast(msg *common.StreamMessage) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + n.log.Debugf("JetStream is not ready yet to publish messages, add to backlog") + n.backlogAdd(msg) + return + } + offset, err := n.add(msg.Stream, msg.Data) if err != nil { @@ -264,16 +331,31 @@ func (n *NATS) Unsubscribe(stream string) string { } func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + n.streamSync.sync(stream) return n.local.HistoryFrom(stream, epoch, offset) } func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + n.streamSync.sync(stream) return n.local.HistorySince(stream, since) } func (n *NATS) CommitSession(sid string, session Cacheable) error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + ctx := context.Background() key := sessionsPrefix + sid data, err := session.ToCacheEntry() @@ -292,6 +374,11 @@ func (n *NATS) CommitSession(sid string, session Cacheable) error { } func (n *NATS) RestoreSession(sid string) ([]byte, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + key := sessionsPrefix + sid ctx := context.Background() @@ -309,6 +396,11 @@ func (n *NATS) RestoreSession(sid string) ([]byte, error) { } func (n *NATS) FinishSession(sid string) error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + ctx := context.Background() key := sessionsPrefix + sid @@ -328,6 +420,11 @@ func (n *NATS) FinishSession(sid string) error { } func (n *NATS) Reset() error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + n.clientMu.Lock() defer n.clientMu.Unlock() @@ -363,6 +460,8 @@ func (n *NATS) add(stream string, data string) (uint64, error) { ctx := context.Background() key := streamPrefix + stream + // Touch on publish to make sure that the subsequent history fetch will return the latest messages + n.streamSync.touch(stream) ack, err := n.js.Publish(ctx, key, []byte(data)) if err != nil { @@ -821,3 +920,21 @@ func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Durati return false } + +func (n *NATS) backlogAdd(msg *common.StreamMessage) { + n.backlogMu.Lock() + defer n.backlogMu.Unlock() + + n.broadcastBacklog = append(n.broadcastBacklog, msg) +} + +func (n *NATS) backlogFlush() { + n.backlogMu.Lock() + defer n.backlogMu.Unlock() + + for _, msg := range n.broadcastBacklog { + n.HandleBroadcast(msg) + } + + n.broadcastBacklog = []*common.StreamMessage{} +} diff --git a/broker/nats_test.go b/broker/nats_test.go index 4090f77b..d2339bb3 100644 --- a/broker/nats_test.go +++ b/broker/nats_test.go @@ -30,8 +30,7 @@ var _ pubsub.Handler = (*FakeBroadastHandler)(nil) func TestNATSBroker_HistorySince_expiration(t *testing.T) { port := 32 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -45,7 +44,7 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -76,8 +75,8 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { assert.EqualValues(t, 4, history[1].Offset) assert.Equal(t, "d", history[1].Data) - // Stream must be expired after 1 second - time.Sleep(2 * time.Second) + // Stream must be expired after 2 seconds + time.Sleep(3 * time.Second) history, err = broker.HistorySince("test", start) require.NoError(t, err) @@ -87,8 +86,7 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { func TestNATSBroker_HistorySince_with_limit(t *testing.T) { port := 33 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -102,7 +100,7 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -130,9 +128,7 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { func TestNATSBroker_HistoryFrom(t *testing.T) { port := 34 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -145,7 +141,7 @@ func TestNATSBroker_HistoryFrom(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -217,8 +213,7 @@ func (t *TestCacheable) ToCacheEntry() ([]byte, error) { func TestNATSBroker_Sessions(t *testing.T) { port := 41 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -230,7 +225,7 @@ func TestNATSBroker_Sessions(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -239,9 +234,11 @@ func TestNATSBroker_Sessions(t *testing.T) { require.NoError(t, err) anotherBroker := NewNATSBroker(nil, &config, &nconfig) - anotherBroker.Start() // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + restored, err := anotherBroker.RestoreSession("test123") require.NoError(t, err) @@ -277,8 +274,7 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { port := 43 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -290,11 +286,13 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, broker.Ready()) + err = broker.CommitSession("test123", &TestCacheable{"cache-me"}) require.NoError(t, err) @@ -302,9 +300,11 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { aConfig.SessionsTTL = 3 anotherBroker := NewNATSBroker(nil, &aConfig, &nconfig) - require.NoError(t, anotherBroker.Start()) // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + // The session must be missing since we recreated the bucket due to TTL change missing, err := anotherBroker.RestoreSession("test123") @@ -342,8 +342,7 @@ func TestNATSBroker_Epoch(t *testing.T) { port := 45 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -354,18 +353,21 @@ func TestNATSBroker_Epoch(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, broker.Ready()) broker.Reset() // nolint: errcheck epoch := broker.Epoch() anotherBroker := NewNATSBroker(nil, &config, &nconfig) - require.NoError(t, anotherBroker.Start()) // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + assert.Equal(t, epoch, anotherBroker.Epoch()) // Now let's test that epoch changes are picked up @@ -392,14 +394,24 @@ wait: } } -func buildNATSServer(t *testing.T, addr string) *enats.Service { +func startNATSServer(t *testing.T, addr string) (*enats.Service, error) { conf := enats.NewConfig() conf.JetStream = true conf.ServiceAddr = addr conf.StoreDir = t.TempDir() service := enats.NewService(&conf) - return service + err := service.Start() + if err != nil { + return nil, err + } + + err = service.WaitJetStreamReady(5) + if err != nil { + return nil, err + } + + return service, nil } type consumerSequenceReader struct { diff --git a/cli/cli.go b/cli/cli.go index d00daa3e..af9f97b6 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -232,6 +232,13 @@ func (r *Runner) Run() error { return errorx.Decorate(err, "!!! Subscriber failed !!!") } + if appBroker != nil { + err = appBroker.Start(r.errChan) + if err != nil { + return errorx.Decorate(err, "!!! Broker failed !!!") + } + } + r.shutdownables = append(r.shutdownables, subscriber) if r.broadcastersFactory != nil { diff --git a/enats/enats.go b/enats/enats.go index 85250488..b14c3ba9 100644 --- a/enats/enats.go +++ b/enats/enats.go @@ -132,7 +132,12 @@ func (s *Service) Start() error { // WaitReady waits while NATS server is starting func (s *Service) WaitReady() error { if s.server.ReadyForConnections(serverStartTimeout) { - return s.WaitJetStreamReady(s.config.JetStreamReadyTimeout) + // We don't want to block the bootstrap process while waiting for JetStream. + // JetStream requires a cluster to be formed before it can be enabled, but when we + // perform a rolling update, the newly created instance may have no network connectivity, + // thus, it won't be able to join the cluster and enable JetStream. + go s.WaitJetStreamReady(s.config.JetStreamReadyTimeout) // nolint:errcheck + return nil } return errorx.TimeoutElapsed.New( diff --git a/features/enats_broker.testfile b/features/enats_broker.testfile index b9b01018..e27e31a5 100644 --- a/features/enats_broker.testfile +++ b/features/enats_broker.testfile @@ -8,22 +8,38 @@ FileUtils.rm_rf(store_path) if File.directory?(store_path) launch :anycable_1, "./dist/anycable-go --port 8080 --broker=nats --pubsub=nats --broadcast_adapter=http --embed_nats --enats_addr=nats://localhost:4242 --enats_cluster=nats://localhost:4342 --enats_gateway=nats://localhost:4442 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/one" -sleep 2 +sleep 1 launch :anycable_2, "./dist/anycable-go --port 8081 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4243 --enats_cluster=nats://localhost:4343 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/two" -sleep 2 +sleep 1 launch :anycable_3, "./dist/anycable-go --port 8082 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4244 --enats_cluster=nats://localhost:4344 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/three" -wait_tcp 8080, timeout: 20 -wait_tcp 8081, timeout: 20 -wait_tcp 8082, timeout: 20 +wait_tcp 8080 +wait_tcp 8081 +wait_tcp 8082 -# We need to obtain epoch first +# Wait for JetStream to be ready; we can do this by checking that the epoch data has been written on disk +timeout = 20 +log(:info) { "Waiting for JetStream to be ready" } +loop do + break if Dir["#{store_path}/**/*"].any? { |f| f =~ /KV__anycable_epoch_/ } + + Kernel.sleep 0.5 + timeout -= 0.5 + if timeout < 0 + fail "JetStream is not ready" + end +end + +# Let's wait for epoch to be propagated to all nodes +sleep 2 + +# We need to obtain epoch first epoch_scenario = [ { client: { @@ -59,12 +75,16 @@ epoch_scenario = [ } ] -run :wsdirector, "bundle exec wsdirector ws://localhost:8080/cable -i #{epoch_scenario.to_json}" +result = nil -result = stdout(:wsdirector) +retrying(delay: 2) do + run :wsdirector, "bundle exec wsdirector ws://localhost:8080/cable -i #{epoch_scenario.to_json}" -if result !~ /1 clients, 0 failures/ - fail "Unexpected scenario result:\n#{result}" + result = stdout(:wsdirector) + + if result !~ /1 clients, 0 failures/ + fail "Unexpected scenario result:\n#{result}" + end end epoch = result.match(/"epoch":"([^"]+)"/)[1] diff --git a/mocks/Broker.go b/mocks/Broker.go index d0c3248a..6ce183e4 100644 --- a/mocks/Broker.go +++ b/mocks/Broker.go @@ -160,13 +160,13 @@ func (_m *Broker) Shutdown(ctx context.Context) error { return r0 } -// Start provides a mock function with given fields: -func (_m *Broker) Start() error { - ret := _m.Called() +// Start provides a mock function with given fields: done +func (_m *Broker) Start(done chan error) error { + ret := _m.Called(done) var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(chan error) error); ok { + r0 = rf(done) } else { r0 = ret.Error(0) } diff --git a/node/broker_integration_test.go b/node/broker_integration_test.go index 584b2972..e5a52226 100644 --- a/node/broker_integration_test.go +++ b/node/broker_integration_test.go @@ -45,6 +45,18 @@ import ( // - Make sure it is not restored (uses controller.Authenticate) func TestIntegrationRestore_Memory(t *testing.T) { node, controller := setupIntegrationNode() + + bconf := broker.NewConfig() + bconf.SessionsTTL = 2 + + subscriber := pubsub.NewLegacySubscriber(node) + + br := broker.NewMemoryBroker(subscriber, &bconf) + br.SetEpoch("2022") + node.SetBroker(br) + + require.NoError(t, br.Start(nil)) + go node.Start() // nolint:errcheck defer node.Shutdown(context.Background()) // nolint:errcheck @@ -55,8 +67,7 @@ func TestIntegrationRestore_NATS(t *testing.T) { port := 32 addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -72,7 +83,8 @@ func TestIntegrationRestore_NATS(t *testing.T) { broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) - require.NoError(t, node.Start()) // nolint:errcheck + require.NoError(t, node.Start()) + require.NoError(t, broker.Start(nil)) defer node.Shutdown(context.Background()) // nolint:errcheck require.NoError(t, broker.Reset()) @@ -242,6 +254,17 @@ func sharedIntegrationRestore(t *testing.T, node *Node, controller *mocks.Contro // - The session MUST receive the messages broadcasted during the unsubsciprtion period. func TestIntegrationHistory_Memory(t *testing.T) { node, controller := setupIntegrationNode() + + bconf := broker.NewConfig() + + subscriber := pubsub.NewLegacySubscriber(node) + + br := broker.NewMemoryBroker(subscriber, &bconf) + br.SetEpoch("2022") + node.SetBroker(br) + + require.NoError(t, br.Start(nil)) + go node.Start() // nolint:errcheck defer node.Shutdown(context.Background()) // nolint:errcheck @@ -252,8 +275,7 @@ func TestIntegrationHistory_NATS(t *testing.T) { port := 33 addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -268,7 +290,8 @@ func TestIntegrationHistory_NATS(t *testing.T) { broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) - require.NoError(t, node.Start()) // nolint:errcheck + require.NoError(t, node.Start()) + require.NoError(t, broker.Start(nil)) defer node.Shutdown(context.Background()) // nolint:errcheck require.NoError(t, broker.Reset()) @@ -378,15 +401,6 @@ func setupIntegrationNode() (*Node, *mocks.Controller) { node := NewNode(controller, metrics.NewMetrics(nil, 10), &config) node.SetDisconnector(NewNoopDisconnector()) - bconf := broker.NewConfig() - bconf.SessionsTTL = 2 - - subscriber := pubsub.NewLegacySubscriber(node) - - br := broker.NewMemoryBroker(subscriber, &bconf) - br.SetEpoch("2022") - node.SetBroker(br) - return node, controller } @@ -437,12 +451,22 @@ func requireAuthenticatedSession(t *testing.T, node *Node, sid string) *Session return session } -func buildNATSServer(t *testing.T, addr string) *enats.Service { +func startNATSServer(t *testing.T, addr string) (*enats.Service, error) { conf := enats.NewConfig() conf.JetStream = true conf.ServiceAddr = addr conf.StoreDir = t.TempDir() service := enats.NewService(&conf) - return service + err := service.Start() + if err != nil { + return nil, err + } + + err = service.WaitJetStreamReady(5) + if err != nil { + return nil, err + } + + return service, nil } diff --git a/node/node.go b/node/node.go index 95074c2b..29cdfd46 100644 --- a/node/node.go +++ b/node/node.go @@ -101,10 +101,6 @@ func (n *Node) Start() error { go n.hub.Run() go n.collectStats() - if err := n.broker.Start(); err != nil { - return err - } - return nil }