Skip to content

Commit

Permalink
Merge pull request #360 from marthtz/iox-#252-replacement-of-old-port…
Browse files Browse the repository at this point in the history
…s-with-building-blocks

Iox #252 replacement of old ports with building blocks
  • Loading branch information
marthtz authored Dec 11, 2020
2 parents dd8a786 + 267b7c8 commit 3c43d88
Show file tree
Hide file tree
Showing 101 changed files with 1,976 additions and 6,771 deletions.
101 changes: 51 additions & 50 deletions doc/conceptual-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,34 @@ An iceoryx system consists of:
## The RouDi Daemon
RouDi (''Rou''ting and ''Di''scovery) is the core of the system and is responsible for:

* **Service discovery:**
* **Service discovery:**
RouDi is the central resolution point for publishers and subscribers
* **Shared memory management:**
* **Shared memory management:**
RouDi initializes shared memory segments used by the system and arbitrates memory allocation
* **System introspection:**
RouDi has full knowledge of the existing ports in the system, their connections and their memory usage.
* **System introspection:**
RouDi has full knowledge of the existing ports in the system, their connections and their memory usage.
It provides facilities for applications to query this information.

It can be thought of as the "management server" of the iceoryx system. An instance of RouDi must be running in any
It can be thought of as the "management server" of the iceoryx system. An instance of RouDi must be running in any
iceoryx system.

RouDi uses the modules of the Posh library to fulfill its function.

## The Posh Runtime
A Posh runtime is a running entity with its own isolated memory space that participates in the iceoryx system.
A Posh runtime is a running entity with its own isolated memory space that participates in the iceoryx system.
In a POSIX system, a Posh runtime and a POSIX process have a one-to-one mapping.

A Posh runtime may offer services to the iceoryx system or discover services offered by other runtimes to interface with.

The services offered by Posh runtimes communicate via events and the event flow is reasoned about using
publish-subscribe semantics.
The services offered by Posh runtimes communicate via events and the event flow is reasoned about using
publish-subscribe semantics.
A service must be explicitly registered with RouDi to participate in communication.

# Shared Memory Management
## The Basics
When a process in a POSIX system starts it is given its own virtual address space.

The range that a virtual address space spans may be the same for different processes, however the data that is
The range that a virtual address space spans may be the same for different processes, however the data that is
accessible at a particular address may be different for each process.

A pointer in an application uses the virtual address space of the process it is running in.
Expand All @@ -57,107 +57,108 @@ Some examples of what may be in a memory area are:
* The process's heap
* **Shared memory segments**

A shared memory segment is physical memory that lies somewhere foreign to a process (i.e. in some section of RAM or on
the file system) that is made accessible via a mapping to a memory area in their virtual address space.
A shared memory segment is physical memory that lies somewhere foreign to a process (i.e. in some section of RAM or on
the file system) that is made accessible via a mapping to a memory area in their virtual address space.

A single segment may be mapped to multiple processes, however the addresses to which it is mapped to may be
A single segment may be mapped to multiple processes, however the addresses to which it is mapped to may be
(and probably will be) different between processes.

![](fig/shared-memory-mapping.svg)

