diff --git a/src/conn.c b/src/conn.c index 0dddffc41..76d444cac 100644 --- a/src/conn.c +++ b/src/conn.c @@ -4432,3 +4432,59 @@ natsConn_defaultErrHandler(natsConnection *nc, natsSubscription *sub, natsStatus } fflush(stderr); } + +natsStatus +natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc) +{ + if ((nc == NULL) || (cb == NULL) || (closure == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + *cb = nc->opts->asyncErrCb; + *closure = nc->opts->asyncErrCbClosure; + natsConn_Unlock(nc); + + return NATS_OK; +} + +natsStatus +natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure) +{ + if ((nc == NULL) || (cb == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + nc->opts->asyncErrCb = cb; + nc->opts->asyncErrCbClosure = closure; + natsConn_Unlock(nc); + + return NATS_OK; +} + +natsStatus +natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc) +{ + if ((nc == NULL) || (cb == NULL) || (closure == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + *cb = nc->opts->closedCb; + *closure = nc->opts->closedCbClosure; + natsConn_Unlock(nc); + + return NATS_OK; +} + +natsStatus +natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure) +{ + if ((nc == NULL) || (cb == NULL)) + return nats_setDefaultError(NATS_INVALID_ARG); + + natsConn_Lock(nc); + nc->opts->closedCb = cb; + nc->opts->closedCbClosure = closure; + natsConn_Unlock(nc); + + return NATS_OK; +} diff --git a/src/conn.h b/src/conn.h index 41e93f402..e357cc9c7 100644 --- a/src/conn.h +++ b/src/conn.h @@ -160,4 +160,17 @@ natsConn_close(natsConnection *nc); void natsConn_destroy(natsConnection *nc, bool fromPublicDestroy); +natsStatus +natsConn_setErrorCallback(natsConnection *nc, natsErrHandler cb, void *closure); + +natsStatus +natsConn_getErrorCallback(natsErrHandler *cb, void **closure, natsConnection *nc); + +natsStatus +natsConn_setClosedCallback(natsConnection *nc, natsConnectionHandler cb, void *closure); + +natsStatus +natsConn_getClosedCallback(natsConnectionHandler *cb, void **closure, natsConnection *nc); + + #endif /* CONN_H_ */ diff --git a/src/nats.c b/src/nats.c index fcaaf0e14..f613a1c46 100644 --- a/src/nats.c +++ b/src/nats.c @@ -726,11 +726,14 @@ _timerThread(void *arg) static void _asyncCbsThread(void *arg) { - natsLibAsyncCbs *asyncCbs = &(gLib.asyncCbs); - natsAsyncCbInfo *cb = NULL; - natsConnection *nc = NULL; + natsLibAsyncCbs *asyncCbs = &(gLib.asyncCbs); + natsAsyncCbInfo *cb = NULL; + natsConnection *nc = NULL; + natsConnectionHandler cbHandler = NULL; + natsErrHandler errHandler = NULL; + void *cbClosure = NULL; #if defined(NATS_HAS_STREAMING) - stanConnection *sc = NULL; + stanConnection *sc = NULL; #endif WAIT_LIB_INITIALIZED; @@ -760,42 +763,65 @@ _asyncCbsThread(void *arg) sc = cb->sc; #endif + // callback handlers can be updated on a live connection, so we need to + // lock. + cbHandler = NULL; + errHandler = NULL; + cbClosure = NULL; + +#define __set_handler(_h, _cb, _cl) \ + { \ + natsMutex_Lock(nc->mu); \ + _h = nc->opts->_cb; \ + cbClosure = nc->opts->_cl; \ + natsMutex_Unlock(nc->mu); \ + } + switch (cb->type) { case ASYNC_CLOSED: - (*(nc->opts->closedCb))(nc, nc->opts->closedCbClosure); - break; + __set_handler(cbHandler, closedCb, closedCbClosure); + break; case ASYNC_DISCONNECTED: - (*(nc->opts->disconnectedCb))(nc, nc->opts->disconnectedCbClosure); + __set_handler(cbHandler, disconnectedCb, disconnectedCbClosure); break; case ASYNC_RECONNECTED: - (*(nc->opts->reconnectedCb))(nc, nc->opts->reconnectedCbClosure); + __set_handler(cbHandler, reconnectedCb, reconnectedCbClosure); break; case ASYNC_CONNECTED: - (*(nc->opts->connectedCb))(nc, nc->opts->connectedCbClosure); + __set_handler(cbHandler, connectedCb, connectedCbClosure); break; case ASYNC_DISCOVERED_SERVERS: - (*(nc->opts->discoveredServersCb))(nc, nc->opts->discoveredServersClosure); + __set_handler(cbHandler, discoveredServersCb, discoveredServersClosure); break; case ASYNC_LAME_DUCK_MODE: - (*(nc->opts->lameDuckCb))(nc, nc->opts->lameDuckClosure); + __set_handler(cbHandler, lameDuckCb, lameDuckClosure); break; case ASYNC_ERROR: - { - if (cb->errTxt != NULL) - nats_setErrStatusAndTxt(cb->err, cb->errTxt); - (*(nc->opts->asyncErrCb))(nc, cb->sub, cb->err, nc->opts->asyncErrCbClosure); - break; - } -#if defined(NATS_HAS_STREAMING) - case ASYNC_STAN_CONN_LOST: - (*(sc->opts->connectionLostCB))(sc, sc->connLostErrTxt, sc->opts->connectionLostCBClosure); + __set_handler(errHandler, asyncErrCb, asyncErrCbClosure); break; -#endif default: break; } + // Invoke the callback + if (cbHandler != NULL) + { + (*(cbHandler))(nc, cbClosure); + } + else if (errHandler != NULL) + { + if (cb->errTxt != NULL) + nats_setErrStatusAndTxt(cb->err, cb->errTxt); + (*(errHandler))(nc, cb->sub, cb->err, cbClosure); + } +#if defined(NATS_HAS_STREAMING) + else if (cb->type == ASYNC_STAN_CONN_LOST) + { + (*(sc->opts->connectionLostCB))(sc, sc->connLostErrTxt, sc->opts->connectionLostCBClosure); + } +#endif + natsAsyncCb_Destroy(cb); nats_clearLastError(); diff --git a/test/list.txt b/test/list.txt index 0429b024c..bf9e3d0db 100644 --- a/test/list.txt +++ b/test/list.txt @@ -67,6 +67,7 @@ ConnectionWithNULLOptions ConnectionToWithNullURLs ConnectionStatus ConnClosedCB +SetConnClosedCB CloseDisconnectedCB ServerStopDisconnectedCB ClosedConnections @@ -147,6 +148,7 @@ AsyncSubscriptionPendingDrain SyncSubscriptionPending SyncSubscriptionPendingDrain AsyncErrHandler +SetAsyncErrHandler AsyncSubscriberStarvation AsyncSubscriberOnClose NextMsgCallOnAsyncSub diff --git a/test/test.c b/test/test.c index a7cf40ed7..bcfe50cf8 100644 --- a/test/test.c +++ b/test/test.c @@ -8532,6 +8532,53 @@ test_ConnClosedCB(void) _stopServer(serverPid); } +static void +test_SetConnClosedCB(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsPid serverPid = NATS_INVALID_PID; + struct threadArg arg; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s == NATS_OK) + opts = _createReconnectOptions(); + + if ((opts == NULL) + || (natsOptions_SetURL(opts, NATS_DEFAULT_URL) != NATS_OK)) + { + FAIL("Unable to setup test for SetConnClosedCB!"); + } + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + s = natsConnection_Connect(&nc, opts); + + // Set the connection closed handler in-flight + IFOK(s, natsConn_setClosedCallback(nc, _closedCb, (void*) &arg)); + if (s == NATS_OK) + natsConnection_Close(nc); + + test("Test connection closed CB invoked: "); + + natsMutex_Lock(arg.m); + s = NATS_OK; + while ((s != NATS_TIMEOUT) && !arg.closed) + s = natsCondition_TimedWait(arg.c, arg.m, 1000); + natsMutex_Unlock(arg.m); + + testCond((s == NATS_OK) && arg.closed); + + natsOptions_Destroy(opts); + natsConnection_Destroy(nc); + + _destroyDefaultThreadArgs(&arg); + + _stopServer(serverPid); +} + static void test_CloseDisconnectedCB(void) { @@ -14373,6 +14420,79 @@ test_AsyncErrHandler(void) _stopServer(serverPid); } +static void +test_SetAsyncErrHandler(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsOptions *opts = NULL; + natsSubscription *sub = NULL; + natsPid serverPid = NATS_INVALID_PID; + struct threadArg arg; + + s = _createDefaultThreadArgsForCbTests(&arg); + if (s != NATS_OK) + FAIL("Unable to setup test!"); + + arg.status = NATS_OK; + arg.control= 7; + + s = natsOptions_Create(&opts); + IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL)); + IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 10)); + + if (s != NATS_OK) + FAIL("Unable to create options for test AsyncErrHandler"); + + serverPid = _startServer("nats://127.0.0.1:4222", NULL, true); + CHECK_SERVER_STARTED(serverPid); + + s = natsConnection_Connect(&nc, opts); + IFOK(s, natsConnection_Subscribe(&sub, nc, "async_test", _recvTestString, (void*) &arg)); + + natsMutex_Lock(arg.m); + arg.sub = sub; + natsMutex_Unlock(arg.m); + + // Start sending messages + for (int i=0; + (s == NATS_OK) && (i < (opts->maxPendingMsgs)); i++) + { + s = natsConnection_PublishString(nc, "async_test", "hello"); + } + + // Set the error handler in-flight + IFOK(s, natsConn_setErrorCallback(nc, _asyncErrCb, (void*) &arg)); + + for (int i=0; + (s == NATS_OK) && (i < 100); i++) + { + s = natsConnection_PublishString(nc, "async_test", "hello"); + } + IFOK(s, natsConnection_Flush(nc)); + + // Wait for async err callback + natsMutex_Lock(arg.m); + while ((s != NATS_TIMEOUT) && !arg.done) + s = natsCondition_TimedWait(arg.c, arg.m, 2000); + natsMutex_Unlock(arg.m); + + test("Aync fired properly, and all checks are good: "); + testCond((s == NATS_OK) + && arg.done + && arg.closed + && (arg.status == NATS_OK)); + + natsOptions_Destroy(opts); + natsSubscription_Destroy(sub); + natsConnection_Destroy(nc); + + _destroyDefaultThreadArgs(&arg); + + _stopServer(serverPid); +} + + static void _responseCb(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { @@ -34358,6 +34478,7 @@ static testInfo allTests[] = {"ConnectionToWithNullURLs", test_ConnectionToWithNullURLs}, {"ConnectionStatus", test_ConnectionStatus}, {"ConnClosedCB", test_ConnClosedCB}, + {"SetConnClosedCB", test_SetConnClosedCB}, {"CloseDisconnectedCB", test_CloseDisconnectedCB}, {"ServerStopDisconnectedCB", test_ServerStopDisconnectedCB}, {"ClosedConnections", test_ClosedConnections}, @@ -34440,6 +34561,7 @@ static testInfo allTests[] = {"SyncSubscriptionPending", test_SyncSubscriptionPending}, {"SyncSubscriptionPendingDrain", test_SyncSubscriptionPendingDrain}, {"AsyncErrHandler", test_AsyncErrHandler}, + {"SetAsyncErrHandler", test_SetAsyncErrHandler}, {"AsyncSubscriberStarvation", test_AsyncSubscriberStarvation}, {"AsyncSubscriberOnClose", test_AsyncSubscriberOnClose}, {"NextMsgCallOnAsyncSub", test_NextMsgCallOnAsyncSub},