Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

benchmark for SubscribeAsync #774

Merged
merged 13 commits into from
Jul 27, 2024
17 changes: 15 additions & 2 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ on:
default: "OFF"
verbose_make_output:
type: string
default: "ON"
default: "OFF"
benchmark:
type: string
default: "OFF"

secrets:
CODECOV_TOKEN:
description: "Codecov repo token"
Expand Down Expand Up @@ -168,6 +172,7 @@ jobs:
fi

- name: "Test"
if: inputs.benchmark == 'OFF'
working-directory: ./build
run: |
export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH
Expand All @@ -183,4 +188,12 @@ jobs:
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}
# verbose: true
# verbose: true

- name: "Benchmark"
if: inputs.benchmark == 'ON'
working-directory: ./build
run: |
export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
./bin/bench-sub-async
8 changes: 8 additions & 0 deletions .github/workflows/on-push-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,11 @@ jobs:
with:
sanitize: thread
server_version: main

bench:
name: "Benchmark"
uses: ./.github/workflows/build-test.yml
with:
server_version: main
benchmark: ON
type: Release
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ add_subdirectory(examples/getstarted)
if(NATS_BUILD_STREAMING)
add_subdirectory(examples/stan)
endif()
add_subdirectory(bench)
add_subdirectory(test)
add_subdirectory(test/dylib)
#----------------------------
28 changes: 28 additions & 0 deletions bench/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
if(NOT NATS_BUILD_LIB_STATIC)
MESSAGE(FATAL_ERROR
"Building tests require static library, or run CMake with -DBUILD_TESTING=OFF")
return()
endif()

include_directories(${PROJECT_SOURCE_DIR}/src)
include_directories(${PROJECT_SOURCE_DIR}/bench)

if(NATS_BUILD_WITH_TLS)
include_directories(${OPENSSL_INCLUDE_DIR})
endif(NATS_BUILD_WITH_TLS)

set(LIB_SUFFIX ${CMAKE_STATIC_LIBRARY_SUFFIX})

# Get all the .c files in the examples directory
file(GLOB BENCH_SOURCES RELATIVE ${PROJECT_SOURCE_DIR}/bench *.c)

# For each file...
foreach(bsrc ${BENCH_SOURCES})
# Remove the suffix so that it becomes the executable name
string(REPLACE ".c" "" bname ${bsrc})
set(bexe "bench-${bname}")

add_executable(${bexe} ${PROJECT_SOURCE_DIR}/bench/${bsrc})

target_link_libraries(${bexe} nats_static ${NATS_EXTRA_LIB} ${NATS_ASYNC_IO_LIB})
endforeach()
270 changes: 270 additions & 0 deletions bench/bench.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef __BENCH_H
#define __BENCH_H

#include <limits.h>

#include "nats.h"
#include "comsock.h"

static const char *natsServerExe = "nats-server";

#define CHECK_SERVER_STARTED(p) \
if ((p) == NATS_INVALID_PID) \
{ \
fprintf(stderr, "Unable to start or verify that the server was started!"); \
exit(1); \
}

static natsStatus
_checkStart(const char *url, int orderIP, int maxAttempts)

Check warning on line 32 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L32

Added line #L32 was not covered by tests
{
natsStatus s = NATS_OK;
natsUrl *nUrl = NULL;
int attempts = 0;

Check warning on line 36 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L34-L36

Added lines #L34 - L36 were not covered by tests
natsSockCtx ctx;

natsSock_Init(&ctx);
ctx.orderIP = orderIP;

Check warning on line 40 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L39-L40

Added lines #L39 - L40 were not covered by tests

natsDeadline_Init(&(ctx.writeDeadline), 2000);

Check warning on line 42 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L42

Added line #L42 was not covered by tests

s = natsUrl_Create(&nUrl, url);

Check warning on line 44 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L44

Added line #L44 was not covered by tests
if (s == NATS_OK)
{
while (((s = natsSock_ConnectTcp(&ctx,
nUrl->host, nUrl->port)) != NATS_OK) &&
(attempts++ < maxAttempts))

Check warning on line 49 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L47-L49

Added lines #L47 - L49 were not covered by tests
{
nats_Sleep(200);

Check warning on line 51 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L51

Added line #L51 was not covered by tests
}

natsUrl_Destroy(nUrl);

Check warning on line 54 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L54

Added line #L54 was not covered by tests

if (s == NATS_OK)
natsSock_Close(ctx.fd);

Check warning on line 57 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L57

Added line #L57 was not covered by tests
else
s = NATS_NO_SERVER;

Check warning on line 59 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L59

Added line #L59 was not covered by tests
}

nats_clearLastError();

Check warning on line 62 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L62

Added line #L62 was not covered by tests

return s;

Check warning on line 64 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L64

Added line #L64 was not covered by tests
}

