Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] refactored nats.c, prep for js_PullSubscribeAsync #778

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
# verbose: true

- name: "Download benchmark results from ${{ github.event.pull_request.base.ref }}"
- name: "Cache for base benchmark for ${{ github.event.pull_request.base.ref }}"
id: cache-base-bench
if: inputs.benchmark == 'ON' && github.event.pull_request.base.ref
uses: actions/download-artifact@v2
uses: actions/cache@v4
with:
name: benchmark_results_${{ github.event.pull_request.base.ref }}
path: ./build/prev_bench.log
continue-on-error: true
key: bench-${{ github.event.pull_request.base.ref }}
path: ./build/bench-${{ github.event.pull_request.base.ref }}.log

- name: "Benchmark"
if: inputs.benchmark == 'ON'
Expand All @@ -197,24 +197,35 @@ jobs:
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
flags=""
ctest -L 'bench' --timeout 600 -VV | tee bench.log
#
# ...coming: compare to base branch

- name: "Upload benchmark result for PR ${{ github.event.pull_request.head.ref }}"
if: inputs.benchmark == 'ON' && github.event.pull_request.head.ref
uses: actions/upload-artifact@v4
- name: "Checkout nats.c for ${{ github.event.pull_request.base.ref }}"
if: inputs.benchmark == 'ON' && github.event.pull_request.base.ref && steps.cache-base-bench.outputs.cache-hit != 'true'
uses: actions/checkout@v4
with:
name: benchmark_results_${{ github.event.pull_request.head.ref }}
path: ./build/bench.log
ref: ${{ github.event.pull_request.base.ref }}
clean: false

- name: Extract branch name
if: inputs.benchmark == 'ON' && !github.event.pull_request.head.ref
id: extract_branch
run: echo "BRANCH_NAME=${GITHUB_REF#refs/heads/}" >> $GITHUB_ENV
- name: "Benchmark ${{ github.event.pull_request.base.ref }} for comparison"
if: inputs.benchmark == 'ON' && github.event.pull_request.base.ref && steps.cache-base-bench.outputs.cache-hit != 'true'
run: |
mkdir -p build
cd build
rm -rf CMakeFiles CMakeCache.txt
cmake .. ${{ steps.cmake-flags.outputs.flags }}
make rebuild_cache && make
export PATH=../deps/nats-server:../deps/nats-streaming-server:$PATH
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
flags=""
ctest -L 'bench' --timeout 600 -VV | tee bench-${{ github.event.pull_request.base.ref }}.log

- name: "Upload benchmark result for branch ${{ env.BRANCH_NAME }}"
if: inputs.benchmark == 'ON' && !github.event.pull_request.head.ref
uses: actions/upload-artifact@v4
- name: "Checkout HEAD ${{ github.event.pull_request.head.ref }}"
if: inputs.benchmark == 'ON' && github.event.pull_request.head.ref && steps.cache-base-bench.outputs.cache-hit != 'true'
uses: actions/checkout@v4
with:
name: benchmark_results_${{ env.BRANCH_NAME }}
path: ./build/bench.log
clean: false

- name: "Compare benchmark to ${{ github.event.pull_request.base.ref }}"
if: inputs.benchmark == 'ON' && github.event.pull_request.base.ref
run: |
cd build
go run ../test/diffstat_sub_async.go bench-${{ github.event.pull_request.base.ref }}.log bench.log
23 changes: 0 additions & 23 deletions .github/workflows/on-push-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,3 @@ jobs:
server_version: main
ubuntu_version: ${{ matrix.ubuntu_version }}
compiler: ${{ matrix.compiler }}

levb marked this conversation as resolved.
Show resolved Hide resolved
dev-mode:
name: "DEV_MODE"
uses: ./.github/workflows/build-test.yml
with:
dev_mode: ON
server_version: main
verbose_test_output: ON
verbose_make_output: ON

sanitize-addr:
name: "Sanitize address"
uses: ./.github/workflows/build-test.yml
with:
sanitize: address
server_version: main

san-thread:
name: "Sanitize thread"
uses: ./.github/workflows/build-test.yml
with:
sanitize: thread
server_version: main
6 changes: 4 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#------------------------
include_directories(include)
include_directories(${NATS_PLATFORM_INCLUDE})
include_directories(glib)

if(NATS_BUILD_WITH_TLS)
include_directories(${OPENSSL_INCLUDE_DIR})
Expand All @@ -24,6 +25,7 @@ endif(NATS_BUILD_USE_SODIUM)
#---------------------------------------
file(GLOB SOURCES "*.c")
file(GLOB PS_SOURCES "${NATS_PLATFORM_INCLUDE}/*.c")
file(GLOB GLIB_SOURCES "glib/*.c")

# Add stan directory if building with Streaming support
if(NATS_BUILD_STREAMING)
Expand All @@ -34,15 +36,15 @@ endif(NATS_BUILD_STREAMING)
# Create the shared and static libraries
# --------------------------------------
if(NATS_BUILD_LIB_SHARED)
add_library(nats SHARED ${SOURCES} ${PS_SOURCES} ${S_SOURCES})
add_library(nats SHARED ${SOURCES} ${GLIB_SOURCES} ${PS_SOURCES} ${S_SOURCES})
target_link_libraries(nats ${NATS_OPENSSL_LIBS} ${NATS_EXTRA_LIB} ${NATS_PROTOBUF_LIBRARIES} ${NATS_SODIUM_LIBRARIES})
set_target_properties(nats PROPERTIES
VERSION ${NATS_VERSION_MAJOR}.${NATS_VERSION_MINOR}.${NATS_VERSION_PATCH}
SOVERSION ${NATS_VERSION_MAJOR}.${NATS_VERSION_MINOR})
endif(NATS_BUILD_LIB_SHARED)

if(NATS_BUILD_LIB_STATIC)
add_library(nats_static STATIC ${SOURCES} ${PS_SOURCES} ${S_SOURCES})
add_library(nats_static STATIC ${SOURCES} ${GLIB_SOURCES} ${PS_SOURCES} ${S_SOURCES})
target_link_libraries(nats_static ${NATS_OPENSSL_LIBS} ${NATS_PROTOBUF_LIBRARIES} ${NATS_SODIUM_LIBRARIES})
set_target_properties(nats_static PROPERTIES
VERSION ${NATS_VERSION_MAJOR}.${NATS_VERSION_MINOR}.${NATS_VERSION_PATCH}
Expand Down
3 changes: 2 additions & 1 deletion src/asynccb.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2018 The NATS Authors
// Copyright 2015-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -18,6 +18,7 @@
#if defined(NATS_HAS_STREAMING)
#include "stan/conn.h"
#endif
#include "glib/glib.h"

static void
_freeAsyncCbInfo(natsAsyncCbInfo *info)
Expand Down
120 changes: 25 additions & 95 deletions src/conn.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015-2023 The NATS Authors
// Copyright 2015-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -35,6 +35,7 @@
#include "nkeys.h"
#include "crypto.h"
#include "js.h"
#include "glib/glib.h"

#define DEFAULT_SCRATCH_SIZE (512)
#define MAX_INFO_MESSAGE_SIZE (32768)
Expand Down Expand Up @@ -1136,31 +1137,19 @@
sub = subs[i];

adjustedMax = 0;
natsSub_Lock(sub);
if (sub->libDlvWorker != NULL)
{
natsMutex_Lock(sub->libDlvWorker->lock);
}
nats_lockSubAndDispatcher(sub);
// If JS ordered consumer, trigger a reset. Don't check the error
// condition here. If there is a failure, it will be retried
// at the next HB interval.
if ((sub->jsi != NULL) && (sub->jsi->ordered))
{
jsSub_resetOrderedConsumer(sub, sub->jsi->sseq+1);
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
nats_unlockSubAndDispatcher(sub);
continue;
}
if (natsSub_drainStarted(sub))
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
nats_unlockSubAndDispatcher(sub);
continue;
}
if (sub->max > 0)
Expand All @@ -1172,11 +1161,7 @@
// messages have reached the max, if so, unsubscribe.
if (adjustedMax == 0)
{
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
nats_unlockSubAndDispatcher(sub);

Check warning on line 1164 in src/conn.c

View check run for this annotation

Codecov / codecov/patch

src/conn.c#L1164

Added line #L1164 was not covered by tests
s = natsConn_sendUnsubProto(nc, sub->sid, 0);
continue;
}
Expand All @@ -1188,11 +1173,7 @@

// Hold the lock up to that point so we are sure not to resend
// any SUB/UNSUB for a subscription that is in draining mode.
if (sub->libDlvWorker != NULL)
{
natsMutex_Unlock(sub->libDlvWorker->lock);
}
natsSub_Unlock(sub);
nats_unlockSubAndDispatcher(sub);
}

