Skip to content

Commit

Permalink
Fixed flappers, refactored 'lifetime'
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Aug 21, 2024
1 parent 4f04996 commit 74f7853
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 156 deletions.
18 changes: 6 additions & 12 deletions examples/js-sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ static const char *usage = ""\
"-fc enable flow control\n" \
"-count number of expected messages\n";

#define SECOND_NANO (int64_t)1E9
#define MINUTE_NANO (SECOND_NANO * 60)
#define HOUR_NANO (MINUTE_NANO * 60)

static bool fetchCompleteCalled = false;
static bool subCompleteCalled = false;

Expand Down Expand Up @@ -177,26 +173,24 @@ int main(int argc, char **argv)
{
if (pull && async)
{
jsOpts.PullSubscribeAsync.MaxMessages = (int) total;
jsOpts.PullSubscribeAsync.NoWait = true;
jsOpts.PullSubscribeAsync.TimeoutMillis = 3600 * 1000; // 1 hour
jsOpts.PullSubscribeAsync.FetchSize = 17;
jsOpts.PullSubscribeAsync.KeepAhead = 7;
jsOpts.PullSubscribeAsync.CompleteHandler = _completeFetchCb;

jsFetchRequest lifetime;
jsFetchRequest_Init(&lifetime);
lifetime.NoWait = true;
lifetime.Expires = 1 * HOUR_NANO;
lifetime.Batch = (int) total;
jsOpts.PullSubscribeAsync.CompleteHandlerClosure = NULL;

// Uncomment to use a 1 second heartbeat.
// lifetime.Heartbeat = 1 * SECOND_NANO;
// jsOpts.PullSubscribeAsync.HeartbeatMillis = 1000; // 1 second

// Uncomment to provide custom control over next fetch size.
// jsOpts.PullSubscribeAsync.NextHandler = nextFetchCb;

// Uncomment to turn off AutoACK on delivered messages.
// so.ManualAck = true;

s = js_PullSubscribeAsync(&sub, js, subj, durable, onMsg, NULL, &lifetime, &jsOpts, &so, &jerr);
s = js_PullSubscribeAsync(&sub, js, subj, durable, onMsg, NULL, &jsOpts, &so, &jerr);
}
else if (pull)
s = js_PullSubscribe(&sub, js, subj, durable, &jsOpts, &so, &jerr);
Expand Down
8 changes: 4 additions & 4 deletions src/dispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ _preProcessUserMessage(

if (fetch)
{
bool overMaxBytes = ((fetch->lifetime.MaxBytes > 0) && ((fetch->deliveredBytes) > fetch->lifetime.MaxBytes));
bool overMaxFetch = ((fetch->deliveredMsgs >= fetch->lifetime.Batch) || overMaxBytes);
bool overMaxBytes = ((fetch->maxBytes > 0) && ((fetch->deliveredBytes) > fetch->maxBytes));
bool overMaxFetch = ((fetch->deliveredMsgs >= fetch->maxMessages) || overMaxBytes);

*lastMessageInFetch = (fetch->deliveredMsgs == (fetch->lifetime.Batch - 1) || overMaxBytes);
*lastMessageInFetch = (fetch->deliveredMsgs == (fetch->maxMessages - 1) || overMaxBytes);

// See if we want to override fetch status based on our own data.
if (fetchStatus == NATS_OK)
Expand All @@ -142,7 +142,7 @@ _preProcessUserMessage(
{
fetchStatus = NATS_MAX_DELIVERED_MSGS;
}
if (overMaxBytes)
else if (overMaxBytes)
{
fetchStatus = NATS_LIMIT_REACHED;
}
Expand Down
79 changes: 30 additions & 49 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2917,9 +2917,10 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)

// These are not changeable by the callback, only Batch and MaxBytes can be updated.
int64_t now = nats_Now();
req.Heartbeat = fetch->lifetime.Heartbeat;
req.Expires = fetch->lifetime.Expires - (now - fetch->startTimeMilli) * 10*1000*1000;
req.NoWait = fetch->lifetime.NoWait;
if (fetch->timeoutMillis != 0)
req.Expires = (fetch->timeoutMillis - (now - fetch->startTimeMillis)) * 1000 * 1000; // ns, go time.Duration
req.NoWait = fetch->noWait;
req.Heartbeat = fetch->heartbeatMillis * 1000 * 1000; // ns, go time.Duration

char buffer[128];
natsBuffer buf;
Expand All @@ -2945,6 +2946,7 @@ js_maybeFetchMore(natsSubscription *sub, jsFetch *fetch)
return NATS_UPDATE_ERR_STACK(s);
}

// Sets Batch and MaxBytes for the next fetch request.
static bool
_autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
{
Expand All @@ -2963,15 +2965,15 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)

if (maybeMore)
{
// fetch->lifetime.Batch is always > 0
remainingUnrequested = fetch->lifetime.Batch - fetch->requestedMsgs;
// fetch->maxMessages is always > 0
remainingUnrequested = fetch->maxMessages - fetch->requestedMsgs;
if (remainingUnrequested <= 0)
maybeMore = false;
}

if (maybeMore && (fetch->lifetime.MaxBytes > 0))
if (maybeMore && (fetch->maxBytes > 0))
{
remainingBytes = fetch->lifetime.MaxBytes - fetch->receivedBytes;
remainingBytes = fetch->maxBytes - fetch->receivedBytes;
if (remainingBytes <= 0)
maybeMore = false;
}
Expand All @@ -2990,7 +2992,6 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
if (!maybeMore)
return false;

*req = fetch->lifetime; // copy bytes, reading immutable data
req->Batch = want;
// FIXME discuss in PR - this seems wrong, we don't know how many bytes we will have
// received from what is already requested. Still, can serve as a safe
Expand All @@ -3002,31 +3003,19 @@ _autoNextFetchRequest(jsFetchRequest *req, natsSubscription *sub, void *closure)
natsStatus
js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, const char *durable,
natsMsgHandler msgCB, void *msgCBClosure,
jsFetchRequest *lifetime,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
jsSub *jsi = NULL;
jsFetch *fetch = NULL;

jsFetchRequest defaultLifetime = {
.Batch = INT_MAX, // no limit
.Expires = INT64_MAX, // never
.Heartbeat = 0, // none
.MaxBytes = 0, // no limit
.NoWait = false, // wait forever
};

if ((newsub == NULL) || (msgCB == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

if (errCode != NULL)
*errCode = 0;

if (lifetime == NULL)
lifetime = &defaultLifetime;

// Do a basic pull subscribe first, but with a callback so it is treated as
// "async" and assigned to a dispatcher. Since we don't fetch anything, it
// will not be active yet.
Expand All @@ -3041,30 +3030,22 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject,
// Initialize fetch parameters.
if (s == NATS_OK)
{
fetch->startTimeMilli = nats_Now();
fetch->lifetime = *lifetime;

if (fetch->lifetime.Batch == 0)
fetch->lifetime.Batch = INT_MAX;

fetch->nextf = _autoNextFetchRequest;
fetch->nextClosure = (void *)fetch;
if (jsOpts != NULL)
{
fetch->fetchSize = jsOpts->PullSubscribeAsync.FetchSize;
fetch->keepAhead = jsOpts->PullSubscribeAsync.KeepAhead;
fetch->completeCB = jsOpts->PullSubscribeAsync.CompleteHandler;
fetch->completeCBClosure = jsOpts->PullSubscribeAsync.CompleteHandlerClosure;

if (jsOpts->PullSubscribeAsync.NextHandler != NULL)
{
fetch->nextf = jsOpts->PullSubscribeAsync.NextHandler;
fetch->nextClosure = jsOpts->PullSubscribeAsync.NextHandlerClosure;
}
}
fetch->status = NATS_OK;
fetch->startTimeMillis = nats_Now();

if (fetch->fetchSize == 0)
fetch->fetchSize = NATS_DEFAULT_ASYNC_FETCH_SIZE;
#define _set(_f, _v, _nil, _def) fetch->_f = ((jsOpts != NULL) && (jsOpts->PullSubscribeAsync._v != _nil)) ? jsOpts->PullSubscribeAsync._v : _def
_set(completeCB, CompleteHandler, NULL, NULL);
_set(completeCBClosure, CompleteHandlerClosure, NULL, NULL);
_set(fetchSize, FetchSize, 0, NATS_DEFAULT_ASYNC_FETCH_SIZE);
_set(heartbeatMillis, HeartbeatMillis, 0, 0);
_set(keepAhead, KeepAhead, 0, 0);
_set(maxBytes, MaxBytes, 0, 0);
_set(maxMessages, MaxMessages, 0, INT_MAX);
_set(nextClosure, NextHandlerClosure, NULL, fetch);
_set(nextf, NextHandler, NULL, _autoNextFetchRequest);
_set(noWait, NoWait, false, false);
_set(timeoutMillis, TimeoutMillis, 0, INT64_MAX);
#undef _set
}

// Set up the sub to process fetch results.
Expand All @@ -3079,27 +3060,27 @@ js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject,

// Start the timers. They will live for the entire length of the
// subscription (the missed heartbeat timer may be reset as needed).
if (lifetime->Expires > 0)
if (fetch->timeoutMillis > 0)
{
sub->refs++;
s = natsTimer_Create(&fetch->expiresTimer, _fetchExpiredFired, _releaseSubWhenStopped,
lifetime->Expires, (void *)sub);
fetch->timeoutMillis, (void *)sub);
if (s != NATS_OK)
sub->refs--;
}

if ((s == NATS_OK) && (lifetime->Heartbeat > 0))
if ((s == NATS_OK) && (fetch->heartbeatMillis > 0))
{
int64_t milli = (lifetime->Heartbeat / 1000000) * 2;
int64_t dur = fetch->heartbeatMillis * 2;
sub->refs++;
if (jsi->hbTimer == NULL)
{
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, milli, (void *)sub);
s = natsTimer_Create(&jsi->hbTimer, _hbTimerFired, _releaseSubWhenStopped, dur, (void *)sub);
if (s != NATS_OK)
sub->refs--;
}
else
natsTimer_Reset(jsi->hbTimer, milli);
natsTimer_Reset(jsi->hbTimer, dur);
}

natsSub_Unlock(sub);
Expand Down
47 changes: 28 additions & 19 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -1281,26 +1281,39 @@ typedef struct jsOptions

struct jsOptionsSubscribePullAsync
{
// Options to control automatic Fetch flow control.
//
// The number of messages to ask for in a single request, and if
// we should try to fetch ahead, KeepAhead more than we need to
// finish the current request. Fetch this many messages ahead of
// time.
int FetchSize;
int KeepAhead;
// Lifetime of the subscription (completes when any one of the
// targets is reached).
int64_t TimeoutMillis;
int MaxMessages;
int64_t MaxBytes;

// Manual fetch flow control. If provided gets called before
// each message is deliverered to msgCB, and overrides the
// default algorithm for sending Next requests.
natsFetchNextHandler NextHandler;
void *NextHandlerClosure;
// If NoWait is set, the subscription will receive the messages
// already stored on the server subject to the limits, but will
// not wait for more messages.
bool NoWait;

// Fetch complete handler that receives the exit status code,
// the subscription's Complete handler is also invoked, but does
// not have the status code.
natsFetchCompleteHandler CompleteHandler;
void *CompleteHandlerClosure;
void *CompleteHandlerClosure;

// Have server sends heartbeats to help detect communication failures.
int64_t HeartbeatMillis;

// Options to control automatic Fetch flow control. The number
// of messages to ask for in a single request, and if we should
// try to fetch ahead, KeepAhead more than we need to finish the
// current request. Fetch this many messages ahead of time.
int FetchSize;
int KeepAhead;

// Manual fetch flow control. If provided gets called before
// each message is deliverered to msgCB, and overrides the
// default algorithm for sending Next requests.
natsFetchNextHandler NextHandler;
void *NextHandlerClosure;

} PullSubscribeAsync;

/**
Expand Down Expand Up @@ -6597,18 +6610,14 @@ jsFetchRequest_Init(jsFetchRequest *request);
* @param durable the optional durable name.
* @param msgCB the #natsMsgHandler callback.
* @param msgCBClosure a pointer to an user defined object (can be `NULL`).
* @param lifetime the pointer to the #jsFetchRequest configuration used to set
* the sub's lifetime limits on messages, bytes, and elapsed time. It also
* allows to specify the heartbeat frequency. The default behavior would be to
* terminate the subscription if it fails. #jsOpts provides finer control.
* @param jsOpts the pointer to the #jsOptions object, possibly `NULL`.
* @param opts the subscribe options, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code,
* or `NULL` if not needed.
*/
NATS_EXTERN natsStatus
js_PullSubscribeAsync(natsSubscription **newsub, jsCtx *js, const char *subject, const char *durable,
natsMsgHandler msgCB, void *msgCBClosure, jsFetchRequest *lifetime,
natsMsgHandler msgCB, void *msgCBClosure,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode);

/** \brief Fetches messages for a pull subscription with a complete request configuration
Expand Down
39 changes: 23 additions & 16 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,31 +374,38 @@ struct __jsCtx

typedef struct __jsFetch
{
natsFetchCompleteHandler completeCB;
void *completeCBClosure;
natsStatus status;

// Lifetime control
jsFetchRequest lifetime;
int64_t startTimeMilli;
int receivedMsgs;
int64_t receivedBytes;
int deliveredMsgs;
int64_t deliveredBytes;
int requestedMsgs;
int64_t timeoutMillis;
int maxMessages;
int64_t maxBytes;
bool noWait;

int keepAhead;
int fetchSize;
// On complete
natsFetchCompleteHandler completeCB;
void *completeCBClosure;
natsStatus status;

// Flow control
natsFetchNextHandler nextf;
void *nextClosure;
void *nextClosure;
int keepAhead;
int fetchSize;
int64_t heartbeatMillis;

// Stats
int64_t startTimeMillis;
int receivedMsgs;
int64_t receivedBytes;
int deliveredMsgs;
int64_t deliveredBytes;
int requestedMsgs;

// Timer for the fetch expiration. We leverage the existing jsi->hbTimer for
// checking missed heartbeats.
natsTimer *expiresTimer;
natsTimer *expiresTimer;

// Matches jsi->fetchID
char replySubject[NATS_DEFAULT_INBOX_PRE_LEN + NUID_BUFFER_LEN + 32]; // big enough for {INBOX}.number
char replySubject[NATS_DEFAULT_INBOX_PRE_LEN + NUID_BUFFER_LEN + 32]; // big enough for {INBOX}.number
} jsFetch;

typedef struct __jsSub
Expand Down
Loading

0 comments on commit 74f7853

Please sign in to comment.