Skip to content

Commit

Permalink
fix: initialize jetstream cluster asynchronously
Browse files Browse the repository at this point in the history
The problem is that JetStream requires network conenctivity to finalize cluster setup, and we rely on health checks to activate instances, we must make WS server ready independently of embedded NATS status.

This commit moves WaitJetStreamReady to a go routine and refactors NATS broker to handle possible not-yet-ready JetStream cluster by deferring some operations
  • Loading branch information
palkan committed Nov 2, 2023
1 parent 967a28a commit db10f81
Show file tree
Hide file tree
Showing 10 changed files with 263 additions and 82 deletions.
6 changes: 3 additions & 3 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Cacheable interface {
//
//go:generate mockery --name Broker --output "../mocks" --outpkg mocks
type Broker interface {
Start() error
Start(done chan (error)) error
Shutdown(ctx context.Context) error

Announce() string
Expand All @@ -60,7 +60,7 @@ type Broker interface {

// LocalBroker is a single-node broker that can used to store streams data locally
type LocalBroker interface {
Start() error
Start(done chan (error)) error
Shutdown(ctx context.Context) error
SetEpoch(epoch string)
GetEpoch() string
Expand Down Expand Up @@ -131,7 +131,7 @@ func NewLegacyBroker(broadcaster Broadcaster) *LegacyBroker {
}
}

func (LegacyBroker) Start() error {
func (LegacyBroker) Start(done chan (error)) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion broker/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (b *Memory) SetEpoch(v string) {
b.epoch = v
}

func (b *Memory) Start() error {
func (b *Memory) Start(done chan (error)) error {
go b.expireLoop()

return nil
Expand Down
147 changes: 132 additions & 15 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type NATS struct {
shutdownCtx context.Context
shutdownFn func()

readyCtx context.Context
broadcastBacklog []*common.StreamMessage
backlogMu sync.Mutex

log *log.Entry
}

Expand All @@ -53,6 +57,8 @@ const (
epochKey = "_epoch_"
sessionsPrefix = ""
streamPrefix = "_ac_"

jetstreamReadyTimeout = 1 * time.Second
)

var _ Broker = (*NATS)(nil)
Expand All @@ -69,16 +75,17 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig
shutdownCtx, shutdownFn := context.WithCancel(context.Background())

n := NATS{
broadcaster: broadcaster,
conf: c,
nconf: nc,
shutdownCtx: shutdownCtx,
shutdownFn: shutdownFn,
tracker: NewStreamsTracker(),
streamSync: newStreamsSynchronizer(),
jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))),
jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))),
log: log.WithField("context", "broker").WithField("provider", "nats"),
broadcaster: broadcaster,
conf: c,
nconf: nc,
shutdownCtx: shutdownCtx,
shutdownFn: shutdownFn,
tracker: NewStreamsTracker(),
broadcastBacklog: []*common.StreamMessage{},
streamSync: newStreamsSynchronizer(),
jstreams: newLRU[string](time.Duration(c.HistoryTTL * int64(time.Second))),
jconsumers: newLRU[jetstream.Consumer](time.Duration(c.HistoryTTL * int64(time.Second))),
log: log.WithField("context", "broker").WithField("provider", "nats"),
}

