Skip to content

Commit

Permalink
Merge pull request #1084 from eclipse-iceoryx/iox-#27-add-client-and-…
Browse files Browse the repository at this point in the history
…server-port-to-port-pool-and-port-manager

iox-#27 Add client and server port to PortPool and PortManager  [stacked PR #2]
  • Loading branch information
elBoberido authored Feb 20, 2022
2 parents d53aba1 + 4a550b8 commit ca6fa0f
Show file tree
Hide file tree
Showing 23 changed files with 1,902 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ namespace iox
error(POSH__RUNTIME_LEADING_SLASH_PROVIDED) \
error(POSH__SERVICE_DISCOVERY_UNKNOWN_EVENT_PROVIDED) \
error(POSH__PORT_MANAGER_PUBLISHERPORT_NOT_UNIQUE) \
error(POSH__PORT_MANAGER_SERVERPORT_NOT_UNIQUE) \
error(POSH__PORT_MANAGER_COULD_NOT_ADD_SERVICE_TO_REGISTRY) \
error(POSH__MEMPOOL_POSSIBLE_DOUBLE_FREE) \
error(POSH__RECEIVERPORT_DELIVERYFIFO_OVERFLOW) \
Expand Down Expand Up @@ -122,6 +123,8 @@ namespace iox
error(MEPOO__MAXIMUM_NUMBER_OF_MEMPOOLS_REACHED) \
error(PORT_POOL__PUBLISHERLIST_OVERFLOW) \
error(PORT_POOL__SUBSCRIBERLIST_OVERFLOW) \
error(PORT_POOL__CLIENTLIST_OVERFLOW) \
error(PORT_POOL__SERVERLIST_OVERFLOW) \
error(PORT_POOL__INTERFACELIST_OVERFLOW) \
error(PORT_POOL__APPLICATIONLIST_OVERFLOW) \
error(PORT_POOL__NODELIST_OVERFLOW) \
Expand All @@ -131,6 +134,8 @@ namespace iox
error(PORT_MANAGER__INTROSPECTION_MEMORY_MANAGER_UNAVAILABLE) \
error(PORT_MANAGER__HANDLE_PUBLISHER_PORTS_INVALID_CAPRO_MESSAGE) \
error(PORT_MANAGER__HANDLE_SUBSCRIBER_PORTS_INVALID_CAPRO_MESSAGE) \
error(PORT_MANAGER__HANDLE_CLIENT_PORTS_INVALID_CAPRO_MESSAGE) \
error(PORT_MANAGER__HANDLE_SERVER_PORTS_INVALID_CAPRO_MESSAGE) \
error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTSERVICE) \
error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONPORTTHROUGHPUTSERVICE) \
error(PORT_MANAGER__NO_PUBLISHER_PORT_FOR_INTROSPECTIONCHANGINGDATASERVICE) \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "iceoryx_hoofs/cxx/serialization.hpp"
#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/log/logstream.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"

#include <cstdint>
Expand Down Expand Up @@ -182,6 +183,12 @@ bool serviceMatch(const ServiceDescription& first, const ServiceDescription& sec
/// @return the reference to `stream` which was provided as input parameter
std::ostream& operator<<(std::ostream& stream, const ServiceDescription& service) noexcept;

/// @brief Convenience stream operator to easily use the `ServiceDescription` with log::LogStream
/// @param[in] stream output LogStream to write the message to
/// @param[in] service ServiceDescription that shall be converted
/// @return the reference to `stream` which was provided as input parameter
log::LogStream& operator<<(log::LogStream& stream, const ServiceDescription& service) noexcept;

} // namespace capro
} // namespace iox

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class ClientPortRouDi : public BasePort
/// @return the configured responseQueueFullPolicy
QueueFullPolicy getResponseQueueFullPolicy() const noexcept;

/// @brief Access to the configured serverTooSlowPolicy
/// @return the configured serverTooSlowPolicy
ConsumerTooSlowPolicy getServerTooSlowPolicy() const noexcept;

/// @brief get an optional CaPro message that requests changes to the desired connection state of the client
/// @return CaPro message with desired connection state, empty optional if no state change
cxx::optional<capro::CaproMessage> tryGetCaProMessage() noexcept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class ServerPortRouDi : public BasePort
ServerPortRouDi& operator=(ServerPortRouDi&& rhs) noexcept = default;
~ServerPortRouDi() = default;

/// @brief Access to the configured requestQueueFullPolicy
/// @return the configured requestQueueFullPolicy
QueueFullPolicy getRequestQueueFullPolicy() const noexcept;

/// @brief Access to the configured clientTooSlowPolicy
/// @return the configured clientTooSlowPolicy
ConsumerTooSlowPolicy getClientTooSlowPolicy() const noexcept;

