From f5d181d6f5682e9e241c50f6431c268bcb2678b9 Mon Sep 17 00:00:00 2001 From: Caleb Champlin Date: Sun, 7 Apr 2024 09:56:38 -0600 Subject: [PATCH] [ADDED] Publish header "Nats-Expected-Last-Subject-Sequence-Subject" This change adds a new header "Nats-Expected-Last-Subject-Sequence-Subject" when when paired with "Nats-Expected-Last-Subject-Sequence" allows publishers to customize the subject used when the server enforces "Nats-Expected-Last-Subject-Sequence". Publishers can specify a alternative subject to be used that includes wildcards. Resolves #5280 Signed-off-by: Caleb Champlin --- server/jetstream_cluster.go | 8 +++- server/jetstream_test.go | 94 +++++++++++++++++++++++++++++++++++++ server/stream.go | 38 ++++++++++----- 3 files changed, 126 insertions(+), 14 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 3bb10d14972..6837e06ef81 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -7657,9 +7657,15 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [ // Expected last sequence per subject. // We can check for last sequence per subject but only if the expected seq <= lseq. if seq, exists := getExpectedLastSeqPerSubject(hdr); exists && store != nil && seq > 0 && seq <= lseq { + // Allow override of the subject used for the check. + seqSubj := subject + if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ { + seqSubj = optSubj + } + var smv StoreMsg var fseq uint64 - sm, err := store.LoadLastMsg(subject, &smv) + sm, err := store.LoadLastMsg(seqSubj, &smv) if sm != nil { fseq = sm.seq } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index b11fa18c888..be84142d45b 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11269,6 +11269,100 @@ func TestJetStreamLastSequenceBySubject(t *testing.T) { } } +func TestJetStreamLastSequenceBySubjectWithSubject(t *testing.T) { + for _, st := range []StorageType{FileStorage, MemoryStorage} { + t.Run(st.String(), func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "JSC", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + cfg := StreamConfig{ + Name: "KV", + Subjects: []string{"kv.>"}, + Storage: st, + Replicas: 3, + MaxMsgsPer: 1, + } + + req, err := json.Marshal(cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // Do manually for now. + m, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + si, err := js.StreamInfo("KV") + if err != nil { + t.Fatalf("Unexpected error: %v, respmsg: %q", err, string(m.Data)) + } + if si == nil || si.Config.Name != "KV" { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + js.PublishAsync("kv.1.foo", []byte("1:1")) // Last is 1 for kv.1.foo; 1 for kv.1.*; + js.PublishAsync("kv.1.bar", []byte("1:2")) // Last is 2 for kv.1.bar; 2 for kv.1.*; + js.PublishAsync("kv.2.foo", []byte("2:1")) // Last is 3 for kv.2.foo; 3 for kv.2.*; + js.PublishAsync("kv.3.bar", []byte("3:1")) // Last is 4 for kv.3.bar; 4 for kv.3.*; + js.PublishAsync("kv.1.baz", []byte("1:3")) // Last is 5 for kv.1.baz; 5 for kv.1.*; + js.PublishAsync("kv.1.bar", []byte("1:4")) // Last is 6 for kv.1.baz; 6 for kv.1.*; + js.PublishAsync("kv.2.baz", []byte("2:2")) // Last is 7 for kv.2.baz; 7 for kv.2.*; + + select { + case <-js.PublishAsyncComplete(): + case <-time.After(time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Now make sure we get an error if the last sequence is not correct per subject. + pubAndCheck := func(subj, seq string, ok bool) { + t.Helper() + m := nats.NewMsg(subj) + m.Data = []byte("HELLO") + + // Expect last to be seq. + m.Header.Set(JSExpectedLastSubjSeq, seq) + + // Constrain the sequence restriction to a specific subject + // e.g. "kv.1.*" for kv.1.foo, kv.1.bar, kv.1.baz; kv.2.* for kv.2.foo, kv.2.baz; kv.3.* for kv.3.bar + filterSubject := fmt.Sprintf("%s.*", subj[:strings.LastIndex(subj, ".")]) + m.Header.Set(JSExpectedLastSubjSeqSubj, filterSubject) + _, err := js.PublishMsg(m) + if ok && err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !ok && err == nil { + t.Fatalf("Expected to get an error and got none") + } + } + + pubAndCheck("kv.1.foo", "0", false) + pubAndCheck("kv.1.bar", "0", false) + pubAndCheck("kv.1.xxx", "0", false) + pubAndCheck("kv.1.foo", "1", false) + pubAndCheck("kv.1.bar", "1", false) + pubAndCheck("kv.1.xxx", "1", false) + pubAndCheck("kv.2.foo", "1", false) + pubAndCheck("kv.2.bar", "1", false) + pubAndCheck("kv.2.xxx", "1", false) + pubAndCheck("kv.1.bar", "2", false) + pubAndCheck("kv.1.bar", "3", false) + pubAndCheck("kv.1.bar", "4", false) + pubAndCheck("kv.1.bar", "5", false) + pubAndCheck("kv.1.bar", "6", true) // Last is 8 for kv.1.bar; 8 for kv.1.*; + pubAndCheck("kv.1.baz", "2", false) + pubAndCheck("kv.1.bar", "7", false) + pubAndCheck("kv.1.xxx", "8", true) // Last is 9 for kv.1.xxx; 9 for kv.1.*; + pubAndCheck("kv.2.foo", "2", false) + pubAndCheck("kv.2.foo", "7", true) // Last is 10 for kv.2.foo; 10 for kv.2.*; + pubAndCheck("kv.xxx", "0", true) // Last is 11 for kv.xxx; 11 for kv.*; + pubAndCheck("kv.3.xxx", "4", true) // Last is 12 for kv.3.xxx; 12 for kv.3.*; + pubAndCheck("kv.3.xyz", "12", true) // Last is 13 for kv.3.xyz; 13 for kv.3.*; + }) + } +} + func TestJetStreamFilteredConsumersWithWiderFilter(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() diff --git a/server/stream.go b/server/stream.go index 73694d2ed59..661ba55d6bd 100644 --- a/server/stream.go +++ b/server/stream.go @@ -334,18 +334,19 @@ const ( // Headers for published messages. const ( - JSMsgId = "Nats-Msg-Id" - JSExpectedStream = "Nats-Expected-Stream" - JSExpectedLastSeq = "Nats-Expected-Last-Sequence" - JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence" - JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id" - JSStreamSource = "Nats-Stream-Source" - JSLastConsumerSeq = "Nats-Last-Consumer" - JSLastStreamSeq = "Nats-Last-Stream" - JSConsumerStalled = "Nats-Consumer-Stalled" - JSMsgRollup = "Nats-Rollup" - JSMsgSize = "Nats-Msg-Size" - JSResponseType = "Nats-Response-Type" + JSMsgId = "Nats-Msg-Id" + JSExpectedStream = "Nats-Expected-Stream" + JSExpectedLastSeq = "Nats-Expected-Last-Sequence" + JSExpectedLastSubjSeq = "Nats-Expected-Last-Subject-Sequence" + JSExpectedLastSubjSeqSubj = "Nats-Expected-Last-Subject-Sequence-Subject" + JSExpectedLastMsgId = "Nats-Expected-Last-Msg-Id" + JSStreamSource = "Nats-Stream-Source" + JSLastConsumerSeq = "Nats-Last-Consumer" + JSLastStreamSeq = "Nats-Last-Stream" + JSConsumerStalled = "Nats-Consumer-Stalled" + JSMsgRollup = "Nats-Rollup" + JSMsgSize = "Nats-Msg-Size" + JSResponseType = "Nats-Response-Type" ) // Headers for republished messages and direct gets. @@ -3978,6 +3979,11 @@ func getExpectedLastSeqPerSubject(hdr []byte) (uint64, bool) { return uint64(parseInt64(bseq)), true } +// Fast lookup of expected subject for the expected stream sequence per subject. +func getExpectedLastSeqPerSubjectForSubject(hdr []byte) string { + return string(getHeader(JSExpectedLastSubjSeqSubj, hdr)) +} + // Signal if we are clustered. Will acquire rlock. func (mset *stream) IsClustered() bool { mset.mu.RLock() @@ -4538,10 +4544,16 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, // Expected last sequence per subject. // If we are clustered we have prechecked seq > 0. if seq, exists := getExpectedLastSeqPerSubject(hdr); exists { + // Allow override of the subject used for the check. + seqSubj := subject + if optSubj := getExpectedLastSeqPerSubjectForSubject(hdr); optSubj != _EMPTY_ { + seqSubj = optSubj + } + // TODO(dlc) - We could make a new store func that does this all in one. var smv StoreMsg var fseq uint64 - sm, err := store.LoadLastMsg(subject, &smv) + sm, err := store.LoadLastMsg(seqSubj, &smv) if sm != nil { fseq = sm.seq }