diff --git a/iceoryx_binding_c/include/iceoryx_binding_c/publisher.h b/iceoryx_binding_c/include/iceoryx_binding_c/publisher.h index 12722ba4ce..c0e6bffecb 100644 --- a/iceoryx_binding_c/include/iceoryx_binding_c/publisher.h +++ b/iceoryx_binding_c/include/iceoryx_binding_c/publisher.h @@ -40,7 +40,7 @@ typedef struct bool offerOnCreate; /// @brief describes whether a publisher blocks when subscriber queue is full - ENUM iox_SubscriberTooSlowPolicy deliveryQueueFullPolicy; + ENUM iox_SubscriberTooSlowPolicy subscriberTooSlowPolicy; /// @brief this value will be set exclusively by `iox_pub_options_init` and is not supposed to be modified otherwise uint64_t initCheck; diff --git a/iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h b/iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h index a74ae20a99..892099c734 100644 --- a/iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h +++ b/iceoryx_binding_c/include/iceoryx_binding_c/subscriber.h @@ -43,7 +43,7 @@ typedef struct bool subscribeOnCreate; /// @brief describes whether a publisher blocks when subscriber queue is full - ENUM iox_QueueFullPolicy receiverQueueFullPolicy; + ENUM iox_QueueFullPolicy queueFullPolicy; /// @brief this value will be set exclusively by iox_sub_options_init and is not supposed to be modified otherwise uint64_t initCheck; diff --git a/iceoryx_binding_c/source/c_publisher.cpp b/iceoryx_binding_c/source/c_publisher.cpp index 6df4d5937e..62d673f2af 100644 --- a/iceoryx_binding_c/source/c_publisher.cpp +++ b/iceoryx_binding_c/source/c_publisher.cpp @@ -47,7 +47,7 @@ void iox_pub_options_init(iox_pub_options_t* options) options->historyCapacity = publisherOptions.historyCapacity; options->nodeName = nullptr; options->offerOnCreate = publisherOptions.offerOnCreate; - options->deliveryQueueFullPolicy = cpp2c::subscriberTooSlowPolicy(publisherOptions.deliveryQueueFullPolicy); + options->subscriberTooSlowPolicy = cpp2c::subscriberTooSlowPolicy(publisherOptions.subscriberTooSlowPolicy); options->initCheck = PUBLISHER_OPTIONS_INIT_CHECK_CONSTANT; } @@ -90,7 +90,7 @@ iox_pub_t iox_pub_init(iox_pub_storage_t* self, publisherOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName); } publisherOptions.offerOnCreate = options->offerOnCreate; - publisherOptions.deliveryQueueFullPolicy = c2cpp::subscriberTooSlowPolicy(options->deliveryQueueFullPolicy); + publisherOptions.subscriberTooSlowPolicy = c2cpp::subscriberTooSlowPolicy(options->subscriberTooSlowPolicy); } me->m_portData = PoshRuntime::getInstance().getMiddlewarePublisher( diff --git a/iceoryx_binding_c/source/c_subscriber.cpp b/iceoryx_binding_c/source/c_subscriber.cpp index 9f9536d4ee..f913afdc73 100644 --- a/iceoryx_binding_c/source/c_subscriber.cpp +++ b/iceoryx_binding_c/source/c_subscriber.cpp @@ -53,7 +53,7 @@ void iox_sub_options_init(iox_sub_options_t* options) options->historyRequest = subscriberOptions.historyRequest; options->nodeName = nullptr; options->subscribeOnCreate = subscriberOptions.subscribeOnCreate; - options->receiverQueueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.receiverQueueFullPolicy); + options->queueFullPolicy = cpp2c::queueFullPolicy(subscriberOptions.queueFullPolicy); options->initCheck = SUBSCRIBER_OPTIONS_INIT_CHECK_CONSTANT; } @@ -97,7 +97,7 @@ iox_sub_t iox_sub_init(iox_sub_storage_t* self, subscriberOptions.nodeName = NodeName_t(TruncateToCapacity, options->nodeName); } subscriberOptions.subscribeOnCreate = options->subscribeOnCreate; - subscriberOptions.receiverQueueFullPolicy = c2cpp::queueFullPolicy(options->receiverQueueFullPolicy); + subscriberOptions.queueFullPolicy = c2cpp::queueFullPolicy(options->queueFullPolicy); } me->m_portData = diff --git a/iceoryx_binding_c/test/moduletests/test_publisher.cpp b/iceoryx_binding_c/test/moduletests/test_publisher.cpp index a2798c5d44..ee82c64186 100644 --- a/iceoryx_binding_c/test/moduletests/test_publisher.cpp +++ b/iceoryx_binding_c/test/moduletests/test_publisher.cpp @@ -353,7 +353,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly) sut.historyCapacity = 37; sut.nodeName = "Dr.Gonzo"; sut.offerOnCreate = false; - sut.deliveryQueueFullPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER; + sut.subscriberTooSlowPolicy = SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER; PublisherOptions options; // set offerOnCreate to the opposite of the expected default to check if it gets overwritten to default @@ -363,7 +363,7 @@ TEST(iox_pub_options_test, publisherOptionsAreInitializedCorrectly) EXPECT_EQ(sut.historyCapacity, options.historyCapacity); EXPECT_EQ(sut.nodeName, nullptr); EXPECT_EQ(sut.offerOnCreate, options.offerOnCreate); - EXPECT_EQ(sut.deliveryQueueFullPolicy, cpp2c::subscriberTooSlowPolicy(options.deliveryQueueFullPolicy)); + EXPECT_EQ(sut.subscriberTooSlowPolicy, cpp2c::subscriberTooSlowPolicy(options.subscriberTooSlowPolicy)); EXPECT_TRUE(iox_pub_options_is_initialized(&sut)); } diff --git a/iceoryx_binding_c/test/moduletests/test_subscriber.cpp b/iceoryx_binding_c/test/moduletests/test_subscriber.cpp index a454371413..58c70afe9c 100644 --- a/iceoryx_binding_c/test/moduletests/test_subscriber.cpp +++ b/iceoryx_binding_c/test/moduletests/test_subscriber.cpp @@ -408,7 +408,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly) sut.historyRequest = 73; sut.nodeName = "Dr.Gonzo"; sut.subscribeOnCreate = false; - sut.receiverQueueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER; + sut.queueFullPolicy = QueueFullPolicy_BLOCK_PUBLISHER; SubscriberOptions options; // set subscribeOnCreate to the opposite of the expected default to check if it gets overwritten to default @@ -419,7 +419,7 @@ TEST(iox_sub_options_test, subscriberOptionsAreInitializedCorrectly) EXPECT_EQ(sut.historyRequest, options.historyRequest); EXPECT_EQ(sut.nodeName, nullptr); EXPECT_EQ(sut.subscribeOnCreate, options.subscribeOnCreate); - EXPECT_EQ(sut.receiverQueueFullPolicy, cpp2c::queueFullPolicy(options.receiverQueueFullPolicy)); + EXPECT_EQ(sut.queueFullPolicy, cpp2c::queueFullPolicy(options.queueFullPolicy)); EXPECT_TRUE(iox_sub_options_is_initialized(&sut)); } diff --git a/iceoryx_examples/icedelivery/iox_publisher_with_options.cpp b/iceoryx_examples/icedelivery/iox_publisher_with_options.cpp index e86db36237..acdbc52b7f 100644 --- a/iceoryx_examples/icedelivery/iox_publisher_with_options.cpp +++ b/iceoryx_examples/icedelivery/iox_publisher_with_options.cpp @@ -51,6 +51,9 @@ int main() // grouping of publishers and subscribers within a process publisherOptions.nodeName = "Pub_Node_With_Options"; + // we allow the subscribers to block the publisher if they want to ensure that no samples are lost + publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER; + iox::popo::Publisher publisher({"Radar", "FrontLeft", "Object"}, publisherOptions); // we have to explicitely offer the publisher for making it visible to subscribers diff --git a/iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp b/iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp index 533223c536..245883a1bc 100644 --- a/iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp +++ b/iceoryx_examples/icedelivery/iox_subscriber_with_options.cpp @@ -58,6 +58,9 @@ int main() // grouping of publishers and subscribers within a process subscriberOptions.nodeName = "Sub_Node_With_Options"; + // we request the publisher to wait for space in the queue if it is full. The publisher will be blocked then + subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER; + iox::popo::Subscriber subscriber({"Radar", "FrontLeft", "Object"}, subscriberOptions); // We have to explicitly call subscribe() otherwise the subscriber will not try to connect to publishers @@ -66,19 +69,8 @@ int main() // run until interrupted by Ctrl-C while (!killswitch) { - if (subscriber.getSubscriptionState() == iox::SubscribeState::SUBSCRIBED) - { - bool hasMoreSamples = true; - // Since we are checking only every second but the publisher is sending every - // 400ms a new sample we will receive here more then one sample. - do - { - subscriber.take() - .and_then([](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; }) - .or_else([&](auto&) { hasMoreSamples = false; }); - } while (hasMoreSamples); - } - std::cout << std::endl; + subscriber.take().and_then( + [](auto& object) { std::cout << APP_NAME << " got value: " << object->x << std::endl; }); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } diff --git a/iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp b/iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp index a704b6fc3c..2a0ff630e6 100644 --- a/iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp +++ b/iceoryx_posh/include/iceoryx_posh/popo/publisher_options.hpp @@ -38,7 +38,7 @@ struct PublisherOptions bool offerOnCreate{true}; /// @brief The option whether the publisher should block when the subscriber queue is full - SubscriberTooSlowPolicy deliveryQueueFullPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA}; + SubscriberTooSlowPolicy subscriberTooSlowPolicy{SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA}; }; } // namespace popo diff --git a/iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp b/iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp index c2181d56f1..c0ec890780 100644 --- a/iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp +++ b/iceoryx_posh/include/iceoryx_posh/popo/subscriber_options.hpp @@ -43,7 +43,7 @@ struct SubscriberOptions bool subscribeOnCreate{true}; /// @brief The option whether the publisher should block when the subscriber queue is full - QueueFullPolicy receiverQueueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA}; + QueueFullPolicy queueFullPolicy{QueueFullPolicy::DISCARD_OLDEST_DATA}; }; } // namespace popo diff --git a/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.inl b/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.inl index 5a7daca743..05d9ec1176 100644 --- a/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.inl +++ b/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.inl @@ -30,7 +30,7 @@ inline iox::popo::SubscriberPortData* PortPool::constructSubscriber(const capro: return m_portPoolData->m_subscriberPortMembers.insert( serviceDescription, runtimeName, - (subscriberOptions.receiverQueueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA) + (subscriberOptions.queueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA) ? cxx::VariantQueueTypes::SoFi_MultiProducerSingleConsumer : cxx::VariantQueueTypes::FiFo_MultiProducerSingleConsumer, subscriberOptions, @@ -46,7 +46,7 @@ inline iox::popo::SubscriberPortData* PortPool::constructSubscriber(const capro: return m_portPoolData->m_subscriberPortMembers.insert( serviceDescription, runtimeName, - (subscriberOptions.receiverQueueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA) + (subscriberOptions.queueFullPolicy == popo::QueueFullPolicy::DISCARD_OLDEST_DATA) ? cxx::VariantQueueTypes::SoFi_SingleProducerSingleConsumer : cxx::VariantQueueTypes::FiFo_SingleProducerSingleConsumer, subscriberOptions, diff --git a/iceoryx_posh/source/popo/ports/publisher_port_data.cpp b/iceoryx_posh/source/popo/ports/publisher_port_data.cpp index 483cbe05ea..da66e98599 100644 --- a/iceoryx_posh/source/popo/ports/publisher_port_data.cpp +++ b/iceoryx_posh/source/popo/ports/publisher_port_data.cpp @@ -28,7 +28,7 @@ PublisherPortData::PublisherPortData(const capro::ServiceDescription& serviceDes const mepoo::MemoryInfo& memoryInfo) noexcept : BasePortData(serviceDescription, runtimeName, publisherOptions.nodeName) , m_chunkSenderData( - memoryManager, publisherOptions.deliveryQueueFullPolicy, publisherOptions.historyCapacity, memoryInfo) + memoryManager, publisherOptions.subscriberTooSlowPolicy, publisherOptions.historyCapacity, memoryInfo) , m_offeringRequested(publisherOptions.offerOnCreate) { } diff --git a/iceoryx_posh/source/popo/ports/subscriber_port_data.cpp b/iceoryx_posh/source/popo/ports/subscriber_port_data.cpp index cdddaefd3c..9e53e79440 100644 --- a/iceoryx_posh/source/popo/ports/subscriber_port_data.cpp +++ b/iceoryx_posh/source/popo/ports/subscriber_port_data.cpp @@ -28,7 +28,7 @@ SubscriberPortData::SubscriberPortData(const capro::ServiceDescription& serviceD const SubscriberOptions& subscriberOptions, const mepoo::MemoryInfo& memoryInfo) noexcept : BasePortData(serviceDescription, runtimeName, subscriberOptions.nodeName) - , m_chunkReceiverData(queueType, subscriberOptions.receiverQueueFullPolicy, memoryInfo) + , m_chunkReceiverData(queueType, subscriberOptions.queueFullPolicy, memoryInfo) , m_historyRequest(subscriberOptions.historyRequest) , m_subscribeRequested(subscriberOptions.subscribeOnCreate) { diff --git a/iceoryx_posh/source/roudi/roudi.cpp b/iceoryx_posh/source/roudi/roudi.cpp index eac0243bee..ad5151e362 100644 --- a/iceoryx_posh/source/roudi/roudi.cpp +++ b/iceoryx_posh/source/roudi/roudi.cpp @@ -225,7 +225,7 @@ void RouDi::processMessage(const runtime::IpcMessage& message, options.historyCapacity = std::stoull(message.getElementAtIndex(3)); options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(4)); options.offerOnCreate = (0U == std::stoull(message.getElementAtIndex(5))) ? false : true; - options.deliveryQueueFullPolicy = + options.subscriberTooSlowPolicy = static_cast(std::stoul(message.getElementAtIndex(6))); m_prcMgr->addPublisherForProcess( @@ -251,7 +251,7 @@ void RouDi::processMessage(const runtime::IpcMessage& message, options.queueCapacity = std::stoull(message.getElementAtIndex(4)); options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5)); options.subscribeOnCreate = (0U == std::stoull(message.getElementAtIndex(6))) ? false : true; - options.receiverQueueFullPolicy = + options.queueFullPolicy = static_cast(std::stoul(message.getElementAtIndex(7))); m_prcMgr->addSubscriberForProcess( diff --git a/iceoryx_posh/source/runtime/posh_runtime.cpp b/iceoryx_posh/source/runtime/posh_runtime.cpp index 5b52d62335..97fb7eaf30 100644 --- a/iceoryx_posh/source/runtime/posh_runtime.cpp +++ b/iceoryx_posh/source/runtime/posh_runtime.cpp @@ -189,7 +189,7 @@ PublisherPortUserType::MemberType_t* PoshRuntime::getMiddlewarePublisher(const c sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_PUBLISHER) << m_appName << static_cast(service).toString() << std::to_string(options.historyCapacity) << options.nodeName << std::to_string(options.offerOnCreate) - << std::to_string(static_cast(options.deliveryQueueFullPolicy)) + << std::to_string(static_cast(options.subscriberTooSlowPolicy)) << static_cast(portConfigInfo).toString(); auto maybePublisher = requestPublisherFromRoudi(sendBuffer); @@ -295,7 +295,7 @@ PoshRuntime::getMiddlewareSubscriber(const capro::ServiceDescription& service, sendBuffer << IpcMessageTypeToString(IpcMessageType::CREATE_SUBSCRIBER) << m_appName << static_cast(service).toString() << std::to_string(options.historyRequest) << std::to_string(options.queueCapacity) << options.nodeName << std::to_string(options.subscribeOnCreate) - << std::to_string(static_cast(options.receiverQueueFullPolicy)) + << std::to_string(static_cast(options.queueFullPolicy)) << static_cast(portConfigInfo).toString(); auto maybeSubscriber = requestSubscriberFromRoudi(sendBuffer); diff --git a/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp b/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp index 05968b8fa8..3b6ef45c95 100644 --- a/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp +++ b/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp @@ -61,7 +61,7 @@ class PublisherSubscriberCommunication_test : public RouDi_GTest createPublisher(const SubscriberTooSlowPolicy policy = SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA) { iox::popo::PublisherOptions options; - options.deliveryQueueFullPolicy = policy; + options.subscriberTooSlowPolicy = policy; return std::make_unique>(m_serviceDescription, options); } @@ -71,7 +71,7 @@ class PublisherSubscriberCommunication_test : public RouDi_GTest const uint64_t queueCapacity = SubscriberPortData::ChunkQueueData_t::MAX_CAPACITY) { iox::popo::SubscriberOptions options; - options.receiverQueueFullPolicy = policy; + options.queueFullPolicy = policy; options.queueCapacity = queueCapacity; return std::make_unique>(m_serviceDescription, options); } diff --git a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp index 923fb6f3e2..2c4f1f5b74 100644 --- a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp +++ b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp @@ -392,7 +392,7 @@ TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithoutExplicitlySetQueueFullPoli TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToDiscardOldestDataLeadsToDiscardOldestData) { iox::popo::PublisherOptions publisherOptions; - publisherOptions.deliveryQueueFullPolicy = iox::popo::SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA; + publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::DISCARD_OLDEST_DATA; const auto publisherPortData = m_runtime->getMiddlewarePublisher(iox::capro::ServiceDescription(90U, 130U, 1550U), publisherOptions, @@ -405,7 +405,7 @@ TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToDiscardOl TEST_F(PoshRuntime_test, GetMiddlewarePublisherWithQueueFullPolicySetToWaitForSubscriberLeadsToWaitForSubscriber) { iox::popo::PublisherOptions publisherOptions; - publisherOptions.deliveryQueueFullPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER; + publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER; const auto publisherPortData = m_runtime->getMiddlewarePublisher( iox::capro::ServiceDescription(18U, 31U, 400U), publisherOptions, iox::runtime::PortConfigInfo(11U, 22U, 33U)); @@ -521,7 +521,7 @@ TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithoutExplicitlySetQueueFullPol TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToDiscardOldestDataLeadsToDiscardOldestData) { iox::popo::SubscriberOptions subscriberOptions; - subscriberOptions.receiverQueueFullPolicy = iox::popo::QueueFullPolicy::DISCARD_OLDEST_DATA; + subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::DISCARD_OLDEST_DATA; const auto subscriberPortData = m_runtime->getMiddlewareSubscriber(iox::capro::ServiceDescription(90U, 130U, 1550U), subscriberOptions, @@ -534,7 +534,7 @@ TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToDiscardO TEST_F(PoshRuntime_test, GetMiddlewareSubscriberWithQueueFullPolicySetToBlockPublisherLeadsToBlockPublisher) { iox::popo::SubscriberOptions subscriberOptions; - subscriberOptions.receiverQueueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER; + subscriberOptions.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PUBLISHER; const auto subscriberPortData = m_runtime->getMiddlewareSubscriber( iox::capro::ServiceDescription(18U, 31U, 400U), subscriberOptions, iox::runtime::PortConfigInfo(11U, 22U, 33U));