for _, opt := range opts {
Expand All @@ -93,7 +100,7 @@ func NewNATSBroker(broadcaster Broadcaster, c *Config, nc *natsconfig.NATSConfig
}

// Write Broker implementtaion here
func (n *NATS) Start() error {
func (n *NATS) Start(done chan (error)) error {
n.clientMu.Lock()
defer n.clientMu.Unlock()

Expand All @@ -120,13 +127,67 @@ func (n *NATS) Start() error {
return err
}

n.conn = nc

readyCtx, readyFn := context.WithCancelCause(context.Background())

n.readyCtx = readyCtx

// Initialize JetStream asynchronously, because we may need to wait for JetStream cluster to be ready
go func() {
err := n.initJetStream()
readyFn(err)
if err != nil && done != nil {
done <- err
}

if err != nil {
n.backlogFlush()
}
}()

return nil
}

func (n *NATS) Ready(timeout ...time.Duration) error {
var err error

if len(timeout) == 0 {
<-n.readyCtx.Done()
} else {
timer := time.After(timeout[0])

select {
case <-n.readyCtx.Done():
case <-timer:
err = fmt.Errorf("timeout waiting for JetStream to be ready")
}
}

if err != nil {
return err
}

cause := context.Cause(n.readyCtx)

if cause == context.Canceled {
return nil
} else {
return cause
}
}

func (n *NATS) initJetStream() error {
n.clientMu.Lock()
defer n.clientMu.Unlock()

nc := n.conn
js, err := jetstream.New(nc)

if err != nil {
return errorx.Decorate(err, "Failed to connect to JetStream")
}

n.conn = nc
n.js = js

kv, err := n.fetchBucketWithTTL(kvBucket, time.Duration(n.conf.SessionsTTL*int64(time.Second)))
Expand All @@ -144,7 +205,7 @@ func (n *NATS) Start() error {
}

n.writeEpoch(epoch)
err = n.local.Start()
err = n.local.Start(nil)

if err != nil {
return errorx.Decorate(err, "Failed to start internal memory broker")
Expand All @@ -156,8 +217,7 @@ func (n *NATS) Start() error {
n.log.Warnf("failed to set up epoch watcher: %s", err)
}

n.log.Debugf("Current epoch: %s", epoch)

n.log.Infof("NATS broker is ready (epoch=%s)", epoch)
return nil
}

Expand Down Expand Up @@ -223,6 +283,13 @@ func (n *NATS) writeEpoch(val string) {
}

func (n *NATS) HandleBroadcast(msg *common.StreamMessage) {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
n.log.Debugf("JetStream is not ready yet to publish messages, add to backlog")
n.backlogAdd(msg)
return
}

offset, err := n.add(msg.Stream, msg.Data)

if err != nil {
Expand Down Expand Up @@ -264,16 +331,31 @@ func (n *NATS) Unsubscribe(stream string) string {
}

func (n *NATS) HistoryFrom(stream string, epoch string, offset uint64) ([]common.StreamMessage, error) {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return nil, err
}

n.streamSync.sync(stream)
return n.local.HistoryFrom(stream, epoch, offset)
}

func (n *NATS) HistorySince(stream string, since int64) ([]common.StreamMessage, error) {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return nil, err
}

n.streamSync.sync(stream)
return n.local.HistorySince(stream, since)
}

func (n *NATS) CommitSession(sid string, session Cacheable) error {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return err
}

ctx := context.Background()
key := sessionsPrefix + sid
data, err := session.ToCacheEntry()
Expand All @@ -292,6 +374,11 @@ func (n *NATS) CommitSession(sid string, session Cacheable) error {
}

func (n *NATS) RestoreSession(sid string) ([]byte, error) {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return nil, err
}

key := sessionsPrefix + sid
ctx := context.Background()

Expand All @@ -309,6 +396,11 @@ func (n *NATS) RestoreSession(sid string) ([]byte, error) {
}

func (n *NATS) FinishSession(sid string) error {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return err
}

ctx := context.Background()
key := sessionsPrefix + sid

Expand All @@ -328,6 +420,11 @@ func (n *NATS) FinishSession(sid string) error {
}

func (n *NATS) Reset() error {
err := n.Ready(jetstreamReadyTimeout)
if err != nil {
return err
}

n.clientMu.Lock()
defer n.clientMu.Unlock()

Expand Down Expand Up @@ -363,6 +460,8 @@ func (n *NATS) add(stream string, data string) (uint64, error) {
ctx := context.Background()
key := streamPrefix + stream

// Touch on publish to make sure that the subsequent history fetch will return the latest messages
n.streamSync.touch(stream)
ack, err := n.js.Publish(ctx, key, []byte(data))

if err != nil {
Expand Down Expand Up @@ -821,3 +920,21 @@ func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Durati

return false
}

func (n *NATS) backlogAdd(msg *common.StreamMessage) {
n.backlogMu.Lock()
defer n.backlogMu.Unlock()

n.broadcastBacklog = append(n.broadcastBacklog, msg)
}

func (n *NATS) backlogFlush() {
n.backlogMu.Lock()
defer n.backlogMu.Unlock()

for _, msg := range n.broadcastBacklog {
n.HandleBroadcast(msg)
}

n.broadcastBacklog = []*common.StreamMessage{}
}
Loading

0 comments on commit db10f81

Please sign in to comment.