From c1dbf60c1fbab74f92debc7b681cfabd50a7e6e6 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Thu, 10 Feb 2022 00:21:36 +0100 Subject: [PATCH] iox-#27 Add client and server port to PortManager --- .../error_handling/error_handling.hpp | 2 + .../internal/roudi/port_manager.hpp | 37 +++ iceoryx_posh/source/roudi/port_manager.cpp | 293 ++++++++++++++++++ 3 files changed, 332 insertions(+) diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp b/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp index 25032677c56..c82e677399f 100644 --- a/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp +++ b/iceoryx_hoofs/include/iceoryx_hoofs/error_handling/error_handling.hpp @@ -133,6 +133,8 @@ namespace iox error(PORT_MANAGER__INTROSPECTION_MEMORY_MANAGER_UNAVAILABLE) \ error(PORT_MANAGER__HANDLE_PUBLISHER_PORTS_INVALID_CAPRO_MESSAGE) \ error(PORT_MANAGER__HANDLE_SUBSCRIBER_PORTS_INVALID_CAPRO_MESSAGE) \ + error(PORT_MANAGER__HANDLE_CLIENT_PORTS_INVALID_CAPRO_MESSAGE) \ + error(PORT_MANAGER__HANDLE_SERVER_PORTS_INVALID_CAPRO_MESSAGE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTSERVICE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTTHROUGHPUTSERVICE) \ error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONCHANGINGDATASERVICE) \ diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp index f6d358b99b8..c6b02e3a18a 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp @@ -25,9 +25,13 @@ #include "iceoryx_posh/iceoryx_posh_types.hpp" #include "iceoryx_posh/internal/capro/capro_message.hpp" #include "iceoryx_posh/internal/mepoo/memory_manager.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_roudi.hpp" +#include "iceoryx_posh/internal/popo/ports/client_port_user.hpp" #include "iceoryx_posh/internal/popo/ports/interface_port.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_roudi.hpp" #include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_roudi.hpp" +#include "iceoryx_posh/internal/popo/ports/server_port_user.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_multi_producer.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_single_producer.hpp" #include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp" @@ -76,6 +80,20 @@ class PortManager const RuntimeName_t& runtimeName, const PortConfigInfo& portConfigInfo) noexcept; + cxx::expected + acquireClientPortData(const capro::ServiceDescription& service, + const popo::ClientOptions& clientOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept; + + cxx::expected + acquireServerPortData(const capro::ServiceDescription& service, + const popo::ServerOptions& serverOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept; + popo::InterfacePortData* acquireInterfacePortData(capro::Interfaces interface, const RuntimeName_t& runtimeName, const NodeName_t& nodeName = {""}) noexcept; @@ -115,6 +133,20 @@ class PortManager void doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPort) noexcept; + void destroyClientPort(popo::ClientPortRouDi::MemberType_t* const clientPortData) noexcept; + + void handleClientPorts() noexcept; + + void doDiscoveryForClientPort(popo::ClientPortRouDi& clientPort) noexcept; + + void makeAllServerPortsToStopOffer() noexcept; + + void destroyServerPort(popo::ServerPortRouDi::MemberType_t* const clientPortData) noexcept; + + void handleServerPorts() noexcept; + + void doDiscoveryForServerPort(popo::ServerPortRouDi& serverPort) noexcept; + void handleInterfaces() noexcept; void handleNodes() noexcept; @@ -127,6 +159,11 @@ class PortManager void sendToAllMatchingSubscriberPorts(const capro::CaproMessage& message, PublisherPortRouDiType& publisherSource) noexcept; + + void sendToAllMatchingClientPorts(const capro::CaproMessage& message, popo::ServerPortRouDi& serverSource) noexcept; + + bool sendToAllMatchingServerPorts(const capro::CaproMessage& message, popo::ClientPortRouDi& clientSource) noexcept; + void sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept; void addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept; diff --git a/iceoryx_posh/source/roudi/port_manager.cpp b/iceoryx_posh/source/roudi/port_manager.cpp index 5af4a2e1a5c..04d21b1ddc6 100644 --- a/iceoryx_posh/source/roudi/port_manager.cpp +++ b/iceoryx_posh/source/roudi/port_manager.cpp @@ -122,6 +122,10 @@ void PortManager::doDiscovery() noexcept handleSubscriberPorts(); + handleServerPorts(); + + handleClientPorts(); + handleInterfaces(); handleNodes(); @@ -215,6 +219,158 @@ void PortManager::doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPor }); } +void PortManager::destroyClientPort(popo::ClientPortRouDi::MemberType_t* const clientPortData) noexcept +{ + cxx::Ensures(clientPortData != nullptr && "clientPortData must not be a nullptr"); + + // create temporary client ports to orderly shut this client down + popo::ClientPortRouDi clientPortRoudi(*clientPortData); + popo::ClientPortUser clientPortUser(*clientPortData); + + clientPortRoudi.releaseAllChunks(); + clientPortUser.disconnect(); + + // process DISCONNECT for this client in RouDi and distribute it + clientPortRoudi.tryGetCaProMessage().and_then([this, &clientPortRoudi](auto caproMessage) { + cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::DISCONNECT); + + m_portIntrospection.reportMessage(caproMessage); + this->sendToAllMatchingServerPorts(caproMessage, clientPortRoudi); + }); + + /// @todo iox-#27 report to port introspection + + // delete client port from list after DISCONNECT was processed + m_portPool->removeClientPort(clientPortData); + + LogDebug() << "Destroyed client port"; +} + +void PortManager::handleClientPorts() noexcept +{ + // get requests for change of connection state of clients + for (auto clientPortData : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi clientPort(*clientPortData); + + doDiscoveryForClientPort(clientPort); + + // check if we have to destroy this clinet port + if (clientPort.toBeDestroyed()) + { + destroyClientPort(clientPortData); + } + } +} + +void PortManager::doDiscoveryForClientPort(popo::ClientPortRouDi& clientPort) noexcept +{ + clientPort.tryGetCaProMessage().and_then([this, &clientPort](auto caproMessage) { + if ((capro::CaproMessageType::CONNECT == caproMessage.m_type) + || (capro::CaproMessageType::DISCONNECT == caproMessage.m_type)) + { + /// @todo iox-#27 report to port introspection + if (!this->sendToAllMatchingServerPorts(caproMessage, clientPort)) + { + LogDebug() << "capro::CONNECT/DISCONNECT, no matching server!!"; + capro::CaproMessage nackMessage(capro::CaproMessageType::NACK, clientPort.getCaProServiceDescription()); + auto returnMessage = clientPort.dispatchCaProMessageAndGetPossibleResponse(nackMessage); + // No response on NACK messages + cxx::Ensures(!returnMessage.has_value()); + } + } + else + { + // protocol error + errorHandler( + Error::kPORT_MANAGER__HANDLE_CLIENT_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + } + }); +} + +void PortManager::makeAllServerPortsToStopOffer() noexcept +{ + for (auto port : m_portPool->getServerPortDataList()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + + popo::ServerPortRouDi serverPort(*port); + doDiscoveryForServerPort(serverPort); + } +} + +void PortManager::destroyServerPort(popo::ServerPortRouDi::MemberType_t* const serverPortData) noexcept +{ + cxx::Ensures(serverPortData != nullptr && "serverPortData must not be a nullptr"); + + // create temporary server ports to orderly shut this server down + popo::ServerPortRouDi serverPortRoudi{*serverPortData}; + popo::ServerPortUser serverPortUser{*serverPortData}; + + serverPortRoudi.releaseAllChunks(); + serverPortUser.stopOffer(); + + // process STOP_OFFER for this server in RouDi and distribute it + serverPortRoudi.tryGetCaProMessage().and_then([this, &serverPortRoudi](auto caproMessage) { + cxx::Ensures(caproMessage.m_type == capro::CaproMessageType::STOP_OFFER); + + m_portIntrospection.reportMessage(caproMessage); + /// @todo iox-#27 add server to service registry + // this->removeEntryFromServiceRegistry(caproMessage.m_serviceDescription); + this->sendToAllMatchingClientPorts(caproMessage, serverPortRoudi); + /// @todo iox-#27 forward server port to interface ports + // this->sendToAllMatchingInterfacePorts(caproMessage); + }); + + /// @todo iox-#27 report to port introspection + + // delete server port from list after STOP_OFFER was processed + m_portPool->removeServerPort(serverPortData); + + LogDebug() << "Destroyed server port"; +} + +void PortManager::handleServerPorts() noexcept +{ + // get the changes of server port offer state + for (auto serverPortData : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*serverPortData); + + doDiscoveryForServerPort(serverPort); + + // check if we have to destroy this server port + if (serverPort.toBeDestroyed()) + { + destroyServerPort(serverPortData); + } + } +} + +void PortManager::doDiscoveryForServerPort(popo::ServerPortRouDi& serverPort) noexcept +{ + serverPort.tryGetCaProMessage().and_then([this, &serverPort](auto caproMessage) { + /// @todo iox-#27 report to port instrospection + + if (capro::CaproMessageType::OFFER == caproMessage.m_type) + { + /// @todo iox-#27 add to service registry? + } + else if (capro::CaproMessageType::STOP_OFFER == caproMessage.m_type) + { + /// @todo iox-#27 remove from service registry + } + else + { + // protocol error + errorHandler( + Error::kPORT_MANAGER__HANDLE_SERVER_PORTS_INVALID_CAPRO_MESSAGE, nullptr, iox::ErrorLevel::MODERATE); + } + + this->sendToAllMatchingClientPorts(caproMessage, serverPort); + /// @todo iox-#27 forward to interfaces? + }); +} void PortManager::handleInterfaces() noexcept { @@ -415,6 +571,75 @@ void PortManager::sendToAllMatchingSubscriberPorts(const capro::CaproMessage& me } } +void PortManager::sendToAllMatchingClientPorts(const capro::CaproMessage& message, + popo::ServerPortRouDi& serverSource) noexcept +{ + for (auto clientPortData : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi clientPort(*clientPortData); + if (clientPort.getCaProServiceDescription() == serverSource.getCaProServiceDescription() + && !(serverSource.getClientTooSlowPolicy() == popo::ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA + && clientPort.getResponseQueueFullPolicy() == popo::QueueFullPolicy::BLOCK_PRODUCER)) + { + // send OFFER to client + auto clientResponse = clientPort.dispatchCaProMessageAndGetPossibleResponse(message); + + // if the clients react on the change, process it immediately on server side + if (clientResponse.has_value()) + { + // we only expect reaction on CONNECT + cxx::Expects(capro::CaproMessageType::CONNECT == clientResponse.value().m_type); + + /// @todo inform port introspection about client + + // send CONNECT to server + auto serverResponse = serverSource.dispatchCaProMessageAndGetPossibleResponse(clientResponse.value()); + if (serverResponse.has_value()) + { + // sende responsee to client port + auto returnMessage = clientPort.dispatchCaProMessageAndGetPossibleResponse(serverResponse.value()); + + // ACK or NACK are sent back to the client port, no further response from this one expected + cxx::Ensures(!returnMessage.has_value()); + + /// @todo iox-#27 inform port introspection about server + } + } + } + } +} + +bool PortManager::sendToAllMatchingServerPorts(const capro::CaproMessage& message, + popo::ClientPortRouDi& clientSource) noexcept +{ + bool serverFound = false; + for (auto serverPortData : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*serverPortData); + if (clientSource.getCaProServiceDescription() == serverPort.getCaProServiceDescription() + && !(serverPort.getClientTooSlowPolicy() == popo::ConsumerTooSlowPolicy::DISCARD_OLDEST_DATA + && clientSource.getResponseQueueFullPolicy() == popo::QueueFullPolicy::BLOCK_PRODUCER)) + { + // send CONNECT to server + auto serverResponse = serverPort.dispatchCaProMessageAndGetPossibleResponse(message); + + // if the clients react on the change, process it immediately on server side + if (serverResponse.has_value()) + { + // send response to client port + auto returnMessage = clientSource.dispatchCaProMessageAndGetPossibleResponse(serverResponse.value()); + + // ACK or NACK are sent back to the client port, no further response from this one expected + cxx::Ensures(!returnMessage.has_value()); + + /// @todo iox-#27 inform port introspection about server + } + serverFound = true; + } + } + return serverFound; +} + void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept { for (auto interfacePortData : m_portPool->getInterfacePortDataList()) @@ -440,11 +665,22 @@ void PortManager::unblockProcessShutdown(const RuntimeName_t& runtimeName) noexc doDiscoveryForPublisherPort(publisherPort); } } + + for (auto port : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi serverPort(*port); + if (runtimeName == serverPort.getRuntimeName()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + doDiscoveryForServerPort(serverPort); + } + } } void PortManager::unblockRouDiShutdown() noexcept { makeAllPublisherPortsToStopOffer(); + makeAllServerPortsToStopOffer(); } void PortManager::makeAllPublisherPortsToStopOffer() noexcept @@ -478,6 +714,24 @@ void PortManager::deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcep } } + for (auto port : m_portPool->getServerPortDataList()) + { + popo::ServerPortRouDi server(*port); + if (runtimeName == server.getRuntimeName()) + { + destroyServerPort(port); + } + } + + for (auto port : m_portPool->getClientPortDataList()) + { + popo::ClientPortRouDi client(*port); + if (runtimeName == client.getRuntimeName()) + { + destroyClientPort(port); + } + } + for (auto port : m_portPool->getInterfacePortDataList()) { popo::InterfacePort interface(port); @@ -642,6 +896,45 @@ PortManager::acquireSubscriberPortData(const capro::ServiceDescription& service, return maybeSubscriberPortData; } +cxx::expected +PortManager::acquireClientPortData(const capro::ServiceDescription& service, + const popo::ClientOptions& clientOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept +{ + // we can create a new port + return m_portPool + ->addClientPort(service, payloadDataSegmentMemoryManager, runtimeName, clientOptions, portConfigInfo.memoryInfo) + .and_then([&](auto clientPortData) { + /// @todo iox-#27 add to port introspection + + // we do discovery here for trying to connect the client if offer on create is desired + popo::ClientPortRouDi clientPort(*clientPortData); + doDiscoveryForClientPort(clientPort); + }); +} + +cxx::expected +PortManager::acquireServerPortData(const capro::ServiceDescription& service, + const popo::ServerOptions& serverOptions, + const RuntimeName_t& runtimeName, + mepoo::MemoryManager* const payloadDataSegmentMemoryManager, + const PortConfigInfo& portConfigInfo) noexcept +{ + /// @todo iox-#27 check for unique server port + + // we can create a new port + return m_portPool + ->addServerPort(service, payloadDataSegmentMemoryManager, runtimeName, serverOptions, portConfigInfo.memoryInfo) + .and_then([&](auto serverPortData) { + /// @todo iox-#27 add to port introspection + + // we do discovery here for trying to connect the waiting client if offer on create is desired + popo::ServerPortRouDi serverPort(*serverPortData); + doDiscoveryForServerPort(serverPort); + }); +} /// @todo return a cxx::expected popo::InterfacePortData* PortManager::acquireInterfacePortData(capro::Interfaces interface,