#ifdef _WIN32
#define NATS_INVALID_PID (NULL)
#define LOGFILE_NAME "wserver.log"
#else
#define NATS_INVALID_PID (-1)
#define LOGFILE_NAME "server.log"
#endif

#ifdef _WIN32

typedef PROCESS_INFORMATION *natsPid;

static HANDLE logHandle = NULL;

static void
_stopServer(natsPid pid)
{
if (pid == NATS_INVALID_PID)
return;

TerminateProcess(pid->hProcess, 0);
WaitForSingleObject(pid->hProcess, INFINITE);

CloseHandle(pid->hProcess);
CloseHandle(pid->hThread);

free(pid);
}

static natsPid
_startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts, bool checkStart)
{
SECURITY_ATTRIBUTES sa;
STARTUPINFO si;
HANDLE h;
PROCESS_INFORMATION *pid;
DWORD flags = 0;
BOOL createdOk = FALSE;
BOOL hInheritance = FALSE;
char *exeAndCmdLine = NULL;
int ret;

pid = calloc(1, sizeof(PROCESS_INFORMATION));
if (pid == NULL)
return NATS_INVALID_PID;

ZeroMemory(&si, sizeof(si));
si.cb = sizeof(si);

ret = nats_asprintf(&exeAndCmdLine, "%s%s%s", serverExe,
(cmdLineOpts != NULL ? " " : ""),
(cmdLineOpts != NULL ? cmdLineOpts : ""));
if (ret < 0)
{
printf("No memory allocating command line string!\n");
free(pid);
return NATS_INVALID_PID;
}

// Start the child process.
if (!CreateProcess(NULL,
(LPSTR)exeAndCmdLine,
NULL, // Process handle not inheritable
NULL, // Thread handle not inheritable
hInheritance, // Set handle inheritance
flags, // Creation flags
NULL, // Use parent's environment block
NULL, // Use parent's starting directory
&si, // Pointer to STARTUPINFO structure
pid)) // Pointer to PROCESS_INFORMATION structure
{

printf("Unable to start '%s': error (%d).\n",
exeAndCmdLine, GetLastError());
free(exeAndCmdLine);
return NATS_INVALID_PID;
}

free(exeAndCmdLine);

if (checkStart)
{
natsStatus s;

s = _checkStart(url, 46, 10);
if (s != NATS_OK)
{
_stopServer(pid);
return NATS_INVALID_PID;
}
}

return (natsPid)pid;
}

#else

typedef pid_t natsPid;

static void
_stopServer(natsPid pid)

Check warning on line 167 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L167

Added line #L167 was not covered by tests
levb marked this conversation as resolved.
Show resolved Hide resolved
{
int status = 0;

Check warning on line 169 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L169

Added line #L169 was not covered by tests

if (pid == NATS_INVALID_PID)
return;

Check warning on line 172 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L172

Added line #L172 was not covered by tests

if (kill(pid, SIGINT) < 0)
{
perror("kill with SIGINT");

Check warning on line 176 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L176

Added line #L176 was not covered by tests
if (kill(pid, SIGKILL) < 0)
{
perror("kill with SIGKILL");

Check warning on line 179 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L179

Added line #L179 was not covered by tests
}
}

waitpid(pid, &status, 0);

Check warning on line 183 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L183

Added line #L183 was not covered by tests
}

