diff --git a/src/js.c b/src/js.c index bc5245b99..399b6ddd2 100644 --- a/src/js.c +++ b/src/js.c @@ -1246,7 +1246,7 @@ jsSub_free(jsSub *jsi) NATS_FREE(jsi->nxtMsgSubj); NATS_FREE(jsi->cmeta); NATS_FREE(jsi->fcReply); - NATS_FREE(jsi->fsubj); + js_destroyConsumerConfig(jsi->ocCfg); NATS_FREE(jsi); js_release(js); @@ -1538,6 +1538,7 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm) jsSub *jsi = sub->jsi; const char *str = NULL; int64_t val = 0; + struct mismatch *m = &jsi->mismatch; natsStatus s; *sm = false; @@ -1548,7 +1549,7 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm) if (jsi->cmeta == NULL) return NATS_OK; - s = js_getMetaData(jsi->cmeta, NULL, NULL, NULL, NULL, &jsi->sseq, &jsi->dseq, NULL, NULL, 2); + s = js_getMetaData(jsi->cmeta, NULL, NULL, NULL, NULL, &m->sseq, &m->dseq, NULL, NULL, 2); if (s != NATS_OK) { if (s == NATS_ERR) @@ -1568,9 +1569,9 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm) if (val == -1) return nats_setError(NATS_ERR, "invalid last consumer sequence: '%s'", str); - jsi->ldseq = (uint64_t) val; + m->ldseq = (uint64_t) val; } - if (jsi->ldseq == jsi->dseq) + if (m->ldseq == m->dseq) { // Sync subs use this flag to get the NextMsg() to error out and // return NATS_MISMATCH to indicate that a mismatch was discovered, @@ -1636,6 +1637,7 @@ natsStatus natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscription *sub) { jsSub *jsi; + struct mismatch *m = NULL; if ((csm == NULL) || (sub == NULL)) return nats_setDefaultError(NATS_INVALID_ARG); @@ -1647,15 +1649,16 @@ natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscr return nats_setError(NATS_INVALID_SUBSCRIPTION, "%s", jsErrNotAJetStreamSubscription); } jsi = sub->jsi; - if (jsi->dseq == jsi->ldseq) + m = &jsi->mismatch; + if (m->dseq == m->ldseq) { natsSubAndLdw_Unlock(sub); return NATS_NOT_FOUND; } memset(csm, 0, sizeof(jsConsumerSequenceMismatch)); - csm->Stream = jsi->sseq; - csm->ConsumerClient = jsi->dseq; - csm->ConsumerServer = jsi->ldseq; + csm->Stream = m->sseq; + csm->ConsumerClient = m->dseq; + csm->ConsumerServer = m->ldseq; natsSubAndLdw_Unlock(sub); return NATS_OK; } @@ -2324,6 +2327,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha jsSubOptions o; jsConsumerConfig cfgStack; jsConsumerConfig *cfg = NULL; + jsConsumerConfig *ocCfg = NULL; if ((new_sub == NULL) || (js == NULL)) return nats_setDefaultError(NATS_INVALID_ARG); @@ -2485,6 +2489,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha cfg->Heartbeat = jsOrderedHBInterval; cfg->MemoryStorage = true; cfg->Replicas = 1; + + s = js_cloneConsumerConfig(cfg, &ocCfg); } else { @@ -2514,14 +2520,13 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha s = nats_setDefaultError(NATS_NO_MEMORY); } IF_OK_DUP_STRING(s, jsi->stream, stream); - if ((s == NATS_OK) && (opts->Ordered)) - DUP_STRING(s, jsi->fsubj, cfg->FilterSubject); if (s == NATS_OK) { jsi->js = js; jsi->hbi = hbi; jsi->pull = isPullMode; jsi->ordered= opts->Ordered; + jsi->ocCfg = ocCfg; jsi->dseq = 1; jsi->ackNone= (opts->Config.AckPolicy == js_AckNone || opts->Ordered); js_retain(js); @@ -2983,7 +2988,7 @@ _recreateOrderedCons(void *closure) natsThread *t = NULL; jsSub *jsi = NULL; jsConsumerInfo *ci = NULL; - jsConsumerConfig cc; + jsConsumerConfig *cc = NULL; natsStatus s; // Note: if anything fail here, the reset/recreate of the ordered consumer @@ -3010,38 +3015,39 @@ _recreateOrderedCons(void *closure) natsSub_Lock(sub); t = oci->thread; jsi = sub->jsi; - // Reset some items in jsi. - jsi->dseq = 1; - NATS_FREE(jsi->cmeta); - jsi->cmeta = NULL; - NATS_FREE(jsi->fcReply); - jsi->fcReply = NULL; - jsi->fcDelivered = 0; - // Create consumer request for starting policy. - jsConsumerConfig_Init(&cc); - cc.FilterSubject = jsi->fsubj; - cc.FlowControl = true; - cc.AckPolicy = js_AckNone; - cc.MaxDeliver = 1; - cc.AckWait = NATS_SECONDS_TO_NANOS(24*60*60); // Just set to something known, not utilized. - cc.Heartbeat = jsi->hbi * 1000000; - cc.DeliverSubject = sub->subject; - cc.DeliverPolicy = js_DeliverByStartSequence; - cc.OptStartSeq = oci->sseq; - cc.MemoryStorage = true; - cc.Replicas = 1; + + NATS_FREE((char*) jsi->ocCfg->DeliverSubject); + jsi->ocCfg->DeliverSubject = NULL; + DUP_STRING(s, jsi->ocCfg->DeliverSubject, sub->subject); + if (s == NATS_OK) + { + // Reset some items in jsi. + jsi->dseq = 1; + NATS_FREE(jsi->cmeta); + jsi->cmeta = NULL; + NATS_FREE(jsi->fcReply); + jsi->fcReply = NULL; + jsi->fcDelivered = 0; + // Create consumer request for starting policy. + cc = jsi->ocCfg; + cc->DeliverPolicy = js_DeliverByStartSequence; + cc->OptStartSeq = oci->sseq; + } natsSub_Unlock(sub); - s = js_AddConsumer(&ci, jsi->js, jsi->stream, &cc, NULL, NULL); if (s == NATS_OK) { - natsSub_Lock(sub); - NATS_FREE(jsi->consumer); - jsi->consumer = NULL; - DUP_STRING(s, jsi->consumer, ci->Name); - natsSub_Unlock(sub); + s = js_AddConsumer(&ci, jsi->js, jsi->stream, cc, NULL, NULL); + if (s == NATS_OK) + { + natsSub_Lock(sub); + NATS_FREE(jsi->consumer); + jsi->consumer = NULL; + DUP_STRING(s, jsi->consumer, ci->Name); + natsSub_Unlock(sub); - jsConsumerInfo_Destroy(ci); + jsConsumerInfo_Destroy(ci); + } } } if (s != NATS_OK) diff --git a/src/js.h b/src/js.h index 1872c335f..8f0f26044 100644 --- a/src/js.h +++ b/src/js.h @@ -259,3 +259,9 @@ js_release(jsCtx *js); natsStatus js_directGetMsgToJSMsg(const char *stream, natsMsg *msg); + +natsStatus +js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone); + +void +js_destroyConsumerConfig(jsConsumerConfig *cc); diff --git a/src/jsm.c b/src/jsm.c index f7aaae1ba..1fd25db59 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -2654,8 +2654,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo return NATS_UPDATE_ERR_STACK(s); } -static void -_destroyConsumerConfig(jsConsumerConfig *cc) +void +js_destroyConsumerConfig(jsConsumerConfig *cc) { if (cc == NULL) return; @@ -2793,7 +2793,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi if (s == NATS_OK) *new_cc = cc; else - _destroyConsumerConfig(cc); + js_destroyConsumerConfig(cc); return NATS_UPDATE_ERR_STACK(s); } @@ -3102,7 +3102,7 @@ jsConsumerInfo_Destroy(jsConsumerInfo *ci) NATS_FREE(ci->Stream); NATS_FREE(ci->Name); - _destroyConsumerConfig(ci->Config); + js_destroyConsumerConfig(ci->Config); _destroyClusterInfo(ci->Cluster); NATS_FREE(ci); } @@ -3451,3 +3451,53 @@ jsConsumerNamesList_Destroy(jsConsumerNamesList *list) NATS_FREE(list->List); NATS_FREE(list); } + +natsStatus +js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone) +{ + natsStatus s = NATS_OK; + jsConsumerConfig *c = NULL; + + *clone = NULL; + if (org == NULL) + return NATS_OK; + + c = (jsConsumerConfig*) NATS_CALLOC(1, sizeof(jsConsumerConfig)); + if (c == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + + memcpy(c, org, sizeof(jsConsumerConfig)); + + // Need to first set all pointers to NULL in case we fail to dup and then + // do the cleanup. + c->Name = NULL; + c->Durable = NULL; + c->Description = NULL; + c->BackOff = NULL; + c->FilterSubject = NULL; + c->SampleFrequency = NULL; + c->DeliverSubject = NULL; + c->DeliverGroup = NULL; + // Now dup all strings, etc... + IF_OK_DUP_STRING(s, c->Name, org->Name); + IF_OK_DUP_STRING(s, c->Durable, org->Durable); + IF_OK_DUP_STRING(s, c->Description, org->Description); + IF_OK_DUP_STRING(s, c->FilterSubject, org->FilterSubject); + IF_OK_DUP_STRING(s, c->SampleFrequency, org->SampleFrequency); + IF_OK_DUP_STRING(s, c->DeliverSubject, org->DeliverSubject); + IF_OK_DUP_STRING(s, c->DeliverGroup, org->DeliverGroup); + if ((s == NATS_OK) && (org->BackOff != NULL) && (org->BackOffLen > 0)) + { + c->BackOff = (int64_t*) NATS_CALLOC(org->BackOffLen, sizeof(int64_t)); + if (c->BackOff == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + memcpy(c->BackOff, org->BackOff, org->BackOffLen*sizeof(int64_t)); + } + if (s == NATS_OK) + *clone = c; + else + js_destroyConsumerConfig(c); + + return NATS_UPDATE_ERR_STACK(s); +} diff --git a/src/natsp.h b/src/natsp.h index 6b1cc9f01..7153dc80e 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -102,6 +102,8 @@ #define MAX_FRAMES (50) +#define nats_IsStringEmpty(s) ((((s) == NULL) || ((s)[0] == '\0')) ? true : false) + #define DUP_STRING(s, s1, s2) \ { \ (s1) = NATS_STRDUP(s2); \ @@ -110,7 +112,7 @@ } #define IF_OK_DUP_STRING(s, s1, s2) \ - if ((s) == NATS_OK) \ + if (((s) == NATS_OK) && !nats_IsStringEmpty(s2)) \ DUP_STRING((s), (s1), (s2)) @@ -399,6 +401,13 @@ typedef struct __jsSub // the sequence mismatch report. Should the mismatch be // resolved, this will be cleared. bool sm; + // These are the mismatch seq info + struct mismatch + { + uint64_t sseq; + uint64_t dseq; + uint64_t ldseq; + } mismatch; // When in auto-ack mode, we have an internal callback // that will call natsMsg_Ack after the user callback returns. @@ -412,8 +421,8 @@ typedef struct __jsSub uint64_t fciseq; char *fcReply; - // When reseting an OrderedConsumer, need the original filter subject. - char *fsubj; + // When reseting an OrderedConsumer, need the original configuration. + jsConsumerConfig *ocCfg; } jsSub; diff --git a/src/util.h b/src/util.h index 04b226575..d8a3f3b23 100644 --- a/src/util.h +++ b/src/util.h @@ -68,8 +68,6 @@ typedef struct typedef natsStatus (*jsonRangeCB)(void *userInfo, const char *fieldName, nats_JSONField *f); -#define nats_IsStringEmpty(s) (((s == NULL) || (s[0] == '\0')) ? true : false) - #define snprintf_truncate(d, szd, f, ...) if (snprintf((d), (szd), (f), __VA_ARGS__) >= (int) (szd)) { \ int offset = (int) (szd) - 2; \ if (offset > 0) (d)[offset--] = '.'; \ diff --git a/test/test.c b/test/test.c index 9e0e682f1..1d54d3fa6 100644 --- a/test/test.c +++ b/test/test.c @@ -23337,6 +23337,7 @@ test_JetStreamMgtConsumers(void) jsConsumerNamesList *cnList = NULL; int count = 0; natsMsg *msg = NULL; + jsConsumerConfig *cloneCfg = NULL; JS_SETUP(2, 9, 0); @@ -24118,6 +24119,32 @@ test_JetStreamMgtConsumers(void) jsConsumerNamesList_Destroy(cnList); testCond(true); + test("Check clone: "); + jsConsumerConfig_Init(&cfg); + cfg.Name = "A"; + cfg.Durable = "B"; + cfg.Description = "C"; + cfg.FilterSubject = "D"; + cfg.SampleFrequency = "E"; + cfg.DeliverSubject = "F"; + cfg.DeliverGroup = "G"; + cfg.BackOff = (int64_t[]){NATS_MILLIS_TO_NANOS(50), NATS_MILLIS_TO_NANOS(250)}; + cfg.BackOffLen = 2; + s = js_cloneConsumerConfig(&cfg, &cloneCfg); + testCond((s == NATS_OK) && (cloneCfg != NULL) + && (cloneCfg->Name != NULL) && (strcmp(cloneCfg->Name, "A") == 0) + && (cloneCfg->Durable != NULL) && (strcmp(cloneCfg->Durable, "B") == 0) + && (cloneCfg->Description != NULL) && (strcmp(cloneCfg->Description, "C") == 0) + && (cloneCfg->FilterSubject != NULL) && (strcmp(cloneCfg->FilterSubject, "D") == 0) + && (cloneCfg->SampleFrequency != NULL) && (strcmp(cloneCfg->SampleFrequency, "E") == 0) + && (cloneCfg->DeliverSubject != NULL) && (strcmp(cloneCfg->DeliverSubject, "F") == 0) + && (cloneCfg->DeliverGroup != NULL) && (strcmp(cloneCfg->DeliverGroup, "G") == 0) + && (cloneCfg->BackOffLen == 2) + && (cloneCfg->BackOff != NULL) + && (cloneCfg->BackOff[0] == NATS_MILLIS_TO_NANOS(50)) + && (cloneCfg->BackOff[1] == NATS_MILLIS_TO_NANOS(250))); + js_destroyConsumerConfig(cloneCfg); + JS_TEARDOWN; } @@ -26852,7 +26879,7 @@ test_JetStreamSubscribeIdleHearbeat(void) test("Check HB received: "); nats_Sleep(300); natsSubAndLdw_Lock(sub); - s = (sub->jsi->dseq == 1 ? NATS_OK : NATS_ERR); + s = (sub->jsi->mismatch.dseq == 1 ? NATS_OK : NATS_ERR); natsSubAndLdw_Unlock(sub); testCond(s == NATS_OK); @@ -26986,7 +27013,7 @@ test_JetStreamSubscribeIdleHearbeat(void) test("Check HB received: "); nats_Sleep(300); natsMutex_Lock(sub->mu); - s = (sub->jsi->dseq == 3 ? NATS_OK : NATS_ERR); + s = (sub->jsi->mismatch.dseq == 3 ? NATS_OK : NATS_ERR); natsMutex_Unlock(sub->mu); testCond(s == NATS_OK); @@ -27344,7 +27371,7 @@ test_JetStreamSubscribePull(void) natsSubscription *sub2 = NULL; natsSubscription *sub3 = NULL; - JS_SETUP(2, 9, 0); + JS_SETUP(2, 9, 2); s = _createDefaultThreadArgsForCbTests(&args); if (s != NATS_OK) @@ -28090,6 +28117,8 @@ test_JetStreamOrderedConsumer(void) int assetLen = 1024*1024; const int chunkSize = 1024; jsStreamInfo *si = NULL; + jsConsumerInfo *ci1 = NULL; + jsConsumerInfo *ci2 = NULL; JS_SETUP(2, 3, 3); @@ -28253,6 +28282,48 @@ test_JetStreamOrderedConsumer(void) s = _testOrderedCons(js, si, asset, assetLen, &args, false); testCond(s == NATS_OK); + // A bug was causing the ordered consumer to be recreated at + // each hearbeat. Use a low hearbeat and check that name + // does not change after some HB intervals. + test("Create stream: "); + jsStreamConfig_Init(&sc); + sc.Name = "TESTNAME"; + sc.Subjects = (const char*[1]){"b"}; + sc.SubjectsLen = 1; + sc.Storage = js_MemoryStorage; + s = js_AddStream(NULL, js, &sc, NULL, &jerr); + testCond((s == NATS_OK) && (jerr == 0)); + + test("Send 1 message: "); + s = js_Publish(NULL, js, "b", "hello", 5, NULL, NULL); + testCond(s == NATS_OK); + + test("Create ordered cons with small HB: "); + jsSubOptions_Init(&so); + so.Ordered = true; + so.Config.Heartbeat = NATS_MILLIS_TO_NANOS(100); + s = js_SubscribeSync(&sub, js, "b", NULL, &so, &jerr); + testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); + + test("Get name: "); + s = natsSubscription_GetConsumerInfo(&ci1, sub, NULL, NULL); + testCond((s == NATS_OK) && (ci1 != NULL)); + + nats_Sleep(300); + + test("Send 1 message: "); + s = js_Publish(NULL, js, "b", "hello", 5, NULL, NULL); + testCond(s == NATS_OK); + + nats_Sleep(300); + + test("Check name does not change: "); + s = natsSubscription_GetConsumerInfo(&ci2, sub, NULL, NULL); + testCond((s == NATS_OK) && (ci2 != NULL) + && (strcmp(ci1->Name, ci2->Name) == 0)); + jsConsumerInfo_Destroy(ci1); + jsConsumerInfo_Destroy(ci2); + free(asset); jsStreamInfo_Destroy(si); natsSubscription_Destroy(sub); @@ -28348,7 +28419,7 @@ test_JetStreamOrderedConsumerWithErrors(void) test("Create ordered sub: "); jsSubOptions_Init(&so); so.Ordered = true; - so.Config.Heartbeat = 200*1000000; + so.Config.Heartbeat = NATS_MILLIS_TO_NANOS(200); s = js_SubscribeSync(&sub, js, "a", NULL, &so, &jerr); testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); @@ -28532,7 +28603,6 @@ test_JetStreamOrderedConsSrvRestart(void) natsSubscription *sub = NULL; natsMsg *msg = NULL; natsOptions *opts = NULL; - const char *cons = NULL; jsConsumerInfo *ci = NULL; jsErrCode jerr= 0; jsStreamConfig sc; @@ -28570,6 +28640,7 @@ test_JetStreamOrderedConsSrvRestart(void) jsSubOptions_Init(&so); so.Ordered = true; so.Config.Heartbeat = NATS_MILLIS_TO_NANOS(250); + so.Config.HeadersOnly = true; s = js_SubscribeSync(&sub, js, "foo", NULL, &so, &jerr); testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0)); @@ -28614,13 +28685,12 @@ test_JetStreamOrderedConsSrvRestart(void) natsMsg_Destroy(msg); msg = NULL; - test("Check still memory storage: "); - natsSub_Lock(sub); - if (sub->jsi != NULL) - cons = sub->jsi->consumer; - natsSub_Unlock(sub); - s = js_GetConsumerInfo(&ci, js, "OCRESTART", cons, NULL, NULL); - testCond((s == NATS_OK) && (ci->Config->MemoryStorage) && (ci->Config->Replicas == 1)) + test("Check configuration is similar: "); + s = natsSubscription_GetConsumerInfo(&ci, sub, NULL, NULL); + testCond((s == NATS_OK) + && (ci->Config->MemoryStorage) + && (ci->Config->Replicas == 1) + && (ci->Config->HeadersOnly)); jsConsumerInfo_Destroy(ci); natsSubscription_Destroy(sub);