Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free used buffers in SHM on zombie ports <2.1.x> [11126] #1894

Merged
merged 2 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/cpp/rtps/transport/shared_mem/SharedMemChannelResource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SharedMemChannelResource : public ChannelResource
auto packets_file_consumer = std::unique_ptr<SHMPacketFileConsumer>(
new SHMPacketFileConsumer(dump_file));

packet_logger_ = std::make_shared<PacketsLog<SHMPacketFileConsumer> >();
packet_logger_ = std::make_shared<PacketsLog<SHMPacketFileConsumer>>();
packet_logger_->RegisterConsumer(std::move(packets_file_consumer));
}

Expand Down Expand Up @@ -107,7 +107,7 @@ class SharedMemChannelResource : public ChannelResource
{
listener_->close();
}
catch(const std::exception& e)
catch (const std::exception& e)
{
logWarning(RTPS_MSG_IN, e.what());
}
Expand All @@ -130,7 +130,7 @@ class SharedMemChannelResource : public ChannelResource
// Blocking receive.
std::shared_ptr<SharedMemManager::Buffer> message;

if (!(message = Receive(remote_locator)) )
if (!(message = Receive(remote_locator)))
{
continue;
}
Expand All @@ -155,6 +155,7 @@ class SharedMemChannelResource : public ChannelResource

// Forces message release before waiting for the next
message.reset();
listener_->stop_processing_buffer();
}

message_receiver(nullptr);
Expand All @@ -168,7 +169,6 @@ class SharedMemChannelResource : public ChannelResource
this->thread(std::thread(&SharedMemChannelResource::perform_listen_operation, this, locator));
}


/**
* Blocking Receive from the specified channel.
*/
Expand Down Expand Up @@ -208,7 +208,7 @@ class SharedMemChannelResource : public ChannelResource

SharedMemChannelResource(
const SharedMemChannelResource&) = delete;
SharedMemChannelResource& operator=(
SharedMemChannelResource& operator =(
const SharedMemChannelResource&) = delete;
};

Expand Down
89 changes: 83 additions & 6 deletions src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class SharedMemGlobal
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Listener Listener;
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell PortCell;

static const uint32_t CURRENT_ABI_VERSION = 4;
static const uint32_t CURRENT_ABI_VERSION = 5;