/// @brief get an optional CaPro message that changes the offer state of the server
Expand Down
63 changes: 61 additions & 2 deletions iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/capro/capro_message.hpp"
#include "iceoryx_posh/internal/mepoo/memory_manager.hpp"
#include "iceoryx_posh/internal/popo/ports/client_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/client_port_user.hpp"
#include "iceoryx_posh/internal/popo/ports/interface_port.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_user.hpp"
#include "iceoryx_posh/internal/popo/ports/server_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/server_port_user.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_multi_producer.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_single_producer.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_user.hpp"
Expand Down Expand Up @@ -76,6 +80,34 @@ class PortManager
const RuntimeName_t& runtimeName,
const PortConfigInfo& portConfigInfo) noexcept;

/// @brief Acquires a ClientPortData for further usage
/// @param[in] service is the ServiceDescription for the new client port
/// @param[in] clientOptions for the new client port
/// @param[in] runtimeName of the runtime the new client port belongs to
/// @param[in] payloadDataSegmentMemoryManager to acquire chunks for the requests
/// @param[in] portConfigInfo for the new client port
/// @return on success a pointer to a ClientPortData; on error a PortPoolError
cxx::expected<popo::ClientPortData*, PortPoolError>
acquireClientPortData(const capro::ServiceDescription& service,
const popo::ClientOptions& clientOptions,
const RuntimeName_t& runtimeName,
mepoo::MemoryManager* const payloadDataSegmentMemoryManager,
const PortConfigInfo& portConfigInfo) noexcept;

/// @brief Acquires a ServerPortData for further usage
/// @param[in] service is the ServiceDescription for the new server port
/// @param[in] serverOptions for the new server port
/// @param[in] runtimeName of the runtime the new server port belongs to
/// @param[in] payloadDataSegmentMemoryManager to acquire chunks for the requests
/// @param[in] portConfigInfo for the new server port
/// @return on success a pointer to a ServerPortData; on error a PortPoolError
cxx::expected<popo::ServerPortData*, PortPoolError>
acquireServerPortData(const capro::ServiceDescription& service,
const popo::ServerOptions& serverOptions,
const RuntimeName_t& runtimeName,
mepoo::MemoryManager* const payloadDataSegmentMemoryManager,
const PortConfigInfo& portConfigInfo) noexcept;

popo::InterfacePortData* acquireInterfacePortData(capro::Interfaces interface,
const RuntimeName_t& runtimeName,
const NodeName_t& nodeName = {""}) noexcept;
Expand Down Expand Up @@ -110,22 +142,49 @@ class PortManager

void doDiscoveryForSubscriberPort(SubscriberPortType& subscriberPort) noexcept;

void destroyClientPort(popo::ClientPortData* const clientPortData) noexcept;

void handleClientPorts() noexcept;

void doDiscoveryForClientPort(popo::ClientPortRouDi& clientPort) noexcept;

void makeAllServerPortsToStopOffer() noexcept;

void destroyServerPort(popo::ServerPortData* const clientPortData) noexcept;

void handleServerPorts() noexcept;

void doDiscoveryForServerPort(popo::ServerPortRouDi& serverPort) noexcept;

void handleInterfaces() noexcept;

void handleNodes() noexcept;

void handleConditionVariables() noexcept;

bool isCompatiblePubSub(const PublisherPortRouDiType& publisher,
const SubscriberPortType& subscriber) const noexcept;

bool sendToAllMatchingPublisherPorts(const capro::CaproMessage& message,
SubscriberPortType& subscriberSource) noexcept;

void sendToAllMatchingSubscriberPorts(const capro::CaproMessage& message,
PublisherPortRouDiType& publisherSource) noexcept;

bool isCompatibleClientServer(const popo::ServerPortRouDi& server,
const popo::ClientPortRouDi& client) const noexcept;

void sendToAllMatchingClientPorts(const capro::CaproMessage& message, popo::ServerPortRouDi& serverSource) noexcept;

bool sendToAllMatchingServerPorts(const capro::CaproMessage& message, popo::ClientPortRouDi& clientSource) noexcept;

void sendToAllMatchingInterfacePorts(const capro::CaproMessage& message) noexcept;

void addEntryToServiceRegistry(const capro::ServiceDescription& service) noexcept;
void removeEntryFromServiceRegistry(const capro::ServiceDescription& service) noexcept;
void addPublisherToServiceRegistry(const capro::ServiceDescription& service) noexcept;
void removePublisherFromServiceRegistry(const capro::ServiceDescription& service) noexcept;

