From 8bdde702d69826fc7dfe528dc33151cdca8b05e0 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 12 Jun 2024 23:16:14 -0700 Subject: [PATCH 1/3] [TEST ONLY] Fixed Test_JetStreamSubscribeIdleHeartbeat --- test/test.c | 46 ++++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/test/test.c b/test/test.c index ec2d2e888..1b1566eea 100644 --- a/test/test.c +++ b/test/test.c @@ -27912,7 +27912,7 @@ test_JetStreamSubscribeIdleHearbeat(void) natsMutex_Unlock(args.m); testCond(s == NATS_OK); - test("Check HB received: "); + test("Wait and check HB received: "); nats_Sleep(300); natsSubAndLdw_Lock(sub); s = (sub->jsi->mismatch.dseq == 1 ? NATS_OK : NATS_ERR); @@ -27951,18 +27951,35 @@ test_JetStreamSubscribeIdleHearbeat(void) s = js_Publish(NULL, js, "foo", "msg2", 4, NULL, &jerr); testCond((s == NATS_OK) && (jerr == 0)); + // Cheat by pretending that the server sends message seq 3, while client + // received only seq 1. Disable auto-ack for this message, or we mess up the + // server state. +#define PUBLISH_FAKE_JS_MSG_WITH_SEQ(_reply, _msg) \ + { \ + natsSub_Lock(sub); \ + inbox = sub->subject; \ + sub->jsi->ackNone = true; \ + natsSub_Unlock(sub); \ + \ + natsConn_setFilterWithClosure(nc, _setMsgReply, (void *)(_reply)); \ + s = natsConnection_PublishString(nc, inbox, (_msg)); \ + } + +#define PUBLISH_FAKE_RESET() \ + { \ + natsSub_Lock(sub); \ + sub->jsi->ackNone = true; \ + natsSub_Unlock(sub); \ + } + test("Check seq mismatch: "); - natsSub_Lock(sub); - inbox = sub->subject; - natsSub_Unlock(sub); - // Cheat by pretending that the server sends message seq 3, while client received only seq 1. - natsConn_setFilterWithClosure(nc, _setMsgReply, (void*) "$JS.ACK.TEST.dur1.1.3.3.1624472520000000000.0"); - s = natsConnection_PublishString(nc, inbox, "msg3"); + PUBLISH_FAKE_JS_MSG_WITH_SEQ("$JS.ACK.TEST.dur1.1.3.3.1624472520000000000.0", "msg3 fake"); // Wait for past the next HB and we should get an async error natsMutex_Lock(args.m); while ((s != NATS_TIMEOUT) && !args.done) s = natsCondition_TimedWait(args.c, args.m, 300); natsMutex_Unlock(args.m); + PUBLISH_FAKE_RESET(); testCond(s == NATS_OK); test("Check that notification is sent only once: "); @@ -27996,8 +28013,7 @@ test_JetStreamSubscribeIdleHearbeat(void) testCond(s == NATS_OK); test("Skip again: "); - natsConn_setFilterWithClosure(nc, _setMsgReply, (void*) "$JS.ACK.TEST.dur1.1.4.4.1624482520000000000.0"); - s = natsConnection_PublishString(nc, inbox, "msg4"); + PUBLISH_FAKE_JS_MSG_WITH_SEQ("$JS.ACK.TEST.dur1.1.4.4.1624482520000000000.0", "msg4 fake"); testCond(s == NATS_OK); test("Check async cb invoked: "); @@ -28005,6 +28021,7 @@ test_JetStreamSubscribeIdleHearbeat(void) while ((s != NATS_TIMEOUT) && !args.done) s = natsCondition_TimedWait(args.c, args.m, 1000); natsMutex_Unlock(args.m); + PUBLISH_FAKE_RESET(); testCond(s == NATS_OK); test("Check HB timer reports missed HB: "); @@ -28059,11 +28076,7 @@ test_JetStreamSubscribeIdleHearbeat(void) nats_clearLastError(); test("Skip: "); - natsSub_Lock(sub); - inbox = sub->subject; - natsSub_Unlock(sub); - natsConn_setFilterWithClosure(nc, _setMsgReply, (void*) "$JS.ACK.TEST.dur2.1.4.4.1624482520000000000.0"); - s = natsConnection_PublishString(nc, inbox, "msg4"); + PUBLISH_FAKE_JS_MSG_WITH_SEQ("$JS.ACK.TEST.dur2.1.4.4.1624482520000000000.0", "msg4 fake"); testCond(s == NATS_OK); // For sync subs, we should not get async error @@ -28074,6 +28087,7 @@ test_JetStreamSubscribeIdleHearbeat(void) natsMutex_Unlock(args.m); testCond(s == NATS_TIMEOUT); nats_clearLastError(); + PUBLISH_FAKE_RESET(); test("NextMsg reports error: "); s = natsSubscription_NextMsg(&msg, sub, 1000); @@ -28103,8 +28117,7 @@ test_JetStreamSubscribeIdleHearbeat(void) testCond(s == NATS_OK); test("Skip again: "); - natsConn_setFilterWithClosure(nc, _setMsgReply, (void*) "$JS.ACK.TEST.dur1.1.5.5.1624492520000000000.0"); - s = natsConnection_PublishString(nc, inbox, "msg5"); + PUBLISH_FAKE_JS_MSG_WITH_SEQ("$JS.ACK.TEST.dur1.1.5.5.1624492520000000000.0", "msg5 fake"); testCond(s == NATS_OK); test("NextMsg reports error: "); @@ -28112,6 +28125,7 @@ test_JetStreamSubscribeIdleHearbeat(void) s = natsSubscription_NextMsg(&msg, sub, 1000); testCond((s == NATS_MISMATCH) && (msg == NULL)); nats_clearLastError(); + PUBLISH_FAKE_RESET(); test("Check HB timer reports missed HB: "); s = NATS_OK; From 20b1fee9a5834bf77290bc047b0eca1b71de1817 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Wed, 12 Jun 2024 23:28:35 -0700 Subject: [PATCH 2/3] Nit: removed an irrelevant format change --- test/test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.c b/test/test.c index 1b1566eea..24b2c03d1 100644 --- a/test/test.c +++ b/test/test.c @@ -27912,7 +27912,7 @@ test_JetStreamSubscribeIdleHearbeat(void) natsMutex_Unlock(args.m); testCond(s == NATS_OK); - test("Wait and check HB received: "); + test("Check HB received: "); nats_Sleep(300); natsSubAndLdw_Lock(sub); s = (sub->jsi->mismatch.dseq == 1 ? NATS_OK : NATS_ERR); From 92b9c7d973408d929c90a7ee7b40348f62dbf2d7 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 13 Jun 2024 07:27:08 -0700 Subject: [PATCH 3/3] PR feedback: typo --- test/test.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/test.c b/test/test.c index 24b2c03d1..71f2a5b9a 100644 --- a/test/test.c +++ b/test/test.c @@ -27965,11 +27965,11 @@ test_JetStreamSubscribeIdleHearbeat(void) s = natsConnection_PublishString(nc, inbox, (_msg)); \ } -#define PUBLISH_FAKE_RESET() \ - { \ - natsSub_Lock(sub); \ - sub->jsi->ackNone = true; \ - natsSub_Unlock(sub); \ +#define PUBLISH_FAKE_RESET() \ + { \ + natsSub_Lock(sub); \ + sub->jsi->ackNone = false; \ + natsSub_Unlock(sub); \ } test("Check seq mismatch: ");