Skip to content

Commit

Permalink
Make sure the subscription exists before publishing messages (ros2#804)
Browse files Browse the repository at this point in the history
* Make sure the subscription exists before publishing messages

Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 authored and YXL76 committed Nov 18, 2022
1 parent f302329 commit 3d94338
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ class PublicationManager
/// \param n_subscribers_to_match Number of subscribers publisher should have for match.
/// \return true if find publisher by specified topic_name and publisher has specified number of
/// subscribers, otherwise false.
template<typename Timeout>
template<typename DurationRepT = int64_t, typename DurationT = std::milli>
bool wait_for_matched(
const char * topic_name,
Timeout timeout, size_t n_subscribers_to_match = 1)
std::chrono::duration<DurationRepT, DurationT> timeout = std::chrono::seconds(10),
size_t n_subscribers_to_match = 1)
{
rclcpp::PublisherBase::SharedPtr publisher = nullptr;
for (const auto pub : publishers_) {
Expand Down
24 changes: 12 additions & 12 deletions rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";
wait_for_db();

Expand Down Expand Up @@ -138,7 +138,7 @@ TEST_F(RecordFixture, record_end_to_end_test) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic", 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic")) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -224,9 +224,9 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_top
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic0", 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic0")) <<
"Expected find rosbag subscription";
ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic1", 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched("/test_topic1")) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -278,7 +278,7 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -352,7 +352,7 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -416,7 +416,7 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -487,7 +487,7 @@ TEST_F(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile)
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -553,7 +553,7 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -667,7 +667,7 @@ TEST_F(RecordFixture, record_end_to_end_test_with_cache) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(topic_name)) <<
"Expected find rosbag subscription";

wait_for_db();
Expand Down Expand Up @@ -723,9 +723,9 @@ TEST_F(RecordFixture, rosbag2_record_and_play_multiple_topics_with_filter) {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.wait_for_matched(first_topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(first_topic_name)) <<
"Expected find rosbag subscription";
ASSERT_TRUE(pub_manager.wait_for_matched(second_topic_name, 30s, 1)) <<
ASSERT_TRUE(pub_manager.wait_for_matched(second_topic_name)) <<
"Expected find rosbag subscription";
wait_for_db();

Expand Down
9 changes: 9 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str()));
ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down Expand Up @@ -101,6 +104,8 @@ TEST_F(RecordIntegrationTestFixture, qos_is_stored_in_metadata)

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down Expand Up @@ -159,6 +164,8 @@ TEST_F(RecordIntegrationTestFixture, records_sensor_data)

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down Expand Up @@ -200,6 +207,8 @@ TEST_F(RecordIntegrationTestFixture, receives_latched_messages)

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(topic.c_str()));

auto & writer = recorder->get_writer_handle();
MockSequentialWriter & mock_writer =
static_cast<MockSequentialWriter &>(writer.get_implementation_handle());
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_record_all.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <gmock/gmock.h>

#include <chrono>
#include <memory>
#include <string>
#include <utility>
Expand All @@ -29,6 +30,8 @@

#include "record_integration_fixture.hpp"

using namespace std::chrono_literals; // NOLINT

TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are_recorded)
{
auto array_message = get_messages_arrays()[0];
Expand All @@ -51,6 +54,9 @@ TEST_F(RecordIntegrationTestFixture, published_messages_from_multiple_topics_are

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(array_topic.c_str()));
ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <gmock/gmock.h>

#include <chrono>
#include <memory>
#include <regex>
#include <string>
Expand All @@ -32,6 +33,8 @@

#include "record_integration_fixture.hpp"

using namespace std::chrono_literals; // NOLINT

TEST_F(RecordIntegrationTestFixture, regex_topics_recording)
{
auto test_string_messages = get_messages_strings();
Expand Down Expand Up @@ -72,6 +75,8 @@ TEST_F(RecordIntegrationTestFixture, regex_topics_recording)

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down Expand Up @@ -143,6 +148,9 @@ TEST_F(RecordIntegrationTestFixture, regex_and_exclude_recording)

start_async_spin(recorder);

ASSERT_TRUE(pub_manager.wait_for_matched(v1.c_str()));
ASSERT_TRUE(pub_manager.wait_for_matched(v2.c_str()));

pub_manager.run_publishers();

auto & writer = recorder->get_writer_handle();
Expand Down

0 comments on commit 3d94338

Please sign in to comment.