struct PortNode
{
Expand All @@ -99,13 +99,31 @@ class SharedMemGlobal
SharedMemSegment::condition_variable empty_cv;
SharedMemSegment::mutex empty_cv_mutex;

// Number of listeners this port supports
static constexpr size_t LISTENERS_STATUS_SIZE = 1024;

// Status of each listener
struct ListenerStatus
{
uint8_t is_waiting : 1;
uint8_t counter : 3;
uint8_t last_verified_counter : 3;
// True if this slot is taken by an active listener
uint8_t is_in_use : 1;

// True if the listener is waiting for a notification of new message
uint8_t is_waiting : 1;

// True if the listener has taken a new message and is still processing it
uint8_t is_processing : 1;

// Break bit packing, next field will start in a new byte
uint8_t : 0;

// These are used to detect listeners frozen while waiting for a notification
uint8_t counter : 4;
uint8_t last_verified_counter : 4;

// Descriptor of the message the listener is processing
// Valid only if is_processing is true.
BufferDescriptor descriptor;
};
ListenerStatus listeners_status[LISTENERS_STATUS_SIZE];

Expand Down Expand Up @@ -282,8 +300,6 @@ class SharedMemGlobal
// Most probably has not, so the check is done without locking empty_cv_mutex.
if (timeout_elapsed(now, *(*port_it)))
{
std::vector<const BufferDescriptor*> descriptors_enqueued;

try
{
std::unique_lock<SharedMemSegment::mutex> lock_port((*port_it)->node->empty_cv_mutex);
Expand Down Expand Up @@ -651,6 +667,7 @@ class SharedMemGlobal
{
*listener_index = i;
node_->listeners_status[i].is_in_use = true;
node_->listeners_status[i].is_processing = false;
node_->num_listeners++;
listener = buffer_->register_listener();
}
Expand All @@ -677,6 +694,7 @@ class SharedMemGlobal
(*listener).reset();
node_->num_listeners--;
node_->listeners_status[listener_index].is_in_use = false;
node_->listeners_status[listener_index].is_processing = false;
}
catch (const std::exception&)
{
Expand All @@ -688,6 +706,65 @@ class SharedMemGlobal
}
}

/**
* @brief Look for a listener processing a buffer and return the buffer descriptor being processed
*
* Iterates over all the listeners, and upon finding the first one that is marked as \c is_processing:
* - Marks it as not processing
* - Copies the buffer descriptor that was being processed in the provided buffer
* - Finishes iterating
*
* The intention of this method is to locate all buffer descriptors that are being processed
* and apply necessary operations on these buffers when the port is found to be zombie.
* This method should be called iteratively until the return value is false
* (there is no processing listener remaining).
* The calling method has the chance to apply necessary actions on the buffer descriptor.
*
* \pre Current port is zombie, as reported by is_zombie()
*
* @param[OUT] buffer_descriptor Descriptor of the buffer that was being processed by the first processing listener
* @return True if there was at least one processing listener. False if not.
*/
bool get_and_remove_blocked_processing(
BufferDescriptor& buffer_descriptor)
{
std::lock_guard<SharedMemSegment::mutex> lock(node_->empty_cv_mutex);
for (uint32_t i = 0; i < PortNode::LISTENERS_STATUS_SIZE; i++)
{
if (node_->listeners_status[i].is_in_use &&
node_->listeners_status[i].is_processing)
{
buffer_descriptor = node_->listeners_status[i].descriptor;
listener_processing_stop(i);
return true;
}
}
return false;
}

/**
* Marks the listener as processing a buffer
* @param listener_index The index of the listener as returned by create_listener.
* @param buffer_descriptor The descriptor of the buffer that the listener is processing
*/
void listener_processing_start(
uint32_t listener_index,
const BufferDescriptor& buffer_descriptor)
{
node_->listeners_status[listener_index].descriptor = buffer_descriptor;
node_->listeners_status[listener_index].is_processing = true;
}

/**
* Marks the listener as finished processing a buffer
* @param listener_index The index of the listener as returned by create_listener.
*/
void listener_processing_stop(
uint32_t listener_index)
{
node_->listeners_status[listener_index].is_processing = false;
}

/**
* Performs a check on the opened port.
* When a process crashes with a port opened the port can be left inoperative.
Expand Down
54 changes: 53 additions & 1 deletion src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,6 @@ class SharedMemManager :
uint32_t required_data_size)
{
auto it = allocated_buffers_.begin();

while (it != allocated_buffers_.end())
{
// There is enough space to allocate the buffer
Expand Down Expand Up @@ -597,6 +596,24 @@ class SharedMemManager :
}
}

// We may have enough memory but no free buffers
it = allocated_buffers_.begin();
while (free_buffers_.empty() && it != allocated_buffers_.end())
{
// Buffer is not beign processed by any listener
if ((*it)->invalidate_if_not_processing())
{
release_buffer(*it);

free_buffers_.push_back(*it);
it = allocated_buffers_.erase(it);
}
else
{
it++;
}
}

return free_bytes_ >= required_data_size;
}

Expand Down Expand Up @@ -705,6 +722,7 @@ class SharedMemManager :

if (buffer_ref)
{
global_port_->listener_processing_start(listener_index_, buffer_descriptor);
if (was_cell_freed)
{
// Atomically increase processing & decrease enqueued
Expand Down Expand Up @@ -745,6 +763,11 @@ class SharedMemManager :
return buffer_ref;
}

void stop_processing_buffer()
{
global_port_->listener_processing_stop(listener_index_);
}

void regenerate_port()
{
auto new_port = shared_mem_manager_->regenerate_port(global_port_, global_port_->open_mode());
Expand Down Expand Up @@ -854,6 +877,34 @@ class SharedMemManager :
return ret;
}

/**
* @brief Unlock buffers being processed by the port if the port is frozen.
*
* If the port is zombie, finds all the buffers that were being processed by a listener
* and decrements their processing count, so that they are not kept locked forever
*/
void recover_blocked_processing()
{
SharedMemGlobal::BufferDescriptor buffer_descriptor;
if (SharedMemGlobal::Port::is_zombie(global_port_->port_id(),
shared_mem_manager_->global_segment()->domain_name()))
{
while (global_port_->get_and_remove_blocked_processing(buffer_descriptor))
{
auto segment = shared_mem_manager_->find_segment(buffer_descriptor.source_segment_id);
if (!segment)
{
// If the segment is gone, nothing to do
continue;
}
auto buffer_node =
static_cast<BufferNode*>(segment->get_address_from_offset(buffer_descriptor.
buffer_node_offset));
buffer_node->dec_processing_count(buffer_descriptor.validity_id);
}
}
}

std::shared_ptr<Listener> create_listener()
{
return std::make_shared<Listener>(shared_mem_manager_, global_port_);
Expand All @@ -863,6 +914,7 @@ class SharedMemManager :

void regenerate_port()
{
recover_blocked_processing();
auto new_port = shared_mem_manager_->regenerate_port(global_port_, open_mode_);

*this = std::move(*new_port);
Expand Down
23 changes: 23 additions & 0 deletions test/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
${CMAKE_CURRENT_BINARY_DIR}/liveliness_assertion.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/automatic_liveliness_assertion.py
${CMAKE_CURRENT_BINARY_DIR}/automatic_liveliness_assertion.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/shm_communication_subscriber_dies_while_processing_message.py
${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_besteffort.xml
${CMAKE_CURRENT_BINARY_DIR}/simple_besteffort.xml COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/simple_reliable.xml
Expand Down Expand Up @@ -98,6 +100,8 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
${CMAKE_CURRENT_BINARY_DIR}/secure_submsg_crypto_besteffort_pub.xml COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/secure_submsg_crypto_besteffort_sub.xml
${CMAKE_CURRENT_BINARY_DIR}/secure_submsg_crypto_besteffort_sub.xml COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/shm_communication_subscriber_dies_while_processing_message.xml
${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.xml COPYONLY)
if(SECURITY)
configure_file(${PROJECT_SOURCE_DIR}/test/certs/maincacert.pem
${CMAKE_CURRENT_BINARY_DIR}/maincacert.pem COPYONLY)
Expand Down Expand Up @@ -347,5 +351,24 @@ if(NOT ((MSVC OR MSVC_IDE) AND EPROSIMA_INSTALLER) AND fastcdr_FOUND)
set_property(TEST TwoPublishersCommunicationReliable APPEND PROPERTY ENVIRONMENT
"PATH=$<TARGET_FILE_DIR:${PROJECT_NAME}>\\;$<TARGET_FILE_DIR:fastcdr>\\;${WIN_PATH}")
endif()

add_test(NAME SHMCommunicationSubscriberDiesWhileProcessingMessage
COMMAND ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_BINARY_DIR}/shm_communication_subscriber_dies_while_processing_message.py)

# Set test with label NoMemoryCheck
set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage PROPERTY LABELS "NoMemoryCheck")

set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_PUBLISHER_BIN=$<TARGET_FILE:SimpleCommunicationPublisher>")
set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT
"SIMPLE_COMMUNICATION_SUBSCRIBER_BIN=$<TARGET_FILE:SimpleCommunicationSubscriber>")
set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT
"XML_FILE=shm_communication_subscriber_dies_while_processing_message.xml")
if(WIN32)
string(REPLACE ";" "\\;" WIN_PATH "$ENV{PATH}")
set_property(TEST SHMCommunicationSubscriberDiesWhileProcessingMessage APPEND PROPERTY ENVIRONMENT
"PATH=$<TARGET_FILE_DIR:${PROJECT_NAME}>\\;$<TARGET_FILE_DIR:fastcdr>\\;${WIN_PATH}")
endif()

endif()
endif()
2 changes: 1 addition & 1 deletion test/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ int main(
}

Publisher publisher(false);
Subscriber subscriber(publishers, samples);
Subscriber subscriber(publishers, samples, false);

if (publisher.init(seed, magic))
{
Expand Down
Loading