Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The callbacks of subscribers for records are executed in different threads #679

Closed
Barry-Xu-2018 opened this issue Mar 13, 2021 · 11 comments
Labels
enhancement New feature or request

Comments

@Barry-Xu-2018
Copy link
Contributor

Description

According to current codes, the current implementation likes below (If not right, please help to correct me)
rosbag2

Currently, these callbacks are executed in one thread in turn.

void Recorder::record_messages() const
{
spin(node_);
}

If there are many topics to be recorded, maybe one thread processing affects the performance.
Could we change to use MultiThreadedExecutor ?

Of course, this way may lead to the bottleneck at one cache buffer.

@Barry-Xu-2018 Barry-Xu-2018 added the enhancement New feature or request label Mar 13, 2021
@adamdbrw
Copy link
Collaborator

adamdbrw commented Mar 15, 2021

Currently, all the processing of each callback is synchronized at the writer level (to protect some stl collections and splitting logic) and the storage level (to protect some stl collections and db access due to async writes).

The writer level lock means that the all the operations that can be executed in parallel are limited to the body of lambda function in Recorder::create_subscription up to the writer_->write call. It isn't much and I don't think that it would be a source of a bottleneck.

What is important though is to minimize the synchronized time in Writer::write. This is best done by improving here: #647.

It would be good to profile afterwards, all the operations before a message is written to the cache - and see where to optimize next. The spinning itself might benefit from multi-threading and it is worth checking.

@Barry-Xu-2018
Copy link
Contributor Author

@adamdbrw

Thanks for your reply.

What is important though is to minimize the synchronized time in Writer::write. This is best done by improving here: #647.

Yeah. should_split_bagfile() will call disk I/O operation to get size of database for each callback. Currently this point should be considered for improvement.
Currently use SingleThreaderExecutor, sync lock is locked at the entry of callback. If use MultiThreadedExecutor, have to add locked at checking the size of bag file. Another lock is at writing message to primary buffer. This mainly reduce the time of lock.
So even if consider using MultiThreadedExecutor, #647 also need to be resolved since it leads long synchronized time .

I will further consider it.

@adamdbrw
Copy link
Collaborator

@Barry-Xu-2018 when considering changes in threading model, pay attention to all stl containers (such as map of topics in the SequentialWriter) and all unsynchronized non-atomics in general. There are various race conditions to avoid. Benchmarking packages are good to test for the race conditions once you go through code analysis and feel pretty confident it should be ok.

@fujitatomoya
Copy link
Contributor

@adamdbrw

btw, do you have already plan to support MultiThreadedExecutor in milestone at this moment?

@adamdbrw
Copy link
Collaborator

Not at this moment (we finished our project).

@fujitatomoya
Copy link
Contributor

okay, thanks!

@Barry-Xu-2018
Copy link
Contributor Author

@fujitatomoya @adamdbrw

FYI.

I modify code to use MultiThreadedExecutor without considering split bag.

  • benchmark config file

    rosbag2_performance_benchmarking:
    benchmark_node:
      ros__parameters:
        benchmark:
          summary_result_file:  "results.csv"
          db_root_folder:       "rosbag2_performance_test_results"
          repeat_each:          18     # How many times to run each configurations (to average results)
          no_transport:         False  # Whether to run storage-only or end-to-end (including transport) benchmark
          preserve_bags:        False # Whether to leave bag files after experiment (and between runs). Some configurations can take lots of space!
          parameters:                 # Each combination of parameters in this section will be benchmarked
            max_cache_size:         [0]                                                                                                                                                                       
            max_bag_size:           [0]
            compression:            [""]
            compression_queue_size: [1]
            compression_threads:    [0]
            storage_config_file:    [""]

    I tested 2 max_cache_size settings.

    • 0 : means that cache mechanism is disabled.
    • 100000000(100M)
  • producer config file

    rosbag2_performance_benchmarking_node:
    ros__parameters:
      publishers: # publisher_groups parameter needs to include all the subsequent groups
        publisher_groups: [ "10k_200inst" ]                                                                                                                                                                   
        wait_for_subscriptions: True
        10k_200inst:
          publishers_count:   200
          topic_root:         "benchmarking_10k_200inst"
          msg_size_bytes:     10000
          msg_count_each:     2000
          rate_hz:            100

    The number of instance is 200 and message size is 10k.

    cache_size: 0 cache_size: 100M
    original codes 39.12% 38.57%
    MultiThreadedExecutor (20 threads) 44.29% 45.82%
    MultiThreadedExecutor (50 threads) 46.17% 49.43%

