Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Reduce the CPU pressure from the transaction buffer in rolling restarts #23062

Conversation

BewareMyPower
Copy link
Contributor

@BewareMyPower BewareMyPower commented Jul 22, 2024

Motivation

During the rolling restarts, the namespace bundle ownerships will change. Assuming there is a producer created on a single topic, and the ownership was transferred to the new broker. Assuming the namespace bundle has N topics and the namespace is tenant/ns,

  1. All N topics in the same bundle of that topic will be loaded.
  2. For each topic, the managed ledger will be initialized, when the transaction coordinator is enabled, a TopicTransactionBuffer will be created. A Pulsar reader will be created on tenant/ns/__transaction_buffer_snapshot concurrently.
  3. Once all N readers are created, the owner of the snapshot topic will start dispatching messages to N readers. Each dispatcher will read messages from BookKeeper concurrently and might fail with too many requests error because BK can only have maxPendingReadRequestsPerThread pending read requests (default: 10000).

We have a numTransactionReplayThreadPoolSize config to limit the concurrency of transaction snapshot readers. However, it only limits the read loop. For example, if it's configured with 1, only 1 reader could read messages at the same time. However, N readers will be created concurrently. Each when one of these reader explicitly calls readNext, all N dispatchers at brokers side will dispatch messages to N readers.

The behaviors above brings much CPU pressure on the owner broker, especially for a small cluster with only two brokers.

Modifications

Synchronize the reader creation, read loop and the following process on its result. Maintain only one reader for each namespace. The reader is now not closed unless there is no snapshot read request in 1 minute.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: BewareMyPower#32

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 22, 2024
@BewareMyPower BewareMyPower self-assigned this Jul 22, 2024
@BewareMyPower BewareMyPower added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker release/3.3.1 release/3.0.6 labels Jul 22, 2024
@BewareMyPower BewareMyPower added this to the 3.4.0 milestone Jul 22, 2024
@BewareMyPower BewareMyPower changed the title [improve][broker] Reduce the pressure from the transaction buffer readers and writers in rolling restarts [improve][broker] Reduce the CPU pressure from the transaction buffer in rolling restarts Jul 22, 2024
@BewareMyPower BewareMyPower force-pushed the bewaremypower/transaction-buffer-optimize branch from 31d847c to 79430fc Compare July 22, 2024 13:29
@BewareMyPower BewareMyPower marked this pull request as draft July 23, 2024 04:41
@BewareMyPower BewareMyPower marked this pull request as ready for review July 23, 2024 09:08
@BewareMyPower
Copy link
Contributor Author

BewareMyPower commented Jul 23, 2024

This improvement for the reader side can be verified by the following example.

public class LoadTopicTest extends ProducerConsumerBase {

    @BeforeClass
    @Override
    protected void setup() throws Exception {
        conf.setDefaultNumberOfNamespaceBundles(1);
        conf.setNumTransactionReplayThreadPoolSize(1);
        conf.setTransactionCoordinatorEnabled(true);
        super.isTcpLookup = true;
        super.internalSetup();
        super.producerBaseSetup();
    }

    @AfterClass
    @Override
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void test() throws Exception {
        final var topic = "test";
        admin.topics().createPartitionedTopic(topic, 5);
        @Cleanup final var producer = pulsarClient.newProducer().topic(topic + "-partition-0").create();
        Thread.sleep(1000);
    }
}

Save the outputs into 1.log and run grep -E "Subscribing on|Subscribed to|Closed consumer" 1.log | grep snapshot, then you can see:

