Skip to content

Commit

Permalink
Merge pull request #638 from nats-io/fix_637
Browse files Browse the repository at this point in the history
[FIXED] Crash processing inbound message for destroyed subscription
  • Loading branch information
kozlovic authored Feb 23, 2023
2 parents 835de29 + 0a7dd28 commit 44b956e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 43 deletions.
78 changes: 35 additions & 43 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -2617,7 +2617,6 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
bool sc = false;
bool sm = false;
nats_MsgList *list = NULL;
natsMutex *mu = NULL;
natsCondition *cond = NULL;
// For JetStream cases
jsSub *jsi = NULL;
Expand All @@ -2626,75 +2625,72 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
int jct = 0;
natsMsgFilter mf = NULL;
void *mfc = NULL;
bool unlock = false;

// Do this outside of locks, even if we end-up having to destroy
// it because we have reached the maxPendingMsgs count or other
// conditions. This reduces lock contention.
s = _createMsg(&msg, nc, buf, bufLen, nc->ps->ma.hdr);
if (s != NATS_OK)
return s;
// bufLen is the total length of headers + data. Since headers become
// more and more prevalent, it makes sense to count them both toward
// the subscription's pending limit. So use bufLen for accounting.

natsMutex_Lock(nc->subsMu);

nc->stats.inMsgs += 1;
nc->stats.inBytes += (uint64_t) bufLen;

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);
if (sub != NULL)
if ((mf = nc->filter) != NULL)
{
mf = nc->filter;
mfc = nc->filterClosure;
natsMutex_Unlock(nc->subsMu);

(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;

natsMutex_Lock(nc->subsMu);
}
natsMutex_Unlock(nc->subsMu);

sub = natsHash_Get(nc->subs, nc->ps->ma.sid);
if (sub == NULL)
{
natsMutex_Unlock(nc->subsMu);
natsMsg_Destroy(msg);
return NATS_OK;
}
// We need to retain the subscription since as soon as we release the
// nc->subsMu lock, the subscription could be destroyed and we would
// reference freed memory.
natsSubAndLdw_LockAndRetain(sub);

// Do this outside of sub's lock, even if we end-up having to destroy
// it because we have reached the maxPendingMsgs count. This reduces
// lock contention.
s = _createMsg(&msg, nc, buf, bufLen, nc->ps->ma.hdr);
if (s != NATS_OK)
return s;
// bufLen is the total length of headers + data. Since headers become
// more and more prevalent, it makes sense to count them both toward
// the subscription's pending limit. So use bufLen for accounting.
natsMutex_Unlock(nc->subsMu);

if (mf != NULL)
if (sub->closed || sub->drainSkip)
{
(*mf)(nc, &msg, mfc);
if (msg == NULL)
return NATS_OK;
natsSubAndLdw_UnlockAndRelease(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}

// Pick mutex, condition variable and list based on if the sub is
// Pick condition variable and list based on if the sub is
// part of a global delivery thread pool or not.
// Note about `list`: this is used only to link messages, but
// sub->msgList needs to be used to update/check number of pending
// messages, since in case of delivery thread pool, `list` will have
// messages from many different subscriptions.
if ((ldw = sub->libDlvWorker) != NULL)
{
mu = ldw->lock;
cond = ldw->cond;
list = &(ldw->msgList);
if (sub->jsi != NULL)
{
natsSub_Lock(sub);
unlock = true;
}
}
else
{
mu = sub->mu;
cond = sub->cond;
list = &(sub->msgList);
}

natsMutex_Lock(mu);
if (sub->closed || sub->drainSkip)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}

jsi = sub->jsi;
// For JS subscriptions (but not pull ones), handle hearbeat and flow control here.
if (jsi && !jsi->pull)
Expand All @@ -2714,9 +2710,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
s = jsSub_checkOrderedMsg(sub, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsSubAndLdw_UnlockAndRelease(sub);
natsMsg_Destroy(msg);
return s;
}
Expand Down Expand Up @@ -2798,9 +2792,7 @@ natsConn_processMsg(natsConnection *nc, char *buf, int bufLen)
}
}

natsMutex_Unlock(mu);
if (unlock)
natsSub_Unlock(sub);
natsSubAndLdw_UnlockAndRelease(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand Down
22 changes: 22 additions & 0 deletions src/sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,35 @@ natsSubAndLdw_Lock(natsSubscription *sub)
SUB_DLV_WORKER_LOCK(sub);
}

void
natsSubAndLdw_LockAndRetain(natsSubscription *sub)
{
natsMutex_Lock(sub->mu);
sub->refs++;
SUB_DLV_WORKER_LOCK(sub);
}

void
natsSubAndLdw_Unlock(natsSubscription *sub)
{
SUB_DLV_WORKER_UNLOCK(sub);
natsMutex_Unlock(sub->mu);
}

void
natsSubAndLdw_UnlockAndRelease(natsSubscription *sub)
{
int refs = 0;

SUB_DLV_WORKER_UNLOCK(sub);

refs = --(sub->refs);
natsMutex_Unlock(sub->mu);

if (refs == 0)
_freeSubscription(sub);
}

// Runs under the subscription lock but will release it for a JS subscription
// if the JS consumer needs to be deleted.
static void
Expand Down
6 changes: 6 additions & 0 deletions src/sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ natsSubAndLdw_Lock(natsSubscription *sub);
void
natsSubAndLdw_Unlock(natsSubscription *sub);

void
natsSubAndLdw_LockAndRetain(natsSubscription *sub);

void
natsSubAndLdw_UnlockAndRelease(natsSubscription *sub);

void
natsSub_close(natsSubscription *sub, bool connectionClosed);

Expand Down

0 comments on commit 44b956e

Please sign in to comment.