According to test result, current cache mechanism doesn't take pronounced effect while recording much more instances (topics). If use MultiThreadedExecutor, it has a little effect.

About Modification for MultiThreadedExecutor, refer to Barry-Xu-2018@c1bb25b

Next, I will change code to support split bag. Of course, #647 is important. Disk I/O will lead to hold the lock for long time. Other threads have to wait the lock.

@Barry-Xu-2018
Copy link
Contributor Author

@fujitatomoya @adamdbrw

Now support split mode and test was finished.

While split mode is enabled, many codes have to be process under lock. So this is not good for multiple threads.

Test condition:
200 instance (topics), 100hz, message size 10KB. 2000 message per instance. Get the average value base on 20 rounds of testing.

  • max_cache_size:
    • 0: means that cache mechanism is disabled.
    • 100 MB
  • max_bag_size (Maximum bag file size)
  • 0: No limitation. Not split bag file.
  • 50MB

Test result:

max_cache_size:0, max_bag_size:0 max_cache_size: 100M, max_bag_size:0 max_cache_size:0, max_bag_size: 50M max_cache_size:100M, max_bag_size:50M
Original codes (commit: b64cf74) 39.12% 38.57% 29.60% 29.60%
MultiThreadedExecutor (20 threads) 45.14% 43.69% 38.86% 37.36%

Based on the above result, there is a little improvement for performance. And cache almost doesn't affect the result.

Next, I check the effect for a few instances.

10 instance (topics), message size is 100 KB. Other conditions are the same as the above test.

Test result:

max_cache_size:0, max_bag_size:0 max_cache_size: 100M, max_bag_size:0 max_cache_size:0, max_bag_size: 50M max_cache_size:100M, max_bag_size:50M
Original codes (commit: b64cf74) 100% 100% 99.93% 99.97%
MultiThreadedExecutor (20 threads) 100% 100% 100% 100%

Current implementation is Barry-Xu-2018@18fdeb2.

Now, the maximum number of thread is set to 20 and cannot be changed by the user. About the user interface to change this, it needs to be discussed.

@fujitatomoya
Copy link
Contributor

@Barry-Xu-2018

thanks for the information. i see that multi-thread does not improve a lot with current cache mechanism. i am not sure why this cache mechanism cannot be in the plugin part, does it have to be in generic part? (if anyone knows this, please enlighten me 😄 ) if there is not a requirement about that, why we would move buffer/cache mechanism into sqlite3 implementation, so that we can keep the generic part simple and plugin can be responsible for performance. i think that would be more room for plugin implementation can manage and differentiate for the use cases.

on the other hand, this could be kinda surgery, so probably it would take a while...

@Barry-Xu-2018
Copy link
Contributor Author

@fujitatomoya

i see that multi-thread does not improve a lot with current cache mechanism. i am not sure why this cache mechanism cannot be in the plugin part, does it have to be in generic part? (if anyone knows this, please enlighten me smile ) if there is not a requirement about that, why we would move buffer/cache mechanism into sqlite3 implementation, so that we can keep the generic part simple and plugin can be responsible for performance. i think that would be more room for plugin implementation can manage and differentiate for the use cases.

Yes. While using multi-thread, the current cache mechanism cannot take the effect. For the transport layer, it should only take charge of calling writing interface which is implemented in storage plugin. Different storage plugins use a suitable cache mechanism according to database feature. Current implementation also provide to disable cache mechanism by setting cache size as 0.

@MichaelOrlov
Copy link
Contributor

  • Closing as wouldn't be done, since it was proved that using MultiThreadedExecutor does not improve a lot with the current cache mechanism.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants