Skip to content

Commit

Permalink
Update the description and test codes
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 committed Aug 4, 2021
1 parent b713660 commit fbba53c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 87 deletions.
6 changes: 3 additions & 3 deletions rcl/include/rcl/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher);
/**
* This function waits until all published message data were acknowledged by peer node or timeout.
*
* The unit of timeout is nanoseconds.
* The timeout unit is nanoseconds.
* If the timeout is negative then this function will block indefinitely until all published message
* data were acknowledged.
* If the timeout is 0 then this function will be non-blocking; checking all published message data
Expand All @@ -449,7 +449,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher);
* elapsed (return RCL_RET_TIMEOUT) or all published message data were acknowledged (return
* RCL_RET_OK).
*
* This function only works effectively while QOS profile of publisher is set to RELIABLE.
* This function only waits for acknowledgments if the publisher's QOS profile is RELIABLE.
* Otherwise this function will immediately return RCL_RET_OK.
*
* <hr>
Expand All @@ -464,7 +464,7 @@ rcl_publisher_assert_liveliness(const rcl_publisher_t * publisher);
* \param[in] timeout the duration to wait for all published message data were acknowledged, in
* nanoseconds.
* \return #RCL_RET_OK if successful, or
* \return #RCL_RET_TIMEOUT if wait timed out, or
* \return #RCL_RET_TIMEOUT if timed out, or
* \return #RCL_RET_PUBLISHER_INVALID if publisher is invalid, or
* \return #RCL_RET_ERROR if an unspecified error occurs, or
* \return #RCL_RET_UNSUPPORTED if the middleware does not support that feature.
Expand Down
5 changes: 3 additions & 2 deletions rcl/src/rcl/publisher.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,11 @@ rcl_publisher_wait_for_all_acked(const rcl_publisher_t * publisher, rcl_duration

rmw_ret_t ret = rmw_publisher_wait_for_all_acked(publisher->impl->rmw_handle, rmw_timeout);
if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
if (ret == RMW_RET_TIMEOUT) {
return RCL_RET_TIMEOUT;
} else if (ret == RMW_RET_UNSUPPORTED) {
}
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
if (ret == RMW_RET_UNSUPPORTED) {
return RCL_RET_UNSUPPORTED;
} else {
return RCL_RET_ERROR;
Expand Down
100 changes: 18 additions & 82 deletions rcl/test/rcl/test_publisher_wait_all_ack.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes

void SetUp()
{
is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0);
is_cyclonedds = (std::string(rmw_get_implementation_identifier()).find("rmw_cyclonedds") == 0);
is_connextdds = (std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0);
bool is_fastdds = (std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") == 0);

if (is_fastdds) {
// By default, fastdds use intraprocess mode in this scenario. But this leads to high-speed
Expand Down Expand Up @@ -100,13 +98,24 @@ class CLASSNAME (TestPublisherFixtureSpecial, RMW_IMPLEMENTATION) : public ::tes
delete this->context_ptr;
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}

protected:
bool is_fastdds;
bool is_cyclonedds;
bool is_connextdds;
};

#define INIT_SUBSCRIPTION(idx) \
rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \
ret = rcl_subscription_init( \
&subscription ## idx, \
this->node_ptr, \
ts, \
topic_name, \
&subscription_options); \
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \
{ \
ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
});

#define ONE_MEGABYTE (1024 * 1024)

TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for_all_acked) {
rcl_ret_t ret;
Expand All @@ -129,29 +138,12 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for
subscription_options.qos.depth = 1;
subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;

#define INIT_SUBSCRIPTION(idx) \
rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \
ret = rcl_subscription_init( \
&subscription ## idx, \
this->node_ptr, \
ts, \
topic_name, \
&subscription_options); \
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \
{ \
ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
});

INIT_SUBSCRIPTION(1)
INIT_SUBSCRIPTION(2)
INIT_SUBSCRIPTION(3)

ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100));

// Prepare 1MB message
#define ONE_MEGABYTE (1024 * 1024)
char test_string[ONE_MEGABYTE];
memset(test_string, 'a', ONE_MEGABYTE);
test_string[ONE_MEGABYTE - 1] = '\0';
Expand Down Expand Up @@ -179,13 +171,8 @@ TEST_F(CLASSNAME(TestPublisherFixtureSpecial, RMW_IMPLEMENTATION), test_wait_for
ret = rcl_publisher_wait_for_all_acked(
&publisher,
RCL_MS_TO_NS(500));
EXPECT_TRUE(ret == RCL_RET_OK || ret == RCL_RET_TIMEOUT);

if (is_cyclonedds) {
// cyclonedds use sync publish, so above scenario cannot lead to timeout.
EXPECT_EQ(RCL_RET_OK, ret);
} else {
EXPECT_EQ(RCL_RET_TIMEOUT, ret);
}
ret = rcl_publisher_wait_for_all_acked(&publisher, -1);
EXPECT_EQ(RCL_RET_OK, ret);
}
Expand All @@ -210,57 +197,6 @@ TEST_F(
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
});

rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
subscription_options.qos.depth = 1;
subscription_options.qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;

#define INIT_SUBSCRIPTION(idx) \
rcl_subscription_t subscription ## idx = rcl_get_zero_initialized_subscription(); \
ret = rcl_subscription_init( \
&subscription ## idx, \
this->node_ptr, \
ts, \
topic_name, \
&subscription_options); \
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( \
{ \
ret = rcl_subscription_fini(&subscription ## idx, this->node_ptr); \
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; \
});

INIT_SUBSCRIPTION(1)
INIT_SUBSCRIPTION(2)
INIT_SUBSCRIPTION(3)

ASSERT_TRUE(wait_for_established_subscription(&publisher, 10, 100));

// Prepare 1MB message
#define ONE_MEGABYTE (1024 * 1024)
char test_string[ONE_MEGABYTE];
memset(test_string, 'a', ONE_MEGABYTE);
test_string[ONE_MEGABYTE - 1] = '\0';
test_msgs__msg__Strings msg;
test_msgs__msg__Strings__init(&msg);
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
test_msgs__msg__Strings__fini(&msg);
});

ret = rcl_publish(&publisher, &msg, nullptr);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;

ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription1, context_ptr, 10, 100));
ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription2, context_ptr, 10, 100));
ASSERT_TRUE(wait_for_subscription_to_be_ready(&subscription3, context_ptr, 10, 100));

int i = 0;
for (; i < 500; i++) {
ret = rcl_publish(&publisher, &msg, nullptr);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}

ret = rcl_publisher_wait_for_all_acked(
&publisher,
RCL_MS_TO_NS(500));
Expand Down

0 comments on commit fbba53c

Please sign in to comment.