The POSIX API provides the [utilities](http://man7.org/linux/man-pages/man7/shm_overview.7.html) for working with
shared memory segments.
The POSIX API provides the [utilities](http://man7.org/linux/man-pages/man7/shm_overview.7.html) for working with
shared memory segments.

## Organization
An iceoryx system utilizes one "management" segment for administration purposes and any number of "user" segments for
An iceoryx system utilizes one "management" segment for administration purposes and any number of "user" segments for
event communication between services.

These segments are logically partitioned into "mempools". Mempools contain a number of equally sized "memory chunks".
These segments are logically partitioned into "mempools". Mempools contain a number of equally sized "memory chunks".

Memory chunks are the basic unit used for shared memory access in an iceoryx system.

![](fig/memory-segment-visualization.svg)

The number of segments used by an iceoryx system, along with the configuration of the mempools they contain, are
The number of segments used by an iceoryx system, along with the configuration of the mempools they contain, are
provided to the system via configuration.

The configuration can be provided at compile time (as a header) or at runtime (as a toml-formatted text file).
The configuration can be provided at compile time (as a header) or at runtime (as a toml-formatted text file).
See the [usage guide](usage-guide.md) for more details.

# Communication Mechanisms
In this section we will have a look at the concepts employed to structure the communication between
In this section we will have a look at the concepts employed to structure the communication between
services in an iceoryx system.

## Ports
A port is an entity that represents data flow. There are different types implemented in iceoryx which differ based on
A port is an entity that represents data flow. There are different types implemented in iceoryx which differ based on
the information that they carry and how they are used by iceoryx.

Existing ports include:
* `SenderPort` - used by services to output arbitrary data required for their function
* `ReceiverPort` - used by services to receive arbitrary data from other services
Existing ports include:
* `PublisherPort` - used by services to output arbitrary data required for their function
* `SubscriberPort` - used by services to receive arbitrary data from other services
* `InterfacePort` - used by gateways to receive information about a local iceoryx system that is required to interface
with remote iceoryx systems (see below for more on gateways)

Data flow between services in a local iceoryx system is described using connections between sender and receiver ports.
Data flow between services in a local iceoryx system is described using connections between publisher and subscriber
ports.

A `Publisher` in an iceoryx system publishes data via a `SenderPort`, and likewise, a `Subscriber` receives data
via a `ReceiverPort`.
A `Publisher` in an iceoryx system publishes data via a `PublisherPort`, and likewise, a `Subscriber` receives data
via a `SubscriberPort`.

## Service Discovery / Port Wiring
Matching `Publisher`s with `Subscriber`s in iceoryx is achieved by connecting their underlying `SenderPort`s and
`ReceiverPort`s.
Matching `Publisher`s with `Subscriber`s in iceoryx is achieved by connecting their underlying `PublisherPort`s and
`SubscriberPort`s.

Connections between `SenderPort`s and `ReceiverPort`s are established using service descriptions which are composed of:
Connections between `PublisherPort`s and `SubscriberPort`s are established using service descriptions which are composed of:
* A service id - identifies the type of service
* A service instance id - identifies an instance of a service
* An event id - identifies an output from a service

All `SenderPort`s and `ReceiverPort`s are created with a service description.
All `PublisherPort`s and `SubscriberPort`s are created with a service description.
The system will automatically connect ports with matching service descriptions.

The order that ports appear in is not a factor.
Existing `ReceiverPort`s will automatically connect to `SenderPort`s that appear at a later time if their service
The order that ports appear in is not a factor.
Existing `SubscriberPort`s will automatically connect to `PublisherPort`s that appear at a later time if their service
descriptions match (and vice versa).

Additionally, information about the existing `SenderPort`s in the system are relayed on `InterfacePort`s. This allows
for the entities using these ports (i.e. Gateways) to hook into the data streams of a local iceoryx system and create a
Additionally, information about the existing `PublisherPort`s in the system are relayed on `InterfacePort`s. This allows
for the entities using these ports (i.e. Gateways) to hook into the data streams of a local iceoryx system and create a
bridge to foreign iceoryx systems.

## Zero-copy Interservice Communication
`SenderPort`s and `ReceiverPort`s which are wired together can communicate via shared memory resulting in zero-copy
`PublisherPort`s and `SubscriberPort`s which are wired together can communicate via shared memory resulting in zero-copy
communication.

A `SenderPort` has an assigned shared memory segment to which it may write its data to. In a POSIX system,
A `PublisherPort` has an assigned shared memory segment to which it may write its data to. In a POSIX system,
this is decided purely based on file access permissions as memory segments are represented as virtual files.

To output data, a `SenderPort` reserves a memory chunk in its assigned memory segment.
The iceoryx system will intelligently choose the smallest chunk size that can fit the output data structure.
To output data, a `PublisherPort` reserves a memory chunk in its assigned memory segment.
The iceoryx system will intelligently choose the smallest chunk size that can fit the output data structure.
Note that an entire chunk is reserved even if the data type it contains is smaller than its size.

A `SenderPort` chooses explicitly when to deliver data written in a memory chunk to all of its attached `ReceiverPort`s
(established via discovery). When this occurs, a pointer to the memory chunk is placed on a receive queue at the
`ReceiverPort`.
The `ReceiverPort` can then access the data at its own convenience by following the pointer.
A `PublisherPort` chooses explicitly when to deliver data written in a memory chunk to all of its attached `SubscriberPort`s
(established via discovery). When this occurs, a pointer to the memory chunk is placed on a receive queue at the
`SubscriberPort`.
The `SubscriberPort` can then access the data at its own convenience by following the pointer.

A `ReceiverPort` must explicitly indicate when it has finished processing a particular memory chunk it has received.
Memory chunks are returned to the pool once all attached `ReceiverPort`s indicate they have finished.
A `SubscriberPort` must explicitly indicate when it has finished processing a particular memory chunk it has received.
Memory chunks are returned to the pool once all attached `SubscriberPort`s indicate they have finished.

### A Note on Pointers
As already discussed, shared memory segments may be mapped to different memory areas in the virtual address space of a
process.
To deal with this, iceoryx utilizes specialized pointer types: the `iox::RelativePointer` and
As already discussed, shared memory segments may be mapped to different memory areas in the virtual address space of a
process.
To deal with this, iceoryx utilizes specialized pointer types: the `iox::RelativePointer` and
the `iox::RelocatablePointer`.

Using these types, the difference in memory mapping is not a factor when it comes to locating a memory chunk.

A more detailed discussion about how these types work can be found
A more detailed discussion about how these types work can be found
[here](../iceoryx_utils/doc/relocatable_pointer/relocatable_pointer.md).

## Internode Communication
Separate iceoryx systems residing on different hosts can be networked together via "Gateways". Gateways are responsible
for synchronizing data published on `SenderPort`s between iceoryx systems residing on different hosts that are networked
for synchronizing data published on `PublisherPort`s between iceoryx systems residing on different hosts that are networked
together.

## Logging and Error Handling
Iceoryx uses its own logger which is based on the Autosar **ara::log** API. For safety reasons it defines its own error handler to deal with errors (instead of using e.g. exceptions).
Details of the error handling concept can be found in [error-handling.md](./error-handling.md).
Details of the error handling concept can be found in [error-handling.md](./error-handling.md).
2 changes: 1 addition & 1 deletion doc/installation-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ The `CMakeLists.txt` from `iceoryx_meta` can be used to easily develop iceoryx w
| `IOX_MAX_PUBLISHERS` | the maximum number of publishers one `RouDi` instance can manage |
| `IOX_MAX_SUBSCRIBERS_PER_PUBLISHER` | the maximum number of subscriber a publisher can deliver chunks to|
| `IOX_MAX_PUBLISHER_HISTORY` | the maximum number chunks available for the publisher history |
| `IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY` | the maximum number of chunks a sender can allocate at a given time |
| `IOX_MAX_CHUNKS_ALLOCATED_PER_PUBLISHER_SIMULTANEOUSLY` | the maximum number of chunks a publisher can allocate at a given time |
| `IOX_MAX_SUBSCRIBERS` | the maximum number of subscribers one `RouDi` instance can manage |
| `IOX_MAX_CHUNKS_HELD_PER_SUBSCRIBER_SIMULTANEOUSLY` | the maximum number of chunks a subscriber can hold at a given time |
| `IOX_MAX_INTERFACE_NUMBER` | the maximum number for interface ports, which are used for e.g. gateways |
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_dds/include/iceoryx_dds/dds/cyclone_data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class CycloneDataReader : public DataReader

void connect() noexcept override;

iox::cxx::optional<uint64_t> peekNextSize() override;

iox::cxx::optional<uint32_t> peekNextSize() override;
bool hasNewSamples() override;
iox::cxx::expected<DataReaderError> takeNext(uint8_t* const buffer, const uint64_t& bufferSize) override;

iox::cxx::expected<uint64_t, DataReaderError>
Expand Down
8 changes: 7 additions & 1 deletion iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ class DataReader
/// @brief peekNextSize Get the size of the next sample if one is available.
/// @return The size of the next sample if one is available.
///
virtual iox::cxx::optional<uint64_t> peekNextSize() = 0;
virtual iox::cxx::optional<uint32_t> peekNextSize() = 0;

///
/// @brief hasNewSamples Checks if new samples ready to take.
/// @return True if new samples available.
///
virtual bool hasNewSamples() = 0;

///
/// @brief take Take the next available sample from the DDS data space.
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_dds/include/iceoryx_dds/gateway/dds_to_iox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "iceoryx_posh/gateway/channel.hpp"
#include "iceoryx_posh/gateway/gateway_config.hpp"
#include "iceoryx_posh/gateway/gateway_generic.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/modern_api/untyped_publisher.hpp"

namespace iox
{
Expand All @@ -28,7 +28,7 @@ namespace dds
///
/// @brief DDS Gateway implementation for the DDS to iceoryx direction.
///
template <typename channel_t = gw::Channel<popo::Publisher, dds::data_reader_t>,
template <typename channel_t = gw::Channel<popo::UntypedPublisher, dds::data_reader_t>,
typename gateway_t = gw::GatewayGeneric<channel_t>>
class DDS2IceoryxGateway : public gateway_t
{
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_dds/include/iceoryx_dds/gateway/iox_to_dds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "iceoryx_dds/dds/dds_types.hpp"
#include "iceoryx_posh/gateway/channel.hpp"
#include "iceoryx_posh/gateway/gateway_generic.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/popo/modern_api/untyped_subscriber.hpp"

namespace iox
{
Expand All @@ -27,7 +27,7 @@ namespace dds
///
/// @brief DDS Gateway implementation for the iceoryx to DDS direction.
///
template <typename channel_t = gw::Channel<popo::Subscriber, dds::data_writer_t>,
template <typename channel_t = gw::Channel<popo::UntypedSubscriber, dds::data_writer_t>,
typename gateway_t = gw::GatewayGeneric<channel_t>>
class Iceoryx2DDSGateway : public gateway_t
{
Expand Down
28 changes: 13 additions & 15 deletions iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,27 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::forward(const channel_t& c
auto publisher = channel.getIceoryxTerminal();
auto reader = channel.getExternalTerminal();

reader->peekNextSize().and_then([&](uint64_t size) {
// reserve a chunk for the sample
m_reservedChunk = publisher->allocateChunk(static_cast<uint32_t>(size));
// read sample into reserved chunk
auto buffer = static_cast<uint8_t*>(m_reservedChunk);
reader->takeNext(buffer, size)
.and_then([&]() {
// publish chunk
publisher->sendChunk(buffer);
})
.or_else([&](DataReaderError err) {
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
while (reader->hasNewSamples())
{
reader->peekNextSize().and_then([&](auto size) {
publisher->loan(size).and_then([&](auto& sample) {
reader->takeNext(static_cast<uint8_t*>(sample.get()), size)
.and_then([&]() { sample.publish(); })
.or_else([&](DataReaderError err) {
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: "
<< dds::DataReaderErrorString[static_cast<uint8_t>(err)];
});
});
});
});
}
}

// ======================================== Private ======================================== //
template <typename channel_t, typename gateway_t>
cxx::expected<channel_t, gw::GatewayError>
DDS2IceoryxGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service) noexcept
{
return this->addChannel(service).and_then([&service](channel_t channel) {
return this->addChannel(service).and_then([&service](auto channel) {
auto publisher = channel.getIceoryxTerminal();
auto reader = channel.getExternalTerminal();
publisher->offer();
Expand Down
14 changes: 5 additions & 9 deletions iceoryx_dds/include/iceoryx_dds/internal/gateway/iox_to_dds.inl
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,12 @@ template <typename channel_t, typename gateway_t>
inline void Iceoryx2DDSGateway<channel_t, gateway_t>::forward(const channel_t& channel) noexcept
{
auto subscriber = channel.getIceoryxTerminal();
while (subscriber->hasNewChunks())
while (subscriber->hasNewSamples())
{
const mepoo::ChunkHeader* header;
subscriber->getChunk(&header);
if (header->m_info.m_payloadSize > 0)
{
subscriber->take().and_then([&channel](popo::Sample<const void>& sample) {
auto dataWriter = channel.getExternalTerminal();
dataWriter->write(static_cast<uint8_t*>(header->payload()), header->m_info.m_payloadSize);
}
subscriber->releaseChunk(header);
dataWriter->write(static_cast<const uint8_t*>(sample.get()), sample.getHeader()->m_info.m_payloadSize);
});
}
}

Expand All @@ -118,7 +114,7 @@ template <typename channel_t, typename gateway_t>
cxx::expected<channel_t, gw::GatewayError>
Iceoryx2DDSGateway<channel_t, gateway_t>::setupChannel(const capro::ServiceDescription& service) noexcept
{
return this->addChannel(service).and_then([](channel_t channel) {
return this->addChannel(service).and_then([](auto channel) {
auto subscriber = channel.getIceoryxTerminal();
auto dataWriter = channel.getExternalTerminal();
subscriber->subscribe(SUBSCRIBER_CACHE_SIZE);
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_dds/source/dds2iceoryx_app/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ int main()
iox::dds::DDS2IceoryxGateway<> gw;

iox::config::TomlGatewayConfigParser::parse()
.and_then([&](iox::config::GatewayConfig config) { gw.loadConfiguration(config); })
.or_else([&](iox::config::TomlGatewayConfigParseError err) {
.and_then([&](auto config) { gw.loadConfiguration(config); })
.or_else([&](auto err) {
iox::dds::LogWarn() << "[Main] Failed to parse gateway config with error: "
<< iox::config::TomlGatewayConfigParseErrorString[err];
iox::dds::LogWarn() << "[Main] Using default configuration.";
Expand Down
Loading

0 comments on commit 3c43d88

Please sign in to comment.