Skip to content

Commit

Permalink
Merge pull request #600 from nats-io/fix_ordered_cons
Browse files Browse the repository at this point in the history
[FIXED] JetStream: Ordered consumer issues
  • Loading branch information
kozlovic authored Oct 6, 2022
2 parents 96eecca + 1232a3a commit 33b9598
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 59 deletions.
82 changes: 44 additions & 38 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ jsSub_free(jsSub *jsi)
NATS_FREE(jsi->nxtMsgSubj);
NATS_FREE(jsi->cmeta);
NATS_FREE(jsi->fcReply);
NATS_FREE(jsi->fsubj);
js_destroyConsumerConfig(jsi->ocCfg);
NATS_FREE(jsi);

js_release(js);
Expand Down Expand Up @@ -1538,6 +1538,7 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm)
jsSub *jsi = sub->jsi;
const char *str = NULL;
int64_t val = 0;
struct mismatch *m = &jsi->mismatch;
natsStatus s;

*sm = false;
Expand All @@ -1548,7 +1549,7 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm)
if (jsi->cmeta == NULL)
return NATS_OK;

s = js_getMetaData(jsi->cmeta, NULL, NULL, NULL, NULL, &jsi->sseq, &jsi->dseq, NULL, NULL, 2);
s = js_getMetaData(jsi->cmeta, NULL, NULL, NULL, NULL, &m->sseq, &m->dseq, NULL, NULL, 2);
if (s != NATS_OK)
{
if (s == NATS_ERR)
Expand All @@ -1568,9 +1569,9 @@ jsSub_processSequenceMismatch(natsSubscription *sub, natsMsg *msg, bool *sm)
if (val == -1)
return nats_setError(NATS_ERR, "invalid last consumer sequence: '%s'", str);

jsi->ldseq = (uint64_t) val;
m->ldseq = (uint64_t) val;
}
if (jsi->ldseq == jsi->dseq)
if (m->ldseq == m->dseq)
{
// Sync subs use this flag to get the NextMsg() to error out and
// return NATS_MISMATCH to indicate that a mismatch was discovered,
Expand Down Expand Up @@ -1636,6 +1637,7 @@ natsStatus
natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscription *sub)
{
jsSub *jsi;
struct mismatch *m = NULL;

if ((csm == NULL) || (sub == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);
Expand All @@ -1647,15 +1649,16 @@ natsSubscription_GetSequenceMismatch(jsConsumerSequenceMismatch *csm, natsSubscr
return nats_setError(NATS_INVALID_SUBSCRIPTION, "%s", jsErrNotAJetStreamSubscription);
}
jsi = sub->jsi;
if (jsi->dseq == jsi->ldseq)
m = &jsi->mismatch;
if (m->dseq == m->ldseq)
{
natsSubAndLdw_Unlock(sub);
return NATS_NOT_FOUND;
}
memset(csm, 0, sizeof(jsConsumerSequenceMismatch));
csm->Stream = jsi->sseq;
csm->ConsumerClient = jsi->dseq;
csm->ConsumerServer = jsi->ldseq;
csm->Stream = m->sseq;
csm->ConsumerClient = m->dseq;
csm->ConsumerServer = m->ldseq;
natsSubAndLdw_Unlock(sub);
return NATS_OK;
}
Expand Down Expand Up @@ -2324,6 +2327,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
jsSubOptions o;
jsConsumerConfig cfgStack;
jsConsumerConfig *cfg = NULL;
jsConsumerConfig *ocCfg = NULL;

if ((new_sub == NULL) || (js == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);
Expand Down Expand Up @@ -2485,6 +2489,8 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
cfg->Heartbeat = jsOrderedHBInterval;
cfg->MemoryStorage = true;
cfg->Replicas = 1;

s = js_cloneConsumerConfig(cfg, &ocCfg);
}
else
{
Expand Down Expand Up @@ -2514,14 +2520,13 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && (opts->Ordered))
DUP_STRING(s, jsi->fsubj, cfg->FilterSubject);
if (s == NATS_OK)
{
jsi->js = js;
jsi->hbi = hbi;
jsi->pull = isPullMode;
jsi->ordered= opts->Ordered;
jsi->ocCfg = ocCfg;
jsi->dseq = 1;
jsi->ackNone= (opts->Config.AckPolicy == js_AckNone || opts->Ordered);
js_retain(js);
Expand Down Expand Up @@ -2983,7 +2988,7 @@ _recreateOrderedCons(void *closure)
natsThread *t = NULL;
jsSub *jsi = NULL;
jsConsumerInfo *ci = NULL;
jsConsumerConfig cc;
jsConsumerConfig *cc = NULL;
natsStatus s;

// Note: if anything fail here, the reset/recreate of the ordered consumer
Expand All @@ -3010,38 +3015,39 @@ _recreateOrderedCons(void *closure)
natsSub_Lock(sub);
t = oci->thread;
jsi = sub->jsi;
// Reset some items in jsi.
jsi->dseq = 1;
NATS_FREE(jsi->cmeta);
jsi->cmeta = NULL;
NATS_FREE(jsi->fcReply);
jsi->fcReply = NULL;
jsi->fcDelivered = 0;
// Create consumer request for starting policy.
jsConsumerConfig_Init(&cc);
cc.FilterSubject = jsi->fsubj;
cc.FlowControl = true;
cc.AckPolicy = js_AckNone;
cc.MaxDeliver = 1;
cc.AckWait = NATS_SECONDS_TO_NANOS(24*60*60); // Just set to something known, not utilized.
cc.Heartbeat = jsi->hbi * 1000000;
cc.DeliverSubject = sub->subject;
cc.DeliverPolicy = js_DeliverByStartSequence;
cc.OptStartSeq = oci->sseq;
cc.MemoryStorage = true;
cc.Replicas = 1;

NATS_FREE((char*) jsi->ocCfg->DeliverSubject);
jsi->ocCfg->DeliverSubject = NULL;
DUP_STRING(s, jsi->ocCfg->DeliverSubject, sub->subject);
if (s == NATS_OK)
{
// Reset some items in jsi.
jsi->dseq = 1;
NATS_FREE(jsi->cmeta);
jsi->cmeta = NULL;
NATS_FREE(jsi->fcReply);
jsi->fcReply = NULL;
jsi->fcDelivered = 0;
// Create consumer request for starting policy.
cc = jsi->ocCfg;
cc->DeliverPolicy = js_DeliverByStartSequence;
cc->OptStartSeq = oci->sseq;
}
natsSub_Unlock(sub);

s = js_AddConsumer(&ci, jsi->js, jsi->stream, &cc, NULL, NULL);
if (s == NATS_OK)
{
natsSub_Lock(sub);
NATS_FREE(jsi->consumer);
jsi->consumer = NULL;
DUP_STRING(s, jsi->consumer, ci->Name);
natsSub_Unlock(sub);
s = js_AddConsumer(&ci, jsi->js, jsi->stream, cc, NULL, NULL);
if (s == NATS_OK)
{
natsSub_Lock(sub);
NATS_FREE(jsi->consumer);
jsi->consumer = NULL;
DUP_STRING(s, jsi->consumer, ci->Name);
natsSub_Unlock(sub);

jsConsumerInfo_Destroy(ci);
jsConsumerInfo_Destroy(ci);
}
}
}
if (s != NATS_OK)
Expand Down
6 changes: 6 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,9 @@ js_release(jsCtx *js);

natsStatus
js_directGetMsgToJSMsg(const char *stream, natsMsg *msg);

natsStatus
js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone);