2024-07-23T17:46:50,142 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0x0134c04e, L:/127.0.0.1:57954 - R:/127.0.0.1:57960] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-8d442bc96f. consumerId: 1
2024-07-23T17:46:50,145 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0x0134c04e, L:/127.0.0.1:57954 - R:/127.0.0.1:57960] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-beb7e469f7. consumerId: 3
2024-07-23T17:46:50,146 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0x0134c04e, L:/127.0.0.1:57954 - R:/127.0.0.1:57960] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-e9f0ce9149. consumerId: 2
2024-07-23T17:46:50,149 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0x0134c04e, L:/127.0.0.1:57954 - R:/127.0.0.1:57960] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-d27782ccd4. consumerId: 5
2024-07-23T17:46:50,150 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0x0134c04e, L:/127.0.0.1:57954 - R:/127.0.0.1:57960] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-ff5b692579. consumerId: 4
2024-07-23T17:46:50,174 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-ff5b692579] Subscribed to topic on localhost/127.0.0.1:57954 -- consumer: 4
2024-07-23T17:46:50,177 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot] [reader-ff5b692579] Closed consumer
2024-07-23T17:46:50,186 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-d27782ccd4] Subscribed to topic on localhost/127.0.0.1:57954 -- consumer: 5
2024-07-23T17:46:50,187 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-e9f0ce9149] Subscribed to topic on localhost/127.0.0.1:57954 -- consumer: 2
2024-07-23T17:46:50,187 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot] [reader-d27782ccd4] Closed consumer
2024-07-23T17:46:50,188 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot] [reader-e9f0ce9149] Closed consumer
2024-07-23T17:46:50,188 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-beb7e469f7] Subscribed to topic on localhost/127.0.0.1:57954 -- consumer: 3
2024-07-23T17:46:50,190 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot] [reader-beb7e469f7] Closed consumer
2024-07-23T17:46:50,190 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-8d442bc96f] Subscribed to topic on localhost/127.0.0.1:57954 -- consumer: 1
2024-07-23T17:46:50,191 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot] [reader-8d442bc96f] Closed consumer

Even though the numTransactionReplayThreadPoolSize is 1, there are 5 pending readers from the "Subscribing on" logs.

After this change, the logs become

