diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 1e67bd43d..14ed168c8 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -133,15 +133,6 @@ jobs: cmake .. ${{ steps.cmake-flags.outputs.flags }} make rebuild_cache && make - - name: "Rebuild the list of tests to match the compile flags" - working-directory: ./build - run: | - ./bin/testsuite - if [[ $(diff list.txt ../test/list.txt; echo $?) != 0 ]]; then - mv list.txt ../test/list.txt - make rebuild_cache - fi - # testing - name: "Download nats-server version ${{ inputs.server_version }}" @@ -178,7 +169,7 @@ jobs: export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" flags="" - ctest --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} + ctest -L 'test' --timeout 60 --output-on-failure --repeat-until-fail ${{ inputs.repeat }} - name: Upload coverage reports to Codecov # PRs from external contributors fail: https://github.com/codecov/feedback/issues/301 @@ -190,10 +181,40 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} # verbose: true + - name: "Download benchmark results from ${{ github.event.pull_request.base.ref }}" + if: inputs.benchmark == 'ON' && github.event.pull_request.base.ref + uses: actions/download-artifact@v2 + with: + name: benchmark_results_${{ github.event.pull_request.base.ref }} + path: ./build/prev_bench.log + continue-on-error: true + - name: "Benchmark" if: inputs.benchmark == 'ON' working-directory: ./build run: | export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH export NATS_TEST_SERVER_VERSION="$(nats-server -v)" - ./bin/bench-sub-async + flags="" + ctest -L 'bench' --timeout 600 -VV | tee bench.log + # + # ...coming: compare to base branch + + - name: "Upload benchmark result for PR ${{ github.event.pull_request.head.ref }}" + if: inputs.benchmark == 'ON' && github.event.pull_request.head.ref + uses: actions/upload-artifact@v4 + with: + name: benchmark_results_${{ github.event.pull_request.head.ref }} + path: ./build/bench.log + + - name: Extract branch name + if: inputs.benchmark == 'ON' && !github.event.pull_request.head.ref + id: extract_branch + run: echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV + + - name: "Upload benchmark result for branch ${{ env.BRANCH_NAME }}" + if: inputs.benchmark == 'ON' && !github.event.pull_request.head.ref + uses: actions/upload-artifact@v4 + with: + name: benchmark_results_${{ env.BRANCH_NAME }} + path: ./build/bench.log diff --git a/.github/workflows/on-pr-debug.yml b/.github/workflows/on-pr-debug.yml index 089cd7f58..852a7475f 100644 --- a/.github/workflows/on-pr-debug.yml +++ b/.github/workflows/on-pr-debug.yml @@ -1,4 +1,4 @@ -name: "Debug" +name: "PR" on: pull_request: @@ -29,7 +29,7 @@ jobs: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} coverage-pooled: - name: "Coverage" + name: "Coverage (pooled delivery)" uses: ./.github/workflows/build-test.yml with: coverage: ON @@ -74,6 +74,14 @@ jobs: server_version: main lib_write_deadline: ON + bench: + name: "Benchmark" + uses: ./.github/workflows/build-test.yml + with: + server_version: main + benchmark: ON + type: Release + Windows: name: "Windows" runs-on: windows-latest @@ -98,4 +106,4 @@ jobs: - name: Test run: | cd build - ./bin/Debug/testsuite + ctest -L 'test' --timeout 60 --output-on-failure diff --git a/.github/workflows/on-push-release-extra.yml b/.github/workflows/on-push-release-extra.yml index f11aaf4fd..487f5d362 100644 --- a/.github/workflows/on-push-release-extra.yml +++ b/.github/workflows/on-push-release-extra.yml @@ -5,7 +5,6 @@ on: - main - release_* - permissions: contents: write # required by build-test to comment on coverage but not used here. @@ -67,3 +66,11 @@ jobs: type: Debug secrets: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + + bench: + name: "Benchmark" + uses: ./.github/workflows/build-test.yml + with: + server_version: main + benchmark: ON + type: Release diff --git a/.github/workflows/on-push-release.yml b/.github/workflows/on-push-release.yml index a277c6a2a..50513575c 100644 --- a/.github/workflows/on-push-release.yml +++ b/.github/workflows/on-push-release.yml @@ -45,11 +45,3 @@ jobs: with: sanitize: thread server_version: main - - bench: - name: "Benchmark" - uses: ./.github/workflows/build-test.yml - with: - server_version: main - benchmark: ON - type: Release diff --git a/CMakeLists.txt b/CMakeLists.txt index c6bdc26cc..213314243 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -283,7 +283,6 @@ add_subdirectory(examples/getstarted) if(NATS_BUILD_STREAMING) add_subdirectory(examples/stan) endif() -add_subdirectory(bench) add_subdirectory(test) add_subdirectory(test/dylib) #---------------------------- diff --git a/README.md b/README.md index 83badc0c3..fa081026d 100644 --- a/README.md +++ b/README.md @@ -241,38 +241,15 @@ ctest -T memcheck -V -I 1,4 ``` The above command would run the tests with `valgrind` (`-T memcheck`), with verbose output (`-V`), and run the tests from 1 to 4 (`-I 1,4`). -If you add a test to `test/test.c`, you need to add it into the `allTests` array. Each entry contains a name, and the test function. You can add it anywhere into this array. -Build you changes: +If you add a test to `test/test.c`, you need to add it into the `list_test.txt` +file. Each entry contains just the test name, the function must be named +identically, with a `test_` prefix. The list is in alphabetical order, but it +does not need to be, you can add anywhere. -``` -make -[ 44%] Built target nats -[ 88%] Built target nats_static -[ 90%] Built target nats-publisher -[ 92%] Built target nats-queuegroup -[ 94%] Built target nats-replier -[ 96%] Built target nats-requestor -[ 98%] Built target nats-subscriber -Scanning dependencies of target testsuite -[100%] Building C object test/CMakeFiles/testsuite.dir/test.c.o -Linking C executable testsuite -[100%] Built target testsuite -``` - -Now regenerate the list by invoking the test suite without any argument: - -``` -./test/testsuite -Number of tests: 77 -``` - -This list the number of tests added to the file `list.txt`. Move this file to the source's test directory. - -``` -mv list.txt ../test/ -``` +If you are adding a benchmark, it should be added to the `list_bench.txt`. These +tests are labeled differently (`-L 'bench'`) and executed separately on CI. -Then, refresh the build: +You need to re-run `cmake` for the changes to take effect: ``` cmake .. diff --git a/bench/CMakeLists.txt b/bench/CMakeLists.txt deleted file mode 100644 index 47ef9a3dc..000000000 --- a/bench/CMakeLists.txt +++ /dev/null @@ -1,28 +0,0 @@ -if(NOT NATS_BUILD_LIB_STATIC) - MESSAGE(FATAL_ERROR - "Building tests require static library, or run CMake with -DBUILD_TESTING=OFF") - return() -endif() - -include_directories(${PROJECT_SOURCE_DIR}/src) -include_directories(${PROJECT_SOURCE_DIR}/bench) - -if(NATS_BUILD_WITH_TLS) - include_directories(${OPENSSL_INCLUDE_DIR}) -endif(NATS_BUILD_WITH_TLS) - -set(LIB_SUFFIX ${CMAKE_STATIC_LIBRARY_SUFFIX}) - -# Get all the .c files in the examples directory -file(GLOB BENCH_SOURCES RELATIVE ${PROJECT_SOURCE_DIR}/bench *.c) - -# For each file... -foreach(bsrc ${BENCH_SOURCES}) - # Remove the suffix so that it becomes the executable name - string(REPLACE ".c" "" bname ${bsrc}) - set(bexe "bench-${bname}") - - add_executable(${bexe} ${PROJECT_SOURCE_DIR}/bench/${bsrc}) - - target_link_libraries(${bexe} nats_static ${NATS_EXTRA_LIB} ${NATS_ASYNC_IO_LIB}) -endforeach() diff --git a/bench/diffstat.go b/bench/diffstat.go deleted file mode 100644 index 45b732ec4..000000000 --- a/bench/diffstat.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "encoding/csv" - "flag" - "fmt" - "log" - "os" - "strconv" -) - -func main() { - flag.Parse() - - if len(os.Args) != 3 { - log.Fatalf("usage: %s
", os.Args[0]) - } - - fmain, err := os.Open(os.Args[1]) - if err != nil { - log.Fatal(err) - } - main, err := csv.NewReader(fmain).ReadAll() - if err != nil { - log.Fatal(err) - } - - fbench, err := os.Open(os.Args[2]) - if err != nil { - log.Fatal(err) - } - bench, err := csv.NewReader(fbench).ReadAll() - if err != nil { - log.Fatal(err) - } - - if len(main) != len(bench) { - log.Fatalf("length mismatch: %d != %d", len(main), len(bench)) - } - - for i := range main { - if main[i][0] != bench[i][0] || main[i][1] != bench[i][1] { - log.Fatalf("key mismatch: %s %s != %s %s", main[i][0], main[i][1], bench[i][0], bench[i][1]) - } - - vmain, _ := strconv.Atoi(main[i][2]) - vbranch, _ := strconv.Atoi(bench[i][2]) - - fmt.Printf("%s,%s,%d,%02f%%\n", main[i][0], main[i][1], vbranch-vmain, float64(vbranch-vmain)/float64(vmain)*100) - } -} diff --git a/bench/sub-async.c b/bench/sub-async.c deleted file mode 100644 index df7037003..000000000 --- a/bench/sub-async.c +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2024 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "bench.h" - -#define MAX_SUBS 500 -int numSubs[] = {1, 2, 3, 5, 7, 11, 23, 43, 83, 163, 317, 499}; - -#define TOTAL_MESSAGES (500 * 1000) -#define REPEAT 5 - -typedef struct -{ - bool useGlobalDelivery; - int maxThreads; -} benchConfig; - -benchConfig configs[] = { - {false, 1}, // 1 is not used in this case, just to quiet nats_SetMessageDeliveryPoolSize - {true, 1}, - {true, 2}, - {true, 3}, - {true, 4}, - {true, 5}, - {true, 7}, - {true, 11}, - {true, 19}, - {true, 41}, - {true, 79}, - {true, 157}, - {true, 307}, -}; - -typedef struct -{ - natsSubscription *sub; - uint64_t sum; - uint64_t xor ; - uint64_t count; - int64_t closedTimestamp; -} subState; - -subState state[MAX_SUBS]; - -static uint64_t _expectedSum(int N) -{ - uint64_t sum = 0; - for (int64_t i = 0; i < N; i++) - sum += i; - return sum; -} - -static uint64_t _expectedXOR(int N) -{ - uint64_t xor = 0; - for (int64_t i = 0; i < N; i++) - xor ^= i; - return xor; -} - -static void -_onMessage(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) -{ - subState *s = (subState *)closure; - char buf[32]; - int len = natsMsg_GetDataLength(msg); - if (len > 31) - len = 31; - - strncpy(buf, natsMsg_GetData(msg), len); - buf[len] = '\0'; - int64_t val = atoi(buf); - - s->sum += val; - s->xor ^= val; - s->count++; - - natsMsg_Destroy(msg); -} - -static void -_onComplete(void *closure) -{ - subState *s = (subState *)closure; - s->closedTimestamp = nats_Now(); -} - -static natsStatus -_bench(benchConfig *c, int nSubs, int nMessages, int flushAfter, int *durMillis) -{ - natsConnection *conn = NULL; - natsOptions *opts = NULL; - uint64_t expectedSum = _expectedSum(nMessages); - uint64_t expectedXOR = _expectedXOR(nMessages); - char buf[16]; - int64_t start, end; - - memset(state, 0, sizeof(state)); - start = nats_Now(); - - natsStatus s = nats_Open(-1); - IFOK(s, natsOptions_Create(&opts)); - IFOK(s, nats_SetMessageDeliveryPoolSize(c->maxThreads)); - IFOK(s, natsOptions_SetErrorHandler(opts, asyncCb, NULL)); - IFOK(s, natsOptions_UseGlobalMessageDelivery(opts, c->useGlobalDelivery)); - - IFOK(s, natsConnection_Connect(&conn, opts)); - - for (int i = 0; i < nSubs; i++) - { - IFOK(s, natsConnection_Subscribe(&state[i].sub, conn, "foo", _onMessage, &state[i])); - IFOK(s, natsSubscription_SetPendingLimits(state[i].sub, -1, -1)); - IFOK(s, natsSubscription_AutoUnsubscribe(state[i].sub, nMessages)); - IFOK(s, natsSubscription_SetOnCompleteCB(state[i].sub, _onComplete, &state[i])); - } - - for (int i = 0; i < nMessages; i++) - { - snprintf(buf, sizeof(buf), "%d", i); - IFOK(s, natsConnection_PublishString(conn, "foo", buf)); - if ((i % flushAfter) == 0) - IFOK(s, natsConnection_Flush(conn)); - } - - while (s == NATS_OK) - { - bool done = true; - for (int i = 0; i < nSubs; i++) - { - // threads don't touch this, should be safe - if (natsSubscription_IsValid(state[i].sub)) - { - done = false; - break; - } - } - - nats_Sleep(10); - if (done) - break; - } - - end = 0; - if (s == NATS_OK) - { - for (int i = 0; i < nSubs; i++) - { - if (state[i].sum != expectedSum) - { - s = NATS_ERR; - fprintf(stderr, "Error: sum is %" PRId64 " for sub %d, expected %" PRId64 "\n", state[i].sum, i, expectedSum); - break; - } - if (state[i].xor != expectedXOR) - { - fprintf(stderr, "Error: xor is %" PRId64 " for sub %d, expected %" PRId64 "\n", state[i].xor, i, expectedXOR); - s = NATS_ERR; - break; - } - if ((int)(state[i].count) != nMessages) - { - fprintf(stderr, "Error: count is %" PRId64 " for sub %d, expected %d\n", state[i].count, i, nMessages); - s = NATS_ERR; - break; - } - - if (state[i].closedTimestamp > end) - end = state[i].closedTimestamp; - } - } - - // cleanup - for (int i = 0; i < nSubs; i++) - natsSubscription_Destroy(state[i].sub); - natsConnection_Destroy(conn); - natsOptions_Destroy(opts); - nats_CloseAndWait(0); - - *durMillis = (int)(end - start); - - return s; -} - -int main(void) -{ - int pid = _startServer("nats://127.0.0.1:4222", NULL, true); - CHECK_SERVER_STARTED(pid); - - for (benchConfig *c = configs; c < configs + sizeof(configs) / sizeof(configs[0]); c++) - { - for (int *n = numSubs; n < numSubs + sizeof(numSubs) / sizeof(numSubs[0]); n++) - { - natsStatus s = NATS_OK; - int nMessages = TOTAL_MESSAGES / *n; - int flushAfter = nMessages / *n / 10; - if (flushAfter < 5) - flushAfter = 5; - int durMillis = 0; - - for (int i = 0; i < REPEAT; i++) - { - s = _bench(c, *n, nMessages, flushAfter, &durMillis); - if (s != NATS_OK) - { - fprintf(stderr, "Error: %s\n", natsStatus_GetText(s)); - nats_PrintLastErrorStack(stderr); - exit(1); - } - } - - printf("%d,%d,%d\n", c->useGlobalDelivery ? c->maxThreads : -1, *n, durMillis / REPEAT); - fflush(stdout); - } - } - - _stopServer(pid); -} diff --git a/buildOnTravis.sh b/buildOnTravis.sh index f5af110a8..8bc7da678 100755 --- a/buildOnTravis.sh +++ b/buildOnTravis.sh @@ -75,7 +75,7 @@ fi export NATS_TEST_TRAVIS=yes echo "Using NATS server version: $NATS_TEST_SERVER_VERSION" -ctest --timeout 60 --output-on-failure $4 +ctest -L 'test' --timeout 60 --output-on-failure $4 res=$? if [ $res -ne 0 ]; then exit $res diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b8cf0f41d..11c2971e0 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -27,7 +27,7 @@ if(NATS_BUILD_STREAMING) endif(NATS_BUILD_STREAMING) # Build the test program -add_executable(testsuite test.c) +add_executable(testsuite test.c bench_sub_async.c) # Link statically with the library target_link_libraries(testsuite nats_static ${NATS_EXTRA_LIB}) @@ -45,19 +45,16 @@ endif() set(TEST_NAMES) foreach(LIST_FILE ${ALL_LISTS}) - # Read the file 'list.txt' to get all the test names + # Get all the test names if(EXISTS ${LIST_FILE}) file(STRINGS ${LIST_FILE} TEST_NAMES) else() set(TEST_NAMES) endif() - message("Test Names: ${TEST_NAMES}") - foreach(name ${TEST_NAMES}) # Remove the _test() prefix string(REGEX REPLACE "_test\\(([^)]+)\\)" "\\1" TEST_NAME ${name}) - message("Adding test: ${TEST_NAME}") add_test(NAME ${TEST_NAME} WORKING_DIRECTORY ${CMAKE_CURRENT_LIST_DIR} COMMAND testsuite ${TEST_NAME}) @@ -70,7 +67,7 @@ foreach(LIST_FILE ${ALL_LISTS}) # Set TSAN_OPTIONS for the test if(NATS_SANITIZE) - set_tests_properties(${name} PROPERTIES + set_tests_properties(${TEST_NAME} PROPERTIES ENVIRONMENT "TSAN_OPTIONS=detect_deadlocks=1:second_deadlock_stack=1:halt_on_error=1:report_signal_unsafe=1") endif(NATS_SANITIZE) diff --git a/test/bench_sub_async.c b/test/bench_sub_async.c new file mode 100644 index 000000000..b7c9f7584 --- /dev/null +++ b/test/bench_sub_async.c @@ -0,0 +1,500 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "test.h" +#include "sub.h" + +#define REPEAT 5 + +typedef struct __env ENV; + +typedef struct +{ + bool useGlobalDelivery; + int max; +} threadConfig; + +typedef struct +{ + natsSubscription *sub; + uint64_t sum; + uint64_t xor ; + uint64_t count; + int64_t closedTimestamp; + + ENV *env; +} subState; + +typedef natsStatus (*publishFunc)(natsConnection *nc, const char *subject, ENV *env); + +struct __env +{ + int numSubs; + threadConfig threads; + int numPubMessages; + + bool progressiveFlush; + publishFunc pubf; + int64_t delayNano; + + subState subs[1000]; // magic number is always enough. +}; + +static void _onMessage(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure); +static void _onError(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure); +static void _onComplete(void *closure); +static void _benchMatrix(threadConfig *threadsVector, int lent, int *subsVector, int lens, int numMessages, ENV *env); +static natsStatus _bench(ENV *env, int *best, int *avg, int *worst); +static natsStatus _publish(natsConnection *nc, const char *subject, ENV *env); +static natsStatus _inject(natsConnection *nc, const char *subject, ENV *env); +static natsStatus _enqueueToSub(natsSubscription *sub, natsMsg *m); +static uint64_t _expectedSum(int N); +static uint64_t _expectedXOR(int N); + +#define RUN_MATRIX(_threads, _subs, _messages, _env) _benchMatrix(_threads, sizeof(_threads) / sizeof(*_threads), _subs, sizeof(_subs) / sizeof(*_subs), _messages, _env) + +// This benchmark publishes messages ASAP (no rate limiting) and measures +// message delivery to a few subscribers. This approach does not work well for a +// large number of subscribers because the server will be overwhelmed having to +// send too many messages at once. +void test_BenchSubscribeAsync_Small(void) +{ + threadConfig threads[] = { + {false, 1}, // 1 is not used in this case, just to quiet nats_SetMessageDeliveryPoolSize + {true, 1}, + {true, 5}, + // These should show no material difference since no extra threads will be spun up + {true, 7}, + }; + + int subs[] = {1, 2, 3, 5}; + + ENV env = { + .pubf = _publish, + .progressiveFlush = false, + }; + RUN_MATRIX(threads, subs, 500 * 1000, &env); +} + +// This benchmark publishes messages ASAP (no rate limiting) and measures +// message delivery to a few subscribers. This approach does not work well for a +// large number of subscribers because the server will be overwhelmed having to +// send too many messages at once. +void test_BenchSubscribeAsync_Large(void) +{ + threadConfig threads[] = { + {false, 1}, // 1 is not used in this case, just to quiet nats_SetMessageDeliveryPoolSize + {true, 1}, + {true, 2}, + {true, 11}, + {true, 163}, // to compare to non-pooled + }; + + int subs[] = {23, 83, 163}; + + ENV env = { + .pubf = _publish, + .progressiveFlush = true, + }; + + RUN_MATRIX(threads, subs, 500 * 1000, &env); +} + +// This benchmark injects the messages directly into the relevant queue for +// delivery, bypassing the publish step. +void test_BenchSubscribeAsync_Inject(void) +{ + threadConfig threads[] = { + {false, 1}, // 1 is not used in this case, just to quiet nats_SetMessageDeliveryPoolSize + {true, 1}, + {true, 2}, + {true, 3}, + {true, 7}, + {true, 11}, + {true, 19}, + {true, 163}, + }; + + int subs[] = {1, 8, 23, 83, 163, 499}; + + ENV env = { + .pubf = _inject, + }; + + RUN_MATRIX(threads, subs, 1000 * 1000, &env); +} + +// This benchmark injects the messages directly into the relevant queue for +// delivery, bypassing the publish step. It uses a delay to simulate a slow-ish +// callback. +void test_BenchSubscribeAsync_InjectSlow(void) +{ +#ifdef _WIN32 + // This test relies on nanosleep, not sure what the Windows equivalent is. Skip fr now. + printf("Skipping BenchSubscribeAsync_InjectSlow on Windows\n"); + return; +#endif + + threadConfig threads[] = { + {false, 1}, // 1 is not used in this case, just to quiet nats_SetMessageDeliveryPoolSize + {true, 1}, + {true, 2}, + {true, 3}, + {true, 7}, + {true, 11}, + {true, 79}, + {true, 499}, + }; + + int subs[] = {1, 8, 12, 83, 163, 499}; + + ENV env = { + .pubf = _inject, + .delayNano = 10 * 1000, // 10µs + }; + + RUN_MATRIX(threads, subs, 20 * 1000, &env); +} + +static void _benchMatrix(threadConfig *threadsVector, int lent, int *subsVector, int lens, int NMessages, ENV *env) +{ + printf("[\n"); + for (int *sv = subsVector; sv < subsVector + lens; sv++) + { + int numSubs = *sv; + bool uselessFromHere = false; + int numPubMessages = NMessages / numSubs; + if (numPubMessages == 0) + numPubMessages = 1; + + for (threadConfig *tv = threadsVector; tv < threadsVector + lent; tv++) + { + natsStatus s = NATS_OK; + threadConfig threads = *tv; + int best = 0, average = 0, worst = 0; + + if (threads.useGlobalDelivery) + { + if (uselessFromHere) + continue; + if (threads.max > numSubs) + uselessFromHere = true; // execute this test, but a larger MaxThreads will not make a difference. + } + + env->numSubs = numSubs; + env->numPubMessages = numPubMessages; + env->threads = threads; + for (int i = 0; i < REPEAT; i++) + { + int b = 0, a = 0, w = 0; + s = _bench(env, &b, &a, &w); + if (s != NATS_OK) + { + fprintf(stderr, "Error: %s\n", natsStatus_GetText(s)); + nats_PrintLastErrorStack(stderr); + exit(1); + } + + if ((b < best) || (best == 0)) + best = b; + if (w > worst) + worst = w; + average += a; + } + average /= REPEAT; + + const char *comma = (sv == subsVector + lens - 1) && (tv == threadsVector + lent - 1) ? "" : ","; + printf("\t{\"subs\":%d, \"threads\":%d, \"messages\":%d, \"best\":%d, \"average\":%d, \"worst\":%d}%s\n", + numSubs, env->threads.useGlobalDelivery ? env->threads.max : 0, numPubMessages * numSubs, best, average, worst, comma); + fflush(stdout); + } + } + printf("]\n"); +} + +static natsStatus _bench(ENV *env, int *best, int *avg, int *worst) +{ + natsConnection *nc = NULL; + natsOptions *opts = NULL; + uint64_t expectedSum = _expectedSum(env->numPubMessages); + uint64_t expectedXOR = _expectedXOR(env->numPubMessages); + char subject[256]; + int64_t start, b, w, a; + + if (env->numSubs > 1000) // magic number check. + return NATS_INVALID_ARG; + memset(env->subs, 0, sizeof(subState) * 1000); + for (int i = 0; i < env->numSubs; i++) + env->subs[i].env = env; // set the environment to access it in the callbacks. + + int pid = _startServer("nats://127.0.0.1:4222", NULL, true); + if (pid == NATS_INVALID_PID) + return NATS_ERR; + + natsStatus s = nats_Open(-1); + IFOK(s, natsNUID_Next(subject, NUID_BUFFER_LEN + 1)); + IFOK(s, natsOptions_Create(&opts)); + IFOK(s, nats_SetMessageDeliveryPoolSize(env->threads.max)); + IFOK(s, natsOptions_SetErrorHandler(opts, _onError, NULL)); + IFOK(s, natsOptions_UseGlobalMessageDelivery(opts, env->threads.useGlobalDelivery)); + + IFOK(s, natsConnection_Connect(&nc, opts)); + + for (int i = 0; i < env->numSubs; i++) + { + IFOK(s, natsConnection_Subscribe(&(env->subs[i].sub), nc, subject, _onMessage, &env->subs[i])); + IFOK(s, natsSubscription_SetPendingLimits(env->subs[i].sub, -1, -1)); + IFOK(s, natsSubscription_AutoUnsubscribe(env->subs[i].sub, env->numPubMessages)); + IFOK(s, natsSubscription_SetOnCompleteCB(env->subs[i].sub, _onComplete, &env->subs[i])); + } + + start = nats_Now(); + + // Publish or inject the messages! + IFOK(s, env->pubf(nc, subject, env)); + + while (s == NATS_OK) + { + bool done = true; + for (int i = 0; i < env->numSubs; i++) + { + // threads don't touch this, should be safe + if (natsSubscription_IsValid(env->subs[i].sub)) + { + done = false; + break; + } + } + + nats_Sleep(10); + if (done) + break; + } + + b = w = a = 0; + if (s == NATS_OK) + { + for (int i = 0; i < env->numSubs; i++) + { + if (env->subs[i].sum != expectedSum) + { + s = NATS_ERR; + fprintf(stderr, "Error: sum is %" PRId64 " for sub %d, expected %" PRId64 "\n", env->subs[i].sum, i, expectedSum); + break; + } + if (env->subs[i].xor != expectedXOR) + { + fprintf(stderr, "Error: xor is %" PRId64 " for sub %d, expected %" PRId64 "\n", env->subs[i].xor, i, expectedXOR); + s = NATS_ERR; + break; + } + if ((int)(env->subs[i].count) != env->numPubMessages) + { + fprintf(stderr, "Error: count is %" PRId64 " for sub %d, expected %d\n", env->subs[i].count, i, env->numPubMessages); + s = NATS_ERR; + break; + } + + int64_t dur = env->subs[i].closedTimestamp - start; + if (dur > w) + w = dur; + if ((dur < b) || (b == 0)) + b = dur; + a += dur; + } + } + + // cleanup + for (int i = 0; i < env->numSubs; i++) + natsSubscription_Destroy(env->subs[i].sub); + natsConnection_Destroy(nc); + natsOptions_Destroy(opts); + nats_CloseAndWait(0); + + *best = (int)b; + *avg = (int)(a / env->numSubs); + *worst = (int)w; + + _stopServer(pid); + + return s; +} + +static void _onMessage(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) +{ + subState *ss = (subState *)closure; + +#ifndef _WIN32 + if (ss->env->delayNano > 0) + { + struct timespec wait = {0, ss->env->delayNano}; + nanosleep(&wait, NULL); + } +#endif + + char buf[32]; + int len = natsMsg_GetDataLength(msg); + if (len > 31) + len = 31; + + strncpy(buf, natsMsg_GetData(msg), len); + buf[len] = '\0'; + int64_t val = atoi(buf); + + ss->sum += val; + ss->xor ^= val; + ss->count++; + + natsMsg_Destroy(msg); +} + +static void _onComplete(void *closure) +{ + subState *ss = (subState *)closure; + ss->closedTimestamp = nats_Now(); +} + +static natsStatus _publish(natsConnection *nc, const char *subject, ENV *env) +{ + natsStatus s = NATS_OK; + char buf[16]; + + int flushAfter = env->progressiveFlush ? env->numPubMessages / (env->numSubs * 2) : // trigger + env->numPubMessages + 1; // do not trigger + for (int i = 0; i < env->numPubMessages; i++) + { + snprintf(buf, sizeof(buf), "%d", i); + IFOK(s, natsConnection_PublishString(nc, subject, buf)); + + if (((i != 0) && ((i % flushAfter) == 0)) || // progressive flush + (i == (env->numPubMessages - 1))) // last message in batch + { + IFOK(s, natsConnection_Flush(nc)); + } + } + + return s; +} + +static natsStatus _inject(natsConnection *nc, const char *subject, ENV *env) +{ + natsStatus s = NATS_OK; + natsMsg *m = NULL; + char buf[16]; + + for (int i = 0; i < env->numPubMessages; i++) + { + for (int n = 0; n < env->numSubs; n++) + { + snprintf(buf, sizeof(buf), "%d", i); + + s = natsMsg_Create(&m, subject, NULL, buf, strlen(buf)); + IFOK(s, _enqueueToSub(env->subs[n].sub, m)); + } + } + + return s; +} + +static natsStatus _enqueueToSub(natsSubscription *sub, natsMsg *m) +{ + natsStatus s = NATS_OK; + natsCondition *cond = NULL; + nats_MsgList *list = NULL; + natsMsgDlvWorker *ldw = NULL; + + // Pick condition variable and list based on if the sub is + // part of a global delivery thread pool or not. + // Note about `list`: this is used only to link messages, but + // sub->msgList needs to be used to update/check number of pending + // messages, since in case of delivery thread pool, `list` will have + // messages from many different subscriptions. + if ((ldw = sub->libDlvWorker) != NULL) + { + cond = ldw->cond; + list = &(ldw->msgList); + } + else + { + cond = sub->cond; + list = &(sub->msgList); + } + + natsSubAndLdw_LockAndRetain(sub); + + sub->msgList.msgs++; + sub->msgList.bytes += natsMsg_GetDataLength(m); + if (((sub->msgsLimit > 0) && (sub->msgList.msgs > sub->msgsLimit)) || ((sub->bytesLimit > 0) && (sub->msgList.bytes > sub->bytesLimit))) + { + natsMsg_Destroy(m); + + sub->dropped++; + sub->slowConsumer = true; + + // Undo stats from above. + sub->msgList.msgs--; + sub->msgList.bytes -= natsMsg_GetDataLength(m); + } + else + { + bool signal = false; + + if (sub->msgList.msgs > sub->msgsMax) + sub->msgsMax = sub->msgList.msgs; + + if (sub->msgList.bytes > sub->bytesMax) + sub->bytesMax = sub->msgList.bytes; + + sub->slowConsumer = false; + + m->sub = sub; + if (list->head == NULL) + { + list->head = m; + signal = true; + } + else + list->tail->next = m; + + list->tail = m; + + if (signal) + natsCondition_Signal(cond); + } + + natsSubAndLdw_UnlockAndRelease(sub); + return s; +} + +static uint64_t _expectedSum(int N) +{ + uint64_t sum = 0; + for (int64_t i = 0; i < N; i++) + sum += i; + return sum; +} + +static uint64_t _expectedXOR(int N) +{ + uint64_t xor = 0; + for (int64_t i = 0; i < N; i++) + xor ^= i; + return xor; +} + +static void _onError(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure) +{ + int64_t dropped = 0; + natsSubscription_GetDropped(sub, (int64_t *)&dropped); + printf("Async error: sid:%" PRId64 ", dropped:%" PRId64 ": %u - %s\n", sub->sid, dropped, err, natsStatus_GetText(err)); +} diff --git a/test/list.txt b/test/list.txt deleted file mode 100644 index 60be59288..000000000 --- a/test/list.txt +++ /dev/null @@ -1,301 +0,0 @@ -Version -VersionMatchesTag -OpenCloseAndWait -natsNowAndSleep -natsAllocSprintf -natsStrCaseStr -natsSnprintf -natsBuffer -natsParseInt64 -natsParseControl -natsNormalizeErr -natsMutex -natsThread -natsCondition -natsTimer -natsUrl -natsCreateStringFromBuffer -natsHash -natsHashing -natsStrHash -natsInbox -natsOptions -natsSock_ConnectTcp -natsSock_ShuffleIPs -natsSock_IPOrder -natsSock_ReadLine -natsJSON -natsEncodeTimeUTC -natsErrWithLongText -natsErrStackMoreThanMaxFrames -natsMsg -natsBase32 -natsBase64 -natsCRC16 -natsKeys -natsReadFile -natsGetJWTOrSeed -natsHostIsIP -natsWaitReady -natsSign -HeadersLift -HeadersAPIs -MsgIsJSControl -SrvVersionAtLeast -FormatStringArray -ReconnectServerStats -ParseStateReconnectFunctionality -ServersRandomize -SelectNextServer -ParserPing -ParserErr -ParserOK -ParseINFO -ParserShouldFail -ParserSplitMsg -ProcessMsgArgs -LibMsgDelivery -AsyncINFO -RequestPool -NoFlusherIfSendAsapOption -HeadersAndSubPendingBytes -DefaultConnection -SimplifiedURLs -IPResolutionOrder -UseDefaultURLIfNoServerSpecified -ConnectToWithMultipleURLs -ConnectionWithNULLOptions -ConnectionToWithNullURLs -ConnectionStatus -ConnClosedCB -CloseDisconnectedCB -ServerStopDisconnectedCB -ClosedConnections -ConnectVerboseOption -ReconnectThreadLeak -ReconnectTotalTime -ReconnectDisallowedFlags -ReconnectAllowedFlags -ConnCloseBreaksReconnectLoop -BasicReconnectFunctionality -ExtendedReconnectFunctionality -QueueSubsOnReconnect -IsClosed -IsReconnectingAndStatus -ReconnectBufSize -RetryOnFailedConnect -NoPartialOnReconnect -ReconnectFailsPendingRequests -ForcedReconnect -ErrOnConnectAndDeadlock -ErrOnMaxPayloadLimit -Auth -AuthFailNoDisconnectCB -AuthToken -AuthTokenHandler -PermViolation -AuthViolation -AuthenticationExpired -AuthenticationExpiredReconnect -ConnectedServer -MultipleClose -SimplePublish -SimplePublishNoData -PublishMsg -InvalidSubsArgs -AsyncSubscribe -AsyncSubscribeTimeout -SyncSubscribe -PubSubWithReply -NoResponders -Flush -ConnCloseDoesFlush -QueueSubscriber -ReplyArg -SyncReplyArg -Unsubscribe -DoubleUnsubscribe -SubRemovedWhileProcessingMsg -RequestTimeout -Request -RequestNoBody -RequestMuxWithMappedSubject -OldRequest -SimultaneousRequests -RequestClose -CustomInbox -MessagePadding -FlushInCb -ReleaseFlush -FlushErrOnDisconnect -Inbox -Stats -BadSubject -SubBadSubjectAndQueueNames -ClientAsyncAutoUnsub -ClientSyncAutoUnsub -ClientAutoUnsubAndReconnect -AutoUnsubNoUnsubOnDestroy -NextMsgOnClosedSub -CloseSubRelease -IsValidSubscriber -SlowSubscriber -SlowAsyncSubscriber -SlowConsumerCb -PendingLimitsDeliveredAndDropped -PendingLimitsWithSyncSub -AsyncSubscriptionPending -AsyncSubscriptionPendingDrain -SyncSubscriptionPending -SyncSubscriptionPendingDrain -AsyncErrHandlerMaxPendingMsgs -AsyncErrHandlerMaxPendingBytes -AsyncErrHandlerSubDestroyed -AsyncSubscriberStarvation -AsyncSubscriberOnClose -NextMsgCallOnAsyncSub -SubOnComplete -GetLastError -StaleConnection -ServerErrorClosesConnection -NoEcho -NoEchoOldServer -DrainSub -DrainSubStops -DrainSubRaceOnAutoUnsub -DrainSubNotResentOnReconnect -DrainConn -NoDoubleCloseCbOnDrain -GetClientID -GetClientIP -GetRTT -GetLocalIPAndPort -UserCredsCallbacks -UserCredsFromFiles -UserCredsFromMemory -NKey -NKeyFromSeed -ConnSign -WriteDeadline -HeadersNotSupported -HeadersBasic -MsgsFilter -EventLoop -EventLoopRetryOnFailedConnect -EventLoopTLS -SSLBasic -SSLVerify -SSLCAFromMemory -SSLCertAndKeyFromMemory -SSLVerifyHostname -SSLSkipServerVerification -SSLCiphers -SSLMultithreads -SSLConnectVerboseOption -SSLSocketLeakEventLoop -SSLReconnectWithAuthError -SSLAvailable -ServersOption -AuthServers -AuthFailToReconnect -ReconnectWithTokenHandler -BasicClusterReconnect -HotSpotReconnect -ProperReconnectDelay -ProperFalloutAfterMaxAttempts -StopReconnectAfterTwoAuthErr -TimeoutOnNoServer -PingReconnect -GetServers -GetDiscoveredServers -DiscoveredServersCb -IgnoreDiscoveredServers -INFOAfterFirstPONGisProcessedOK -ServerPoolUpdatedOnClusterUpdate -ReconnectJitter -CustomReconnectDelay -LameDuckMode -ReconnectImplicitUserInfo -JetStreamUnmarshalAccInfo -JetStreamUnmarshalStreamState -JetStreamUnmarshalStreamCfg -JetStreamUnmarshalStreamInfo -JetStreamMarshalStreamCfg -JetStreamUnmarshalConsumerInfo -JetStreamContext -JetStreamDomain -JetStreamMgtStreams -JetStreamMgtConsumers -JetStreamPublish -JetStreamPublishAsync -JetStreamPublishAckHandler -JetStreamSubscribe -JetStreamSubscribeSync -JetStreamSubscribeConfigCheck -JetStreamSubscribeIdleHeartbeat -JetStreamSubscribeFlowControl -JetStreamSubscribePull -JetStreamSubscribeHeadersOnly -JetStreamOrderedCons -JetStreamOrderedConsWithErrors -JetStreamOrderedConsAutoUnsub -JetStreamOrderedConsSrvRestart -JetStreamSubscribeWithFWC -JetStreamStreamsSealAndRollup -JetStreamGetMsgAndLastMsg -JetStreamConvertDirectMsg -JetStreamDirectGetMsg -JetStreamNakWithDelay -JetStreamBackOffRedeliveries -JetStreamInfoWithSubjects -JetStreamInfoAlternates -KeyValueManager -KeyValueBasics -KeyValueWatch -KeyValueWatchMulti -KeyValueHistory -KeyValueKeys -KeyValueDeleteVsPurge -KeyValueDeleteTombstones -KeyValueDeleteMarkerThreshold -KeyValueCrossAccount -KeyValueDiscardOldToNew -KeyValueRePublish -KeyValueMirrorDirectGet -KeyValueMirrorCrossDomains -MicroMatchEndpointSubject -MicroAddService -MicroGroups -MicroBasics -MicroStartStop -MicroServiceStopsOnClosedConn -MicroServiceStopsWhenServerStops -MicroAsyncErrorHandlerMaxPendingMsgs -MicroAsyncErrorHandlerMaxPendingBytes -StanPBufAllocator -StanConnOptions -StanSubOptions -StanMsg -StanServerNotReachable -StanBasicConnect -StanConnectError -StanBasicPublish -StanBasicPublishAsync -StanPublishTimeout -StanPublishMaxAcksInflight -StanBasicSubscription -StanSubscriptionCloseAndUnsub -StanDurableSubscription -StanBasicQueueSubscription -StanDurableQueueSubscription -StanCheckReceivedMsg -StanSubscriptionAckMsg -StanPings -StanPingsNoResponder -StanConnectionLostHandlerNotSet -StanPingsUnblockPublishCalls -StanGetNATSConnection -StanNoRetryOnFailedConnect -StanInternalSubsNotPooled -StanSubOnComplete -StanSubTimeout diff --git a/test/list_bench.txt b/test/list_bench.txt index e69de29bb..2c51bb260 100644 --- a/test/list_bench.txt +++ b/test/list_bench.txt @@ -0,0 +1,4 @@ +_test(BenchSubscribeAsync_Large) +_test(BenchSubscribeAsync_Small) +_test(BenchSubscribeAsync_Inject) +_test(BenchSubscribeAsync_InjectSlow) diff --git a/test/test.c b/test/test.c index d784f1f9d..af1e35842 100644 --- a/test/test.c +++ b/test/test.c @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "natsp.h" +#include "test.h" #include #include @@ -33,7 +33,6 @@ #include "sub.h" #include "msg.h" #include "stats.h" -#include "comsock.h" #include "crypto.h" #include "nkeys.h" #include "parser.h" @@ -69,35 +68,27 @@ testInfo allTests[] = static int tests = 0; -static bool failed = false; +bool failed = false; -static bool keepServerOutput = false; +bool keepServerOutput = false; static bool valgrind = false; static bool runOnTravis = false; -static const char *natsServerExe = "nats-server"; static const char *serverVersion = NULL; static const char *natsStreamingServerExe = "nats-streaming-server"; -static natsMutex *slMu = NULL; -static natsHash *slMap = NULL; +natsMutex *slMu = NULL; +natsHash *slMap = NULL; #define test(s) { printf("#%02d ", ++tests); printf("%s", (s)); fflush(stdout); } #ifdef _WIN32 -#define NATS_INVALID_PID (NULL) #define testCond(c) if(c) { printf("PASSED\n"); fflush(stdout); } else { printf("FAILED\n"); nats_PrintLastErrorStack(stdout); fflush(stdout); failed=true; return; } #define testCondNoReturn(c) if(c) { printf("PASSED\n"); fflush(stdout); } else { printf("FAILED\n"); nats_PrintLastErrorStack(stdout); fflush(stdout); failed=true; } -#define LOGFILE_NAME "wserver.log" #else -#define NATS_INVALID_PID (-1) #define testCond(c) if(c) { printf("\033[0;32mPASSED\033[0;0m\n"); fflush(stdout); } else { printf("\033[0;31mFAILED\033[0;0m\n"); nats_PrintLastErrorStack(stdout); fflush(stdout); failed=true; return; } #define testCondNoReturn(c) if(c) { printf("\033[0;32mPASSED\033[0;0m\n"); fflush(stdout); } else { printf("\033[0;31mFAILED\033[0;0m\n"); nats_PrintLastErrorStack(stdout); fflush(stdout); failed=true; } -#define LOGFILE_NAME "server.log" #endif -#define FAIL(m) { printf("@@ %s @@\n", (m)); failed=true; return; } - -#define CHECK_SERVER_STARTED(p) if ((p) == NATS_INVALID_PID) FAIL("Unable to start or verify that the server was started!") static const char *testServers[] = {"nats://127.0.0.1:1222", "nats://127.0.0.1:1223", @@ -107,11 +98,6 @@ static const char *testServers[] = {"nats://127.0.0.1:1222", "nats://127.0.0.1:1227", "nats://127.0.0.1:1228"}; -#if defined(NATS_HAS_STREAMING) -static const char *clusterName = "test-cluster"; -static const char *clientName = "client"; -#endif - // Forward declaration static void _startMockupServerThread(void *closure); static void _createConfFile(char *buf, int bufLen, const char *content); @@ -5514,323 +5500,6 @@ void test_natsFormatStringArray(void) NATS_FREE_STRINGS(out, N); } -static natsStatus -_checkStart(const char *url, int orderIP, int maxAttempts) -{ - natsStatus s = NATS_OK; - natsUrl *nUrl = NULL; - int attempts = 0; - natsSockCtx ctx; - - natsSock_Init(&ctx); - ctx.orderIP = orderIP; - - natsDeadline_Init(&(ctx.writeDeadline), 2000); - - s = natsUrl_Create(&nUrl, url); - if (s == NATS_OK) - { - while (((s = natsSock_ConnectTcp(&ctx, - nUrl->host, nUrl->port)) != NATS_OK) - && (attempts++ < maxAttempts)) - { - nats_Sleep(200); - } - - natsUrl_Destroy(nUrl); - - if (s == NATS_OK) - natsSock_Close(ctx.fd); - else - s = NATS_NO_SERVER; - } - - nats_clearLastError(); - - return s; -} - -static natsStatus -_checkStreamingStart(const char *url, int maxAttempts) -{ - natsStatus s = NATS_NOT_PERMITTED; - -#if defined(NATS_HAS_STREAMING) - - stanConnOptions *opts = NULL; - stanConnection *sc = NULL; - int attempts = 0; - - s = stanConnOptions_Create(&opts); - IFOK(s, stanConnOptions_SetURL(opts, url)); - IFOK(s, stanConnOptions_SetConnectionWait(opts, 250)); - if (s == NATS_OK) - { - while (((s = stanConnection_Connect(&sc, clusterName, "checkStart", opts)) != NATS_OK) - && (attempts++ < maxAttempts)) - { - nats_Sleep(200); - } - } - - stanConnection_Destroy(sc); - stanConnOptions_Destroy(opts); - - if (s != NATS_OK) - nats_clearLastError(); -#else -#endif - return s; -} - -#ifdef _WIN32 - -typedef PROCESS_INFORMATION *natsPid; - -static HANDLE logHandle = NULL; - -static void -_stopServer(natsPid pid) -{ - if (pid == NATS_INVALID_PID) - return; - - TerminateProcess(pid->hProcess, 0); - WaitForSingleObject(pid->hProcess, INFINITE); - - CloseHandle(pid->hProcess); - CloseHandle(pid->hThread); - - natsMutex_Lock(slMu); - if (slMap != NULL) - natsHash_Remove(slMap, (int64_t) pid); - natsMutex_Unlock(slMu); - - free(pid); -} - -static natsPid -_startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts, bool checkStart) -{ - SECURITY_ATTRIBUTES sa; - STARTUPINFO si; - HANDLE h; - PROCESS_INFORMATION *pid; - DWORD flags = 0; - BOOL createdOk = FALSE; - BOOL hInheritance = FALSE; - char *exeAndCmdLine = NULL; - int ret; - - pid = calloc(1, sizeof(PROCESS_INFORMATION)); - if (pid == NULL) - return NATS_INVALID_PID; - - ZeroMemory(&si, sizeof(si)); - si.cb = sizeof(si); - - ret = nats_asprintf(&exeAndCmdLine, "%s%s%s", serverExe, - (cmdLineOpts != NULL ? " " : ""), - (cmdLineOpts != NULL ? cmdLineOpts : "")); - if (ret < 0) - { - printf("No memory allocating command line string!\n"); - free(pid); - return NATS_INVALID_PID; - } - - if (!keepServerOutput) - { - ZeroMemory(&sa, sizeof(sa)); - sa.nLength = sizeof(sa); - sa.lpSecurityDescriptor = NULL; - sa.bInheritHandle = TRUE; - - h = logHandle; - if (h == NULL) - { - h = CreateFile(LOGFILE_NAME, - GENERIC_WRITE, - FILE_SHARE_WRITE | FILE_SHARE_READ, - &sa, - CREATE_ALWAYS, - FILE_ATTRIBUTE_NORMAL, - NULL); - } - - si.dwFlags |= STARTF_USESTDHANDLES; - si.hStdInput = NULL; - si.hStdError = h; - si.hStdOutput = h; - - hInheritance = TRUE; - flags = CREATE_NO_WINDOW; - - if (logHandle == NULL) - logHandle = h; - } - - // Start the child process. - if (!CreateProcess(NULL, - (LPSTR) exeAndCmdLine, - NULL, // Process handle not inheritable - NULL, // Thread handle not inheritable - hInheritance, // Set handle inheritance - flags, // Creation flags - NULL, // Use parent's environment block - NULL, // Use parent's starting directory - &si, // Pointer to STARTUPINFO structure - pid)) // Pointer to PROCESS_INFORMATION structure - { - - printf("Unable to start '%s': error (%d).\n", - exeAndCmdLine, GetLastError()); - free(exeAndCmdLine); - return NATS_INVALID_PID; - } - - free(exeAndCmdLine); - - if (checkStart) - { - natsStatus s; - - if (strcmp(serverExe, natsServerExe) == 0) - s = _checkStart(url, 46, 10); - else - s = _checkStreamingStart(url, 10); - - if (s != NATS_OK) - { - _stopServer(pid); - return NATS_INVALID_PID; - } - } - - natsMutex_Lock(slMu); - if (slMap != NULL) - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); - natsMutex_Unlock(slMu); - - return (natsPid) pid; -} - -#else - -typedef pid_t natsPid; - -static void -_stopServer(natsPid pid) -{ - int status = 0; - - if (pid == NATS_INVALID_PID) - return; - - if (kill(pid, SIGINT) < 0) - { - perror("kill with SIGINT"); - if (kill(pid, SIGKILL) < 0) - { - perror("kill with SIGKILL"); - } - } - - waitpid(pid, &status, 0); - - natsMutex_Lock(slMu); - if (slMap != NULL) - natsHash_Remove(slMap, (int64_t) pid); - natsMutex_Unlock(slMu); -} - -static natsPid -_startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts, bool checkStart) -{ - natsPid pid = fork(); - if (pid == -1) - { - perror("fork"); - return NATS_INVALID_PID; - } - - if (pid == 0) - { - char *exeAndCmdLine = NULL; - char *argvPtrs[64]; - char *line = NULL; - int index = 0; - int ret = 0; - bool overrideAddr = false; - - if ((cmdLineOpts == NULL) || (strstr(cmdLineOpts, "-a ") == NULL)) - overrideAddr = true; - - ret = nats_asprintf(&exeAndCmdLine, "%s%s%s%s%s", serverExe, - (cmdLineOpts != NULL ? " " : ""), - (cmdLineOpts != NULL ? cmdLineOpts : ""), - (overrideAddr ? " -a 127.0.0.1" : ""), - (keepServerOutput ? "" : " -l " LOGFILE_NAME)); - if (ret < 0) - { - perror("No memory allocating command line string!\n"); - exit(1); - } - - memset(argvPtrs, 0, sizeof(argvPtrs)); - line = exeAndCmdLine; - - while (*line != '\0') - { - while ((*line == ' ') || (*line == '\t') || (*line == '\n')) - *line++ = '\0'; - - argvPtrs[index++] = line; - while ((*line != '\0') && (*line != ' ') - && (*line != '\t') && (*line != '\n')) - { - line++; - } - } - argvPtrs[index++] = NULL; - - // Child process. Replace with NATS server - execvp(argvPtrs[0], argvPtrs); - perror("Exec failed: "); - exit(1); - } - else if (checkStart) - { - natsStatus s; - - if (strcmp(serverExe, natsServerExe) == 0) - s = _checkStart(url, 46, 10); - else - s = _checkStreamingStart(url, 10); - - if (s != NATS_OK) - { - _stopServer(pid); - return NATS_INVALID_PID; - } - } - - natsMutex_Lock(slMu); - if (slMap != NULL) - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); - natsMutex_Unlock(slMu); - - // parent, return the child's PID back. - return pid; -} -#endif - -static natsPid -_startServer(const char *url, const char *cmdLineOpts, bool checkStart) -{ - return _startServerImpl(natsServerExe, url, cmdLineOpts, checkStart); -} - static natsPid _startStreamingServer(const char* url, const char *cmdLineOpts, bool checkStart) { @@ -19423,7 +19092,7 @@ void test_UserCredsFromFiles(void) // Use a file that contains no userJWT.. test("UserOrChainedFile has no JWT: "); - s = natsOptions_SetUserCredentialsFromFiles(opts, "list.txt", NULL); + s = natsOptions_SetUserCredentialsFromFiles(opts, "list_test.txt", NULL); IFOK(s, natsConnection_Connect(&nc, opts)); // Since we return the whole content of the file when we don't find // the key for the user, but we don't for seed, the error we'll get @@ -19439,7 +19108,7 @@ void test_UserCredsFromFiles(void) // Use a seed file that contains no seed.. test("SeedFile has no seed: "); - s = natsOptions_SetUserCredentialsFromFiles(opts, ucfn, "list.txt"); + s = natsOptions_SetUserCredentialsFromFiles(opts, ucfn, "list_test.txt"); IFOK(s, natsConnection_Connect(&nc, opts)); testCond((s == NATS_ERR) && (strstr(nats_GetLastError(NULL), "no nkey user seed found") != NULL)); diff --git a/bench/bench.h b/test/test.h similarity index 66% rename from bench/bench.h rename to test/test.h index e7afcc824..f2cd60213 100644 --- a/bench/bench.h +++ b/test/test.h @@ -11,21 +11,72 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef __BENCH_H -#define __BENCH_H - -#include "nats.h" +#include "natsp.h" #include "comsock.h" +#if defined(NATS_HAS_STREAMING) +static const char *clusterName = "test-cluster"; +static const char *clientName = "client"; +#endif + +#ifdef _WIN32 +#define NATS_INVALID_PID (NULL) +#define LOGFILE_NAME "wserver.log" +#else +#define NATS_INVALID_PID (-1) +#define LOGFILE_NAME "server.log" +#endif + +#define FAIL(m) \ + { \ + printf("@@ %s @@\n", (m)); \ + failed = true; \ + return; \ + } + +#define CHECK_SERVER_STARTED(p) \ + if ((p) == NATS_INVALID_PID) \ + FAIL("Unable to start or verify that the server was started!") + +extern natsMutex *slMu; +extern natsHash *slMap; +extern bool keepServerOutput; +extern bool failed; + static const char *natsServerExe = "nats-server"; -#define CHECK_SERVER_STARTED(p) \ - if ((p) == NATS_INVALID_PID) \ - { \ - fprintf(stderr, "Unable to start or verify that the server was started!"); \ - exit(1); \ +static natsStatus +_checkStreamingStart(const char *url, int maxAttempts) +{ + natsStatus s = NATS_NOT_PERMITTED; + +#if defined(NATS_HAS_STREAMING) + + stanConnOptions *opts = NULL; + stanConnection *sc = NULL; + int attempts = 0; + + s = stanConnOptions_Create(&opts); + IFOK(s, stanConnOptions_SetURL(opts, url)); + IFOK(s, stanConnOptions_SetConnectionWait(opts, 250)); + if (s == NATS_OK) + { + while (((s = stanConnection_Connect(&sc, clusterName, "checkStart", opts)) != NATS_OK) && (attempts++ < maxAttempts)) + { + nats_Sleep(200); + } } + stanConnection_Destroy(sc); + stanConnOptions_Destroy(opts); + + if (s != NATS_OK) + nats_clearLastError(); +#else +#endif + return s; +} + static natsStatus _checkStart(const char *url, int orderIP, int maxAttempts) { @@ -62,14 +113,6 @@ _checkStart(const char *url, int orderIP, int maxAttempts) return s; } -#ifdef _WIN32 -#define NATS_INVALID_PID (NULL) -#define LOGFILE_NAME "wserver.log" -#else -#define NATS_INVALID_PID (-1) -#define LOGFILE_NAME "server.log" -#endif - #ifdef _WIN32 typedef PROCESS_INFORMATION *natsPid; @@ -88,6 +131,11 @@ _stopServer(natsPid pid) CloseHandle(pid->hProcess); CloseHandle(pid->hThread); + natsMutex_Lock(slMu); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t)pid); + natsMutex_Unlock(slMu); + free(pid); } @@ -121,6 +169,37 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts return NATS_INVALID_PID; } + if (!keepServerOutput) + { + ZeroMemory(&sa, sizeof(sa)); + sa.nLength = sizeof(sa); + sa.lpSecurityDescriptor = NULL; + sa.bInheritHandle = TRUE; + + h = logHandle; + if (h == NULL) + { + h = CreateFile(LOGFILE_NAME, + GENERIC_WRITE, + FILE_SHARE_WRITE | FILE_SHARE_READ, + &sa, + CREATE_ALWAYS, + FILE_ATTRIBUTE_NORMAL, + NULL); + } + + si.dwFlags |= STARTF_USESTDHANDLES; + si.hStdInput = NULL; + si.hStdError = h; + si.hStdOutput = h; + + hInheritance = TRUE; + flags = CREATE_NO_WINDOW; + + if (logHandle == NULL) + logHandle = h; + } + // Start the child process. if (!CreateProcess(NULL, (LPSTR)exeAndCmdLine, @@ -146,7 +225,11 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts { natsStatus s; - s = _checkStart(url, 46, 10); + if (strcmp(serverExe, natsServerExe) == 0) + s = _checkStart(url, 46, 10); + else + s = _checkStreamingStart(url, 10); + if (s != NATS_OK) { _stopServer(pid); @@ -154,6 +237,11 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } } + natsMutex_Lock(slMu); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t)pid, NULL, NULL); + natsMutex_Unlock(slMu); + return (natsPid)pid; } @@ -179,6 +267,11 @@ _stopServer(natsPid pid) } waitpid(pid, &status, 0); + + natsMutex_Lock(slMu); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t)pid); + natsMutex_Unlock(slMu); } static natsPid @@ -203,10 +296,11 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts if ((cmdLineOpts == NULL) || (strstr(cmdLineOpts, "-a ") == NULL)) overrideAddr = true; - ret = nats_asprintf(&exeAndCmdLine, "%s%s%s%s", serverExe, + ret = nats_asprintf(&exeAndCmdLine, "%s%s%s%s%s", serverExe, (cmdLineOpts != NULL ? " " : ""), (cmdLineOpts != NULL ? cmdLineOpts : ""), - (overrideAddr ? " -a 127.0.0.1" : "")); + (overrideAddr ? " -a 127.0.0.1" : ""), + (keepServerOutput ? "" : " -l " LOGFILE_NAME)); if (ret < 0) { perror("No memory allocating command line string!\n"); @@ -238,7 +332,11 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts { natsStatus s; - s = _checkStart(url, 46, 10); + if (strcmp(serverExe, natsServerExe) == 0) + s = _checkStart(url, 46, 10); + else + s = _checkStreamingStart(url, 10); + if (s != NATS_OK) { _stopServer(pid); @@ -246,6 +344,11 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } } + natsMutex_Lock(slMu); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t)pid, NULL, NULL); + natsMutex_Unlock(slMu); + // parent, return the child's PID back. return pid; } @@ -257,12 +360,3 @@ _startServer(const char *url, const char *cmdLineOpts, bool checkStart) return _startServerImpl(natsServerExe, url, cmdLineOpts, checkStart); } -static void -asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure) -{ - int64_t dropped = 0; - natsSubscription_GetDropped(sub, (int64_t *)&dropped); - printf("Async error: sid:%" PRId64 ", dropped:%" PRId64 ": %u - %s\n", sub->sid, dropped, err, natsStatus_GetText(err)); -} - -#endif // __BENCH_H