From db10f817bb2c244c53ebee72b615c7dea2d3a89d Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Thu, 2 Nov 2023 14:41:29 -0700 Subject: [PATCH] fix: initialize jetstream cluster asynchronously The problem is that JetStream requires network conenctivity to finalize cluster setup, and we rely on health checks to activate instances, we must make WS server ready independently of embedded NATS status. This commit moves WaitJetStreamReady to a go routine and refactors NATS broker to handle possible not-yet-ready JetStream cluster by deferring some operations --- broker/broker.go | 6 +- broker/memory.go | 2 +- broker/nats.go | 147 ++++++++++++++++++++++++++++---- broker/nats_test.go | 64 ++++++++------ cli/cli.go | 7 ++ enats/enats.go | 7 +- features/enats_broker.testfile | 40 ++++++--- mocks/Broker.go | 10 +-- node/broker_integration_test.go | 58 +++++++++---- node/node.go | 4 - 10 files changed, 263 insertions(+), 82 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 0562b708..e25b0f2a 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -33,7 +33,7 @@ type Cacheable interface { // //go:generate mockery --name Broker --output "../mocks" --outpkg mocks type Broker interface { - Start() error + Start(done chan (error)) error Shutdown(ctx context.Context) error Announce() string @@ -60,7 +60,7 @@ type Broker interface { // LocalBroker is a single-node broker that can used to store streams data locally type LocalBroker interface { - Start() error + Start(done chan (error)) error Shutdown(ctx context.Context) error SetEpoch(epoch string) GetEpoch() string @@ -131,7 +131,7 @@ func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker { } } -func (LegacyBroker) Start() error { +func (LegacyBroker) Start(done chan (error)) error { return nil } diff --git a/broker/memory.go b/broker/memory.go index 3d405add..83c69c16 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -223,7 +223,7 @@ func (b *Memory) SetEpoch(v string) { b.epoch = v } -func (b *Memory) Start() error { +func (b *Memory) Start(done chan (error)) error { go b.expireLoop() return nil diff --git a/broker/nats.go b/broker/nats.go index cf3fe78c..22d2b341 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -44,6 +44,10 @@ type NATS struct { shutdownCtx context.Context shutdownFn func() + readyCtx context.Context + broadcastBacklog []*common.StreamMessage + backlogMu sync.Mutex + log *log.Entry } @@ -53,6 +57,8 @@ const ( epochKey = "_epoch_" sessionsPrefix = "" streamPrefix = "_ac_" + + jetstreamReadyTimeout = 1 * time.Second ) var _ Broker = (*NATS)(nil) @@ -69,16 +75,17 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig shutdownCtx, shutdownFn := context.WithCancel(context.Background()) n := NATS{ - broadcaster: broadcaster, - conf: c, - nconf: nc, - shutdownCtx: shutdownCtx, - shutdownFn: shutdownFn, - tracker: NewStreamsTracker(), - streamSync: newStreamsSynchronizer(), - jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))), - jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))), - log: log.WithField("context", "broker").WithField("provider", "nats"), + broadcaster: broadcaster, + conf: c, + nconf: nc, + shutdownCtx: shutdownCtx, + shutdownFn: shutdownFn, + tracker: NewStreamsTracker(), + broadcastBacklog: []*common.StreamMessage{}, + streamSync: newStreamsSynchronizer(), + jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))), + jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))), + log: log.WithField("context", "broker").WithField("provider", "nats"), } for _, opt := range opts { @@ -93,7 +100,7 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig } // Write Broker implementtaion here -func (n *NATS) Start() error { +func (n *NATS) Start(done chan (error)) error { n.clientMu.Lock() defer n.clientMu.Unlock() @@ -120,13 +127,67 @@ func (n *NATS) Start() error { return err } + n.conn = nc + + readyCtx, readyFn := context.WithCancelCause(context.Background()) + + n.readyCtx = readyCtx + + // Initialize JetStream asynchronously, because we may need to wait for JetStream cluster to be ready + go func() { + err := n.initJetStream() + readyFn(err) + if err != nil && done != nil { + done <- err + } + + if err != nil { + n.backlogFlush() + } + }() + + return nil +} + +func (n *NATS) Ready(timeout ...time.Duration) error { + var err error + + if len(timeout) == 0 { + <-n.readyCtx.Done() + } else { + timer := time.After(timeout[0]) + + select { + case <-n.readyCtx.Done(): + case <-timer: + err = fmt.Errorf("timeout waiting for JetStream to be ready") + } + } + + if err != nil { + return err + } + + cause := context.Cause(n.readyCtx) + + if cause == context.Canceled { + return nil + } else { + return cause + } +} + +func (n *NATS) initJetStream() error { + n.clientMu.Lock() + defer n.clientMu.Unlock() + + nc := n.conn js, err := jetstream.New(nc) if err != nil { return errorx.Decorate(err, "Failed to connect to JetStream") } - n.conn = nc n.js = js kv, err := n.fetchBucketWithTTL(kvBucket, time.Duration(n.conf.SessionsTTL*int64(time.Second))) @@ -144,7 +205,7 @@ func (n *NATS) Start() error { } n.writeEpoch(epoch) - err = n.local.Start() + err = n.local.Start(nil) if err != nil { return errorx.Decorate(err, "Failed to start internal memory broker") @@ -156,8 +217,7 @@ func (n *NATS) Start() error { n.log.Warnf("failed to set up epoch watcher: %s", err) } - n.log.Debugf("Current epoch: %s", epoch) - + n.log.Infof("NATS broker is ready (epoch=%s)", epoch) return nil } @@ -223,6 +283,13 @@ func (n *NATS) writeEpoch(val string) { } func (n *NATS) HandleBroadcast(msg *common.StreamMessage) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + n.log.Debugf("JetStream is not ready yet to publish messages, add to backlog") + n.backlogAdd(msg) + return + } + offset, err := n.add(msg.Stream, msg.Data) if err != nil { @@ -264,16 +331,31 @@ func (n *NATS) Unsubscribe(stream string) string { } func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + n.streamSync.sync(stream) return n.local.HistoryFrom(stream, epoch, offset) } func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + n.streamSync.sync(stream) return n.local.HistorySince(stream, since) } func (n *NATS) CommitSession(sid string, session Cacheable) error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + ctx := context.Background() key := sessionsPrefix + sid data, err := session.ToCacheEntry() @@ -292,6 +374,11 @@ func (n *NATS) CommitSession(sid string, session Cacheable) error { } func (n *NATS) RestoreSession(sid string) ([]byte, error) { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return nil, err + } + key := sessionsPrefix + sid ctx := context.Background() @@ -309,6 +396,11 @@ func (n *NATS) RestoreSession(sid string) ([]byte, error) { } func (n *NATS) FinishSession(sid string) error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + ctx := context.Background() key := sessionsPrefix + sid @@ -328,6 +420,11 @@ func (n *NATS) FinishSession(sid string) error { } func (n *NATS) Reset() error { + err := n.Ready(jetstreamReadyTimeout) + if err != nil { + return err + } + n.clientMu.Lock() defer n.clientMu.Unlock() @@ -363,6 +460,8 @@ func (n *NATS) add(stream string, data string) (uint64, error) { ctx := context.Background() key := streamPrefix + stream + // Touch on publish to make sure that the subsequent history fetch will return the latest messages + n.streamSync.touch(stream) ack, err := n.js.Publish(ctx, key, []byte(data)) if err != nil { @@ -821,3 +920,21 @@ func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Durati return false } + +func (n *NATS) backlogAdd(msg *common.StreamMessage) { + n.backlogMu.Lock() + defer n.backlogMu.Unlock() + + n.broadcastBacklog = append(n.broadcastBacklog, msg) +} + +func (n *NATS) backlogFlush() { + n.backlogMu.Lock() + defer n.backlogMu.Unlock() + + for _, msg := range n.broadcastBacklog { + n.HandleBroadcast(msg) + } + + n.broadcastBacklog = []*common.StreamMessage{} +} diff --git a/broker/nats_test.go b/broker/nats_test.go index 4090f77b..d2339bb3 100644 --- a/broker/nats_test.go +++ b/broker/nats_test.go @@ -30,8 +30,7 @@ var _ pubsub.Handler = (*FakeBroadastHandler)(nil) func TestNATSBroker_HistorySince_expiration(t *testing.T) { port := 32 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -45,7 +44,7 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -76,8 +75,8 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { assert.EqualValues(t, 4, history[1].Offset) assert.Equal(t, "d", history[1].Data) - // Stream must be expired after 1 second - time.Sleep(2 * time.Second) + // Stream must be expired after 2 seconds + time.Sleep(3 * time.Second) history, err = broker.HistorySince("test", start) require.NoError(t, err) @@ -87,8 +86,7 @@ func TestNATSBroker_HistorySince_expiration(t *testing.T) { func TestNATSBroker_HistorySince_with_limit(t *testing.T) { port := 33 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -102,7 +100,7 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -130,9 +128,7 @@ func TestNATSBroker_HistorySince_with_limit(t *testing.T) { func TestNATSBroker_HistoryFrom(t *testing.T) { port := 34 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -145,7 +141,7 @@ func TestNATSBroker_HistoryFrom(t *testing.T) { broadcaster := pubsub.NewLegacySubscriber(broadcastHandler) broker := NewNATSBroker(broadcaster, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -217,8 +213,7 @@ func (t *TestCacheable) ToCacheEntry() ([]byte, error) { func TestNATSBroker_Sessions(t *testing.T) { port := 41 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -230,7 +225,7 @@ func TestNATSBroker_Sessions(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck @@ -239,9 +234,11 @@ func TestNATSBroker_Sessions(t *testing.T) { require.NoError(t, err) anotherBroker := NewNATSBroker(nil, &config, &nconfig) - anotherBroker.Start() // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + restored, err := anotherBroker.RestoreSession("test123") require.NoError(t, err) @@ -277,8 +274,7 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { port := 43 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -290,11 +286,13 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, broker.Ready()) + err = broker.CommitSession("test123", &TestCacheable{"cache-me"}) require.NoError(t, err) @@ -302,9 +300,11 @@ func TestNATSBroker_SessionsTTLChange(t *testing.T) { aConfig.SessionsTTL = 3 anotherBroker := NewNATSBroker(nil, &aConfig, &nconfig) - require.NoError(t, anotherBroker.Start()) // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + // The session must be missing since we recreated the bucket due to TTL change missing, err := anotherBroker.RestoreSession("test123") @@ -342,8 +342,7 @@ func TestNATSBroker_Epoch(t *testing.T) { port := 45 addr := fmt.Sprintf("nats://127.0.0.1:44%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -354,18 +353,21 @@ func TestNATSBroker_Epoch(t *testing.T) { broker := NewNATSBroker(nil, &config, &nconfig) - err = broker.Start() + err = broker.Start(nil) require.NoError(t, err) defer broker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, broker.Ready()) broker.Reset() // nolint: errcheck epoch := broker.Epoch() anotherBroker := NewNATSBroker(nil, &config, &nconfig) - require.NoError(t, anotherBroker.Start()) // nolint: errcheck + require.NoError(t, anotherBroker.Start(nil)) defer anotherBroker.Shutdown(context.Background()) // nolint: errcheck + require.NoError(t, anotherBroker.Ready()) + assert.Equal(t, epoch, anotherBroker.Epoch()) // Now let's test that epoch changes are picked up @@ -392,14 +394,24 @@ wait: } } -func buildNATSServer(t *testing.T, addr string) *enats.Service { +func startNATSServer(t *testing.T, addr string) (*enats.Service, error) { conf := enats.NewConfig() conf.JetStream = true conf.ServiceAddr = addr conf.StoreDir = t.TempDir() service := enats.NewService(&conf) - return service + err := service.Start() + if err != nil { + return nil, err + } + + err = service.WaitJetStreamReady(5) + if err != nil { + return nil, err + } + + return service, nil } type consumerSequenceReader struct { diff --git a/cli/cli.go b/cli/cli.go index d00daa3e..af9f97b6 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -232,6 +232,13 @@ func (r *Runner) Run() error { return errorx.Decorate(err, "!!! Subscriber failed !!!") } + if appBroker != nil { + err = appBroker.Start(r.errChan) + if err != nil { + return errorx.Decorate(err, "!!! Broker failed !!!") + } + } + r.shutdownables = append(r.shutdownables, subscriber) if r.broadcastersFactory != nil { diff --git a/enats/enats.go b/enats/enats.go index 85250488..b14c3ba9 100644 --- a/enats/enats.go +++ b/enats/enats.go @@ -132,7 +132,12 @@ func (s *Service) Start() error { // WaitReady waits while NATS server is starting func (s *Service) WaitReady() error { if s.server.ReadyForConnections(serverStartTimeout) { - return s.WaitJetStreamReady(s.config.JetStreamReadyTimeout) + // We don't want to block the bootstrap process while waiting for JetStream. + // JetStream requires a cluster to be formed before it can be enabled, but when we + // perform a rolling update, the newly created instance may have no network connectivity, + // thus, it won't be able to join the cluster and enable JetStream. + go s.WaitJetStreamReady(s.config.JetStreamReadyTimeout) // nolint:errcheck + return nil } return errorx.TimeoutElapsed.New( diff --git a/features/enats_broker.testfile b/features/enats_broker.testfile index b9b01018..e27e31a5 100644 --- a/features/enats_broker.testfile +++ b/features/enats_broker.testfile @@ -8,22 +8,38 @@ FileUtils.rm_rf(store_path) if File.directory?(store_path) launch :anycable_1, "./dist/anycable-go --port 8080 --broker=nats --pubsub=nats --broadcast_adapter=http --embed_nats --enats_addr=nats://localhost:4242 --enats_cluster=nats://localhost:4342 --enats_gateway=nats://localhost:4442 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/one" -sleep 2 +sleep 1 launch :anycable_2, "./dist/anycable-go --port 8081 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4243 --enats_cluster=nats://localhost:4343 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/two" -sleep 2 +sleep 1 launch :anycable_3, "./dist/anycable-go --port 8082 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4244 --enats_cluster=nats://localhost:4344 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/three" -wait_tcp 8080, timeout: 20 -wait_tcp 8081, timeout: 20 -wait_tcp 8082, timeout: 20 +wait_tcp 8080 +wait_tcp 8081 +wait_tcp 8082 -# We need to obtain epoch first +# Wait for JetStream to be ready; we can do this by checking that the epoch data has been written on disk +timeout = 20 +log(:info) { "Waiting for JetStream to be ready" } +loop do + break if Dir["#{store_path}/**/*"].any? { |f| f =~ /KV__anycable_epoch_/ } + + Kernel.sleep 0.5 + timeout -= 0.5 + if timeout < 0 + fail "JetStream is not ready" + end +end + +# Let's wait for epoch to be propagated to all nodes +sleep 2 + +# We need to obtain epoch first epoch_scenario = [ { client: { @@ -59,12 +75,16 @@ epoch_scenario = [ } ] -run :wsdirector, "bundle exec wsdirector ws://localhost:8080/cable -i #{epoch_scenario.to_json}" +result = nil -result = stdout(:wsdirector) +retrying(delay: 2) do + run :wsdirector, "bundle exec wsdirector ws://localhost:8080/cable -i #{epoch_scenario.to_json}" -if result !~ /1 clients, 0 failures/ - fail "Unexpected scenario result:\n#{result}" + result = stdout(:wsdirector) + + if result !~ /1 clients, 0 failures/ + fail "Unexpected scenario result:\n#{result}" + end end epoch = result.match(/"epoch":"([^"]+)"/)[1] diff --git a/mocks/Broker.go b/mocks/Broker.go index d0c3248a..6ce183e4 100644 --- a/mocks/Broker.go +++ b/mocks/Broker.go @@ -160,13 +160,13 @@ func (_m *Broker) Shutdown(ctx context.Context) error { return r0 } -// Start provides a mock function with given fields: -func (_m *Broker) Start() error { - ret := _m.Called() +// Start provides a mock function with given fields: done +func (_m *Broker) Start(done chan error) error { + ret := _m.Called(done) var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(chan error) error); ok { + r0 = rf(done) } else { r0 = ret.Error(0) } diff --git a/node/broker_integration_test.go b/node/broker_integration_test.go index 584b2972..e5a52226 100644 --- a/node/broker_integration_test.go +++ b/node/broker_integration_test.go @@ -45,6 +45,18 @@ import ( // - Make sure it is not restored (uses controller.Authenticate) func TestIntegrationRestore_Memory(t *testing.T) { node, controller := setupIntegrationNode() + + bconf := broker.NewConfig() + bconf.SessionsTTL = 2 + + subscriber := pubsub.NewLegacySubscriber(node) + + br := broker.NewMemoryBroker(subscriber, &bconf) + br.SetEpoch("2022") + node.SetBroker(br) + + require.NoError(t, br.Start(nil)) + go node.Start() // nolint:errcheck defer node.Shutdown(context.Background()) // nolint:errcheck @@ -55,8 +67,7 @@ func TestIntegrationRestore_NATS(t *testing.T) { port := 32 addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -72,7 +83,8 @@ func TestIntegrationRestore_NATS(t *testing.T) { broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) - require.NoError(t, node.Start()) // nolint:errcheck + require.NoError(t, node.Start()) + require.NoError(t, broker.Start(nil)) defer node.Shutdown(context.Background()) // nolint:errcheck require.NoError(t, broker.Reset()) @@ -242,6 +254,17 @@ func sharedIntegrationRestore(t *testing.T, node *Node, controller *mocks.Contro // - The session MUST receive the messages broadcasted during the unsubsciprtion period. func TestIntegrationHistory_Memory(t *testing.T) { node, controller := setupIntegrationNode() + + bconf := broker.NewConfig() + + subscriber := pubsub.NewLegacySubscriber(node) + + br := broker.NewMemoryBroker(subscriber, &bconf) + br.SetEpoch("2022") + node.SetBroker(br) + + require.NoError(t, br.Start(nil)) + go node.Start() // nolint:errcheck defer node.Shutdown(context.Background()) // nolint:errcheck @@ -252,8 +275,7 @@ func TestIntegrationHistory_NATS(t *testing.T) { port := 33 addr := fmt.Sprintf("nats://127.0.0.1:45%d", port) - server := buildNATSServer(t, addr) - err := server.Start() + server, err := startNATSServer(t, addr) require.NoError(t, err) defer server.Shutdown(context.Background()) // nolint:errcheck @@ -268,7 +290,8 @@ func TestIntegrationHistory_NATS(t *testing.T) { broker := broker.NewNATSBroker(broadcaster, &bconf, &nconfig) node.SetBroker(broker) - require.NoError(t, node.Start()) // nolint:errcheck + require.NoError(t, node.Start()) + require.NoError(t, broker.Start(nil)) defer node.Shutdown(context.Background()) // nolint:errcheck require.NoError(t, broker.Reset()) @@ -378,15 +401,6 @@ func setupIntegrationNode() (*Node, *mocks.Controller) { node := NewNode(controller, metrics.NewMetrics(nil, 10), &config) node.SetDisconnector(NewNoopDisconnector()) - bconf := broker.NewConfig() - bconf.SessionsTTL = 2 - - subscriber := pubsub.NewLegacySubscriber(node) - - br := broker.NewMemoryBroker(subscriber, &bconf) - br.SetEpoch("2022") - node.SetBroker(br) - return node, controller } @@ -437,12 +451,22 @@ func requireAuthenticatedSession(t *testing.T, node *Node, sid string) *Session return session } -func buildNATSServer(t *testing.T, addr string) *enats.Service { +func startNATSServer(t *testing.T, addr string) (*enats.Service, error) { conf := enats.NewConfig() conf.JetStream = true conf.ServiceAddr = addr conf.StoreDir = t.TempDir() service := enats.NewService(&conf) - return service + err := service.Start() + if err != nil { + return nil, err + } + + err = service.WaitJetStreamReady(5) + if err != nil { + return nil, err + } + + return service, nil } diff --git a/node/node.go b/node/node.go index 95074c2b..29cdfd46 100644 --- a/node/node.go +++ b/node/node.go @@ -101,10 +101,6 @@ func (n *Node) Start() error { go n.hub.Run() go n.collectStats() - if err := n.broker.Start(); err != nil { - return err - } - return nil }