void addServerToServiceRegistry(const capro::ServiceDescription& service) noexcept;
void removeServerFromServiceRegistry(const capro::ServiceDescription& service) noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t> doesViolateCommunicationPolicy(const capro::ServiceDescription& service) noexcept;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp"
#include "iceoryx_posh/internal/popo/ports/client_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/interface_port.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/server_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp"
#include "iceoryx_posh/internal/runtime/node_data.hpp"

Expand All @@ -42,7 +44,7 @@ class FixedPositionContainer
template <typename... Targs>
T* insert(Targs&&... args) noexcept;

void erase(T* const element) noexcept;
void erase(const T* const element) noexcept;

cxx::vector<T*, Capacity> content() noexcept;

Expand All @@ -58,6 +60,9 @@ struct PortPoolData

FixedPositionContainer<iox::popo::PublisherPortData, MAX_PUBLISHERS> m_publisherPortMembers;
FixedPositionContainer<iox::popo::SubscriberPortData, MAX_SUBSCRIBERS> m_subscriberPortMembers;

FixedPositionContainer<iox::popo::ServerPortData, MAX_SERVERS> m_serverPortMembers;
FixedPositionContainer<iox::popo::ClientPortData, MAX_CLIENTS> m_clientPortMembers;
};

} // namespace roudi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#ifndef IOX_POSH_ROUDI_PORT_POOL_DATA_INL
#define IOX_POSH_ROUDI_PORT_POOL_DATA_INL

#include "iceoryx_posh/internal/roudi/port_pool_data.hpp"

namespace iox
{
namespace roudi
Expand Down Expand Up @@ -58,7 +60,7 @@ T* FixedPositionContainer<T, Capacity>::insert(Targs&&... args) noexcept
}