2024-07-23T20:47:33,101 - INFO  - [pulsar-io-9-4:ServerCnx] - [[id: 0xd393d83a, L:/127.0.0.1:60438 - R:/127.0.0.1:60444] [SR:127.0.0.1, state:Connected]] Subscribing on topic persistent://public/default/__transaction_buffer_snapshot / reader-205040e5a5. consumerId: 1
2024-07-23T20:47:33,134 - INFO  - [pulsar-io-9-3:ConsumerImpl] - [persistent://public/default/__transaction_buffer_snapshot][reader-205040e5a5] Subscribed to topic on localhost/127.0.0.1:60438 -- consumer: 1

@BewareMyPower BewareMyPower marked this pull request as draft July 23, 2024 10:08
@BewareMyPower
Copy link
Contributor Author

Mark it as draft to avoid repeated creation of reader. It's better to maintain a long live reader.

…ders and writers in rolling restarts

### Motivation

During the rolling restarts, the namespace bundle ownerships will
change. Assuming there is a producer created on a single topic, and the
ownership was transferred to the new broker. Assuming the namespace
bundle has N topics and the namespace is `tenant/ns`,
1. All N topics in the same bundle of that topic will be loaded.
2. For each topic, the managed ledger will be initialized, when the
   transaction coordinator is enabled, a `TopicTransactionBuffer` will
   be created.
   2.1 A Pulsar producer will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
   2.2 A Pulsar reader will be created on
     `tenant/ns/__transaction_buffer_snapshot` concurrently.
3. Once all N readers are created, the owner of the snapshot topic will
   start dispatching messages to N readers. Each dispatcher will read
   messages from BookKeeper concurrently and might fail with too many
   requests error because BK can only have
  `maxPendingReadRequestsPerThread` pending read requests (default: 10000).

We have a `numTransactionReplayThreadPoolSize` config to limit the
concurrency of transaction snapshot readers. However, it only limits the
read loop. For example, if it's configured with 1, only 1 reader could
read messages at the same time. However, N readers will be created
concurrently. Each when one of these reader explicitly calls `readNext`,
all N dispatchers at brokers side will dispatch messages to N readers.

The behaviors above brings much CPU pressure on the owner broker,
especially for a small cluster with only two brokers.

### Modifications

- Synchronize the reader creation, read loop and the following process
  on its result. Maintain only one reader for each namespace.
@BewareMyPower BewareMyPower force-pushed the bewaremypower/transaction-buffer-optimize branch from 9378fa9 to ffcc578 Compare July 23, 2024 13:43
@BewareMyPower BewareMyPower marked this pull request as ready for review July 23, 2024 13:55
@BewareMyPower BewareMyPower marked this pull request as draft July 24, 2024 02:25
@BewareMyPower BewareMyPower marked this pull request as ready for review July 24, 2024 09:34
@BewareMyPower BewareMyPower marked this pull request as draft July 24, 2024 13:01
@BewareMyPower BewareMyPower marked this pull request as ready for review July 25, 2024 03:57
@poorbarcode
Copy link
Contributor

@liangyepianzhou @congbobo184 @gaoran10 Please take a look

@BewareMyPower BewareMyPower marked this pull request as draft July 26, 2024 04:52
@BewareMyPower BewareMyPower marked this pull request as ready for review July 26, 2024 07:13
@BewareMyPower BewareMyPower force-pushed the bewaremypower/transaction-buffer-optimize branch from ef22430 to 3b192a5 Compare July 26, 2024 07:13
Copy link
Member

@Demogorgon314 Demogorgon314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 83.78378% with 30 lines in your changes missing coverage. Please review.

Project coverage is 73.47%. Comparing base (bbc6224) to head (acc95de).
Report is 475 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23062      +/-   ##
============================================
- Coverage     73.57%   73.47%   -0.11%     
- Complexity    32624    33530     +906     
============================================
  Files          1877     1919      +42     
  Lines        139502   144086    +4584     
  Branches      15299    15741     +442     
============================================
+ Hits         102638   105860    +3222     
- Misses        28908    30101    +1193     
- Partials       7956     8125     +169     
Flag Coverage Δ
inttests 27.52% <36.75%> (+2.94%) ⬆️
systests 24.72% <0.00%> (+0.40%) ⬆️
unittests 72.54% <83.78%> (-0.31%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...n/java/org/apache/pulsar/broker/PulsarService.java 84.25% <100.00%> (+1.88%) ⬆️
...r/service/SystemTopicTxnBufferSnapshotService.java 80.55% <100.00%> (-2.47%) ⬇️
...rvice/TransactionBufferSnapshotServiceFactory.java 83.33% <100.00%> (ø)
...er/impl/SingleSnapshotAbortedTxnProcessorImpl.java 83.13% <100.00%> (-0.04%) ⬇️
...ransaction/buffer/impl/TopicTransactionBuffer.java 87.63% <100.00%> (-0.12%) ⬇️
...main/java/org/apache/pulsar/utils/SimpleCache.java 100.00% <100.00%> (ø)
...lsar/broker/transaction/buffer/impl/TableView.java 74.07% <74.07%> (ø)
...r/impl/SnapshotSegmentAbortedTxnProcessorImpl.java 81.96% <77.00%> (+3.44%) ⬆️

... and 514 files with indirect coverage changes

@BewareMyPower
Copy link
Contributor Author

Merge it first.

@BewareMyPower BewareMyPower merged commit 40c8c23 into apache:master Jul 29, 2024
51 checks passed
Demogorgon314 pushed a commit that referenced this pull request Jul 29, 2024
lhotari pushed a commit that referenced this pull request Aug 6, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 8, 2024
… in rolling restarts (apache#23062)

(cherry picked from commit 40c8c23)
(cherry picked from commit 53fb549)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 12, 2024
… in rolling restarts (apache#23062)

(cherry picked from commit 40c8c23)
(cherry picked from commit 53fb549)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-3.0 cherry-picked/branch-3.3 doc-not-needed Your PR changes do not impact docs ready-to-test release/3.0.7 release/3.3.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants