From b262e647ecaa367e29f3022f0b14e2650e7b0399 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 21 Feb 2024 10:57:49 +0100 Subject: [PATCH] Prevent index overflow and correctly assert the end iterator in DataSharing (#4321) (#4403) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refs #20227: Add DDS communication test Signed-off-by: Mario Dominguez * Refs #20227: Prevent history index to overflow history size Signed-off-by: Mario Dominguez * Refs #20227: Correctly assert the current end Signed-off-by: Mario Dominguez * Refs #20227: Linter Signed-off-by: Mario Dominguez * Refs #20227: Initialize test timeout argument properly Signed-off-by: Mario Dominguez * Refs #20227: Rev suggestion Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez (cherry picked from commit 66de89eaccd49b94279f5c5e56a8240257079189) Co-authored-by: Mario Domínguez López <116071334+Mario-DL@users.noreply.github.com> --- .../DataSharing/DataSharingPayloadPool.cpp | 5 ++- src/cpp/rtps/DataSharing/ReaderPool.hpp | 10 +++-- test/dds/communication/CMakeLists.txt | 1 + test/dds/communication/PubSubMain.cpp | 32 +++++++++++-- test/dds/communication/PublisherMain.cpp | 14 +++++- test/dds/communication/PublisherModule.cpp | 17 +++---- test/dds/communication/PublisherModule.hpp | 3 +- test/dds/communication/SubscriberMain.cpp | 22 ++++++++- test/dds/communication/SubscriberModule.cpp | 45 ++++++++++++------- test/dds/communication/SubscriberModule.hpp | 8 +++- .../communication/definitions_example.json | 10 +++-- .../simple_data_sharing_stress.json | 18 ++++++++ test/dds/communication/test_build.py | 9 ++-- 13 files changed, 148 insertions(+), 46 deletions(-) create mode 100644 test/dds/communication/simple_data_sharing_stress.json diff --git a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp index 483ba57b03c..a6292fc23a0 100644 --- a/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp +++ b/src/cpp/rtps/DataSharing/DataSharingPayloadPool.cpp @@ -42,7 +42,10 @@ void DataSharingPayloadPool::advance( uint64_t& index) const { // lower part is the index, upper part is the loop counter - ++index; + if (static_cast(index) + 1 <= descriptor_->history_size) + { + ++index; + } if (static_cast(index) % descriptor_->history_size == 0) { index = ((index >> 32) + 1) << 32; diff --git a/src/cpp/rtps/DataSharing/ReaderPool.hpp b/src/cpp/rtps/DataSharing/ReaderPool.hpp index a0aa47acedb..788c49cdbf6 100644 --- a/src/cpp/rtps/DataSharing/ReaderPool.hpp +++ b/src/cpp/rtps/DataSharing/ReaderPool.hpp @@ -132,13 +132,15 @@ class ReaderPool : public DataSharingPayloadPool { CacheChange_t ch; SequenceNumber_t last_sequence = c_SequenceNumber_Unknown; - get_next_unread_payload(ch, last_sequence); - while (ch.sequenceNumber != SequenceNumber_t::unknown()) + uint64_t current_end = end(); + get_next_unread_payload(ch, last_sequence, current_end); + while (ch.sequenceNumber != SequenceNumber_t::unknown() || next_payload_ != current_end) { + current_end = end(); advance(next_payload_); - get_next_unread_payload(ch, last_sequence); + get_next_unread_payload(ch, last_sequence, current_end); } - assert(next_payload_ == end()); + assert(next_payload_ == current_end); } return true; diff --git a/test/dds/communication/CMakeLists.txt b/test/dds/communication/CMakeLists.txt index 61043811e8c..9b2790faa1f 100644 --- a/test/dds/communication/CMakeLists.txt +++ b/test/dds/communication/CMakeLists.txt @@ -111,6 +111,7 @@ list(APPEND TEST_DEFINITIONS zero_copy_sub_communication mix_zero_copy_communication close_TCP_client + simple_data_sharing_stress ) diff --git a/test/dds/communication/PubSubMain.cpp b/test/dds/communication/PubSubMain.cpp index cd09ba1a5b5..46553c9f64b 100644 --- a/test/dds/communication/PubSubMain.cpp +++ b/test/dds/communication/PubSubMain.cpp @@ -37,20 +37,22 @@ using namespace eprosima::fastdds::dds; * --xmlfile * --publishers * --publisher_loops + * --interval */ void publisher_run( PublisherModule* publisher, uint32_t wait, uint32_t samples, - uint32_t loops) + uint32_t loops, + uint32_t interval) { if (wait > 0) { publisher->wait_discovery(wait); } - publisher->run(samples, loops); + publisher->run(samples, loops, interval); } int main( @@ -66,8 +68,10 @@ int main( uint32_t wait = 0; uint32_t samples = 4; uint32_t publishers = 1; + uint32_t timeout = 86400000; // 24 hours in ms // The first loop could be easily ignored by the reader uint32_t publisher_loops = 2; + uint32_t interval = 250; char* xml_file = nullptr; std::string magic; @@ -119,6 +123,26 @@ int main( samples = strtol(argv[arg_count], nullptr, 10); } + else if (strcmp(argv[arg_count], "--interval") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--interval expects a parameter" << std::endl; + return -1; + } + + interval = strtol(argv[arg_count], nullptr, 10); + } + else if (strcmp(argv[arg_count], "--timeout") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--timeout expects a parameter" << std::endl; + return -1; + } + + timeout = strtol(argv[arg_count], nullptr, 10); + } else if (strcmp(argv[arg_count], "--magic") == 0) { if (++arg_count >= argc) @@ -180,11 +204,11 @@ int main( if (publisher.init(seed, magic)) { - std::thread publisher_thread(publisher_run, &publisher, wait, samples, publisher_loops); + std::thread publisher_thread(publisher_run, &publisher, wait, samples, publisher_loops, interval); if (subscriber.init(seed, magic)) { - result = subscriber.run(notexit) ? 0 : -1; + result = subscriber.run(notexit, timeout) ? 0 : -1; } publisher_thread.join(); diff --git a/test/dds/communication/PublisherMain.cpp b/test/dds/communication/PublisherMain.cpp index 829abd0a2db..0d72b20ae5b 100644 --- a/test/dds/communication/PublisherMain.cpp +++ b/test/dds/communication/PublisherMain.cpp @@ -31,6 +31,7 @@ using namespace eprosima::fastdds::dds; * --samples * --magic * --xmlfile + * --interval */ int main( @@ -45,6 +46,7 @@ int main( uint32_t wait = 0; char* xml_file = nullptr; uint32_t samples = 4; + uint32_t interval = 250; std::string magic; while (arg_count < argc) @@ -91,6 +93,16 @@ int main( samples = strtol(argv[arg_count], nullptr, 10); } + else if (strcmp(argv[arg_count], "--interval") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--interval expects a parameter" << std::endl; + return -1; + } + + interval = strtol(argv[arg_count], nullptr, 10); + } else if (strcmp(argv[arg_count], "--magic") == 0) { if (++arg_count >= argc) @@ -134,7 +146,7 @@ int main( publisher.wait_discovery(wait); } - publisher.run(samples); + publisher.run(samples, 0, interval); return 0; } diff --git a/test/dds/communication/PublisherModule.cpp b/test/dds/communication/PublisherModule.cpp index 547b442b9ef..35debd61a9c 100644 --- a/test/dds/communication/PublisherModule.cpp +++ b/test/dds/communication/PublisherModule.cpp @@ -58,14 +58,14 @@ bool PublisherModule::init( uint32_t seed, const std::string& magic) { - std::cout << "Initializing Publisher" << std::endl; + std::cout << "Initializing Publisher" << std::endl; participant_ = DomainParticipantFactory::get_instance()->create_participant(seed % 230, PARTICIPANT_QOS_DEFAULT, this); if (participant_ == nullptr) { - std::cout << "Error creating publisher participant" << std::endl; + EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher participant"); return false; } @@ -89,14 +89,14 @@ bool PublisherModule::init( publisher_ = participant_->create_publisher(PUBLISHER_QOS_DEFAULT, this); if (publisher_ == nullptr) { - std::cout << "Error creating publisher" << std::endl; + EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher"); return false; } topic_ = participant_->create_topic(topic_name.str(), type_.get_type_name(), TOPIC_QOS_DEFAULT); if (topic_ == nullptr) { - std::cout << "Error creating publisher topic" << std::endl; + EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher topic"); return false; } @@ -108,7 +108,7 @@ bool PublisherModule::init( writer_ = publisher_->create_datawriter(topic_, wqos, this); if (writer_ == nullptr) { - std::cout << "Error creating publisher datawriter" << std::endl; + EPROSIMA_LOG_ERROR(PUBLISHER_MODULE, "Error creating publisher datawriter"); return false; } std::cout << "Writer created correctly in topic " << topic_->get_name() @@ -131,7 +131,8 @@ void PublisherModule::wait_discovery( void PublisherModule::run( uint32_t samples, - const uint32_t loops) + const uint32_t loops, + uint32_t interval) { uint32_t current_loop = 0; uint16_t index = 1; @@ -163,7 +164,7 @@ void PublisherModule::run( data->message("HelloWorld"); } } - std::cout << "Publisher writting index " << index << std::endl; + EPROSIMA_LOG_INFO(PUBLISHER_MODULE, "Publisher writting index " << index); writer_->write(sample); if (index == samples) @@ -181,7 +182,7 @@ void PublisherModule::run( type_.delete_data(sample); } - std::this_thread::sleep_for(std::chrono::milliseconds(250)); + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); } } diff --git a/test/dds/communication/PublisherModule.hpp b/test/dds/communication/PublisherModule.hpp index 36af88b10a3..91b04db5e54 100644 --- a/test/dds/communication/PublisherModule.hpp +++ b/test/dds/communication/PublisherModule.hpp @@ -80,7 +80,8 @@ class PublisherModule void run( uint32_t samples, - uint32_t loops = 0); + uint32_t loops = 0, + uint32_t interval = 250); private: diff --git a/test/dds/communication/SubscriberMain.cpp b/test/dds/communication/SubscriberMain.cpp index de864761a6d..e1e578c64c7 100644 --- a/test/dds/communication/SubscriberMain.cpp +++ b/test/dds/communication/SubscriberMain.cpp @@ -31,6 +31,8 @@ using namespace eprosima::fastdds::dds; * --magic * --xmlfile * --publishers + * --succeed_on_timeout + * --timeout */ int main( @@ -41,9 +43,11 @@ int main( bool notexit = false; bool fixed_type = false; bool zero_copy = false; + bool succeed_on_timeout = false; uint32_t seed = 7800; uint32_t samples = 4; uint32_t publishers = 1; + uint32_t timeout = 86400000; // 24 h in ms char* xml_file = nullptr; std::string magic; @@ -61,6 +65,10 @@ int main( { zero_copy = true; } + else if (strcmp(argv[arg_count], "--succeed_on_timeout") == 0) + { + succeed_on_timeout = true; + } else if (strcmp(argv[arg_count], "--seed") == 0) { if (++arg_count >= argc) @@ -91,6 +99,16 @@ int main( magic = argv[arg_count]; } + else if (strcmp(argv[arg_count], "--timeout") == 0) + { + if (++arg_count >= argc) + { + std::cout << "--run-for expects a parameter" << std::endl; + return -1; + } + + timeout = strtol(argv[arg_count], nullptr, 10); + } else if (strcmp(argv[arg_count], "--xmlfile") == 0) { if (++arg_count >= argc) @@ -125,11 +143,11 @@ int main( DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file); } - SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy); + SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, succeed_on_timeout); if (subscriber.init(seed, magic)) { - return subscriber.run(notexit) ? 0 : -1; + return subscriber.run(notexit, timeout) ? 0 : -1; } return -1; diff --git a/test/dds/communication/SubscriberModule.cpp b/test/dds/communication/SubscriberModule.cpp index 9f2c5aece87..a1d2b7b2a24 100644 --- a/test/dds/communication/SubscriberModule.cpp +++ b/test/dds/communication/SubscriberModule.cpp @@ -72,7 +72,7 @@ bool SubscriberModule::init( if (participant_ == nullptr) { - std::cout << "Error creating subscriber participant" << std::endl; + EPROSIMA_LOG_ERROR(SUBSCRIBER_MODULE, "Error creating subscriber participant"); return false; } @@ -95,7 +95,7 @@ bool SubscriberModule::init( subscriber_ = participant_->create_subscriber(SUBSCRIBER_QOS_DEFAULT, nullptr); if (subscriber_ == nullptr) { - std::cout << "Error creating subscriber" << std::endl; + EPROSIMA_LOG_ERROR(SUBSCRIBER_MODULE, "Error creating subscriber"); return false; } @@ -103,7 +103,7 @@ bool SubscriberModule::init( topic_ = participant_->create_topic(topic_name.str(), type_.get_type_name(), TOPIC_QOS_DEFAULT); if (topic_ == nullptr) { - std::cout << "Error creating subscriber topic" << std::endl; + EPROSIMA_LOG_ERROR(SUBSCRIBER_MODULE, "Error creating subscriber topic"); return false; } @@ -116,11 +116,12 @@ bool SubscriberModule::init( reader_ = subscriber_->create_datareader(topic_, rqos); if (reader_ == nullptr) { - std::cout << "Error creating subscriber datareader" << std::endl; + EPROSIMA_LOG_ERROR(SUBSCRIBER_MODULE, "Error creating subscriber datareader"); return false; } std::cout << "Reader created correctly in topic " << topic_->get_name() - << " with type " << type_.get_type_name() << std::endl; + << " with type " << + type_.get_type_name() << std::endl; std::cout << "Subscriber initialized correctly" << std::endl; @@ -128,9 +129,10 @@ bool SubscriberModule::init( } bool SubscriberModule::run( - bool notexit) + bool notexit, + uint32_t timeout) { - return run_for(notexit, std::chrono::hours(24)); + return run_for(notexit, std::chrono::milliseconds(timeout)); } bool SubscriberModule::run_for( @@ -146,9 +148,15 @@ bool SubscriberModule::run_for( if (run_) { + auto t0 = std::chrono::steady_clock::now(); std::unique_lock lock(mutex_); returned_value = cv_.wait_for(lock, timeout, [&] { + if (succeeed_on_timeout_ && (std::chrono::steady_clock::now() - t0) > timeout) + { + return true; + } + if (publishers_ < number_samples_.size()) { // Will fail later. @@ -178,7 +186,7 @@ bool SubscriberModule::run_for( if (publishers_ < number_samples_.size()) { - std::cout << "ERROR: detected more than " << publishers_ << " publishers" << std::endl; + EPROSIMA_LOG_INFO(SUBSCRIBER_MODULE, "ERROR: detected more than " << publishers_ << " publishers"); returned_value = false; } @@ -252,7 +260,7 @@ void SubscriberModule::on_subscription_matched( void SubscriberModule::on_data_available( DataReader* reader) { - std::cout << "Subscriber on_data_available from :" << participant_->guid() << std::endl; + EPROSIMA_LOG_INFO(SUBSCRIBER_MODULE, "Subscriber on_data_available from :" << participant_->guid()); if (zero_copy_) { @@ -265,10 +273,11 @@ void SubscriberModule::on_data_available( if (info.valid_data && info.instance_state == ALIVE_INSTANCE_STATE) { - FixedSized& data = l_sample[0]; - std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << - info.sample_identity.sequence_number() << "): index(" << data.index() << ")" << std::endl; + EPROSIMA_LOG_INFO(SUBSCRIBER_MODULE, + "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << ((FixedSized&)l_sample[0]).index() << + ")"); if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) @@ -291,8 +300,9 @@ void SubscriberModule::on_data_available( if (info.instance_state == ALIVE_INSTANCE_STATE) { std::unique_lock lock(mutex_); - std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << - info.sample_identity.sequence_number() << "): index(" << sample.index() << ")" << std::endl; + EPROSIMA_LOG_INFO(SUBSCRIBER_MODULE, + "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << sample.index() << ")"); if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) { cv_.notify_all(); @@ -308,9 +318,10 @@ void SubscriberModule::on_data_available( if (info.instance_state == ALIVE_INSTANCE_STATE) { std::unique_lock lock(mutex_); - std::cout << "Received sample (" << info.sample_identity.writer_guid() << " - " << - info.sample_identity.sequence_number() << "): index(" << sample.index() << "), message(" - << sample.message() << ")" << std::endl; + EPROSIMA_LOG_INFO(SUBSCRIBER_MODULE, + "Received sample (" << info.sample_identity.writer_guid() << " - " << + info.sample_identity.sequence_number() << "): index(" << sample.index() << "), message(" + << sample.message() << ")"); if (max_number_samples_ <= ++number_samples_[info.sample_identity.writer_guid()]) { cv_.notify_all(); diff --git a/test/dds/communication/SubscriberModule.hpp b/test/dds/communication/SubscriberModule.hpp index f04d4f89c79..9b7751452e1 100644 --- a/test/dds/communication/SubscriberModule.hpp +++ b/test/dds/communication/SubscriberModule.hpp @@ -45,11 +45,13 @@ class SubscriberModule const uint32_t publishers, const uint32_t max_number_samples, bool fixed_type = false, - bool zero_copy = false) + bool zero_copy = false, + bool succeed_on_timeout = false) : publishers_(publishers) , max_number_samples_(max_number_samples) , fixed_type_(zero_copy || fixed_type) // If zero copy active, fixed type is required , zero_copy_(zero_copy) + , succeeed_on_timeout_(succeed_on_timeout) { } @@ -81,7 +83,8 @@ class SubscriberModule const std::string& magic); bool run( - bool notexit); + bool notexit, + uint32_t timeout = 86400000); bool run_for( bool notexit, @@ -97,6 +100,7 @@ class SubscriberModule bool fixed_type_ = false; bool zero_copy_ = false; bool run_ = true; + bool succeeed_on_timeout_ = false; DomainParticipant* participant_ = nullptr; TypeSupport type_; Subscriber* subscriber_ = nullptr; diff --git a/test/dds/communication/definitions_example.json b/test/dds/communication/definitions_example.json index ba54e9886a5..81bbd09f44a 100644 --- a/test/dds/communication/definitions_example.json +++ b/test/dds/communication/definitions_example.json @@ -13,7 +13,8 @@ "wait" : 0, "samples" : 4, "magic" : "str", - "xmlfile" : "path/file.xml" + "xmlfile" : "path/file.xml", + "interval" : 250 }, { "kind" : "subscriber", @@ -24,7 +25,9 @@ "samples" : 4, "magic" : "str", "xmlfile" : "path/file.xml", - "publishers" : 1 + "publishers" : 1, + "timeout" : 20, + "succeed_on_timeout" : false }, { "kind" : "pubsub", @@ -38,7 +41,8 @@ "magic" : "str", "xmlfile" : "path/file.xml", "publishers" : 1, - "publisher_loops" : 2 + "publisher_loops" : 2, + "interval" : 250 } ] } diff --git a/test/dds/communication/simple_data_sharing_stress.json b/test/dds/communication/simple_data_sharing_stress.json new file mode 100644 index 00000000000..8388208c044 --- /dev/null +++ b/test/dds/communication/simple_data_sharing_stress.json @@ -0,0 +1,18 @@ +{ + "description" : "Test to check that Data Sharing correctly behaves under stress", + "participants" : [ + { + "kind" : "subscriber", + "fixed_type" : true, + "samples" : "1000000", + "timeout" : "20000", + "succeed_on_timeout" : true + }, + { + "kind" : "publisher", + "fixed_type" : true, + "interval" : "0", + "sleep_before_exec" : "1" + } + ] +} diff --git a/test/dds/communication/test_build.py b/test/dds/communication/test_build.py index 979f677d3d9..509a7e1c01f 100644 --- a/test/dds/communication/test_build.py +++ b/test/dds/communication/test_build.py @@ -48,7 +48,7 @@ def define_args(tests_definition): if 'kind' not in test.keys(): print('ARGUMENT ERROR : ' - 'Test definition requites field for each participant') + 'Test definition requires field for each participant') continue # All processes has seed argument @@ -58,7 +58,9 @@ def define_args(tests_definition): 'wait', 'magic', 'publishers', - 'sleep_before_exec'] + 'sleep_before_exec', + 'interval', + 'timeout'] for argument in possible_arguments: if argument in test.keys(): @@ -69,7 +71,8 @@ def define_args(tests_definition): possible_flags = ['exit_on_lost_liveliness', 'zero_copy', 'fixed_type', - 'notexit'] + 'notexit', + 'succeed_on_timeout'] for flag in possible_flags: if flag in test.keys():