NATS_FREE(subs);
Expand Down Expand Up @@ -2667,11 +2648,8 @@
natsStatus s = NATS_OK;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsMsgDlvWorker *ldw = NULL;
bool sc = false;
bool sm = false;
nats_MsgList *list = NULL;
natsCondition *cond = NULL;
// For JetStream cases
jsSub *jsi = NULL;
bool ctrlMsg = false;
Expand Down Expand Up @@ -2717,34 +2695,17 @@
// We need to retain the subscription since as soon as we release the
// nc->subsMu lock, the subscription could be destroyed and we would
// reference freed memory.
natsSubAndLdw_LockAndRetain(sub);
nats_lockRetainSubAndDispatcher(sub);

natsMutex_Unlock(nc->subsMu);

if (sub->closed || sub->drainSkip)
{
natsSubAndLdw_UnlockAndRelease(sub);
nats_unlockReleaseSubAndDispatcher(sub);
natsMsg_Destroy(msg);
return NATS_OK;
}

// Pick condition variable and list based on if the sub is
// part of a global delivery thread pool or not.
// Note about `list`: this is used only to link messages, but
// sub->msgList needs to be used to update/check number of pending
// messages, since in case of delivery thread pool, `list` will have
// messages from many different subscriptions.
if ((ldw = sub->libDlvWorker) != NULL)
{
cond = ldw->cond;
list = &(ldw->msgList);
}
else
{
cond = sub->cond;
list = &(sub->msgList);
}

jsi = sub->jsi;
// For JS subscriptions (but not pull ones), handle hearbeat and flow control here.
if (jsi && !jsi->pull)
Expand All @@ -2764,7 +2725,7 @@
s = jsSub_checkOrderedMsg(sub, msg, &replaced);
if ((s != NATS_OK) || replaced)
{
natsSubAndLdw_UnlockAndRelease(sub);
nats_unlockReleaseSubAndDispatcher(sub);
natsMsg_Destroy(msg);
return s;
}
Expand All @@ -2773,58 +2734,27 @@

if (!ctrlMsg)
{
sub->msgList.msgs++;
sub->msgList.bytes += bufLen;

if (((sub->msgsLimit > 0) && (sub->msgList.msgs > sub->msgsLimit))
|| ((sub->bytesLimit > 0) && (sub->msgList.bytes > sub->bytesLimit)))
{
natsMsg_Destroy(msg);

sub->dropped++;

sc = !sub->slowConsumer;
sub->slowConsumer = true;

// Undo stats from above.
sub->msgList.msgs--;
sub->msgList.bytes -= bufLen;
}
else
s = natsSub_enqueueUserMessage(sub, msg);
if (s == NATS_OK)
{
bool signal= false;

if ((jsi != NULL) && jsi->ackNone)
natsMsg_setAcked(msg);

if (sub->msgList.msgs > sub->msgsMax)
sub->msgsMax = sub->msgList.msgs;

if (sub->msgList.bytes > sub->bytesMax)
sub->bytesMax = sub->msgList.bytes;

sub->slowConsumer = false;

msg->sub = sub;

if (list->head == NULL)
{
list->head = msg;
signal = true;
}
else
list->tail->next = msg;

list->tail = msg;

if (signal)
natsCondition_Signal(cond);

// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
if (jsi != NULL)
s = jsSub_trackSequences(jsi, msg->reply);
}
else
{
// Slow consumer is the only reason to fail here. Handle it and
// reset the status, so we continue.
natsMsg_Destroy(msg);
sub->dropped++;
sc = !sub->slowConsumer;
sub->slowConsumer = true;

s = NATS_OK;
}
}
else if ((jct == jsCtrlHeartbeat) && (msg->reply == NULL))
{
Expand All @@ -2848,9 +2778,9 @@

// If we are going to post to the error handler, do not release yet.
if (sc || sm)
natsSubAndLdw_Unlock(sub);
nats_unlockSubAndDispatcher(sub);
else
natsSubAndLdw_UnlockAndRelease(sub);
nats_unlockReleaseSubAndDispatcher(sub);

if ((s == NATS_OK) && fcReply)
s = natsConnection_Publish(nc, fcReply, NULL, 0);
Expand Down
Loading