diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp index 8ecb4546d60..9db11dae0bf 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp @@ -49,7 +49,7 @@ class SharedMemChannelResource : public ChannelResource auto packets_file_consumer = std::unique_ptr( new SHMPacketFileConsumer(dump_file)); - packet_logger_ = std::make_shared >(); + packet_logger_ = std::make_shared>(); packet_logger_->RegisterConsumer(std::move(packets_file_consumer)); } @@ -107,7 +107,7 @@ class SharedMemChannelResource : public ChannelResource { listener_->close(); } - catch(const std::exception& e) + catch (const std::exception& e) { logWarning(RTPS_MSG_IN, e.what()); } @@ -130,7 +130,7 @@ class SharedMemChannelResource : public ChannelResource // Blocking receive. std::shared_ptr message; - if (!(message = Receive(remote_locator)) ) + if (!(message = Receive(remote_locator))) { continue; } @@ -155,6 +155,7 @@ class SharedMemChannelResource : public ChannelResource // Forces message release before waiting for the next message.reset(); + listener_->stop_processing_buffer(); } message_receiver(nullptr); @@ -168,7 +169,6 @@ class SharedMemChannelResource : public ChannelResource this->thread(std::thread(&SharedMemChannelResource::perform_listen_operation, this, locator)); } - /** * Blocking Receive from the specified channel. */ @@ -208,7 +208,7 @@ class SharedMemChannelResource : public ChannelResource SharedMemChannelResource( const SharedMemChannelResource&) = delete; - SharedMemChannelResource& operator=( + SharedMemChannelResource& operator =( const SharedMemChannelResource&) = delete; }; diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp index 8c81304c133..26af25c6f15 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp @@ -72,7 +72,7 @@ class SharedMemGlobal typedef MultiProducerConsumerRingBuffer::Listener Listener; typedef MultiProducerConsumerRingBuffer::Cell PortCell; - static const uint32_t CURRENT_ABI_VERSION = 4; + static const uint32_t CURRENT_ABI_VERSION = 5; struct PortNode { @@ -99,13 +99,31 @@ class SharedMemGlobal SharedMemSegment::condition_variable empty_cv; SharedMemSegment::mutex empty_cv_mutex; + // Number of listeners this port supports static constexpr size_t LISTENERS_STATUS_SIZE = 1024; + + // Status of each listener struct ListenerStatus { - uint8_t is_waiting : 1; - uint8_t counter : 3; - uint8_t last_verified_counter : 3; + // True if this slot is taken by an active listener uint8_t is_in_use : 1; + + // True if the listener is waiting for a notification of new message + uint8_t is_waiting : 1; + + // True if the listener has taken a new message and is still processing it + uint8_t is_processing : 1; + + // Break bit packing, next field will start in a new byte + uint8_t : 0; + + // These are used to detect listeners frozen while waiting for a notification + uint8_t counter : 4; + uint8_t last_verified_counter : 4; + + // Descriptor of the message the listener is processing + // Valid only if is_processing is true. + BufferDescriptor descriptor; }; ListenerStatus listeners_status[LISTENERS_STATUS_SIZE]; @@ -282,8 +300,6 @@ class SharedMemGlobal // Most probably has not, so the check is done without locking empty_cv_mutex. if (timeout_elapsed(now, *(*port_it))) { - std::vector descriptors_enqueued; - try { std::unique_lock lock_port((*port_it)->node->empty_cv_mutex); @@ -651,6 +667,7 @@ class SharedMemGlobal { *listener_index = i; node_->listeners_status[i].is_in_use = true; + node_->listeners_status[i].is_processing = false; node_->num_listeners++; listener = buffer_->register_listener(); } @@ -677,6 +694,7 @@ class SharedMemGlobal (*listener).reset(); node_->num_listeners--; node_->listeners_status[listener_index].is_in_use = false; + node_->listeners_status[listener_index].is_processing = false; } catch (const std::exception&) { @@ -688,6 +706,65 @@ class SharedMemGlobal } } + /** + * @brief Look for a listener processing a buffer and return the buffer descriptor being processed + * + * Iterates over all the listeners, and upon finding the first one that is marked as \c is_processing: + * - Marks it as not processing + * - Copies the buffer descriptor that was being processed in the provided buffer + * - Finishes iterating + * + * The intention of this method is to locate all buffer descriptors that are being processed + * and apply necessary operations on these buffers when the port is found to be zombie. + * This method should be called iteratively until the return value is false + * (there is no processing listener remaining). + * The calling method has the chance to apply necessary actions on the buffer descriptor. + * + * \pre Current port is zombie, as reported by is_zombie() + * + * @param[OUT] buffer_descriptor Descriptor of the buffer that was being processed by the first processing listener + * @return True if there was at least one processing listener. False if not. + */ + bool get_and_remove_blocked_processing( + BufferDescriptor& buffer_descriptor) + { + std::lock_guard lock(node_->empty_cv_mutex); + for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++) + { + if (node_->listeners_status[i].is_in_use && + node_->listeners_status[i].is_processing) + { + buffer_descriptor = node_->listeners_status[i].descriptor; + listener_processing_stop(i); + return true; + } + } + return false; + } + + /** + * Marks the listener as processing a buffer + * @param listener_index The index of the listener as returned by create_listener. + * @param buffer_descriptor The descriptor of the buffer that the listener is processing + */ + void listener_processing_start( + uint32_t listener_index, + const BufferDescriptor& buffer_descriptor) + { + node_->listeners_status[listener_index].descriptor = buffer_descriptor; + node_->listeners_status[listener_index].is_processing = true; + } + + /** + * Marks the listener as finished processing a buffer + * @param listener_index The index of the listener as returned by create_listener. + */ + void listener_processing_stop( + uint32_t listener_index) + { + node_->listeners_status[listener_index].is_processing = false; + } + /** * Performs a check on the opened port. * When a process crashes with a port opened the port can be left inoperative. diff --git a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp index d2ab2dcbae2..58fa1d96129 100644 --- a/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp +++ b/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp @@ -560,7 +560,6 @@ class SharedMemManager : uint32_t required_data_size) { auto it = allocated_buffers_.begin(); - while (it != allocated_buffers_.end()) { // There is enough space to allocate the buffer @@ -597,6 +596,24 @@ class SharedMemManager : } } + // We may have enough memory but no free buffers + it = allocated_buffers_.begin(); + while (free_buffers_.empty() && it != allocated_buffers_.end()) + { + // Buffer is not beign processed by any listener + if ((*it)->invalidate_if_not_processing()) + { + release_buffer(*it); + + free_buffers_.push_back(*it); + it = allocated_buffers_.erase(it); + } + else + { + it++; + } + } + return free_bytes_ >= required_data_size; } @@ -705,6 +722,7 @@ class SharedMemManager : if (buffer_ref) { + global_port_->listener_processing_start(listener_index_, buffer_descriptor); if (was_cell_freed) { // Atomically increase processing & decrease enqueued @@ -745,6 +763,11 @@ class SharedMemManager : return buffer_ref; } + void stop_processing_buffer() + { + global_port_->listener_processing_stop(listener_index_); + } + void regenerate_port() { auto new_port = shared_mem_manager_->regenerate_port(global_port_, global_port_->open_mode()); @@ -854,6 +877,34 @@ class SharedMemManager : return ret; } + /** + * @brief Unlock buffers being processed by the port if the port is frozen. + * + * If the port is zombie, finds all the buffers that were being processed by a listener + * and decrements their processing count, so that they are not kept locked forever + */ + void recover_blocked_processing() + { + SharedMemGlobal::BufferDescriptor buffer_descriptor; + if (SharedMemGlobal::Port::is_zombie(global_port_->port_id(), + shared_mem_manager_->global_segment()->domain_name())) + { + while (global_port_->get_and_remove_blocked_processing(buffer_descriptor)) + { + auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id); + if (!segment) + { + // If the segment is gone, nothing to do + continue; + } + auto buffer_node = + static_cast(segment->get_address_from_offset(buffer_descriptor. + buffer_node_offset)); + buffer_node->dec_processing_count(buffer_descriptor.validity_id); + } + } + } + std::shared_ptr create_listener() { return std::make_shared(shared_mem_manager_, global_port_); @@ -863,6 +914,7 @@ class SharedMemManager : void regenerate_port() { + recover_blocked_processing(); auto new_port = shared_mem_manager_->regenerate_port(global_port_, open_mode_); *this = std::move(*new_port); diff --git a/test/communication/CMakeLists.txt b/test/communication/CMakeLists.txt index 68fe63b16d8..58e5453df6f 100644 --- a/test/communication/CMakeLists.txt +++ b/test/communication/CMakeLists.txt @@ -68,6 +68,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ${CMAKE_CURRENT_BINARY_DIR}/liveliness_assertion.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/automatic_liveliness_assertion.py ${CMAKE_CURRENT_BINARY_DIR}/automatic_liveliness_assertion.py COPYONLY) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/shm_communication_subscriber_dies_while_processing_message.py + ${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_besteffort.xml ${CMAKE_CURRENT_BINARY_DIR}/simple_besteffort.xml COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_reliable.xml @@ -98,6 +100,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) ${CMAKE_CURRENT_BINARY_DIR}/secure_submsg_crypto_besteffort_pub.xml COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/secure_submsg_crypto_besteffort_sub.xml ${CMAKE_CURRENT_BINARY_DIR}/secure_submsg_crypto_besteffort_sub.xml COPYONLY) + configure_file(${CMAKE_CURRENT_SOURCE_DIR}/shm_communication_subscriber_dies_while_processing_message.xml + ${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.xml COPYONLY) if(SECURITY) configure_file(${PROJECT_SOURCE_DIR}/test/certs/maincacert.pem ${CMAKE_CURRENT_BINARY_DIR}/maincacert.pem COPYONLY) @@ -347,5 +351,24 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND) set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT "PATH=$\\;$\\;${WIN_PATH}") endif() + + add_test(NAME SHMCommunicationSubscriberDiesWhileProcessingMessage + COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.py) + + # Set test with label NoMemoryCheck + set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage PROPERTY LABELS "NoMemoryCheck") + + set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_PUBLISHER_BIN=$") + set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT + "SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$") + set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT + "XML_FILE=shm_communication_subscriber_dies_while_processing_message.xml") + if(WIN32) + string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}") + set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT + "PATH=$\\;$\\;${WIN_PATH}") + endif() + endif() endif() diff --git a/test/communication/PubSubMain.cpp b/test/communication/PubSubMain.cpp index 2f1b4d44fb4..8a3bba8ecf6 100644 --- a/test/communication/PubSubMain.cpp +++ b/test/communication/PubSubMain.cpp @@ -91,7 +91,7 @@ int main( } Publisher publisher(false); - Subscriber subscriber(publishers, samples); + Subscriber subscriber(publishers, samples, false); if (publisher.init(seed, magic)) { diff --git a/test/communication/Subscriber.cpp b/test/communication/Subscriber.cpp index b497bb40a73..96053e8ab41 100644 --- a/test/communication/Subscriber.cpp +++ b/test/communication/Subscriber.cpp @@ -81,6 +81,11 @@ bool Subscriber::init( void Subscriber::onNewDataMessage( eprosima::fastrtps::Subscriber* subscriber) { + if (die_on_data_received_) + { + std::abort(); + } + HelloWorld sample; eprosima::fastrtps::SampleInfo_t info; @@ -121,27 +126,27 @@ bool Subscriber::run_for( { std::unique_lock lock(mutex_); returned_value = cv_.wait_for(lock, timeout, [&] - { - if (publishers_ < number_samples_.size()) - { - // Will fail later. - return true; - } - else if (publishers_ > number_samples_.size()) - { - return false; - } - - for (auto& number_samples : number_samples_) - { - if (max_number_samples_ > number_samples.second) - { - return false; - } - } - - return true; - }); + { + if (publishers_ < number_samples_.size()) + { + // Will fail later. + return true; + } + else if (publishers_ > number_samples_.size()) + { + return false; + } + + for (auto& number_samples : number_samples_) + { + if (max_number_samples_ > number_samples.second) + { + return false; + } + } + + return true; + }); } else { @@ -201,4 +206,4 @@ void Subscriber::onParticipantAuthentication( } } -#endif +#endif // if HAVE_SECURITY diff --git a/test/communication/Subscriber.hpp b/test/communication/Subscriber.hpp index 084dc37f1ed..e17cee4ab38 100644 --- a/test/communication/Subscriber.hpp +++ b/test/communication/Subscriber.hpp @@ -34,8 +34,8 @@ namespace eprosima { namespace fastrtps { class Participant; class Subscriber; -} -} +} // namespace fastrtps +} // namespace eprosima class Subscriber : public eprosima::fastrtps::SubscriberListener @@ -45,9 +45,11 @@ class Subscriber Subscriber( const uint32_t publishers, - const uint32_t max_number_samples) + const uint32_t max_number_samples, + bool die_on_data_received) : publishers_(publishers) , max_number_samples_(max_number_samples) + , die_on_data_received_(die_on_data_received) { } @@ -61,7 +63,7 @@ class Subscriber void onParticipantAuthentication( eprosima::fastrtps::Participant* /*participant*/, eprosima::fastrtps::rtps::ParticipantAuthenticationInfo&& info) override; -#endif +#endif // if HAVE_SECURITY void onSubscriptionMatched( eprosima::fastrtps::Subscriber* /*subscriber*/, @@ -118,5 +120,6 @@ class Subscriber eprosima::fastrtps::Participant* participant_ = nullptr; HelloWorldType type_; eprosima::fastrtps::Subscriber* subscriber_ = nullptr; + bool die_on_data_received_ = false; }; #endif // TEST_COMMUNICATION_SUBSCRIBER_HPP diff --git a/test/communication/SubscriberMain.cpp b/test/communication/SubscriberMain.cpp index 7d430c06a9a..093f5c43a49 100644 --- a/test/communication/SubscriberMain.cpp +++ b/test/communication/SubscriberMain.cpp @@ -8,6 +8,7 @@ int main( { int arg_count = 1; bool notexit = false; + bool die_on_data_received = false; uint32_t seed = 7800; uint32_t samples = 4; uint32_t publishers = 1; @@ -70,6 +71,15 @@ int main( publishers = strtol(argv[arg_count], nullptr, 10); } + else if (strcmp(argv[arg_count], "--die_on_data_received") == 0) + { + die_on_data_received = true; + } + else + { + std::cout << "Wrong argument " << argv[arg_count] << std::endl; + return -1; + } ++arg_count; } @@ -79,7 +89,7 @@ int main( eprosima::fastrtps::Domain::loadXMLProfilesFile(xml_file); } - Subscriber subscriber(publishers, samples); + Subscriber subscriber(publishers, samples, die_on_data_received); if (subscriber.init(seed, magic)) { diff --git a/test/communication/shm_communication_subscriber_dies_while_processing_message.py b/test/communication/shm_communication_subscriber_dies_while_processing_message.py new file mode 100644 index 00000000000..0f5a750bd00 --- /dev/null +++ b/test/communication/shm_communication_subscriber_dies_while_processing_message.py @@ -0,0 +1,84 @@ +# Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys, os, subprocess, glob, time + +script_dir = os.path.dirname(os.path.realpath(__file__)) + +publisher_command = os.environ.get("SIMPLE_COMMUNICATION_PUBLISHER_BIN") +if not publisher_command: + publisher_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationPublisher*"), recursive=True) + pf = iter(publisher_files) + publisher_command = next(pf, None) + while publisher_command and (not os.path.isfile(publisher_command) or not os.access(publisher_command, + os.X_OK)): + publisher_command = next(pf, None) +assert publisher_command +subscriber_command = os.environ.get("SIMPLE_COMMUNICATION_SUBSCRIBER_BIN") +if not subscriber_command: + subscriber_files = glob.glob(os.path.join(script_dir, "**/SimpleCommunicationSubscriber*"), recursive=True) + pf = iter(subscriber_files) + subscriber_command = next(pf, None) + while subscriber_command and (not os.path.isfile(subscriber_command) or not os.access(subscriber_command, + os.X_OK)): + subscriber_command = next(pf, None) +assert subscriber_command + +extra_pub_arg = os.environ.get("EXTRA_PUB_ARG") +if extra_pub_arg: + extra_pub_args = extra_pub_arg.split() +else: + extra_pub_args = [] +extra_pub_arg = os.environ.get("EXTRA_PUB_ARG") + +extra_sub_arg = os.environ.get("EXTRA_SUB_ARG") +if extra_sub_arg: + extra_sub_args = extra_sub_arg.split() +else: + extra_sub_args = [] +extra_sub_arg = os.environ.get("EXTRA_SUB_ARG") + +real_xml_file_pub = None +real_xml_file_sub = None +xml_file = os.environ.get("XML_FILE") +if xml_file: + real_xml_file_pub = os.path.join(script_dir, xml_file) + real_xml_file_sub = os.path.join(script_dir, xml_file) +else: + xml_file_pub = os.environ.get("XML_FILE_PUB") + if xml_file_pub: + real_xml_file_pub = os.path.join(script_dir, xml_file_pub) + xml_file_sub = os.environ.get("XML_FILE_SUB") + if xml_file_sub: + real_xml_file_sub = os.path.join(script_dir, xml_file_sub) + +subscriber1_proc = subprocess.Popen([subscriber_command, "--samples", "20", "--seed", str(os.getpid())] + + (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else []) + + extra_sub_args) +publisher_proc = subprocess.Popen([publisher_command, "--seed", str(os.getpid())] + + (["--xmlfile", real_xml_file_pub] if real_xml_file_pub else []) + + extra_pub_args) +for i in range(2): + subscriber2_proc = subprocess.Popen([subscriber_command, "--die_on_data_received", "--seed", str(os.getpid())] + + (["--xmlfile", real_xml_file_sub] if real_xml_file_sub else []) + + extra_sub_args) + time.sleep(2) + subscriber2_proc.kill() + +subscriber1_proc.communicate() +retvalue = subscriber1_proc.returncode + +publisher_proc.kill() + +sys.exit(retvalue) diff --git a/test/communication/shm_communication_subscriber_dies_while_processing_message.xml b/test/communication/shm_communication_subscriber_dies_while_processing_message.xml new file mode 100644 index 00000000000..ee4875731fb --- /dev/null +++ b/test/communication/shm_communication_subscriber_dies_while_processing_message.xml @@ -0,0 +1,54 @@ + + + + + OFF + + + + + shm_transport + SHM + 10 + 100 + + + + + + false + + shm_transport + + + + + + + + KEEP_LAST + 100 + + + + + RELIABLE + + + TRANSIENT_LOCAL + + + + + + + + RELIABLE + + + TRANSIENT_LOCAL + + + + +