Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise sublist matching for gateways, client replies #5736

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4072,7 +4072,7 @@ func (c *client) subForReply(reply []byte) *subscription {
func (c *client) handleGWReplyMap(msg []byte) bool {
// Check for leaf nodes
if c.srv.gwLeafSubs.Count() > 0 {
if r := c.srv.gwLeafSubs.Match(string(c.pa.subject)); len(r.psubs) > 0 {
if r := c.srv.gwLeafSubs.MatchBytes(c.pa.subject); len(r.psubs) > 0 {
c.processMsgResults(c.acc, r, msg, c.pa.deliver, c.pa.subject, c.pa.reply, pmrNoFlag)
}
}
Expand Down Expand Up @@ -5675,7 +5675,7 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) {
sl := acc.sl

// Match against the account sublist.
r = sl.Match(string(c.pa.subject))
r = sl.MatchBytes(c.pa.subject)

// Check if we need to prune.
if len(c.in.pacache) >= maxPerAccountCacheSize {
Expand Down
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ func (s *Server) hasGatewayInterest(account, subject string) bool {
gw.RLock()
defer gw.RUnlock()
for _, gwc := range gw.outo {
psi, qr := gwc.gatewayInterest(account, subject)
psi, qr := gwc.gatewayInterest(account, stringToBytes(subject))
if psi || qr != nil {
return true
}
Expand Down
8 changes: 4 additions & 4 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2143,7 +2143,7 @@ func (c *client) processGatewayRSub(arg []byte) error {
// for queue subscriptions.
// <Outbound connection: invoked when client message is published,
// so from any client connection's readLoop>
func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
func (c *client) gatewayInterest(acc string, subj []byte) (bool, *SublistResult) {
ei, accountInMap := c.gw.outsim.Load(acc)
// If there is an entry for this account and ei is nil,
// it means that the remote is not interested at all in
Expand All @@ -2164,14 +2164,14 @@ func (c *client) gatewayInterest(acc, subj string) (bool, *SublistResult) {
// but until e.ni is nil, use it to know if we
// should suppress interest or not.
if !c.gw.interestOnlyMode && e.ni != nil {
if _, inMap := e.ni[subj]; !inMap {
if _, inMap := e.ni[string(subj)]; !inMap {
psi = true
}
}
// If we are in modeInterestOnly (e.ni will be nil)
// or if we have queue subs, we also need to check sl.Match.
if e.ni == nil || e.qsubs > 0 {
r = e.sl.Match(subj)
r = e.sl.MatchBytes(subj)
if len(r.psubs) > 0 {
psi = true
}
Expand Down Expand Up @@ -2588,7 +2588,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
}
} else {
// Plain sub interest and queue sub results for this account/subject
psi, qr := gwc.gatewayInterest(accName, string(subject))
psi, qr := gwc.gatewayInterest(accName, subject)
if !psi && qr == nil {
continue
}
Expand Down
8 changes: 8 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,3 +768,11 @@ func stringToBytes(s string) []byte {
b := unsafe.Slice(p, len(s))
return b
}

// Forces a copy of a string, for use in the case that you might have been passed a value when bytesToString was used,
// but now you need a separate copy of it to store for longer-term use.
func copyString(s string) string {
b := make([]byte, len(s))
copy(b, s)
return bytesToString(b)
}
15 changes: 12 additions & 3 deletions server/sublist.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,13 @@ var emptyResult = &SublistResult{}
// Match will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) Match(subject string) *SublistResult {
return s.match(subject, true)
return s.match(subject, true, false)
}

// MatchBytes will match all entries to the literal subject.
// It will return a set of results for both normal and queue subscribers.
func (s *Sublist) MatchBytes(subject []byte) *SublistResult {
return s.match(bytesToString(subject), true, true)
}

// HasInterest will return whether or not there is any interest in the subject.
Expand All @@ -537,10 +543,10 @@ func (s *Sublist) HasInterest(subject string) bool {
}

func (s *Sublist) matchNoLock(subject string) *SublistResult {
return s.match(subject, false)
return s.match(subject, false, false)
}

func (s *Sublist) match(subject string, doLock bool) *SublistResult {
func (s *Sublist) match(subject string, doLock bool, doCopyOnCache bool) *SublistResult {
atomic.AddUint64(&s.matches, 1)

// Check cache first.
Expand Down Expand Up @@ -595,6 +601,9 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
result = emptyResult
}
if cacheEnabled {
if doCopyOnCache {
subject = copyString(subject)
}
s.cache[subject] = result
n = len(s.cache)
}
Expand Down