template <typename T, uint64_t Capacity>
void FixedPositionContainer<T, Capacity>::erase(T* const element) noexcept
void FixedPositionContainer<T, Capacity>::erase(const T* const element) noexcept
{
for (auto& e : m_data)
{
Expand Down
80 changes: 74 additions & 6 deletions iceoryx_posh/include/iceoryx_posh/roudi/port_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2020 - 2021 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2020 - 2022 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -20,15 +20,21 @@
#include "iceoryx_hoofs/cxx/type_traits.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_variable_data.hpp"
#include "iceoryx_posh/internal/popo/ports/client_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/client_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/interface_port.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/publisher_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/server_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/server_port_roudi.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_data.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_multi_producer.hpp"
#include "iceoryx_posh/internal/popo/ports/subscriber_port_single_producer.hpp"
#include "iceoryx_posh/internal/roudi/port_pool_data.hpp"
#include "iceoryx_posh/internal/runtime/node_data.hpp"
#include "iceoryx_posh/popo/client_options.hpp"
#include "iceoryx_posh/popo/publisher_options.hpp"
#include "iceoryx_posh/popo/server_options.hpp"
#include "iceoryx_posh/popo/subscriber_options.hpp"

namespace iox
Expand All @@ -43,6 +49,9 @@ enum class PortPoolError : uint8_t
PUBLISHER_PORT_LIST_FULL,
SUBSCRIBER_PORT_LIST_FULL,
INTERFACE_PORT_LIST_FULL,
CLIENT_PORT_LIST_FULL,
UNIQUE_SERVER_PORT_ALREADY_EXISTS,
SERVER_PORT_LIST_FULL,
NODE_DATA_LIST_FULL,
CONDITION_VARIABLE_LIST_FULL,
EVENT_VARIABLE_LIST_FULL,
Expand All @@ -60,6 +69,8 @@ class PortPool
/// update this member if the publisher ports actually changed
cxx::vector<PublisherPortRouDiType::MemberType_t*, MAX_PUBLISHERS> getPublisherPortDataList() noexcept;
cxx::vector<SubscriberPortType::MemberType_t*, MAX_SUBSCRIBERS> getSubscriberPortDataList() noexcept;
cxx::vector<popo::ClientPortData*, MAX_CLIENTS> getClientPortDataList() noexcept;
cxx::vector<popo::ServerPortData*, MAX_SERVERS> getServerPortDataList() noexcept;
cxx::vector<popo::InterfacePortData*, MAX_INTERFACE_NUMBER> getInterfacePortDataList() noexcept;
cxx::vector<runtime::NodeData*, MAX_NODE_NUMBER> getNodeDataList() noexcept;
cxx::vector<popo::ConditionVariableData*, MAX_NUMBER_OF_CONDITION_VARIABLES>
Expand Down Expand Up @@ -90,6 +101,34 @@ class PortPool
const popo::SubscriberOptions& subscriberOptions,
const mepoo::MemoryInfo& memoryInfo) noexcept;

/// @brief Adds a ClientPortData to the internal pool and returns a pointer for further usage
/// @param[in] serviceDescription for the new client port
/// @param[in] memoryManager to acquire chunks for the requests
/// @param[in] runtimeName of the runtime the new client port belongs to
/// @param[in] clientOptions for the new client port
/// @param[in] memoryInfo for the new client port
/// @return on success a pointer to a ClientPortData; on error a PortPoolError
cxx::expected<popo::ClientPortData*, PortPoolError>
addClientPort(const capro::ServiceDescription& serviceDescription,
mepoo::MemoryManager* const memoryManager,
const RuntimeName_t& runtimeName,
const popo::ClientOptions& clientOptions,
const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept;

/// @brief Adds a ServerPortData to the internal pool and returns a pointer for further usage
/// @param[in] serviceDescription for the new server port
/// @param[in] memoryManager to acquire chunks for the responses
/// @param[in] runtimeName of the runtime the new server port belongs to
/// @param[in] serverOptions for the new server port
/// @param[in] memoryInfo for the new server port
/// @return on success a pointer to a ServerPortData; on error a PortPoolError
cxx::expected<popo::ServerPortData*, PortPoolError>
addServerPort(const capro::ServiceDescription& serviceDescription,
mepoo::MemoryManager* const memoryManager,
const RuntimeName_t& runtimeName,
const popo::ServerOptions& serverOptions,
const mepoo::MemoryInfo& memoryInfo = mepoo::MemoryInfo()) noexcept;

cxx::expected<popo::InterfacePortData*, PortPoolError> addInterfacePort(const RuntimeName_t& runtimeName,
const capro::Interfaces interface) noexcept;

Expand All @@ -100,11 +139,40 @@ class PortPool
cxx::expected<popo::ConditionVariableData*, PortPoolError>
addConditionVariableData(const RuntimeName_t& runtimeName) noexcept;

void removePublisherPort(PublisherPortRouDiType::MemberType_t* const portData) noexcept;
void removeSubscriberPort(SubscriberPortType::MemberType_t* const portData) noexcept;
void removeInterfacePort(popo::InterfacePortData* const portData) noexcept;
void removeNodeData(runtime::NodeData* const nodeData) noexcept;
void removeConditionVariableData(popo::ConditionVariableData* const conditionVariableData) noexcept;
/// @brief Removes a PublisherPortData from the internal pool
/// @param[in] portData is a pointer to the PublisherPortData to be removed
/// @note after this call the provided PublisherPortData is no longer available for usage
void removePublisherPort(const PublisherPortRouDiType::MemberType_t* const portData) noexcept;

/// @brief Removes a SubscriberPortData from the internal pool
/// @param[in] portData is a pointer to the SubscriberPortData to be removed
/// @note after this call the provided SubscriberPortData is no longer available for usage
void removeSubscriberPort(const SubscriberPortType::MemberType_t* const portData) noexcept;

/// @brief Removes a ClientPortData from the internal pool
/// @param[in] portData is a pointer to the ClientPortData to be removed
/// @note after this call the provided ClientPortData is no longer available for usage
void removeClientPort(const popo::ClientPortData* const portData) noexcept;

/// @brief Removes a ServerPortData from the internal pool
/// @param[in] portData is a pointer to the ServerPortData to be removed
/// @note after this call the provided ServerPortData is no longer available for usage
void removeServerPort(const popo::ServerPortData* const portData) noexcept;

/// @brief Removes a InterfacePortData from the internal pool
/// @param[in] portData is a pointer to the InterfacePortData to be removed
/// @note after this call the provided InterfacePortData is no longer available for usage
void removeInterfacePort(const popo::InterfacePortData* const portData) noexcept;

/// @brief Removes a NodeData from the internal pool
/// @param[in] nodeData is a pointer to the NodeData to be removed
/// @note after this call the provided NodeData is no longer available for usage
void removeNodeData(const runtime::NodeData* const nodeData) noexcept;

/// @brief Removes a ConditionVariableData from the internal pool
/// @param[in] conditionVariableData is a pointer to the ConditionVariableData to be removed
/// @note after this call the provided ConditionVariableData is no longer available for usage
void removeConditionVariableData(const popo::ConditionVariableData* const conditionVariableData) noexcept;

private:
PortPoolData* m_portPoolData;
Expand Down
8 changes: 8 additions & 0 deletions iceoryx_posh/source/capro/service_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,13 @@ std::ostream& operator<<(std::ostream& stream, const ServiceDescription& service
return stream;
}

log::LogStream& operator<<(log::LogStream& stream, const ServiceDescription& service) noexcept
{
/// @todo #415 Add classHash, scope and interface
stream << "Service: " << service.getServiceIDString() << ", Instance: " << service.getInstanceIDString()
<< ", Event: " << service.getEventIDString();
return stream;
}

} // namespace capro
} // namespace iox
Loading

0 comments on commit ca6fa0f

Please sign in to comment.