From bb45ca69e52a9877db591690343e0a41ddee163e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 1 Aug 2024 17:24:13 +0100 Subject: [PATCH] Optimise sublist matching for gateways, client replies Signed-off-by: Neil Twigg --- server/client.go | 4 ++-- server/consumer.go | 2 +- server/gateway.go | 8 ++++---- server/store.go | 8 ++++++++ server/sublist.go | 15 ++++++++++++--- 5 files changed, 27 insertions(+), 10 deletions(-) diff --git a/server/client.go b/server/client.go index dea218c371..e5b08c3f42 100644 --- a/server/client.go +++ b/server/client.go @@ -4072,7 +4072,7 @@ func (c *client) subForReply(reply []byte) *subscription { func (c *client) handleGWReplyMap(msg []byte) bool { // Check for leaf nodes if c.srv.gwLeafSubs.Count() > 0 { - if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 { + if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 { c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag) } } @@ -5675,7 +5675,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { sl := acc.sl // Match against the account sublist. - r = sl.Match(string(c.pa.subject)) + r = sl.MatchBytes(c.pa.subject) // Check if we need to prune. if len(c.in.pacache) >= maxPerAccountCacheSize { diff --git a/server/consumer.go b/server/consumer.go index bc09a79036..2393907c47 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1572,7 +1572,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool { gw.RLock() defer gw.RUnlock() for _, gwc := range gw.outo { - psi, qr := gwc.gatewayInterest(account, subject) + psi, qr := gwc.gatewayInterest(account, stringToBytes(subject)) if psi || qr != nil { return true } diff --git a/server/gateway.go b/server/gateway.go index be175467b3..e01fbb70ed 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -2143,7 +2143,7 @@ func (c *client) processGatewayRSub(arg []byte) error { // for queue subscriptions. // -func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { +func (c *client) gatewayInterest(acc string, subj []byte) (bool, *SublistResult) { ei, accountInMap := c.gw.outsim.Load(acc) // If there is an entry for this account and ei is nil, // it means that the remote is not interested at all in @@ -2164,14 +2164,14 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) { // but until e.ni is nil, use it to know if we // should suppress interest or not. if !c.gw.interestOnlyMode && e.ni != nil { - if _, inMap := e.ni[subj]; !inMap { + if _, inMap := e.ni[string(subj)]; !inMap { psi = true } } // If we are in modeInterestOnly (e.ni will be nil) // or if we have queue subs, we also need to check sl.Match. if e.ni == nil || e.qsubs > 0 { - r = e.sl.Match(subj) + r = e.sl.MatchBytes(subj) if len(r.psubs) > 0 { psi = true } @@ -2588,7 +2588,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr } } else { // Plain sub interest and queue sub results for this account/subject - psi, qr := gwc.gatewayInterest(accName, string(subject)) + psi, qr := gwc.gatewayInterest(accName, subject) if !psi && qr == nil { continue } diff --git a/server/store.go b/server/store.go index 6948d022e8..867dc2c358 100644 --- a/server/store.go +++ b/server/store.go @@ -768,3 +768,11 @@ func stringToBytes(s string) []byte { b := unsafe.Slice(p, len(s)) return b } + +// Forces a copy of a string, for use in the case that you might have been passed a value when bytesToString was used, +// but now you need a separate copy of it to store for longer-term use. +func copyString(s string) string { + b := make([]byte, len(s)) + copy(b, s) + return bytesToString(b) +} diff --git a/server/sublist.go b/server/sublist.go index c0e7a1810d..1ec878a19e 100644 --- a/server/sublist.go +++ b/server/sublist.go @@ -527,7 +527,13 @@ var emptyResult = &SublistResult{} // Match will match all entries to the literal subject. // It will return a set of results for both normal and queue subscribers. func (s *Sublist) Match(subject string) *SublistResult { - return s.match(subject, true) + return s.match(subject, true, false) +} + +// MatchBytes will match all entries to the literal subject. +// It will return a set of results for both normal and queue subscribers. +func (s *Sublist) MatchBytes(subject []byte) *SublistResult { + return s.match(bytesToString(subject), true, true) } // HasInterest will return whether or not there is any interest in the subject. @@ -537,10 +543,10 @@ func (s *Sublist) HasInterest(subject string) bool { } func (s *Sublist) matchNoLock(subject string) *SublistResult { - return s.match(subject, false) + return s.match(subject, false, false) } -func (s *Sublist) match(subject string, doLock bool) *SublistResult { +func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *SublistResult { atomic.AddUint64(&s.matches, 1) // Check cache first. @@ -595,6 +601,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult { result = emptyResult } if cacheEnabled { + if doCopyOnCache { + subject = copyString(subject) + } s.cache[subject] = result n = len(s.cache) }