static natsPid
_startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts, bool checkStart)

Check warning on line 187 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L187

Added line #L187 was not covered by tests
{
natsPid pid = fork();

Check warning on line 189 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L189

Added line #L189 was not covered by tests
if (pid == -1)
{
perror("fork");
return NATS_INVALID_PID;

Check warning on line 193 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L192-L193

Added lines #L192 - L193 were not covered by tests
}

if (pid == 0)
{
char *exeAndCmdLine = NULL;

Check warning on line 198 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L198

Added line #L198 was not covered by tests
char *argvPtrs[64];
char *line = NULL;
int index = 0;
int ret = 0;
bool overrideAddr = false;

Check warning on line 203 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L200-L203

Added lines #L200 - L203 were not covered by tests

if ((cmdLineOpts == NULL) || (strstr(cmdLineOpts, "-a ") == NULL))
overrideAddr = true;

Check warning on line 206 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L206

Added line #L206 was not covered by tests

ret = nats_asprintf(&exeAndCmdLine, "%s%s%s%s", serverExe,

Check warning on line 208 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L208

Added line #L208 was not covered by tests
(cmdLineOpts != NULL ? " " : ""),
(cmdLineOpts != NULL ? cmdLineOpts : ""),
(overrideAddr ? " -a 127.0.0.1" : ""));
if (ret < 0)
{
perror("No memory allocating command line string!\n");
exit(1);

Check warning on line 215 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L214-L215

Added lines #L214 - L215 were not covered by tests
}

memset(argvPtrs, 0, sizeof(argvPtrs));
line = exeAndCmdLine;

Check warning on line 219 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L218-L219

Added lines #L218 - L219 were not covered by tests

while (*line != '\0')
{
while ((*line == ' ') || (*line == '\t') || (*line == '\n'))
*line++ = '\0';

Check warning on line 224 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L224

Added line #L224 was not covered by tests

argvPtrs[index++] = line;

Check warning on line 226 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L226

Added line #L226 was not covered by tests
while ((*line != '\0') && (*line != ' ') && (*line != '\t') && (*line != '\n'))
{
line++;

Check warning on line 229 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L229

Added line #L229 was not covered by tests
}
}
argvPtrs[index++] = NULL;

Check warning on line 232 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L232

Added line #L232 was not covered by tests

// Child process. Replace with NATS server
execvp(argvPtrs[0], argvPtrs);
perror("Exec failed: ");
exit(1);

Check warning on line 237 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L235-L237

Added lines #L235 - L237 were not covered by tests
}
else if (checkStart)

Check warning on line 239 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L239

Added line #L239 was not covered by tests
{
natsStatus s;

s = _checkStart(url, 46, 10);

Check warning on line 243 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L243

Added line #L243 was not covered by tests
if (s != NATS_OK)
{
_stopServer(pid);
return NATS_INVALID_PID;

Check warning on line 247 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L246-L247

Added lines #L246 - L247 were not covered by tests
}
}

// parent, return the child's PID back.
return pid;

Check warning on line 252 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L252

Added line #L252 was not covered by tests
}
#endif

static natsPid
_startServer(const char *url, const char *cmdLineOpts, bool checkStart)

Check warning on line 257 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L257

Added line #L257 was not covered by tests
{
return _startServerImpl(natsServerExe, url, cmdLineOpts, checkStart);

Check warning on line 259 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L259

Added line #L259 was not covered by tests
}

static void
asyncCb(natsConnection *nc, natsSubscription *sub, natsStatus err, void *closure)

Check warning on line 263 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L263

Added line #L263 was not covered by tests
{
int64_t dropped = 0;
natsSubscription_GetDropped(sub, (int64_t *)&dropped);
printf("Async error: sid:%" PRId64 ", dropped:%" PRId64 ": %u - %s\n", sub->sid, dropped, err, natsStatus_GetText(err));

Check warning on line 267 in bench/bench.h

View check run for this annotation

Codecov / codecov/patch

bench/bench.h#L265-L267

Added lines #L265 - L267 were not covered by tests
}

#endif // __BENCH_H
Loading
Loading