Skip to content

Commit

Permalink
Add stopRecording into rosbag2_py (#854)
Browse files Browse the repository at this point in the history
* Allow rosbag2_py Recorder to cancel recording - does not block Python GIL while running

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
Signed-off-by: Afonso da Fonseca Braga <afonso.braga@engenharia.ufjf.br>
  • Loading branch information
afonsofonbraga authored Sep 14, 2021
1 parent 622b0ad commit 4befbfa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 10 deletions.
24 changes: 16 additions & 8 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,14 @@ class Player

class Recorder
{
private:
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;

public:
Recorder()
{
rclcpp::init(0, nullptr);
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
std::signal(
SIGTERM, [](int /* signal */) {
rclcpp::shutdown();
Expand Down Expand Up @@ -199,15 +203,18 @@ class Recorder
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(recorder);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});

exec.cancel();
spin_thread.join();
exec_->add_node(recorder);
// Release the GIL for long-running record, so that calling Python code can use other threads
{
py::gil_scoped_release release;
exec_->spin();
}
}

void cancel()
{
exec_->cancel();
}
};

Expand Down Expand Up @@ -270,5 +277,6 @@ PYBIND11_MODULE(_transport, m) {
py::class_<rosbag2_py::Recorder>(m, "Recorder")
.def(py::init())
.def("record", &rosbag2_py::Recorder::record)
.def("cancel", &rosbag2_py::Recorder::cancel)
;
}
19 changes: 19 additions & 0 deletions rosbag2_py/test/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from typing import Callable

from rclpy.clock import Clock, ClockType
from rclpy.duration import Duration
import rosbag2_py


Expand All @@ -23,3 +28,17 @@ def get_rosbag_options(path, serialization_format='cdr'):
output_serialization_format=serialization_format)

return storage_options, converter_options


def wait_for(
condition: Callable[[], bool],
timeout: Duration,
sleep_time: float = 0.1,
):
clock = Clock(clock_type=ClockType.STEADY_TIME)
start = clock.now()
while not condition():
if clock.now() - start > timeout:
return False
time.sleep(sleep_time)
return True
49 changes: 47 additions & 2 deletions rosbag2_py/test/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import os
from pathlib import Path
import sys
import threading


from common import get_rosbag_options, wait_for
import rclpy
from rclpy.qos import QoSProfile
import rosbag2_py
from std_msgs.msg import String

if os.environ.get('ROSBAG2_PY_TEST_WITH_RTLD_GLOBAL', None) is not None:
# This is needed on Linux when compiling with clang/libc++.
Expand All @@ -24,8 +32,6 @@
# For the fun RTTI ABI details, see https://whatofhow.wordpress.com/2015/03/17/odr-rtti-dso/.
sys.setdlopenflags(os.RTLD_GLOBAL | os.RTLD_LAZY)

import rosbag2_py # noqa


def test_options_qos_conversion():
# Tests that the to-and-from C++ conversions are working properly in the pybind structs
Expand All @@ -40,3 +46,42 @@ def test_options_qos_conversion():
record_options = rosbag2_py.RecordOptions()
record_options.topic_qos_profile_overrides = simple_overrides
assert record_options.topic_qos_profile_overrides == simple_overrides


def test_record_cancel(tmp_path):
bag_path = str(tmp_path / 'test_record_cancel')
storage_options, converter_options = get_rosbag_options(bag_path)

recorder = rosbag2_py.Recorder()

record_options = rosbag2_py.RecordOptions()
record_options.all = True
record_options.is_discovery_disabled = False
record_options.topic_polling_interval = datetime.timedelta(milliseconds=100)

rclpy.init()
record_thread = threading.Thread(
target=recorder.record,
args=(storage_options, record_options),
daemon=True)
record_thread.start()

node = rclpy.create_node('test_record_cancel')
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
pub = node.create_publisher(String, 'chatter', 10)

i = 0
msg = String()

while rclpy.ok() and i < 10:
msg.data = 'Hello World: {0}'.format(i)
i += 1
pub.publish(msg)

recorder.cancel()

metadata_path = Path(bag_path) / 'metadata.yaml'
db3_path = Path(bag_path) / 'test_record_cancel_0.db3'
assert wait_for(lambda: metadata_path.is_file() and db3_path.is_file(),
timeout=rclpy.duration.Duration(seconds=3))

0 comments on commit 4befbfa

Please sign in to comment.