void
js_destroyConsumerConfig(jsConsumerConfig *cc);
58 changes: 54 additions & 4 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2654,8 +2654,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
return NATS_UPDATE_ERR_STACK(s);
}

static void
_destroyConsumerConfig(jsConsumerConfig *cc)
void
js_destroyConsumerConfig(jsConsumerConfig *cc)
{
if (cc == NULL)
return;
Expand Down Expand Up @@ -2793,7 +2793,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
if (s == NATS_OK)
*new_cc = cc;
else
_destroyConsumerConfig(cc);
js_destroyConsumerConfig(cc);

return NATS_UPDATE_ERR_STACK(s);
}
Expand Down Expand Up @@ -3102,7 +3102,7 @@ jsConsumerInfo_Destroy(jsConsumerInfo *ci)

NATS_FREE(ci->Stream);
NATS_FREE(ci->Name);
_destroyConsumerConfig(ci->Config);
js_destroyConsumerConfig(ci->Config);
_destroyClusterInfo(ci->Cluster);
NATS_FREE(ci);
}
Expand Down Expand Up @@ -3451,3 +3451,53 @@ jsConsumerNamesList_Destroy(jsConsumerNamesList *list)
NATS_FREE(list->List);
NATS_FREE(list);
}

natsStatus
js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
{
natsStatus s = NATS_OK;
jsConsumerConfig *c = NULL;

*clone = NULL;
if (org == NULL)
return NATS_OK;

c = (jsConsumerConfig*) NATS_CALLOC(1, sizeof(jsConsumerConfig));
if (c == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

memcpy(c, org, sizeof(jsConsumerConfig));

// Need to first set all pointers to NULL in case we fail to dup and then
// do the cleanup.
c->Name = NULL;
c->Durable = NULL;
c->Description = NULL;
c->BackOff = NULL;
c->FilterSubject = NULL;
c->SampleFrequency = NULL;
c->DeliverSubject = NULL;
c->DeliverGroup = NULL;
// Now dup all strings, etc...
IF_OK_DUP_STRING(s, c->Name, org->Name);
IF_OK_DUP_STRING(s, c->Durable, org->Durable);
IF_OK_DUP_STRING(s, c->Description, org->Description);
IF_OK_DUP_STRING(s, c->FilterSubject, org->FilterSubject);
IF_OK_DUP_STRING(s, c->SampleFrequency, org->SampleFrequency);
IF_OK_DUP_STRING(s, c->DeliverSubject, org->DeliverSubject);
IF_OK_DUP_STRING(s, c->DeliverGroup, org->DeliverGroup);
if ((s == NATS_OK) && (org->BackOff != NULL) && (org->BackOffLen > 0))
{
c->BackOff = (int64_t*) NATS_CALLOC(org->BackOffLen, sizeof(int64_t));
if (c->BackOff == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
else
memcpy(c->BackOff, org->BackOff, org->BackOffLen*sizeof(int64_t));
}
if (s == NATS_OK)
*clone = c;
else
js_destroyConsumerConfig(c);

return NATS_UPDATE_ERR_STACK(s);
}
15 changes: 12 additions & 3 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@

#define MAX_FRAMES (50)

#define nats_IsStringEmpty(s) ((((s) == NULL) || ((s)[0] == '\0')) ? true : false)

#define DUP_STRING(s, s1, s2) \
{ \
(s1) = NATS_STRDUP(s2); \
Expand All @@ -110,7 +112,7 @@
}

#define IF_OK_DUP_STRING(s, s1, s2) \
if ((s) == NATS_OK) \
if (((s) == NATS_OK) && !nats_IsStringEmpty(s2)) \
DUP_STRING((s), (s1), (s2))


Expand Down Expand Up @@ -399,6 +401,13 @@ typedef struct __jsSub
// the sequence mismatch report. Should the mismatch be
// resolved, this will be cleared.
bool sm;
// These are the mismatch seq info
struct mismatch
{
uint64_t sseq;
uint64_t dseq;
uint64_t ldseq;
} mismatch;

// When in auto-ack mode, we have an internal callback
// that will call natsMsg_Ack after the user callback returns.
Expand All @@ -412,8 +421,8 @@ typedef struct __jsSub
uint64_t fciseq;
char *fcReply;

// When reseting an OrderedConsumer, need the original filter subject.
char *fsubj;
// When reseting an OrderedConsumer, need the original configuration.
jsConsumerConfig *ocCfg;

} jsSub;

Expand Down
2 changes: 0 additions & 2 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ typedef struct

typedef natsStatus (*jsonRangeCB)(void *userInfo, const char *fieldName, nats_JSONField *f);

#define nats_IsStringEmpty(s) (((s == NULL) || (s[0] == '\0')) ? true : false)

#define snprintf_truncate(d, szd, f, ...) if (snprintf((d), (szd), (f), __VA_ARGS__) >= (int) (szd)) { \
int offset = (int) (szd) - 2; \
if (offset > 0) (d)[offset--] = '.'; \
Expand Down
Loading

0 comments on commit 33b9598

Please sign in to comment.