diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp b/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp index 0d0ce8d0b2..3d13434a92 100644 --- a/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp +++ b/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp @@ -56,6 +56,7 @@ namespace iox error(POSH__RUNTIME_LEADING_SLASH_PROVIDED) \ error(POSH__SERVICE_DISCOVERY_UNKNOWN_EVENT_PROVIDED) \ error(POSH__PORT_MANAGER_PUBLISHERPORT_NOT_UNIQUE) \ + error(POSH__PORT_MANAGER_SERVERPORT_NOT_UNIQUE) \ error(POSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY) \ error(POSH__MEMPOOL_POSSIBLE_DOUBLE_FREE) \ error(POSH__RECEIVERPORT_DELIVERYFIFO_OVERFLOW) \ @@ -122,6 +123,8 @@ namespace iox error(MEPOO__MAXIMUM_NUMBER_OF_MEMPOOLS_REACHED) \ error(PORT_POOL__PUBLISHERLIST_OVERFLOW) \ error(PORT_POOL__SUBSCRIBERLIST_OVERFLOW) \ + error(PORT_POOL__CLIENTLIST_OVERFLOW) \ + error(PORT_POOL__SERVERLIST_OVERFLOW) \ error(PORT_POOL__INTERFACELIST_OVERFLOW) \ error(PORT_POOL__APPLICATIONLIST_OVERFLOW) \ error(PORT_POOL__NODELIST_OVERFLOW) \ @@ -131,6 +134,8 @@ namespace iox error(PORT_MANAGER__INTROSPECTION_MEMORY_MANAGER_UNAVAILABLE) \ error(PORT_MANAGER__HANDLE_PUBLISHER_PORTS_INVALID_CAPRO_MESSAGE) \ error(PORT_MANAGER__HANDLE_SUBSCRIBER_PORTS_INVALID_CAPRO_MESSAGE) \ + error(PORT_MANAGER__HANDLE_CLIENT_PORTS_INVALID_CAPRO_MESSAGE) \ + error(PORT_MANAGER__HANDLE_SERVER_PORTS_INVALID_CAPRO_MESSAGE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTSERVICE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTTHROUGHPUTSERVICE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONCHANGINGDATASERVICE) \ diff --git a/iceoryx_posh/include/iceoryx_posh/capro/service_description.hpp b/iceoryx_posh/include/iceoryx_posh/capro/service_description.hpp index f7dd9e5bfd..6693409a9b 100644 --- a/iceoryx_posh/include/iceoryx_posh/capro/service_description.hpp +++ b/iceoryx_posh/include/iceoryx_posh/capro/service_description.hpp @@ -20,6 +20,7 @@ #include "iceoryx_hoofs/cxx/serialization.hpp" #include "iceoryx_hoofs/cxx/string.hpp" #include "iceoryx_hoofs/cxx/vector.hpp" +#include "iceoryx_hoofs/log/logstream.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" #include @@ -182,6 +183,12 @@ bool serviceMatch(const ServiceDescription& first, const ServiceDescription& sec /// @return the reference to `stream` which was provided as input parameter std::ostream& operator<<(std::ostream& stream, const ServiceDescription& service) noexcept; +/// @brief Convenience stream operator to easily use the `ServiceDescription` with log::LogStream +/// @param[in] stream output LogStream to write the message to +/// @param[in] service ServiceDescription that shall be converted +/// @return the reference to `stream` which was provided as input parameter +log::LogStream& operator<<(log::LogStream& stream, const ServiceDescription& service) noexcept; + } // namespace capro } // namespace iox diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/client_port_roudi.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/client_port_roudi.hpp index 6b94bab6c7..e14ddf6378 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/client_port_roudi.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/client_port_roudi.hpp @@ -52,6 +52,10 @@ class ClientPortRouDi : public BasePort /// @return the configured responseQueueFullPolicy QueueFullPolicy getResponseQueueFullPolicy() const noexcept; + /// @brief Access to the configured serverTooSlowPolicy + /// @return the configured serverTooSlowPolicy + ConsumerTooSlowPolicy getServerTooSlowPolicy() const noexcept; + /// @brief get an optional CaPro message that requests changes to the desired connection state of the client /// @return CaPro message with desired connection state, empty optional if no state change cxx::optional tryGetCaProMessage() noexcept; diff --git a/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/server_port_roudi.hpp b/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/server_port_roudi.hpp index 65f954ff53..f6b87e8234 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/server_port_roudi.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/popo/ports/server_port_roudi.hpp @@ -45,6 +45,12 @@ class ServerPortRouDi : public BasePort ServerPortRouDi& operator=(ServerPortRouDi&& rhs) noexcept = default; ~ServerPortRouDi() = default; + /// @brief Access to the configured requestQueueFullPolicy + /// @return the configured requestQueueFullPolicy + QueueFullPolicy getRequestQueueFullPolicy() const noexcept; + + /// @brief Access to the configured clientTooSlowPolicy + /// @return the configured clientTooSlowPolicy ConsumerTooSlowPolicy getClientTooSlowPolicy() const noexcept; /// @brief get an optional CaPro message that changes the offer state of the server diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp index 308ae04780..a962d46839 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp @@ -25,9 +25,13 @@ #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/capro/capro_message.hpp" #include "iceoryx_posh/internal/mepoo/memory_manager.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_roudi.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_user.hpp" #include "iceoryx_posh/internal/popo/ports/interface_port.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_roudi.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_roudi.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_user.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_multi_producer.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_single_producer.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp" @@ -76,6 +80,34 @@ class PortManager const RuntimeName_t& runtimeName, const PortConfigInfo& portConfigInfo) noexcept; + /// @brief Acquires a ClientPortData for further usage + /// @param[in] service is the ServiceDescription for the new client port + /// @param[in] clientOptions for the new client port + /// @param[in] runtimeName of the runtime the new client port belongs to + /// @param[in] payloadDataSegmentMemoryManager to acquire chunks for the requests + /// @param[in] portConfigInfo for the new client port + /// @return on success a pointer to a ClientPortData; on error a PortPoolError + cxx::expected + acquireClientPortData(const capro::ServiceDescription& service, + const popo::ClientOptions& clientOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept; + + /// @brief Acquires a ServerPortData for further usage + /// @param[in] service is the ServiceDescription for the new server port + /// @param[in] serverOptions for the new server port + /// @param[in] runtimeName of the runtime the new server port belongs to + /// @param[in] payloadDataSegmentMemoryManager to acquire chunks for the requests + /// @param[in] portConfigInfo for the new server port + /// @return on success a pointer to a ServerPortData; on error a PortPoolError + cxx::expected + acquireServerPortData(const capro::ServiceDescription& service, + const popo::ServerOptions& serverOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept; + popo::InterfacePortData* acquireInterfacePortData(capro::Interfaces interface, const RuntimeName_t& runtimeName, const NodeName_t& nodeName = {""}) noexcept; @@ -110,22 +142,49 @@ class PortManager void doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPort) noexcept; + void destroyClientPort(popo::ClientPortData* const clientPortData) noexcept; + + void handleClientPorts() noexcept; + + void doDiscoveryForClientPort(popo::ClientPortRouDi& clientPort) noexcept; + + void makeAllServerPortsToStopOffer() noexcept; + + void destroyServerPort(popo::ServerPortData* const clientPortData) noexcept; + + void handleServerPorts() noexcept; + + void doDiscoveryForServerPort(popo::ServerPortRouDi& serverPort) noexcept; + void handleInterfaces() noexcept; void handleNodes() noexcept; void handleConditionVariables() noexcept; + bool isCompatiblePubSub(const PublisherPortRouDiType& publisher, + const SubscriberPortType& subscriber) const noexcept; + bool sendToAllMatchingPublisherPorts(const capro::CaproMessage& message, SubscriberPortType& subscriberSource) noexcept; void sendToAllMatchingSubscriberPorts(const capro::CaproMessage& message, PublisherPortRouDiType& publisherSource) noexcept; + bool isCompatibleClientServer(const popo::ServerPortRouDi& server, + const popo::ClientPortRouDi& client) const noexcept; + + void sendToAllMatchingClientPorts(const capro::CaproMessage& message, popo::ServerPortRouDi& serverSource) noexcept; + + bool sendToAllMatchingServerPorts(const capro::CaproMessage& message, popo::ClientPortRouDi& clientSource) noexcept; + void sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept; - void addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept; - void removeEntryFromServiceRegistry(const capro::ServiceDescription& service) noexcept; + void addPublisherToServiceRegistry(const capro::ServiceDescription& service) noexcept; + void removePublisherFromServiceRegistry(const capro::ServiceDescription& service) noexcept; + + void addServerToServiceRegistry(const capro::ServiceDescription& service) noexcept; + void removeServerFromServiceRegistry(const capro::ServiceDescription& service) noexcept; template ::value>* = nullptr> cxx::optional doesViolateCommunicationPolicy(const capro::ServiceDescription& service) noexcept; diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.hpp index 4864a61987..b66e550926 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.hpp @@ -21,8 +21,10 @@ #include "iceoryx_hoofs/cxx/vector.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_data.hpp" #include "iceoryx_posh/internal/popo/ports/interface_port.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_data.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp" #include "iceoryx_posh/internal/runtime/node_data.hpp" @@ -42,7 +44,7 @@ class FixedPositionContainer template T* insert(Targs&&... args) noexcept; - void erase(T* const element) noexcept; + void erase(const T* const element) noexcept; cxx::vector content() noexcept; @@ -58,6 +60,9 @@ struct PortPoolData FixedPositionContainer m_publisherPortMembers; FixedPositionContainer m_subscriberPortMembers; + + FixedPositionContainer m_serverPortMembers; + FixedPositionContainer m_clientPortMembers; }; } // namespace roudi diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.inl b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.inl index a2999474bf..23a6bec762 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.inl +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_pool_data.inl @@ -16,6 +16,8 @@ #ifndef IOX_POSH_ROUDI_PORT_POOL_DATA_INL #define IOX_POSH_ROUDI_PORT_POOL_DATA_INL +#include "iceoryx_posh/internal/roudi/port_pool_data.hpp" + namespace iox { namespace roudi @@ -58,7 +60,7 @@ T* FixedPositionContainer::insert(Targs&&... args) noexcept } template -void FixedPositionContainer::erase(T* const element) noexcept +void FixedPositionContainer::erase(const T* const element) noexcept { for (auto& e : m_data) { diff --git a/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.hpp b/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.hpp index 3545eb8b89..42b8ce1bbe 100644 --- a/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.hpp +++ b/iceoryx_posh/include/iceoryx_posh/roudi/port_pool.hpp @@ -1,5 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. -// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved. +// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,15 +20,21 @@ #include "iceoryx_hoofs/cxx/type_traits.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_data.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_roudi.hpp" #include "iceoryx_posh/internal/popo/ports/interface_port.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_roudi.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_data.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_roudi.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_multi_producer.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_single_producer.hpp" #include "iceoryx_posh/internal/roudi/port_pool_data.hpp" #include "iceoryx_posh/internal/runtime/node_data.hpp" +#include "iceoryx_posh/popo/client_options.hpp" #include "iceoryx_posh/popo/publisher_options.hpp" +#include "iceoryx_posh/popo/server_options.hpp" #include "iceoryx_posh/popo/subscriber_options.hpp" namespace iox @@ -43,6 +49,9 @@ enum class PortPoolError : uint8_t PUBLISHER_PORT_LIST_FULL, SUBSCRIBER_PORT_LIST_FULL, INTERFACE_PORT_LIST_FULL, + CLIENT_PORT_LIST_FULL, + UNIQUE_SERVER_PORT_ALREADY_EXISTS, + SERVER_PORT_LIST_FULL, NODE_DATA_LIST_FULL, CONDITION_VARIABLE_LIST_FULL, EVENT_VARIABLE_LIST_FULL, @@ -60,6 +69,8 @@ class PortPool /// update this member if the publisher ports actually changed cxx::vector getPublisherPortDataList() noexcept; cxx::vector getSubscriberPortDataList() noexcept; + cxx::vector getClientPortDataList() noexcept; + cxx::vector getServerPortDataList() noexcept; cxx::vector getInterfacePortDataList() noexcept; cxx::vector getNodeDataList() noexcept; cxx::vector @@ -90,6 +101,34 @@ class PortPool const popo::SubscriberOptions& subscriberOptions, const mepoo::MemoryInfo& memoryInfo) noexcept; + /// @brief Adds a ClientPortData to the internal pool and returns a pointer for further usage + /// @param[in] serviceDescription for the new client port + /// @param[in] memoryManager to acquire chunks for the requests + /// @param[in] runtimeName of the runtime the new client port belongs to + /// @param[in] clientOptions for the new client port + /// @param[in] memoryInfo for the new client port + /// @return on success a pointer to a ClientPortData; on error a PortPoolError + cxx::expected + addClientPort(const capro::ServiceDescription& serviceDescription, + mepoo::MemoryManager* const memoryManager, + const RuntimeName_t& runtimeName, + const popo::ClientOptions& clientOptions, + const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept; + + /// @brief Adds a ServerPortData to the internal pool and returns a pointer for further usage + /// @param[in] serviceDescription for the new server port + /// @param[in] memoryManager to acquire chunks for the responses + /// @param[in] runtimeName of the runtime the new server port belongs to + /// @param[in] serverOptions for the new server port + /// @param[in] memoryInfo for the new server port + /// @return on success a pointer to a ServerPortData; on error a PortPoolError + cxx::expected + addServerPort(const capro::ServiceDescription& serviceDescription, + mepoo::MemoryManager* const memoryManager, + const RuntimeName_t& runtimeName, + const popo::ServerOptions& serverOptions, + const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept; + cxx::expected addInterfacePort(const RuntimeName_t& runtimeName, const capro::Interfaces interface) noexcept; @@ -100,11 +139,40 @@ class PortPool cxx::expected addConditionVariableData(const RuntimeName_t& runtimeName) noexcept; - void removePublisherPort(PublisherPortRouDiType::MemberType_t* const portData) noexcept; - void removeSubscriberPort(SubscriberPortType::MemberType_t* const portData) noexcept; - void removeInterfacePort(popo::InterfacePortData* const portData) noexcept; - void removeNodeData(runtime::NodeData* const nodeData) noexcept; - void removeConditionVariableData(popo::ConditionVariableData* const conditionVariableData) noexcept; + /// @brief Removes a PublisherPortData from the internal pool + /// @param[in] portData is a pointer to the PublisherPortData to be removed + /// @note after this call the provided PublisherPortData is no longer available for usage + void removePublisherPort(const PublisherPortRouDiType::MemberType_t* const portData) noexcept; + + /// @brief Removes a SubscriberPortData from the internal pool + /// @param[in] portData is a pointer to the SubscriberPortData to be removed + /// @note after this call the provided SubscriberPortData is no longer available for usage + void removeSubscriberPort(const SubscriberPortType::MemberType_t* const portData) noexcept; + + /// @brief Removes a ClientPortData from the internal pool + /// @param[in] portData is a pointer to the ClientPortData to be removed + /// @note after this call the provided ClientPortData is no longer available for usage + void removeClientPort(const popo::ClientPortData* const portData) noexcept; + + /// @brief Removes a ServerPortData from the internal pool + /// @param[in] portData is a pointer to the ServerPortData to be removed + /// @note after this call the provided ServerPortData is no longer available for usage + void removeServerPort(const popo::ServerPortData* const portData) noexcept; + + /// @brief Removes a InterfacePortData from the internal pool + /// @param[in] portData is a pointer to the InterfacePortData to be removed + /// @note after this call the provided InterfacePortData is no longer available for usage + void removeInterfacePort(const popo::InterfacePortData* const portData) noexcept; + + /// @brief Removes a NodeData from the internal pool + /// @param[in] nodeData is a pointer to the NodeData to be removed + /// @note after this call the provided NodeData is no longer available for usage + void removeNodeData(const runtime::NodeData* const nodeData) noexcept; + + /// @brief Removes a ConditionVariableData from the internal pool + /// @param[in] conditionVariableData is a pointer to the ConditionVariableData to be removed + /// @note after this call the provided ConditionVariableData is no longer available for usage + void removeConditionVariableData(const popo::ConditionVariableData* const conditionVariableData) noexcept; private: PortPoolData* m_portPoolData; diff --git a/iceoryx_posh/source/capro/service_description.cpp b/iceoryx_posh/source/capro/service_description.cpp index 8dabdcc5cd..0af7eb07a0 100644 --- a/iceoryx_posh/source/capro/service_description.cpp +++ b/iceoryx_posh/source/capro/service_description.cpp @@ -235,5 +235,13 @@ std::ostream& operator<<(std::ostream& stream, const ServiceDescription& service return stream; } +log::LogStream& operator<<(log::LogStream& stream, const ServiceDescription& service) noexcept +{ + /// @todo #415 Add classHash, scope and interface + stream << "Service: " << service.getServiceIDString() << ", Instance: " << service.getInstanceIDString() + << ", Event: " << service.getEventIDString(); + return stream; +} + } // namespace capro } // namespace iox diff --git a/iceoryx_posh/source/popo/ports/client_port_data.cpp b/iceoryx_posh/source/popo/ports/client_port_data.cpp index 51578bd5e7..fc0e0cfc62 100644 --- a/iceoryx_posh/source/popo/ports/client_port_data.cpp +++ b/iceoryx_posh/source/popo/ports/client_port_data.cpp @@ -27,6 +27,8 @@ cxx::VariantQueueTypes getResponseQueueType(const QueueFullPolicy policy) noexce : cxx::VariantQueueTypes::FiFo_MultiProducerSingleConsumer; } +constexpr uint64_t ClientPortData::HISTORY_CAPACITY_ZERO; + ClientPortData::ClientPortData(const capro::ServiceDescription& serviceDescription, const RuntimeName_t& runtimeName, const ClientOptions& clientOptions, @@ -35,7 +37,8 @@ ClientPortData::ClientPortData(const capro::ServiceDescription& serviceDescripti : BasePortData(serviceDescription, runtimeName, clientOptions.nodeName) , m_chunkSenderData(memoryManager, clientOptions.serverTooSlowPolicy, HISTORY_CAPACITY_ZERO, memoryInfo) , m_chunkReceiverData(getResponseQueueType(clientOptions.responseQueueFullPolicy), - clientOptions.responseQueueFullPolicy) + clientOptions.responseQueueFullPolicy, + memoryInfo) , m_connectRequested(clientOptions.connectOnCreate) { m_chunkReceiverData.m_queue.setCapacity(clientOptions.responseQueueCapacity); diff --git a/iceoryx_posh/source/popo/ports/client_port_roudi.cpp b/iceoryx_posh/source/popo/ports/client_port_roudi.cpp index 6bbfe477ad..bbbc88909b 100644 --- a/iceoryx_posh/source/popo/ports/client_port_roudi.cpp +++ b/iceoryx_posh/source/popo/ports/client_port_roudi.cpp @@ -45,6 +45,11 @@ QueueFullPolicy ClientPortRouDi::getResponseQueueFullPolicy() const noexcept return getMembers()->m_chunkReceiverData.m_queueFullPolicy; } +ConsumerTooSlowPolicy ClientPortRouDi::getServerTooSlowPolicy() const noexcept +{ + return getMembers()->m_chunkSenderData.m_consumerTooSlowPolicy; +} + cxx::optional ClientPortRouDi::tryGetCaProMessage() noexcept { // get connect request from user side diff --git a/iceoryx_posh/source/popo/ports/server_port_data.cpp b/iceoryx_posh/source/popo/ports/server_port_data.cpp index d088661ed6..5f0cbd055e 100644 --- a/iceoryx_posh/source/popo/ports/server_port_data.cpp +++ b/iceoryx_posh/source/popo/ports/server_port_data.cpp @@ -27,6 +27,8 @@ cxx::VariantQueueTypes getRequestQueueType(const QueueFullPolicy policy) noexcep : cxx::VariantQueueTypes::FiFo_MultiProducerSingleConsumer; } +constexpr uint64_t ServerPortData::HISTORY_REQUEST_OF_ZERO; + ServerPortData::ServerPortData(const capro::ServiceDescription& serviceDescription, const RuntimeName_t& runtimeName, const ServerOptions& serverOptions, @@ -34,8 +36,8 @@ ServerPortData::ServerPortData(const capro::ServiceDescription& serviceDescripti const mepoo::MemoryInfo& memoryInfo) noexcept : BasePortData(serviceDescription, runtimeName, serverOptions.nodeName) , m_chunkSenderData(memoryManager, serverOptions.clientTooSlowPolicy, HISTORY_REQUEST_OF_ZERO, memoryInfo) - , m_chunkReceiverData(getRequestQueueType(serverOptions.requestQueueFullPolicy), - serverOptions.requestQueueFullPolicy) + , m_chunkReceiverData( + getRequestQueueType(serverOptions.requestQueueFullPolicy), serverOptions.requestQueueFullPolicy, memoryInfo) , m_offeringRequested(serverOptions.offerOnCreate) { m_chunkReceiverData.m_queue.setCapacity(serverOptions.requestQueueCapacity); diff --git a/iceoryx_posh/source/popo/ports/server_port_roudi.cpp b/iceoryx_posh/source/popo/ports/server_port_roudi.cpp index 82fd76de85..9e8d4894f7 100644 --- a/iceoryx_posh/source/popo/ports/server_port_roudi.cpp +++ b/iceoryx_posh/source/popo/ports/server_port_roudi.cpp @@ -39,6 +39,11 @@ ServerPortRouDi::MemberType_t* ServerPortRouDi::getMembers() noexcept return reinterpret_cast(BasePort::getMembers()); } +QueueFullPolicy ServerPortRouDi::getRequestQueueFullPolicy() const noexcept +{ + return getMembers()->m_chunkReceiverData.m_queueFullPolicy; +} + ConsumerTooSlowPolicy ServerPortRouDi::getClientTooSlowPolicy() const noexcept { return getMembers()->m_chunkSenderData.m_consumerTooSlowPolicy; diff --git a/iceoryx_posh/source/roudi/port_manager.cpp b/iceoryx_posh/source/roudi/port_manager.cpp index 1cc79468d9..2ef0d05d53 100644 --- a/iceoryx_posh/source/roudi/port_manager.cpp +++ b/iceoryx_posh/source/roudi/port_manager.cpp @@ -143,6 +143,10 @@ void PortManager::doDiscovery() noexcept handleSubscriberPorts(); + handleServerPorts(); + + handleClientPorts(); + handleInterfaces(); handleNodes(); @@ -173,17 +177,20 @@ void PortManager::doDiscoveryForPublisherPort(PublisherPortRouDiType& publisherP m_portIntrospection.reportMessage(caproMessage); if (capro::CaproMessageType::OFFER == caproMessage.m_type) { - this->addEntryToServiceRegistry(caproMessage.m_serviceDescription); + this->addPublisherToServiceRegistry(caproMessage.m_serviceDescription); } else if (capro::CaproMessageType::STOP_OFFER == caproMessage.m_type) { - this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription); + this->removePublisherFromServiceRegistry(caproMessage.m_serviceDescription); } else { - // protocol error + LogWarn() << "CaPro protocol error for publisher from runtime '" << publisherPort.getRuntimeName() + << "' and with service description '" << publisherPort.getCaProServiceDescription() + << "'! Cannot handle CaProMessageType '" << caproMessage.m_type << "'"; errorHandler( Error::kPORT_MANAGER__HANDLE_PUBLISHER_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + return; } this->sendToAllMatchingSubscriberPorts(caproMessage, publisherPort); @@ -218,7 +225,9 @@ void PortManager::doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPor m_portIntrospection.reportMessage(caproMessage, subscriberPort.getUniqueID()); if (!this->sendToAllMatchingPublisherPorts(caproMessage, subscriberPort)) { - LogDebug() << "capro::SUB/UNSUB, no matching publisher!!"; + LogDebug() << "capro::SUB/UNSUB, no matching publisher for subscriber from runtime '" + << subscriberPort.getRuntimeName() << "' and with service description '" + << caproMessage.m_serviceDescription << "'!"; capro::CaproMessage nackMessage(capro::CaproMessageType::NACK, subscriberPort.getCaProServiceDescription()); auto returnMessage = subscriberPort.dispatchCaProMessageAndGetPossibleResponse(nackMessage); @@ -228,14 +237,180 @@ void PortManager::doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPor } else { - // protocol error + LogWarn() << "CaPro protocol error for subscriber from runtime '" << subscriberPort.getRuntimeName() + << "' and with service description '" << subscriberPort.getCaProServiceDescription() + << "'! Cannot handle CaProMessageType '" << caproMessage.m_type << "'"; errorHandler(Error::kPORT_MANAGER__HANDLE_SUBSCRIBER_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + return; + } + }); +} + +void PortManager::destroyClientPort(popo::ClientPortData* const clientPortData) noexcept +{ + cxx::Ensures(clientPortData != nullptr && "clientPortData must not be a nullptr"); + + // create temporary client ports to orderly shut this client down + popo::ClientPortRouDi clientPortRoudi(*clientPortData); + popo::ClientPortUser clientPortUser(*clientPortData); + + clientPortUser.disconnect(); + + // process DISCONNECT for this client in RouDi and distribute it + clientPortRoudi.tryGetCaProMessage().and_then([this, &clientPortRoudi](auto caproMessage) { + cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::DISCONNECT); + + /// @todo iox-#1128 report to port introspection + this->sendToAllMatchingServerPorts(caproMessage, clientPortRoudi); + }); + + clientPortRoudi.releaseAllChunks(); + + /// @todo iox-#1128 remove from to port introspection + + LogDebug() << "Destroy client port from runtime '" << clientPortData->m_runtimeName + << "' and with service description '" << clientPortData->m_serviceDescription << "'"; + + // delete client port from list after DISCONNECT was processed + m_portPool->removeClientPort(clientPortData); +} + +void PortManager::handleClientPorts() noexcept +{ + // get requests for change of connection state of clients + for (auto clientPortData : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi clientPort(*clientPortData); + + doDiscoveryForClientPort(clientPort); + + // check if we have to destroy this clinet port + if (clientPort.toBeDestroyed()) + { + destroyClientPort(clientPortData); + } + } +} + +void PortManager::doDiscoveryForClientPort(popo::ClientPortRouDi& clientPort) noexcept +{ + clientPort.tryGetCaProMessage().and_then([this, &clientPort](auto caproMessage) { + if ((capro::CaproMessageType::CONNECT == caproMessage.m_type) + || (capro::CaproMessageType::DISCONNECT == caproMessage.m_type)) + { + /// @todo iox-#1128 report to port introspection + if (!this->sendToAllMatchingServerPorts(caproMessage, clientPort)) + { + LogDebug() << "capro::CONNECT/DISCONNECT, no matching server for client from runtime '" + << clientPort.getRuntimeName() << "' and with service description '" + << caproMessage.m_serviceDescription << "'!"; + capro::CaproMessage nackMessage(capro::CaproMessageType::NACK, clientPort.getCaProServiceDescription()); + auto returnMessage = clientPort.dispatchCaProMessageAndGetPossibleResponse(nackMessage); + // No response on NACK messages + cxx::Ensures(!returnMessage.has_value()); + } + } + else + { + LogWarn() << "CaPro protocol error for client from runtime '" << clientPort.getRuntimeName() + << "' and with service description '" << clientPort.getCaProServiceDescription() + << "'! Cannot handle CaProMessageType '" << caproMessage.m_type << "'"; + errorHandler( + Error::kPORT_MANAGER__HANDLE_CLIENT_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + return; } }); } +void PortManager::makeAllServerPortsToStopOffer() noexcept +{ + for (auto port : m_portPool->getServerPortDataList()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + + popo::ServerPortRouDi serverPort(*port); + doDiscoveryForServerPort(serverPort); + } +} + +void PortManager::destroyServerPort(popo::ServerPortData* const serverPortData) noexcept +{ + cxx::Ensures(serverPortData != nullptr && "serverPortData must not be a nullptr"); + + // create temporary server ports to orderly shut this server down + popo::ServerPortRouDi serverPortRoudi{*serverPortData}; + popo::ServerPortUser serverPortUser{*serverPortData}; + + serverPortUser.stopOffer(); + + // process STOP_OFFER for this server in RouDi and distribute it + serverPortRoudi.tryGetCaProMessage().and_then([this, &serverPortRoudi](auto caproMessage) { + cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::STOP_OFFER); + cxx::Ensures(caproMessage.m_serviceType == capro::CaproServiceType::SERVER); + + /// @todo iox-#1128 report to port introspection + this->removeServerFromServiceRegistry(caproMessage.m_serviceDescription); + this->sendToAllMatchingClientPorts(caproMessage, serverPortRoudi); + this->sendToAllMatchingInterfacePorts(caproMessage); + }); + + serverPortRoudi.releaseAllChunks(); + + /// @todo iox-#1128 remove from port introspection + + LogDebug() << "Destroy server port from runtime '" << serverPortData->m_runtimeName + << "' and with service description '" << serverPortData->m_serviceDescription << "'"; + + // delete server port from list after STOP_OFFER was processed + m_portPool->removeServerPort(serverPortData); +} + +void PortManager::handleServerPorts() noexcept +{ + // get the changes of server port offer state + for (auto serverPortData : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*serverPortData); + + doDiscoveryForServerPort(serverPort); + + // check if we have to destroy this server port + if (serverPort.toBeDestroyed()) + { + destroyServerPort(serverPortData); + } + } +} + +void PortManager::doDiscoveryForServerPort(popo::ServerPortRouDi& serverPort) noexcept +{ + serverPort.tryGetCaProMessage().and_then([this, &serverPort](auto caproMessage) { + /// @todo iox-#1128 report to port instrospection + + if (capro::CaproMessageType::OFFER == caproMessage.m_type) + { + this->addServerToServiceRegistry(caproMessage.m_serviceDescription); + } + else if (capro::CaproMessageType::STOP_OFFER == caproMessage.m_type) + { + this->removeServerFromServiceRegistry(caproMessage.m_serviceDescription); + } + else + { + LogWarn() << "CaPro protocol error for server from runtime '" << serverPort.getRuntimeName() + << "' and with service description '" << serverPort.getCaProServiceDescription() + << "'! Cannot handle CaProMessageType '" << caproMessage.m_type << "'"; + errorHandler( + Error::kPORT_MANAGER__HANDLE_SERVER_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + return; + } + + this->sendToAllMatchingClientPorts(caproMessage, serverPort); + this->sendToAllMatchingInterfacePorts(caproMessage); + }); +} void PortManager::handleInterfaces() noexcept { @@ -254,8 +429,9 @@ void PortManager::handleInterfaces() noexcept // check if we have to destroy this interface port if (interfacePortData->m_toBeDestroyed.load(std::memory_order_relaxed)) { + LogDebug() << "Destroy interface port from runtime '" << interfacePortData->m_runtimeName + << "' and with service description '" << interfacePortData->m_serviceDescription << "'"; m_portPool->removeInterfacePort(interfacePortData); - LogDebug() << "Destroyed InterfacePortData"; } } @@ -264,6 +440,7 @@ void PortManager::handleInterfaces() noexcept // provide offer information from all active publisher ports to all new interfaces capro::CaproMessage caproMessage; caproMessage.m_type = capro::CaproMessageType::OFFER; + caproMessage.m_serviceType = capro::CaproServiceType::PUBLISHER; for (auto publisherPortData : m_portPool->getPublisherPortDataList()) { PublisherPortUserType publisherPort(publisherPortData); @@ -282,23 +459,24 @@ void PortManager::handleInterfaces() noexcept } } } - // also forward services from service registry - /// @todo #415 do we still need this? yes but return a copy here to be stored in shared memory via new - /// StatusPort's - /// @todo iox-#27 I guess this was necessary since a service could be offered via ServiceDiscovery; - /// this was removed and I somehow have the feeling this breaks the interface ports with the changes from this - /// PR if the CaproServiceType is something different than NON - auto serviceVector = m_serviceRegistry.getServices(); - - caproMessage.m_serviceType = capro::CaproServiceType::NONE; - - for (auto const& element : serviceVector) + // provide offer information from all active server ports to all new interfaces + caproMessage.m_serviceType = capro::CaproServiceType::SERVER; + for (auto serverPortData : m_portPool->getServerPortDataList()) { - caproMessage.m_serviceDescription = element.serviceDescription; - - for (auto& interfacePortData : interfacePortsForInitialForwarding) + popo::ServerPortUser serverPort(*serverPortData); + if (serverPort.isOffered()) { - popo::InterfacePort(interfacePortData).dispatchCaProMessage(caproMessage); + caproMessage.m_serviceDescription = serverPort.getCaProServiceDescription(); + for (auto& interfacePortData : interfacePortsForInitialForwarding) + { + auto interfacePort = popo::InterfacePort(interfacePortData); + // do not offer on same interface + if (serverPort.getCaProServiceDescription().getSourceInterface() + != interfacePort.getCaProServiceDescription().getSourceInterface()) + { + interfacePort.dispatchCaProMessage(caproMessage); + } + } } } } @@ -315,8 +493,9 @@ void PortManager::handleNodes() noexcept { if (nodeData->m_toBeDestroyed.load(std::memory_order_relaxed)) { + LogDebug() << "Destroy NodeData from runtime '" << nodeData->m_runtimeName << "' and node name '" + << nodeData->m_nodeName << "'"; m_portPool->removeNodeData(nodeData); - LogDebug() << "Destroyed NodeData"; } } } @@ -327,14 +506,15 @@ void PortManager::handleConditionVariables() noexcept { if (conditionVariableData->m_toBeDestroyed.load(std::memory_order_relaxed)) { + LogDebug() << "Destroy ConditionVariableData from runtime '" << conditionVariableData->m_runtimeName << "'"; m_portPool->removeConditionVariableData(conditionVariableData); - LogDebug() << "Destroyed ConditionVariableData"; } } } /// @todo consider making the matching function available in some interface -bool isCompatible(const PublisherPortRouDiType& publisher, const SubscriberPortType& subscriber) +bool PortManager::isCompatiblePubSub(const PublisherPortRouDiType& publisher, + const SubscriberPortType& subscriber) const noexcept { const bool servicesMatch = subscriber.getCaProServiceDescription() == publisher.getCaProServiceDescription(); @@ -369,7 +549,7 @@ bool PortManager::sendToAllMatchingPublisherPorts(const capro::CaproMessage& mes break; } - if (isCompatible(publisherPort, subscriberSource)) + if (isCompatiblePubSub(publisherPort, subscriberSource)) { auto publisherResponse = publisherPort.dispatchCaProMessageAndGetPossibleResponse(message); if (publisherResponse.has_value()) @@ -407,7 +587,7 @@ void PortManager::sendToAllMatchingSubscriberPorts(const capro::CaproMessage& me break; } - if (isCompatible(publisherSource, subscriberPort)) + if (isCompatiblePubSub(publisherSource, subscriberPort)) { auto subscriberResponse = subscriberPort.dispatchCaProMessageAndGetPossibleResponse(message); @@ -439,6 +619,85 @@ void PortManager::sendToAllMatchingSubscriberPorts(const capro::CaproMessage& me } } +bool PortManager::isCompatibleClientServer(const popo::ServerPortRouDi& server, + const popo::ClientPortRouDi& client) const noexcept +{ + auto requestMatch = !(client.getServerTooSlowPolicy() == popo::ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA + && server.getRequestQueueFullPolicy() == popo::QueueFullPolicy::BLOCK_PRODUCER); + + auto responseMatch = !(server.getClientTooSlowPolicy() == popo::ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA + && client.getResponseQueueFullPolicy() == popo::QueueFullPolicy::BLOCK_PRODUCER); + + return requestMatch && responseMatch; +} + +void PortManager::sendToAllMatchingClientPorts(const capro::CaproMessage& message, + popo::ServerPortRouDi& serverSource) noexcept +{ + for (auto clientPortData : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi clientPort(*clientPortData); + if (clientPort.getCaProServiceDescription() == serverSource.getCaProServiceDescription() + && isCompatibleClientServer(serverSource, clientPort)) + { + // send OFFER/STOP_OFFER to client + auto clientResponse = clientPort.dispatchCaProMessageAndGetPossibleResponse(message); + + // if the clients react on the change, process it immediately on server side + if (clientResponse.has_value()) + { + // we only expect reaction on CONNECT + cxx::Expects(capro::CaproMessageType::CONNECT == clientResponse.value().m_type); + + /// @todo inform port introspection about client + + // send CONNECT to server + auto serverResponse = serverSource.dispatchCaProMessageAndGetPossibleResponse(clientResponse.value()); + if (serverResponse.has_value()) + { + // sende responsee to client port + auto returnMessage = clientPort.dispatchCaProMessageAndGetPossibleResponse(serverResponse.value()); + + // ACK or NACK are sent back to the client port, no further response from this one expected + cxx::Ensures(!returnMessage.has_value()); + + /// @todo iox-#1128 inform port introspection about server + } + } + } + } +} + +bool PortManager::sendToAllMatchingServerPorts(const capro::CaproMessage& message, + popo::ClientPortRouDi& clientSource) noexcept +{ + bool serverFound = false; + for (auto serverPortData : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*serverPortData); + if (clientSource.getCaProServiceDescription() == serverPort.getCaProServiceDescription() + && isCompatibleClientServer(serverPort, clientSource)) + { + // send CONNECT/DISCONNECT to server + auto serverResponse = serverPort.dispatchCaProMessageAndGetPossibleResponse(message); + + // if the server react on the change, process it immediately on client side + if (serverResponse.has_value()) + { + // send response to client port + auto returnMessage = clientSource.dispatchCaProMessageAndGetPossibleResponse(serverResponse.value()); + + // ACK or NACK are sent back to the client port, no further response from this one expected + cxx::Ensures(!returnMessage.has_value()); + + /// @todo iox-#1128 inform port introspection about client + } + serverFound = true; + } + } + return serverFound; +} + void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept { for (auto interfacePortData : m_portPool->getInterfacePortDataList()) @@ -464,11 +723,22 @@ void PortManager::unblockProcessShutdown(const RuntimeName_t& runtimeName) noexc doDiscoveryForPublisherPort(publisherPort); } } + + for (auto port : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*port); + if (runtimeName == serverPort.getRuntimeName()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + doDiscoveryForServerPort(serverPort); + } + } } void PortManager::unblockRouDiShutdown() noexcept { makeAllPublisherPortsToStopOffer(); + makeAllServerPortsToStopOffer(); } void PortManager::makeAllPublisherPortsToStopOffer() noexcept @@ -507,6 +777,24 @@ void PortManager::deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcep } } + for (auto port : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi server(*port); + if (runtimeName == server.getRuntimeName()) + { + destroyServerPort(port); + } + } + + for (auto port : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi client(*port); + if (runtimeName == client.getRuntimeName()) + { + destroyClientPort(port); + } + } + for (auto port : m_portPool->getInterfacePortDataList()) { popo::InterfacePort interface(port); @@ -542,7 +830,6 @@ void PortManager::destroyPublisherPort(PublisherPortRouDiType::MemberType_t* con PublisherPortRouDiType publisherPortRoudi{publisherPortData}; PublisherPortUserType publisherPortUser{publisherPortData}; - publisherPortRoudi.releaseAllChunks(); publisherPortUser.stopOffer(); // process STOP_OFFER for this publisher in RouDi and distribute it @@ -550,17 +837,19 @@ void PortManager::destroyPublisherPort(PublisherPortRouDiType::MemberType_t* con cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::STOP_OFFER); m_portIntrospection.reportMessage(caproMessage); - this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription); + this->removePublisherFromServiceRegistry(caproMessage.m_serviceDescription); this->sendToAllMatchingSubscriberPorts(caproMessage, publisherPortRoudi); this->sendToAllMatchingInterfacePorts(caproMessage); }); + publisherPortRoudi.releaseAllChunks(); + m_portIntrospection.removePublisher(publisherPortUser); + LogDebug() << "Destroy publisher port from runtime '" << publisherPortData->m_runtimeName + << "' and with service description '" << publisherPortData->m_serviceDescription << "'"; // delete publisher port from list after STOP_OFFER was processed m_portPool->removePublisherPort(publisherPortData); - - LogDebug() << "Destroyed publisher port"; } void PortManager::destroySubscriberPort(SubscriberPortType::MemberType_t* const subscriberPortData) noexcept @@ -569,7 +858,6 @@ void PortManager::destroySubscriberPort(SubscriberPortType::MemberType_t* const SubscriberPortType subscriberPortRoudi(subscriberPortData); SubscriberPortUserType subscriberPortUser(subscriberPortData); - subscriberPortRoudi.releaseAllChunks(); subscriberPortUser.unsubscribe(); // process UNSUB for this subscriber in RouDi and distribute it @@ -580,11 +868,14 @@ void PortManager::destroySubscriberPort(SubscriberPortType::MemberType_t* const this->sendToAllMatchingPublisherPorts(caproMessage, subscriberPortRoudi); }); + subscriberPortRoudi.releaseAllChunks(); + m_portIntrospection.removeSubscriber(subscriberPortUser); + + LogDebug() << "Destroy subscriber port from runtime '" << subscriberPortData->m_runtimeName + << "' and with service description '" << subscriberPortData->m_serviceDescription << "'"; // delete subscriber port from list after UNSUB was processed m_portPool->removeSubscriberPort(subscriberPortData); - - LogDebug() << "Destroyed subscriber port"; } cxx::expected @@ -661,6 +952,63 @@ PortManager::acquireSubscriberPortData(const capro::ServiceDescription& service, return maybeSubscriberPortData; } +cxx::expected +PortManager::acquireClientPortData(const capro::ServiceDescription& service, + const popo::ClientOptions& clientOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept +{ + // we can create a new port + return m_portPool + ->addClientPort(service, payloadDataSegmentMemoryManager, runtimeName, clientOptions, portConfigInfo.memoryInfo) + .and_then([this](auto clientPortData) { + /// @todo iox-#1128 add to port introspection + + // we do discovery here for trying to connect the client if offer on create is desired + popo::ClientPortRouDi clientPort(*clientPortData); + this->doDiscoveryForClientPort(clientPort); + }); +} + +cxx::expected +PortManager::acquireServerPortData(const capro::ServiceDescription& service, + const popo::ServerOptions& serverOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept +{ + // it is not allowed to have two servers with the same ServiceDescription; + // check if the server is already in the list + for (const auto serverPortData : m_portPool->getServerPortDataList()) + { + if (service == serverPortData->m_serviceDescription) + { + if (serverPortData->m_toBeDestroyed) + { + destroyServerPort(serverPortData); + continue; + } + LogWarn() << "Process '" << runtimeName + << "' violates the communication policy by requesting a ServerPort which is already used by '" + << serverPortData->m_runtimeName << "' with service '" + << service.operator cxx::Serialization().toString() << "'."; + errorHandler(Error::kPOSH__PORT_MANAGER_SERVERPORT_NOT_UNIQUE, nullptr, ErrorLevel::MODERATE); + return cxx::error(PortPoolError::UNIQUE_SERVER_PORT_ALREADY_EXISTS); + } + } + + // we can create a new port + return m_portPool + ->addServerPort(service, payloadDataSegmentMemoryManager, runtimeName, serverOptions, portConfigInfo.memoryInfo) + .and_then([this](auto serverPortData) { + /// @todo iox-#1128 add to port introspection + + // we do discovery here for trying to connect the waiting client if offer on create is desired + popo::ServerPortRouDi serverPort(*serverPortData); + this->doDiscoveryForServerPort(serverPort); + }); +} /// @todo return a cxx::expected popo::InterfacePortData* PortManager::acquireInterfacePortData(capro::Interfaces interface, @@ -703,21 +1051,36 @@ void PortManager::publishServiceRegistry() const noexcept } -void PortManager::addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept +void PortManager::addPublisherToServiceRegistry(const capro::ServiceDescription& service) noexcept { m_serviceRegistry.addPublisher(service).or_else([&](auto&) { - LogWarn() << "Could not add service " << service.getServiceIDString() << " to service registry!"; + LogWarn() << "Could not add publisher with service description '" << service << "' to service registry!"; errorHandler(Error::kPOSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY, nullptr, ErrorLevel::MODERATE); }); publishServiceRegistry(); } -void PortManager::removeEntryFromServiceRegistry(const capro::ServiceDescription& service) noexcept +void PortManager::removePublisherFromServiceRegistry(const capro::ServiceDescription& service) noexcept { m_serviceRegistry.removePublisher(service); publishServiceRegistry(); } +void PortManager::addServerToServiceRegistry(const capro::ServiceDescription& service) noexcept +{ + m_serviceRegistry.addServer(service).or_else([&](auto&) { + LogWarn() << "Could not add server with service description '" << service << "' to service registry!"; + errorHandler(Error::kPOSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY, nullptr, ErrorLevel::MODERATE); + }); + publishServiceRegistry(); +} + +void PortManager::removeServerFromServiceRegistry(const capro::ServiceDescription& service) noexcept +{ + m_serviceRegistry.removeServer(service); + publishServiceRegistry(); +} + cxx::expected PortManager::acquireNodeData(const RuntimeName_t& runtimeName, const NodeName_t& nodeName) noexcept { diff --git a/iceoryx_posh/source/roudi/port_pool.cpp b/iceoryx_posh/source/roudi/port_pool.cpp index b012e0eeb4..7e24528cf5 100644 --- a/iceoryx_posh/source/roudi/port_pool.cpp +++ b/iceoryx_posh/source/roudi/port_pool.cpp @@ -1,5 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. -// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved. +// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -54,6 +54,7 @@ PortPool::addInterfacePort(const RuntimeName_t& runtimeName, const capro::Interf } else { + LogWarn() << "Out of interface ports! Requested by runtime '" << runtimeName << "'"; errorHandler(Error::kPORT_POOL__INTERFACELIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); return cxx::error(PortPoolError::INTERFACE_PORT_LIST_FULL); } @@ -70,6 +71,8 @@ cxx::expected PortPool::addNodeData(const Run } else { + LogWarn() << "Out of node data! Requested by runtime '" << runtimeName << "' and node name '" << nodeName + << "'"; errorHandler(Error::kPORT_POOL__NODELIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); return cxx::error(PortPoolError::NODE_DATA_LIST_FULL); } @@ -85,22 +88,23 @@ PortPool::addConditionVariableData(const RuntimeName_t& runtimeName) noexcept } else { + LogWarn() << "Out of condition variables! Requested by runtime '" << runtimeName << "'"; errorHandler(Error::kPORT_POOL__CONDITION_VARIABLE_LIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); return cxx::error(PortPoolError::CONDITION_VARIABLE_LIST_FULL); } } -void PortPool::removeInterfacePort(popo::InterfacePortData* const portData) noexcept +void PortPool::removeInterfacePort(const popo::InterfacePortData* const portData) noexcept { m_portPoolData->m_interfacePortMembers.erase(portData); } -void PortPool::removeNodeData(runtime::NodeData* const nodeData) noexcept +void PortPool::removeNodeData(const runtime::NodeData* const nodeData) noexcept { m_portPoolData->m_nodeMembers.erase(nodeData); } -void PortPool::removeConditionVariableData(popo::ConditionVariableData* const conditionVariableData) noexcept +void PortPool::removeConditionVariableData(const popo::ConditionVariableData* const conditionVariableData) noexcept { m_portPoolData->m_conditionVariableMembers.erase(conditionVariableData); } @@ -130,6 +134,8 @@ PortPool::addPublisherPort(const capro::ServiceDescription& serviceDescription, } else { + LogWarn() << "Out of publisher ports! Requested by runtime '" << runtimeName + << "' and with service description '" << serviceDescription << "'"; errorHandler(Error::kPORT_POOL__PUBLISHERLIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); return cxx::error(PortPoolError::PUBLISHER_PORT_LIST_FULL); } @@ -150,20 +156,81 @@ PortPool::addSubscriberPort(const capro::ServiceDescription& serviceDescription, } else { + LogWarn() << "Out of subscriber ports! Requested by runtime '" << runtimeName + << "' and with service description '" << serviceDescription << "'"; errorHandler(Error::kPORT_POOL__SUBSCRIBERLIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); return cxx::error(PortPoolError::SUBSCRIBER_PORT_LIST_FULL); } } -void PortPool::removePublisherPort(PublisherPortRouDiType::MemberType_t* const portData) noexcept +cxx::vector PortPool::getClientPortDataList() noexcept +{ + return m_portPoolData->m_clientPortMembers.content(); +} + +cxx::vector PortPool::getServerPortDataList() noexcept +{ + return m_portPoolData->m_serverPortMembers.content(); +} + +cxx::expected +PortPool::addClientPort(const capro::ServiceDescription& serviceDescription, + mepoo::MemoryManager* const memoryManager, + const RuntimeName_t& runtimeName, + const popo::ClientOptions& clientOptions, + const mepoo::MemoryInfo& memoryInfo) noexcept +{ + if (!m_portPoolData->m_clientPortMembers.hasFreeSpace()) + { + LogWarn() << "Out of client ports! Requested by runtime '" << runtimeName << "' and with service description '" + << serviceDescription << "'"; + errorHandler(Error::kPORT_POOL__CLIENTLIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); + return cxx::error(PortPoolError::CLIENT_PORT_LIST_FULL); + } + + auto clientPortData = m_portPoolData->m_clientPortMembers.insert( + serviceDescription, runtimeName, clientOptions, memoryManager, memoryInfo); + return cxx::success(clientPortData); +} + +cxx::expected +PortPool::addServerPort(const capro::ServiceDescription& serviceDescription, + mepoo::MemoryManager* const memoryManager, + const RuntimeName_t& runtimeName, + const popo::ServerOptions& serverOptions, + const mepoo::MemoryInfo& memoryInfo) noexcept +{ + if (!m_portPoolData->m_serverPortMembers.hasFreeSpace()) + { + LogWarn() << "Out of server ports! Requested by runtime '" << runtimeName << "' and with service description '" + << serviceDescription << "'"; + errorHandler(Error::kPORT_POOL__SERVERLIST_OVERFLOW, nullptr, ErrorLevel::MODERATE); + return cxx::error(PortPoolError::SERVER_PORT_LIST_FULL); + } + + auto serverPortData = m_portPoolData->m_serverPortMembers.insert( + serviceDescription, runtimeName, serverOptions, memoryManager, memoryInfo); + return cxx::success(serverPortData); +} + +void PortPool::removePublisherPort(const PublisherPortRouDiType::MemberType_t* const portData) noexcept { m_portPoolData->m_publisherPortMembers.erase(portData); } -void PortPool::removeSubscriberPort(SubscriberPortType::MemberType_t* const portData) noexcept +void PortPool::removeSubscriberPort(const SubscriberPortType::MemberType_t* const portData) noexcept { m_portPoolData->m_subscriberPortMembers.erase(portData); } +void PortPool::removeClientPort(const popo::ClientPortData* const portData) noexcept +{ + m_portPoolData->m_clientPortMembers.erase(portData); +} +void PortPool::removeServerPort(const popo::ServerPortData* const portData) noexcept +{ + m_portPoolData->m_serverPortMembers.erase(portData); +} + } // namespace roudi } // namespace iox diff --git a/iceoryx_posh/test/moduletests/test_capro_service.cpp b/iceoryx_posh/test/moduletests/test_capro_service.cpp index 48c2e6e135..d1a0324f48 100644 --- a/iceoryx_posh/test/moduletests/test_capro_service.cpp +++ b/iceoryx_posh/test/moduletests/test_capro_service.cpp @@ -20,6 +20,7 @@ #include "iceoryx_hoofs/cxx/convert.hpp" #include "iceoryx_hoofs/cxx/serialization.hpp" #include "iceoryx_hoofs/cxx/string.hpp" +#include "iceoryx_hoofs/testing/mocks/logger_mock.hpp" #include "iceoryx_posh/capro/service_description.hpp" /// @todo #415 replace the service registry include with the new discovery API header #include "iceoryx_posh/internal/roudi/service_registry.hpp" @@ -428,6 +429,25 @@ TEST_F(ServiceDescription_test, LessThanOperatorReturnsFalseIfEventStringOfFirst EXPECT_FALSE(serviceDescription1 < serviceDescription2); } +TEST_F(ServiceDescription_test, LogStreamConvertsServiceDescriptionToString) +{ + ::testing::Test::RecordProperty("TEST_ID", "42bc3f21-d9f4-4cc3-a37e-6508e1f981c1"); + Logger_Mock loggerMock; + + const IdString_t SERVICE_ID{"all"}; + const IdString_t INSTANCE_ID{"glory"}; + const IdString_t EVENT_ID{"hypnotoad"}; + const std::string SERVICE_DESCRIPTION_AS_STRING{"Service: all, Instance: glory, Event: hypnotoad"}; + auto sut = ServiceDescription{SERVICE_ID, INSTANCE_ID, EVENT_ID}; + + { + auto logstream = iox::log::LogStream(loggerMock); + logstream << sut; + } + + ASSERT_THAT(loggerMock.m_logs.size(), Eq(1U)); + EXPECT_THAT(loggerMock.m_logs[0].message, StrEq(SERVICE_DESCRIPTION_AS_STRING)); +} /// END SERVICEDESCRIPTION TESTS diff --git a/iceoryx_posh/test/moduletests/test_popo_client_port.cpp b/iceoryx_posh/test/moduletests/test_popo_client_port.cpp index fef679eeb6..99f4c9eb70 100644 --- a/iceoryx_posh/test/moduletests/test_popo_client_port.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_client_port.cpp @@ -195,6 +195,13 @@ class ClientPort_test : public Test return options; }(); + ClientOptions m_clientOptionsWithWaitForConsumerServerTooSlowPolicy = [&] { + ClientOptions options; + options.responseQueueCapacity = QUEUE_CAPACITY; + options.serverTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + return options; + }(); + iox::cxx::optional clientPortForStateTransitionTests; public: @@ -211,6 +218,8 @@ class ClientPort_test : public Test m_serviceDescription, m_runtimeName, m_clientOptionsWithoutConnectOnCreate, m_memoryManager}; SutClientPort clientPortWithBlockProducerResponseQueuePolicy{ m_serviceDescription, m_runtimeName, m_clientOptionsWithBlockProducerResponseQueueFullPolicy, m_memoryManager}; + SutClientPort clientPortWithWaitForConsumerServerTooSlowPolicy{ + m_serviceDescription, m_runtimeName, m_clientOptionsWithWaitForConsumerServerTooSlowPolicy, m_memoryManager}; }; constexpr iox::units::Duration ClientPort_test::DEADLOCK_TIMEOUT; @@ -622,6 +631,14 @@ TEST_F(ClientPort_test, GetResponseQueueFullPolicyOnPortWithBlockProducerOptionI EXPECT_THAT(sut.portRouDi.getResponseQueueFullPolicy(), Eq(QueueFullPolicy::BLOCK_PRODUCER)); } +TEST_F(ClientPort_test, GetServerTooSlowPolicyOnPortWithWaitForConsumerOptionIsWaitForConsumer) +{ + ::testing::Test::RecordProperty("TEST_ID", "f0036542-bb93-4975-b70b-ec40b0947d13"); + auto& sut = clientPortWithWaitForConsumerServerTooSlowPolicy; + + EXPECT_THAT(sut.portRouDi.getServerTooSlowPolicy(), Eq(ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER)); +} + TEST_F(ClientPort_test, TryGetCaProMessageOnConnectHasCaProMessageTypeConnect) { ::testing::Test::RecordProperty("TEST_ID", "eac43f13-b486-4e8b-a5b9-4fc274113d08"); diff --git a/iceoryx_posh/test/moduletests/test_popo_server_port_roudi.cpp b/iceoryx_posh/test/moduletests/test_popo_server_port_roudi.cpp index 91ff254dd4..0c9abef55e 100644 --- a/iceoryx_posh/test/moduletests/test_popo_server_port_roudi.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_server_port_roudi.cpp @@ -20,6 +20,18 @@ namespace iox_test_popo_server_port { constexpr iox::units::Duration ServerPort_test::DEADLOCK_TIMEOUT; +TEST_F(ServerPort_test, GetRequestQueueFullPolicyReturnsCorrectValues) +{ + ::testing::Test::RecordProperty("TEST_ID", "4b3dbe4c-6c3d-4129-a4f0-643a801a4803"); + + auto& sutWithDiscardOldestData = serverPortWithOfferOnCreate; + auto& sutWithBlockProducer = serverOptionsWithBlockProducerRequestQueueFullPolicy; + + EXPECT_THAT(sutWithDiscardOldestData.portRouDi.getRequestQueueFullPolicy(), + Eq(QueueFullPolicy::DISCARD_OLDEST_DATA)); + EXPECT_THAT(sutWithBlockProducer.portRouDi.getRequestQueueFullPolicy(), Eq(QueueFullPolicy::BLOCK_PRODUCER)); +} + TEST_F(ServerPort_test, GetClientTooSlowPolicyReturnsCorrectValues) { ::testing::Test::RecordProperty("TEST_ID", "7090916c-57c5-4ef4-9876-87e58ab64058"); diff --git a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp index 700b4a862c..c200e20c39 100644 --- a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp +++ b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp @@ -170,7 +170,6 @@ TEST_F(PoshRuntime_test, GetMiddlewareInterfaceIsSuccessful) ASSERT_NE(nullptr, interfacePortData); EXPECT_EQ(m_runtimeName, interfacePortData->m_runtimeName); EXPECT_EQ(false, interfacePortData->m_toBeDestroyed); - EXPECT_EQ(true, interfacePortData->m_doInitialOfferForward); } TEST_F(PoshRuntime_test, GetMiddlewareInterfaceInterfacelistOverflow) diff --git a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp index 87127a180a..d2ad45dc2d 100644 --- a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp +++ b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp @@ -15,195 +15,10 @@ // // SPDX-License-Identifier: Apache-2.0 -#include "iceoryx_hoofs/cxx/convert.hpp" -#include "iceoryx_hoofs/cxx/generic_raii.hpp" -#include "iceoryx_hoofs/internal/relocatable_pointer/base_relative_pointer.hpp" -#include "iceoryx_hoofs/posix_wrapper/posix_access_rights.hpp" -#include "iceoryx_hoofs/testing/watch_dog.hpp" -#include "iceoryx_posh/iceoryx_posh_types.hpp" -#include "iceoryx_posh/internal/capro/capro_message.hpp" -#include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp" -#include "iceoryx_posh/internal/roudi/port_manager.hpp" -#include "iceoryx_posh/roudi/memory/iceoryx_roudi_memory_manager.hpp" - -#include "test.hpp" - -#include -#include // std::numeric_limits - -namespace -{ -using namespace ::testing; -using namespace iox::cxx; - -using iox::popo::PublisherOptions; -using iox::popo::PublisherPortUser; -using iox::popo::QueueFullPolicy; -using iox::popo::SubscriberOptions; -using iox::popo::SubscriberPortUser; -using iox::roudi::IceOryxRouDiMemoryManager; -using iox::roudi::PortManager; -using iox::roudi::PortPoolError; -using iox::runtime::PortConfigInfo; - -class PortManagerTester : public PortManager -{ - public: - PortManagerTester(IceOryxRouDiMemoryManager* roudiMemoryManager) - : PortManager(roudiMemoryManager) - { - } +#include "test_roudi_portmanager_fixture.hpp" - private: - FRIEND_TEST(PortManager_test, CheckDeleteOfPortsFromProcess1); - FRIEND_TEST(PortManager_test, CheckDeleteOfPortsFromProcess2); -}; - -class PortManager_test : public Test +namespace iox_test_roudi_portmanager { - public: - iox::mepoo::MemoryManager* m_payloadDataSegmentMemoryManager{nullptr}; - IceOryxRouDiMemoryManager* m_roudiMemoryManager{nullptr}; - PortManagerTester* m_portManager{nullptr}; - - uint16_t m_instIdCounter, m_eventIdCounter, m_sIdCounter; - - iox::RuntimeName_t m_runtimeName{"TestApp"}; - - void SetUp() override - { - m_instIdCounter = m_sIdCounter = 1U; - m_eventIdCounter = 0; - // starting at {1,1,1} - - auto config = iox::RouDiConfig_t().setDefaults(); - m_roudiMemoryManager = new IceOryxRouDiMemoryManager(config); - EXPECT_FALSE(m_roudiMemoryManager->createAndAnnounceMemory().has_error()); - m_portManager = new PortManagerTester(m_roudiMemoryManager); - - auto user = iox::posix::PosixUser::getUserOfCurrentProcess(); - auto segmentInfo = - m_roudiMemoryManager->segmentManager().value()->getSegmentInformationWithWriteAccessForUser(user); - ASSERT_TRUE(segmentInfo.m_memoryManager.has_value()); - - m_payloadDataSegmentMemoryManager = &segmentInfo.m_memoryManager.value().get(); - - // clearing the introspection, is not in d'tor -> SEGFAULT in delete sporadically - m_portManager->stopPortIntrospection(); - m_portManager->deletePortsOfProcess(iox::roudi::IPC_CHANNEL_ROUDI_NAME); - } - - void TearDown() override - { - delete m_portManager; - delete m_roudiMemoryManager; - iox::rp::BaseRelativePointer::unregisterAll(); - } - iox::capro::ServiceDescription getUniqueSD() - { - m_eventIdCounter++; - if (m_eventIdCounter == std::numeric_limits::max()) - { - m_eventIdCounter = 1U; - m_instIdCounter++; // not using max (wildcard) - if (m_instIdCounter == std::numeric_limits::max()) - { - m_instIdCounter = 1U; - m_sIdCounter++; - if (m_sIdCounter == std::numeric_limits::max()) - { - // ASSERT_TRUE(false); // limits of test reached no more unique ids possible - } - } - } - return {iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_sIdCounter)), - iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_eventIdCounter)), - iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_instIdCounter))}; - } - - void acquireMaxNumberOfInterfaces( - std::string runtimeName, - std::function f = std::function()) - { - for (unsigned int i = 0; i < iox::MAX_INTERFACE_NUMBER; i++) - { - auto newProcessName = runtimeName + iox::cxx::convert::toString(i); - auto interfacePort = m_portManager->acquireInterfacePortData( - iox::capro::Interfaces::INTERNAL, iox::RuntimeName_t(iox::cxx::TruncateToCapacity, newProcessName)); - ASSERT_NE(interfacePort, nullptr); - if (f) - { - f(interfacePort); - } - } - } - - void acquireMaxNumberOfConditionVariables(std::string runtimeName, - std::function f = - std::function()) - { - for (unsigned int i = 0; i < iox::MAX_NUMBER_OF_CONDITION_VARIABLES; i++) - { - auto newProcessName = runtimeName + iox::cxx::convert::toString(i); - auto condVar = m_portManager->acquireConditionVariableData( - iox::RuntimeName_t(iox::cxx::TruncateToCapacity, newProcessName)); - ASSERT_FALSE(condVar.has_error()); - if (f) - { - f(condVar.value()); - } - } - } - - void - acquireMaxNumberOfNodes(std::string nodeName, - std::string runtimeName, - std::function f = - std::function()) - { - for (unsigned int i = 0U; i < iox::MAX_NODE_NUMBER; i++) - { - iox::RuntimeName_t newProcessName(iox::cxx::TruncateToCapacity, - runtimeName + iox::cxx::convert::toString(i)); - iox::NodeName_t newNodeName(iox::cxx::TruncateToCapacity, nodeName + iox::cxx::convert::toString(i)); - auto node = m_portManager->acquireNodeData(newProcessName, newNodeName); - ASSERT_FALSE(node.has_error()); - if (f) - { - f(node.value(), newNodeName, newProcessName); - } - } - } - - void setupAndTestBlockingPublisher(const iox::RuntimeName_t& publisherRuntimeName, - std::function testHook) noexcept; - - PublisherPortUser createPublisher(const PublisherOptions& options) - { - return PublisherPortUser( - m_portManager - ->acquirePublisherPortData( - {"1", "1", "1"}, options, "guiseppe", m_payloadDataSegmentMemoryManager, PortConfigInfo()) - .value()); - } - - SubscriberPortUser createSubscriber(const SubscriberOptions& options) - { - return SubscriberPortUser( - m_portManager->acquireSubscriberPortData({"1", "1", "1"}, options, "schlomo", PortConfigInfo()).value()); - } -}; - -template -void setDestroyFlagAndClearContainer(vector& container) -{ - for (auto& item : container) - { - item->m_toBeDestroyed.store(true, std::memory_order_relaxed); - } - container.clear(); -} - PublisherOptions createTestPubOptions() { return PublisherOptions{0U, iox::NodeName_t("node"), true, iox::popo::ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA}; @@ -1162,4 +977,4 @@ TEST_F(PortManager_test, PortsDestroyInProcess2ChangeStatesOfPortsInProcess1) } } -} // namespace +} // namespace iox_test_roudi_portmanager diff --git a/iceoryx_posh/test/moduletests/test_roudi_portmanager_client_server.cpp b/iceoryx_posh/test/moduletests/test_roudi_portmanager_client_server.cpp new file mode 100644 index 0000000000..c2dfcc2f49 --- /dev/null +++ b/iceoryx_posh/test/moduletests/test_roudi_portmanager_client_server.cpp @@ -0,0 +1,672 @@ +// Copyright (c) 2022 by Apex.AI Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +#include "test_roudi_portmanager_fixture.hpp" + +namespace iox_test_roudi_portmanager +{ +using namespace iox::popo; + +constexpr uint64_t RESPONSE_QUEUE_CAPACITY{2U}; +constexpr uint64_t REQUEST_QUEUE_CAPACITY{2U}; + +ClientOptions createTestClientOptions() +{ + return ClientOptions{RESPONSE_QUEUE_CAPACITY, iox::NodeName_t("node")}; +} + +ServerOptions createTestServerOptions() +{ + return ServerOptions{REQUEST_QUEUE_CAPACITY, iox::NodeName_t("node")}; +} + +// BEGIN aquireClientPortData tests + +TEST_F(PortManager_test, AcquireClientPortDataReturnsPort) +{ + ::testing::Test::RecordProperty("TEST_ID", "92225f2c-619a-425b-bba0-6a014822c4c3"); + const ServiceDescription sd{"hyp", "no", "toad"}; + const RuntimeName_t runtimeName{"hypnotoad"}; + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + clientOptions.responseQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + clientOptions.serverTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + m_portManager->acquireClientPortData(sd, clientOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .and_then([&](const auto& clientPortData) { + EXPECT_THAT(clientPortData->m_serviceDescription, Eq(sd)); + EXPECT_THAT(clientPortData->m_runtimeName, Eq(runtimeName)); + EXPECT_THAT(clientPortData->m_nodeName, Eq(clientOptions.nodeName)); + EXPECT_THAT(clientPortData->m_toBeDestroyed, Eq(false)); + EXPECT_THAT(clientPortData->m_chunkReceiverData.m_queue.capacity(), + Eq(clientOptions.responseQueueCapacity)); + EXPECT_THAT(clientPortData->m_connectRequested, Eq(clientOptions.connectOnCreate)); + EXPECT_THAT(clientPortData->m_chunkReceiverData.m_queueFullPolicy, + Eq(clientOptions.responseQueueFullPolicy)); + EXPECT_THAT(clientPortData->m_chunkSenderData.m_consumerTooSlowPolicy, + Eq(clientOptions.serverTooSlowPolicy)); + }) + .or_else([&](const auto& error) { + GTEST_FAIL() << "Expected ClientPortData but got PortPoolError: " << static_cast(error); + }); +} + +// END aquireClientPortData tests + +// BEGIN aquireServerPortData tests + +TEST_F(PortManager_test, AcquireServerPortDataReturnsPort) +{ + ::testing::Test::RecordProperty("TEST_ID", "776c51c4-074a-4404-b6a7-ed08f59f05a0"); + const ServiceDescription sd{"hyp", "no", "toad"}; + const RuntimeName_t runtimeName{"hypnotoad"}; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = false; + serverOptions.requestQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + serverOptions.clientTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + m_portManager->acquireServerPortData(sd, serverOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .and_then([&](const auto& serverPortData) { + EXPECT_THAT(serverPortData->m_serviceDescription, Eq(sd)); + EXPECT_THAT(serverPortData->m_runtimeName, Eq(runtimeName)); + EXPECT_THAT(serverPortData->m_nodeName, Eq(serverOptions.nodeName)); + EXPECT_THAT(serverPortData->m_toBeDestroyed, Eq(false)); + EXPECT_THAT(serverPortData->m_chunkReceiverData.m_queue.capacity(), Eq(serverOptions.requestQueueCapacity)); + EXPECT_THAT(serverPortData->m_offeringRequested, Eq(serverOptions.offerOnCreate)); + EXPECT_THAT(serverPortData->m_chunkReceiverData.m_queueFullPolicy, + Eq(serverOptions.requestQueueFullPolicy)); + EXPECT_THAT(serverPortData->m_chunkSenderData.m_consumerTooSlowPolicy, + Eq(serverOptions.clientTooSlowPolicy)); + }) + .or_else([&](const auto& error) { + GTEST_FAIL() << "Expected ClientPortData but got PortPoolError: " << static_cast(error); + }); +} + +TEST_F(PortManager_test, AcquireServerPortDataWithSameServiceDescriptionTwiceCallsErrorHandlerAndReturnsError) +{ + ::testing::Test::RecordProperty("TEST_ID", "9f2c24ba-192d-4ce8-a61a-fe40b42c655b"); + const ServiceDescription sd{"hyp", "no", "toad"}; + const RuntimeName_t runtimeName{"hypnotoad"}; + auto serverOptions = createTestServerOptions(); + + // first call must be successful + m_portManager->acquireServerPortData(sd, serverOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .or_else([&](const auto& error) { + GTEST_FAIL() << "Expected ClientPortData but got PortPoolError: " << static_cast(error); + }); + + iox::cxx::optional detectedError; + auto errorHandlerGuard = + iox::ErrorHandler::setTemporaryErrorHandler([&](const auto error, const auto, const auto errorLevel) { + EXPECT_THAT(error, Eq(iox::Error::kPOSH__PORT_MANAGER_SERVERPORT_NOT_UNIQUE)); + EXPECT_THAT(errorLevel, Eq(iox::ErrorLevel::MODERATE)); + detectedError.emplace(error); + }); + + // second call must fail + m_portManager->acquireServerPortData(sd, serverOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .and_then([&](const auto&) { + GTEST_FAIL() << "Expected PortPoolError::UNIQUE_SERVER_PORT_ALREADY_EXISTS but got ServerPortData"; + }) + .or_else([&](const auto& error) { EXPECT_THAT(error, Eq(PortPoolError::UNIQUE_SERVER_PORT_ALREADY_EXISTS)); }); + + EXPECT_TRUE(detectedError.has_value()); +} + +TEST_F(PortManager_test, AcquireServerPortDataWithSameServiceDescriptionTwiceAndFirstPortMarkedToBeDestroyedReturnsPort) +{ + ::testing::Test::RecordProperty("TEST_ID", "d7f2815d-f1ea-403d-9355-69470d92a10f"); + const ServiceDescription sd{"hyp", "no", "toad"}; + const RuntimeName_t runtimeName{"hypnotoad"}; + auto serverOptions = createTestServerOptions(); + + // first call must be successful + auto serverPortDataResult = + m_portManager->acquireServerPortData(sd, serverOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}); + + ASSERT_FALSE(serverPortDataResult.has_error()); + + serverPortDataResult.value()->m_toBeDestroyed = true; + + iox::cxx::optional detectedError; + auto errorHandlerGuard = iox::ErrorHandler::setTemporaryErrorHandler( + [&](const auto error, const auto, const auto) { detectedError.emplace(error); }); + + // second call must now also succeed + m_portManager->acquireServerPortData(sd, serverOptions, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .or_else([&](const auto& error) { + GTEST_FAIL() << "Expected ClientPortData but got PortPoolError: " << static_cast(error); + }); + + detectedError.and_then( + [&](const auto& error) { GTEST_FAIL() << "Expected error handler to not be called but got: " << error; }); +} + +// END aquireServerPortData tests + +// BEGIN discovery tests + +TEST_F(PortManager_test, CreateClientWithConnectOnCreateAndNoServerResultsInWaitForOffer) +{ + ::testing::Test::RecordProperty("TEST_ID", "14070d7b-d8e1-4df5-84fc-119e5e126cde"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + + auto clientPortUser = createClient(clientOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::WAIT_FOR_OFFER)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientConnectOnCreateAndNoServerResultsInClientNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "6829e506-9f58-4253-bc42-469f2970a2c7"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + + auto clientPortUser = createClient(clientOptions); + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::WAIT_FOR_OFFER)); +} + +TEST_F(PortManager_test, CreateClientWithConnectOnCreateAndNotOfferingServerResultsInWaitForOffer) +{ + ::testing::Test::RecordProperty("TEST_ID", "0f7098d0-2646-4c10-b347-9b57b0f593ce"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = false; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::WAIT_FOR_OFFER)); +} + +TEST_F(PortManager_test, CreateClientWithConnectOnCreateAndOfferingServerResultsInClientConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "108170d4-786b-4266-ad2a-ef922188f70b"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, CreateServerWithOfferOnCreateAndClientWaitingToConnectResultsInClientConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "b5bb10b2-bf9b-400e-ab5c-aa3a1e0e826f"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto clientPortUser = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, CreateClientWithNotConnectOnCreateAndNoServerResultsInClientNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "fde662f1-f9e1-4302-be41-59a7a0bfa4e7"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + + auto clientPortUser = createClient(clientOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::NOT_CONNECTED)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientNotConnectOnCreateAndNoServerResultsInClientNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "c59b7343-6277-4a4b-8204-506048726be4"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + + auto clientPortUser = createClient(clientOptions); + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::NOT_CONNECTED)); +} + +TEST_F(PortManager_test, CreateClientWithNotConnectOnCreateAndOfferingServerResultsInClientNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "17cf22ba-066a-418a-8366-1c6b75177b9a"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::NOT_CONNECTED)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientNotConnectOnCreateAndServerResultsInConnectedWhenCallingConnect) +{ + ::testing::Test::RecordProperty("TEST_ID", "87bbb991-4aaf-49c1-b238-d9b0bb18d699"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + clientPortUser.connect(); + + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientConnectResultsInClientNotConnectedWhenCallingDisconnect) +{ + ::testing::Test::RecordProperty("TEST_ID", "b6826f93-096d-473d-b846-ab824efff1ee"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + clientPortUser.disconnect(); + + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::NOT_CONNECTED)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientConnectResultsInWaitForOfferWhenCallingStopOffer) +{ + ::testing::Test::RecordProperty("TEST_ID", "45c9cc27-4198-4539-943f-2111ae2d1368"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + serverPortUser.stopOffer(); + + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::WAIT_FOR_OFFER)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientConnectResultsInWaitForOfferWhenServerIsDestroyed) +{ + ::testing::Test::RecordProperty("TEST_ID", "585ad47d-1a03-4599-a4dc-57ea1fb6eac7"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + serverPortUser.destroy(); + + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser.getConnectionState(), Eq(ConnectionState::WAIT_FOR_OFFER)); +} + +TEST_F(PortManager_test, DoDiscoveryWithClientConnectResultsInNoClientsWhenClientIsDestroyed) +{ + ::testing::Test::RecordProperty("TEST_ID", "3be2f7b5-7e22-4676-a25b-c8a93a4aaa7d"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + EXPECT_TRUE(serverPortUser.hasClients()); + + clientPortUser.destroy(); + + m_portManager->doDiscovery(); + + EXPECT_FALSE(serverPortUser.hasClients()); +} + +TEST_F(PortManager_test, CreateMultipleClientsWithConnectOnCreateAndOfferingServerResultsInAllClientsConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "08f9981f-2585-4574-b0fc-c16cf0eef7d4"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser1 = createClient(clientOptions); + auto clientPortUser2 = createClient(clientOptions); + + EXPECT_THAT(clientPortUser1.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientPortUser2.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, + DoDiscoveryWithMultipleClientsNotConnectedAndOfferingServerResultsSomeClientsConnectedWhenSomeClientsCallConnect) +{ + ::testing::Test::RecordProperty("TEST_ID", "7d210259-7c50-479e-b108-bf9747ceb0ef"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = false; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser1 = createClient(clientOptions); + auto clientPortUser2 = createClient(clientOptions); + + clientPortUser2.connect(); + m_portManager->doDiscovery(); + + EXPECT_THAT(clientPortUser1.getConnectionState(), Eq(ConnectionState::NOT_CONNECTED)); + EXPECT_THAT(clientPortUser2.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +// END discovery tests + +// BEGIN forwarding to InterfacePort tests + +TEST_F(PortManager_test, ServerStateIsForwardedToInterfacePortWhenOffer) +{ + ::testing::Test::RecordProperty("TEST_ID", "e51d6f8b-55dd-43b6-977a-da08cfed7be1"); + auto interfacePort = m_portManager->acquireInterfacePortData(iox::capro::Interfaces::DDS, "penguin"); + auto serverOptions = createTestServerOptions(); + m_portManager->doDiscovery(); + + serverOptions.offerOnCreate = true; + auto serverPortUser = createServer(serverOptions); + + m_portManager->doDiscovery(); + + interfacePort->m_caproMessageFiFo.pop() + .and_then([&](const auto& caproMessage) { + EXPECT_THAT(caproMessage.m_type, Eq(CaproMessageType::OFFER)); + EXPECT_THAT(caproMessage.m_serviceType, Eq(CaproServiceType::SERVER)); + }) + .or_else([&]() { GTEST_FAIL() << "Expected OFFER message but got none"; }); + EXPECT_FALSE(interfacePort->m_caproMessageFiFo.pop().has_value()); +} + +TEST_F(PortManager_test, ServerStateIsForwardedToInterfacePortWhenStopOffer) +{ + ::testing::Test::RecordProperty("TEST_ID", "70692935-82da-4694-a2b0-8307ab2c167c"); + auto interfacePort = m_portManager->acquireInterfacePortData(iox::capro::Interfaces::DDS, "penguin"); + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + auto serverPortUser = createServer(serverOptions); + m_portManager->doDiscovery(); + + // empty fifo + while (interfacePort->m_caproMessageFiFo.pop().has_value()) + { + } + + serverPortUser.stopOffer(); + m_portManager->doDiscovery(); + + interfacePort->m_caproMessageFiFo.pop() + .and_then([&](const auto& caproMessage) { + EXPECT_THAT(caproMessage.m_type, Eq(CaproMessageType::STOP_OFFER)); + EXPECT_THAT(caproMessage.m_serviceType, Eq(CaproServiceType::SERVER)); + }) + .or_else([&]() { GTEST_FAIL() << "Expected STOP_OFFER message but got none"; }); + EXPECT_FALSE(interfacePort->m_caproMessageFiFo.pop().has_value()); +} + +TEST_F(PortManager_test, ServerStateIsForwardedToInterfacePortWhenDestroyed) +{ + ::testing::Test::RecordProperty("TEST_ID", "3e9660f8-046c-4e3a-acfd-bad33a6f999c"); + auto interfacePort = m_portManager->acquireInterfacePortData(iox::capro::Interfaces::DDS, "penguin"); + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + auto serverPortUser = createServer(serverOptions); + m_portManager->doDiscovery(); + + // empty fifo + while (interfacePort->m_caproMessageFiFo.pop().has_value()) + { + } + + serverPortUser.destroy(); + m_portManager->doDiscovery(); + + interfacePort->m_caproMessageFiFo.pop() + .and_then([&](const auto& caproMessage) { + EXPECT_THAT(caproMessage.m_type, Eq(CaproMessageType::STOP_OFFER)); + EXPECT_THAT(caproMessage.m_serviceType, Eq(CaproServiceType::SERVER)); + }) + .or_else([&]() { GTEST_FAIL() << "Expected STOP_OFFER message but got none"; }); + EXPECT_FALSE(interfacePort->m_caproMessageFiFo.pop().has_value()); +} + +TEST_F(PortManager_test, ServerStateIsForwardedToInterfacePortWhenAlreadyOfferAndInterfacePortIsNewlyCreated) +{ + ::testing::Test::RecordProperty("TEST_ID", "31563bb9-561c-43ee-8e3e-b6676cfc9547"); + auto serverOptions = createTestServerOptions(); + + serverOptions.offerOnCreate = true; + auto serverPortUser = createServer(serverOptions); + + m_portManager->doDiscovery(); + + auto interfacePort = m_portManager->acquireInterfacePortData(iox::capro::Interfaces::DDS, "penguin"); + m_portManager->doDiscovery(); + + interfacePort->m_caproMessageFiFo.pop() + .and_then([&](const auto& caproMessage) { + EXPECT_THAT(caproMessage.m_type, Eq(CaproMessageType::OFFER)); + EXPECT_THAT(caproMessage.m_serviceType, Eq(CaproServiceType::SERVER)); + }) + .or_else([&]() { GTEST_FAIL() << "Expected OFFER message but got none"; }); + EXPECT_FALSE(interfacePort->m_caproMessageFiFo.pop().has_value()); +} + +// END forwarding to InterfacePort tests + +// BEGIN policy based connection tests + +// NOTE: there is a client/server sandwich to test both code paths where the client and +// the server initiates the state machine ping pong + +TEST_F(PortManager_test, ClientWithDiscardOldestDataAndServerWithDiscardOldestDataAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "56871f9d-d7c1-4c3c-b86c-9a1e1dc9fd74"); + auto clientOptions = createTestClientOptions(); + clientOptions.responseQueueFullPolicy = QueueFullPolicy::DISCARD_OLDEST_DATA; + auto serverOptions = createTestServerOptions(); + serverOptions.clientTooSlowPolicy = ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ClientWithDiscardOldestDataAndServerWithWaitForConsumerAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "4767b263-1ca4-4e54-b489-5e486f40f4db"); + auto clientOptions = createTestClientOptions(); + clientOptions.responseQueueFullPolicy = QueueFullPolicy::DISCARD_OLDEST_DATA; + auto serverOptions = createTestServerOptions(); + serverOptions.clientTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ClientWithBlockProducerAndServerWithWaitForConsumerAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "c118ce87-25bf-4f53-b157-7414b9f10193"); + auto clientOptions = createTestClientOptions(); + clientOptions.responseQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + auto serverOptions = createTestServerOptions(); + serverOptions.clientTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ClientWithBlockProducerAndServerWithDiscardOldestDataAreNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "f5c6213a-b875-42bd-b55b-17bc04179e6d"); + auto clientOptions = createTestClientOptions(); + clientOptions.responseQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + auto serverOptions = createTestServerOptions(); + serverOptions.clientTooSlowPolicy = ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Ne(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Ne(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ServerWithDiscardOldestDataAndClientWithDiscardOldestDataAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "53d4ee50-5799-4405-8505-4b7ac3037310"); + auto clientOptions = createTestClientOptions(); + clientOptions.serverTooSlowPolicy = ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA; + auto serverOptions = createTestServerOptions(); + serverOptions.requestQueueFullPolicy = QueueFullPolicy::DISCARD_OLDEST_DATA; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ServerWithDiscardOldestDataAndClientWithWaitForConsumerAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "0d7a8819-3e33-478e-a13b-844b83fe92ae"); + auto clientOptions = createTestClientOptions(); + clientOptions.serverTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + auto serverOptions = createTestServerOptions(); + serverOptions.requestQueueFullPolicy = QueueFullPolicy::DISCARD_OLDEST_DATA; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ServerWithBlockProducerAndClientWithWaitForConsumerAreConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "8c3b7770-13e6-4003-aa9f-b04a34df67c9"); + auto clientOptions = createTestClientOptions(); + clientOptions.serverTooSlowPolicy = ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER; + auto serverOptions = createTestServerOptions(); + serverOptions.requestQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Eq(ConnectionState::CONNECTED)); +} + +TEST_F(PortManager_test, ServerWithBlockProducerAndClientWithDiscardOldestDataAreNotConnected) +{ + ::testing::Test::RecordProperty("TEST_ID", "1d89fa87-3628-4645-9147-82f4223e878a"); + auto clientOptions = createTestClientOptions(); + clientOptions.serverTooSlowPolicy = ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA; + auto serverOptions = createTestServerOptions(); + serverOptions.requestQueueFullPolicy = QueueFullPolicy::BLOCK_PRODUCER; + + auto clientBeforeServerOffer = createClient(clientOptions); + auto serverPortUser = createServer(serverOptions); + auto clientAfterServerOffer = createClient(clientOptions); + + EXPECT_THAT(clientBeforeServerOffer.getConnectionState(), Ne(ConnectionState::CONNECTED)); + EXPECT_THAT(clientAfterServerOffer.getConnectionState(), Ne(ConnectionState::CONNECTED)); +} + +// END policy based connection tests + +// BEGIN communication tests + +TEST_F(PortManager_test, ConnectedClientCanCommunicateWithServer) +{ + ::testing::Test::RecordProperty("TEST_ID", "6376b58d-a796-4cc4-9c40-0c5a117b53f5"); + auto clientOptions = createTestClientOptions(); + clientOptions.connectOnCreate = true; + auto serverOptions = createTestServerOptions(); + serverOptions.offerOnCreate = true; + + auto serverPortUser = createServer(serverOptions); + auto clientPortUser = createClient(clientOptions); + + using DataType = uint64_t; + constexpr int64_t SEQUENCE_ID{42}; + + auto allocateRequestResult = clientPortUser.allocateRequest(sizeof(DataType), alignof(DataType)); + ASSERT_FALSE(allocateRequestResult.has_error()); + auto requestHeader = allocateRequestResult.value(); + requestHeader->setSequenceId(SEQUENCE_ID); + clientPortUser.sendRequest(requestHeader); + + auto getRequestResult = serverPortUser.getRequest(); + ASSERT_FALSE(getRequestResult.has_error()); + auto receivedRequestHeader = getRequestResult.value(); + EXPECT_THAT(receivedRequestHeader->getSequenceId(), Eq(SEQUENCE_ID)); + + auto allocateResponseResult = + serverPortUser.allocateResponse(receivedRequestHeader, sizeof(DataType), alignof(DataType)); + ASSERT_FALSE(allocateResponseResult.has_error()); + auto responseHeader = allocateResponseResult.value(); + serverPortUser.sendResponse(responseHeader); + + auto getResponseResult = clientPortUser.getResponse(); + ASSERT_FALSE(getResponseResult.has_error()); + auto receivedResponseHeader = getResponseResult.value(); + EXPECT_THAT(receivedResponseHeader->getSequenceId(), Eq(SEQUENCE_ID)); +} + +// END communication tests + +/// @todo iox-#27 add service registry tests once it is possible to query the service registry for server + +} // namespace iox_test_roudi_portmanager diff --git a/iceoryx_posh/test/moduletests/test_roudi_portmanager_fixture.hpp b/iceoryx_posh/test/moduletests/test_roudi_portmanager_fixture.hpp new file mode 100644 index 0000000000..5c76c44397 --- /dev/null +++ b/iceoryx_posh/test/moduletests/test_roudi_portmanager_fixture.hpp @@ -0,0 +1,227 @@ +// Copyright (c) 2019 - 2021 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +#include "iceoryx_hoofs/cxx/convert.hpp" +#include "iceoryx_hoofs/cxx/generic_raii.hpp" +#include "iceoryx_hoofs/internal/relocatable_pointer/base_relative_pointer.hpp" +#include "iceoryx_hoofs/posix_wrapper/posix_access_rights.hpp" +#include "iceoryx_hoofs/testing/watch_dog.hpp" +#include "iceoryx_posh/iceoryx_posh_types.hpp" +#include "iceoryx_posh/internal/capro/capro_message.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_user.hpp" +#include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_user.hpp" +#include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp" +#include "iceoryx_posh/internal/roudi/port_manager.hpp" +#include "iceoryx_posh/popo/client_options.hpp" +#include "iceoryx_posh/popo/server_options.hpp" +#include "iceoryx_posh/roudi/memory/iceoryx_roudi_memory_manager.hpp" + +#include "test.hpp" + +#include +#include + +namespace iox_test_roudi_portmanager +{ +using namespace ::testing; +using namespace iox; +using namespace iox::capro; +using namespace iox::cxx; +using namespace iox::popo; +using namespace iox::roudi; + +using iox::runtime::PortConfigInfo; + +class PortManagerTester : public PortManager +{ + public: + PortManagerTester(IceOryxRouDiMemoryManager* roudiMemoryManager) + : PortManager(roudiMemoryManager) + { + } + + private: + FRIEND_TEST(PortManager_test, CheckDeleteOfPortsFromProcess1); + FRIEND_TEST(PortManager_test, CheckDeleteOfPortsFromProcess2); +}; + +class PortManager_test : public Test +{ + public: + iox::mepoo::MemoryManager* m_payloadDataSegmentMemoryManager{nullptr}; + IceOryxRouDiMemoryManager* m_roudiMemoryManager{nullptr}; + PortManagerTester* m_portManager{nullptr}; + + uint16_t m_instIdCounter, m_eventIdCounter, m_sIdCounter; + + iox::RuntimeName_t m_runtimeName{"TestApp"}; + + cxx::GenericRAII suppressLogging = iox::LoggerPosh().SetLogLevelForScope(iox::log::LogLevel::kOff); + + void SetUp() override + { + m_instIdCounter = m_sIdCounter = 1U; + m_eventIdCounter = 0; + // starting at {1,1,1} + + auto config = iox::RouDiConfig_t().setDefaults(); + m_roudiMemoryManager = new IceOryxRouDiMemoryManager(config); + EXPECT_FALSE(m_roudiMemoryManager->createAndAnnounceMemory().has_error()); + m_portManager = new PortManagerTester(m_roudiMemoryManager); + + auto user = iox::posix::PosixUser::getUserOfCurrentProcess(); + auto segmentInfo = + m_roudiMemoryManager->segmentManager().value()->getSegmentInformationWithWriteAccessForUser(user); + ASSERT_TRUE(segmentInfo.m_memoryManager.has_value()); + + m_payloadDataSegmentMemoryManager = &segmentInfo.m_memoryManager.value().get(); + + // clearing the introspection, is not in d'tor -> SEGFAULT in delete sporadically + m_portManager->stopPortIntrospection(); + m_portManager->deletePortsOfProcess(iox::roudi::IPC_CHANNEL_ROUDI_NAME); + } + + void TearDown() override + { + delete m_portManager; + delete m_roudiMemoryManager; + iox::rp::BaseRelativePointer::unregisterAll(); + } + iox::capro::ServiceDescription getUniqueSD() + { + m_eventIdCounter++; + if (m_eventIdCounter == std::numeric_limits::max()) + { + m_eventIdCounter = 1U; + m_instIdCounter++; // not using max (wildcard) + if (m_instIdCounter == std::numeric_limits::max()) + { + m_instIdCounter = 1U; + m_sIdCounter++; + if (m_sIdCounter == std::numeric_limits::max()) + { + // ASSERT_TRUE(false); // limits of test reached no more unique ids possible + } + } + } + return {iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_sIdCounter)), + iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_eventIdCounter)), + iox::capro::IdString_t(TruncateToCapacity, convert::toString(m_instIdCounter))}; + } + + void acquireMaxNumberOfInterfaces( + std::string runtimeName, + std::function f = std::function()) + { + for (unsigned int i = 0; i < iox::MAX_INTERFACE_NUMBER; i++) + { + auto newProcessName = runtimeName + iox::cxx::convert::toString(i); + auto interfacePort = m_portManager->acquireInterfacePortData( + iox::capro::Interfaces::INTERNAL, iox::RuntimeName_t(iox::cxx::TruncateToCapacity, newProcessName)); + ASSERT_NE(interfacePort, nullptr); + if (f) + { + f(interfacePort); + } + } + } + + void acquireMaxNumberOfConditionVariables(std::string runtimeName, + std::function f = + std::function()) + { + for (unsigned int i = 0; i < iox::MAX_NUMBER_OF_CONDITION_VARIABLES; i++) + { + auto newProcessName = runtimeName + iox::cxx::convert::toString(i); + auto condVar = m_portManager->acquireConditionVariableData( + iox::RuntimeName_t(iox::cxx::TruncateToCapacity, newProcessName)); + ASSERT_FALSE(condVar.has_error()); + if (f) + { + f(condVar.value()); + } + } + } + + void + acquireMaxNumberOfNodes(std::string nodeName, + std::string runtimeName, + std::function f = + std::function()) + { + for (unsigned int i = 0U; i < iox::MAX_NODE_NUMBER; i++) + { + iox::RuntimeName_t newProcessName(iox::cxx::TruncateToCapacity, + runtimeName + iox::cxx::convert::toString(i)); + iox::NodeName_t newNodeName(iox::cxx::TruncateToCapacity, nodeName + iox::cxx::convert::toString(i)); + auto node = m_portManager->acquireNodeData(newProcessName, newNodeName); + ASSERT_FALSE(node.has_error()); + if (f) + { + f(node.value(), newNodeName, newProcessName); + } + } + } + + void setupAndTestBlockingPublisher(const iox::RuntimeName_t& publisherRuntimeName, + std::function testHook) noexcept; + + PublisherPortUser createPublisher(const PublisherOptions& options) + { + return PublisherPortUser( + m_portManager + ->acquirePublisherPortData( + {"1", "1", "1"}, options, "guiseppe", m_payloadDataSegmentMemoryManager, PortConfigInfo()) + .value()); + } + + SubscriberPortUser createSubscriber(const SubscriberOptions& options) + { + return SubscriberPortUser( + m_portManager->acquireSubscriberPortData({"1", "1", "1"}, options, "schlomo", PortConfigInfo()).value()); + } + + ClientPortUser createClient(const ClientOptions& options) + { + const ServiceDescription sd{"1", "1", "1"}; + const RuntimeName_t runtimeName{"guiseppe"}; + return ClientPortUser( + *m_portManager->acquireClientPortData(sd, options, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .value()); + } + + ServerPortUser createServer(const ServerOptions& options) + { + const ServiceDescription sd{"1", "1", "1"}; + const RuntimeName_t runtimeName{"schlomo"}; + return ServerPortUser( + *m_portManager->acquireServerPortData(sd, options, runtimeName, m_payloadDataSegmentMemoryManager, {}) + .value()); + } +}; + +template +void setDestroyFlagAndClearContainer(vector& container) +{ + for (auto& item : container) + { + item->m_toBeDestroyed.store(true, std::memory_order_relaxed); + } + container.clear(); +} +} // namespace iox_test_roudi_portmanager diff --git a/iceoryx_posh/test/moduletests/test_roudi_portpool.cpp b/iceoryx_posh/test/moduletests/test_roudi_portpool.cpp index 542eda28ed..fec8ff327c 100644 --- a/iceoryx_posh/test/moduletests/test_roudi_portpool.cpp +++ b/iceoryx_posh/test/moduletests/test_roudi_portpool.cpp @@ -18,6 +18,7 @@ #include "iceoryx_hoofs/cxx/convert.hpp" #include "iceoryx_posh/internal/roudi/port_pool_data.hpp" #include "iceoryx_posh/internal/runtime/node_data.hpp" +#include "iceoryx_posh/popo/client_options.hpp" #include "iceoryx_posh/popo/subscriber_options.hpp" #include "iceoryx_posh/roudi/port_pool.hpp" #include "test.hpp" @@ -30,8 +31,56 @@ using namespace iox; static constexpr uint32_t DEFAULT_DEVICE_ID{20U}; static constexpr uint32_t DEFAULT_MEMORY_TYPE{100U}; +static constexpr uint32_t QUEUE_CAPACITY{10U}; class PortPool_test : public Test { + public: + bool addClientPorts(uint32_t numberOfClientPortsToAdd, + std::function onAdd) + { + for (uint32_t i = 0; i < numberOfClientPortsToAdd; ++i) + { + std::string service = "service" + cxx::convert::toString(i); + IdString_t serviceId{cxx::TruncateToCapacity, service}; + ServiceDescription sd{serviceId, "instance", "event"}; + RuntimeName_t runtimeName{cxx::TruncateToCapacity, "AppName" + cxx::convert::toString(i)}; + + auto clientPortResult = sut.addClientPort(sd, &m_memoryManager, runtimeName, m_clientOptions, m_memoryInfo); + if (clientPortResult.has_error()) + { + return false; + } + onAdd(sd, runtimeName, *clientPortResult.value()); + } + + return true; + } + + bool addServerPorts(uint32_t numberOfServerPortsToAdd, + std::function onAdd) + { + for (uint32_t i = 0; i < numberOfServerPortsToAdd; ++i) + { + std::string service = "service" + cxx::convert::toString(i); + IdString_t serviceId{cxx::TruncateToCapacity, service}; + ServiceDescription sd{serviceId, "instance", "event"}; + RuntimeName_t runtimeName{cxx::TruncateToCapacity, "AppName" + cxx::convert::toString(i)}; + + auto serverPortResult = sut.addServerPort(sd, &m_memoryManager, runtimeName, m_serverOptions, m_memoryInfo); + if (serverPortResult.has_error()) + { + return false; + } + onAdd(sd, runtimeName, *serverPortResult.value()); + } + + return true; + } + public: roudi::PortPoolData m_portPoolData; roudi::PortPool sut{m_portPoolData}; @@ -45,9 +94,13 @@ class PortPool_test : public Test popo::PublisherOptions m_publisherOptions{10U, m_nodeName}; popo::SubscriberOptions m_subscriberOptions{ iox::popo::SubscriberPortData::ChunkQueueData_t::MAX_CAPACITY, 10U, m_nodeName}; + popo::ClientOptions m_clientOptions{QUEUE_CAPACITY, m_nodeName}; + popo::ServerOptions m_serverOptions{QUEUE_CAPACITY, m_nodeName}; iox::mepoo::MemoryInfo m_memoryInfo{DEFAULT_DEVICE_ID, DEFAULT_MEMORY_TYPE}; }; +// BEGIN Node tests + TEST_F(PortPool_test, AddNodeDataIsSuccessful) { ::testing::Test::RecordProperty("TEST_ID", "a917fe3d-08a4-4c8f-83a5-4b99b915c0dd"); @@ -142,6 +195,10 @@ TEST_F(PortPool_test, RemoveNodeDataIsSuccessful) EXPECT_EQ(nodeDataList.size(), 0U); } +// END Node tests + +// BEGIN PublisherPort tests + TEST_F(PortPool_test, AddPublisherPortIsSuccessful) { ::testing::Test::RecordProperty("TEST_ID", "8e2271f5-65a9-41ea-bffa-7f1a55321cc0"); @@ -269,6 +326,10 @@ TEST_F(PortPool_test, RemovePublisherPortIsSuccessful) EXPECT_EQ(publisherPortDataList.size(), 0U); } +// END PublisherPort tests + +// BEGIN SubscriberPort tests + TEST_F(PortPool_test, AddSubscriberPortIsSuccessful) { ::testing::Test::RecordProperty("TEST_ID", "b4703d69-bec1-49cf-8f7b-00e805577d8f"); @@ -391,6 +452,226 @@ TEST_F(PortPool_test, RemoveSubscriberPortIsSuccessful) EXPECT_EQ(subscriberPortDataList.size(), 0U); } +// END SubscriberPort tests + +// BEGIN ClientPort tests + +TEST_F(PortPool_test, AddClientPortIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "47d9cd34-22a6-480a-8595-d4abf46df428"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{1U}; + auto addSuccessful = + addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, [&](const auto& sd, const auto& runtimeName, const auto& clientPort) { + EXPECT_EQ(clientPort.m_serviceDescription, sd); + EXPECT_EQ(clientPort.m_runtimeName, runtimeName); + EXPECT_EQ(clientPort.m_nodeName, m_clientOptions.nodeName); + EXPECT_EQ(clientPort.m_connectRequested, m_clientOptions.connectOnCreate); + EXPECT_EQ(clientPort.m_connectionState, ConnectionState::NOT_CONNECTED); + EXPECT_EQ(clientPort.m_chunkReceiverData.m_queue.capacity(), QUEUE_CAPACITY); + EXPECT_EQ(clientPort.m_chunkReceiverData.m_memoryInfo.deviceId, DEFAULT_DEVICE_ID); + EXPECT_EQ(clientPort.m_chunkReceiverData.m_memoryInfo.memoryType, DEFAULT_MEMORY_TYPE); + EXPECT_EQ(clientPort.m_chunkSenderData.m_historyCapacity, popo::ClientPortData::HISTORY_CAPACITY_ZERO); + EXPECT_EQ(clientPort.m_chunkSenderData.m_memoryInfo.deviceId, DEFAULT_DEVICE_ID); + EXPECT_EQ(clientPort.m_chunkSenderData.m_memoryInfo.memoryType, DEFAULT_MEMORY_TYPE); + }); + + EXPECT_TRUE(addSuccessful); +} + +TEST_F(PortPool_test, AddClientPortToMaxCapacityIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "f8ee6f26-fdac-4bfd-9e28-46362e4359e9"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{MAX_CLIENTS}; + auto addSuccessful = + addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, [&](const auto& sd, const auto&, const auto& clientPort) { + EXPECT_EQ(clientPort.m_serviceDescription, sd); + }); + + EXPECT_TRUE(addSuccessful); +} + + +TEST_F(PortPool_test, AddClientPortWhenClientListOverflowsReturnsError) +{ + ::testing::Test::RecordProperty("TEST_ID", "98c47d42-5f75-42a3-84b5-b97e72a17992"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{MAX_CLIENTS}; + auto addSuccessful = addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + + EXPECT_TRUE(addSuccessful); + + auto errorHandlerCalled{false}; + auto errorHandlerGuard = ErrorHandler::setTemporaryErrorHandler( + [&](const Error error, const std::function, const ErrorLevel level) { + errorHandlerCalled = true; + EXPECT_THAT(error, Eq(Error::kPORT_POOL__CLIENTLIST_OVERFLOW)); + EXPECT_THAT(level, Eq(ErrorLevel::MODERATE)); + }); + + constexpr uint32_t ONE_MORE_CLIENT{1U}; + auto additionalAddSuccessful = addClientPorts(ONE_MORE_CLIENT, [&](const auto&, const auto&, const auto&) {}); + + EXPECT_FALSE(additionalAddSuccessful); + EXPECT_TRUE(errorHandlerCalled); +} + +TEST_F(PortPool_test, GetClientPortDataListIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "39119f21-ca97-4320-a805-029927a79372"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{1U}; + auto addSuccessful = addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + EXPECT_TRUE(addSuccessful); + + auto clientPortDataList = sut.getClientPortDataList(); + + ASSERT_EQ(clientPortDataList.size(), NUMBER_OF_CLIENTS_TO_ADD); +} + +TEST_F(PortPool_test, GetClientPortDataListWhenEmptyIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "6c08ae7d-1eed-46d6-b363-b2dc294d0e0e"); + auto clientPortDataList = sut.getClientPortDataList(); + + ASSERT_EQ(clientPortDataList.size(), 0U); +} + +TEST_F(PortPool_test, GetClientPortDataListCompletelyFilledIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "cdac1dca-f438-4816-90f9-ca976b6ccd88"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{MAX_CLIENTS}; + auto addSuccessful = addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + EXPECT_TRUE(addSuccessful); + + auto clientPortDataList = sut.getClientPortDataList(); + + ASSERT_EQ(clientPortDataList.size(), MAX_CLIENTS); +} + +TEST_F(PortPool_test, RemoveClientPortIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "d93ecaef-555a-4db4-a49d-390366457f97"); + constexpr uint32_t NUMBER_OF_CLIENTS_TO_ADD{1U}; + auto addSuccessful = + addClientPorts(NUMBER_OF_CLIENTS_TO_ADD, + [&](const auto&, const auto&, const auto& clientPort) { sut.removeClientPort(&clientPort); }); + EXPECT_TRUE(addSuccessful); + + auto clientPortDataList = sut.getClientPortDataList(); + + EXPECT_EQ(clientPortDataList.size(), 0U); +} + +// END ClientPort tests + +// BEGIN ServerPort tests + +TEST_F(PortPool_test, AddServerPortIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "ff0a77a0-5a60-460e-ba3c-f9c5669b7086"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{1U}; + auto addSuccessful = + addServerPorts(NUMBER_OF_SERVERS_TO_ADD, [&](const auto& sd, const auto& runtimeName, const auto& serverPort) { + EXPECT_EQ(serverPort.m_serviceDescription, sd); + EXPECT_EQ(serverPort.m_runtimeName, runtimeName); + EXPECT_EQ(serverPort.m_nodeName, m_serverOptions.nodeName); + EXPECT_EQ(serverPort.m_offeringRequested, m_serverOptions.offerOnCreate); + EXPECT_EQ(serverPort.m_offered, false); + EXPECT_EQ(serverPort.m_chunkReceiverData.m_queue.capacity(), QUEUE_CAPACITY); + EXPECT_EQ(serverPort.m_chunkReceiverData.m_memoryInfo.deviceId, DEFAULT_DEVICE_ID); + EXPECT_EQ(serverPort.m_chunkReceiverData.m_memoryInfo.memoryType, DEFAULT_MEMORY_TYPE); + EXPECT_EQ(serverPort.m_chunkSenderData.m_historyCapacity, popo::ServerPortData::HISTORY_REQUEST_OF_ZERO); + EXPECT_EQ(serverPort.m_chunkSenderData.m_memoryInfo.deviceId, DEFAULT_DEVICE_ID); + EXPECT_EQ(serverPort.m_chunkSenderData.m_memoryInfo.memoryType, DEFAULT_MEMORY_TYPE); + }); + + EXPECT_TRUE(addSuccessful); +} + +TEST_F(PortPool_test, AddServerPortToMaxCapacityIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "496021f9-5ec3-4b1c-a551-8a0d50d0ac8f"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{MAX_SERVERS}; + auto addSuccessful = + addServerPorts(NUMBER_OF_SERVERS_TO_ADD, [&](const auto& sd, const auto&, const auto& serverPort) { + EXPECT_EQ(serverPort.m_serviceDescription, sd); + }); + + EXPECT_TRUE(addSuccessful); +} + + +TEST_F(PortPool_test, AddServerPortWhenServerListOverflowsReturnsError) +{ + ::testing::Test::RecordProperty("TEST_ID", "744b3d73-b2d2-49cf-a748-e13dc6f3b06c"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{MAX_SERVERS}; + auto addSuccessful = addServerPorts(NUMBER_OF_SERVERS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + + EXPECT_TRUE(addSuccessful); + + auto errorHandlerCalled{false}; + auto errorHandlerGuard = ErrorHandler::setTemporaryErrorHandler( + [&](const Error error, const std::function, const ErrorLevel level) { + errorHandlerCalled = true; + EXPECT_THAT(error, Eq(Error::kPORT_POOL__SERVERLIST_OVERFLOW)); + EXPECT_THAT(level, Eq(ErrorLevel::MODERATE)); + }); + + constexpr uint32_t ONE_MORE_SERVER{1U}; + auto additionalAddSuccessful = addServerPorts(ONE_MORE_SERVER, [&](const auto&, const auto&, const auto&) {}); + + EXPECT_FALSE(additionalAddSuccessful); + EXPECT_TRUE(errorHandlerCalled); +} + +TEST_F(PortPool_test, GetServerPortDataListIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "d30fa67c-7f7d-43f1-a7bc-599e5668ab65"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{1U}; + auto addSuccessful = addServerPorts(NUMBER_OF_SERVERS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + EXPECT_TRUE(addSuccessful); + + auto serverPortDataList = sut.getServerPortDataList(); + + ASSERT_EQ(serverPortDataList.size(), NUMBER_OF_SERVERS_TO_ADD); +} + +TEST_F(PortPool_test, GetServerPortDataListWhenEmptyIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "d1b32417-caeb-4a5c-ae40-49d651b418cd"); + auto serverPortDataList = sut.getServerPortDataList(); + + ASSERT_EQ(serverPortDataList.size(), 0U); +} + +TEST_F(PortPool_test, GetServerPortDataListCompletelyFilledIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "2968e43d-6972-4667-82f6-7762d479a729"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{MAX_SERVERS}; + auto addSuccessful = addServerPorts(NUMBER_OF_SERVERS_TO_ADD, [&](const auto&, const auto&, const auto&) {}); + EXPECT_TRUE(addSuccessful); + + auto serverPortDataList = sut.getServerPortDataList(); + + ASSERT_EQ(serverPortDataList.size(), MAX_SERVERS); +} + +TEST_F(PortPool_test, RemoveServerPortIsSuccessful) +{ + ::testing::Test::RecordProperty("TEST_ID", "b140e3bf-0ddf-4e1a-824b-a4935596f371"); + constexpr uint32_t NUMBER_OF_SERVERS_TO_ADD{1U}; + auto addSuccessful = + addServerPorts(NUMBER_OF_SERVERS_TO_ADD, + [&](const auto&, const auto&, const auto& serverPort) { sut.removeServerPort(&serverPort); }); + EXPECT_TRUE(addSuccessful); + + auto serverPortDataList = sut.getServerPortDataList(); + + EXPECT_EQ(serverPortDataList.size(), 0U); +} + +// END ServerPort tests + +// BEGIN InterfacePort tests + TEST_F(PortPool_test, AddInterfacePortIsSuccessful) { ::testing::Test::RecordProperty("TEST_ID", "28116302-dc19-4927-aab4-6d03c9befd88"); @@ -476,6 +757,10 @@ TEST_F(PortPool_test, RemoveInterfacePortIsSuccessful) ASSERT_EQ(interfacePortDataList.size(), 0U); } +// END InterfacePort tests + +// BEGIN ConditionVariable tests + TEST_F(PortPool_test, AddConditionVariableDataIsSuccessful) { ::testing::Test::RecordProperty("TEST_ID", "08021def-be31-42f2-855f-38cac6120c3f"); @@ -559,4 +844,6 @@ TEST_F(PortPool_test, RemoveConditionVariableDataIsSuccessful) ASSERT_EQ(condtionalVariableData.size(), 0U); } +// END ConditionVariable tests + } // namespace