Skip to content

Commit

Permalink
Notification of significant events during bag recording and playback (
Browse files Browse the repository at this point in the history
…#908) (#1037)

* Simple hacky prototype

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Improved prototype with more flexibility and event information

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Use the full relative file path for event info

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove no-longer required dependency

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove left-over merge

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Change event names

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add overrides to mocks

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add missing include

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove empty file

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove unnecessary virtual

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Correct formatting

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add missing doc

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Reformat documentation

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Change read event topic name

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Change read event topic name

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Simplify for loop

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove unnecessary blank line

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Change argument type to const

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add events handling to mocks

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add comment explaining 5-message split

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add integration tests for player and recorder

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add ability to check if an event has callbacks

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Shift event topic publishing to a separate thread

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Fix formatting

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Test SequentialReader event callbacks

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Test SequentialWriter event callbacks

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Document bag events

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Check if joinable before joining

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Correct spelling mistake

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Correct spelling mistake

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Reduce time wasted waiting for test to reach intended behaviour

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Address test flakiness

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Improve size check

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Use constant for split size in tests

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add missing header

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Allocate thread on the stack

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add getters for number of messages per 'file'

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Replace magic number

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Address unstable OSX build

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Make path in test cross-platform

Co-authored-by: Emerson Knapp <537409+emersonknapp@users.noreply.github.com>
Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Add missing virtual destructor

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Follow uncrustify suggestion

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Fix rebase errors

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

* Remove double include

Signed-off-by: Geoffrey Biggs <gbiggs@killbots.net>

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Co-authored-by: Emerson Knapp <537409+emersonknapp@users.noreply.github.com>

Co-authored-by: Michael Orlov <morlovmr@gmail.com>
Co-authored-by: Emerson Knapp <537409+emersonknapp@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 2, 2022
1 parent 193caee commit f3a3454
Show file tree
Hide file tree
Showing 25 changed files with 668 additions and 12 deletions.
193 changes: 193 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__BAG_EVENTS_HPP_
#define ROSBAG2_CPP__BAG_EVENTS_HPP_

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "rclcpp/function_traits.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

namespace rosbag2_cpp
{

namespace bag_events
{

/**
* \brief The types of bag events available for registering callbacks.
*/
enum class BagEvent
{
/// The output bag file has been split, starting a new file.
WRITE_SPLIT,
/// Reading of the input bag file has gone over a split, opening the next file.
READ_SPLIT,
};

/**
* \brief The information structure passed to callbacks for the WRITE_SPLIT and READ_SPLIT events.
*/
struct BagSplitInfo
{
/// The URI of the file that was closed.
std::string closed_file;
/// The URI of the file that was opened.
std::string opened_file;
};

using BagSplitCallbackType = std::function<void (BagSplitInfo &)>;

/**
* \brief Use this structure to register callbacks with Writers.
*/
struct WriterEventCallbacks
{
/// The callback to call for the WRITE_SPLIT event.
BagSplitCallbackType write_split_callback;
};

/**
* \brief Use this structure to register callbacks with Readers.
*/
struct ReaderEventCallbacks
{
/// The callback to call for the READ_SPLIT event.
BagSplitCallbackType read_split_callback;
};

/**
* \brief Base class for event callbacks.
*
* This class should not be used directly.
*/
class BagEventCallbackBase
{
public:
using SharedPtr = std::shared_ptr<BagEventCallbackBase>;
using InfoPtr = std::shared_ptr<void>;

virtual ~BagEventCallbackBase()
{}

virtual void execute(InfoPtr & info) = 0;

virtual bool is_type(BagEvent event) const = 0;
};

/**
* \brief Templated class for storing an event callback.
*
* This class should not be used directly by users.
*/
template<typename EventCallbackT>
class BagEventCallback : public BagEventCallbackBase
{
public:
BagEventCallback(const EventCallbackT & callback, BagEvent event)
: callback_(callback),
event_(event)
{}

virtual ~BagEventCallback()
{}

void execute(InfoPtr & info) override
{
callback_(*std::static_pointer_cast<EventCallbackInfoT>(info));
}

bool is_type(BagEvent event) const override
{
return event == event_;
}

private:
using EventCallbackInfoT = typename std::remove_reference<typename
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;

EventCallbackT callback_;
BagEvent event_;
};

/**
* \brief The class used to manage event callbacks registered with a Writer or Reader.
*
* Each implementation of the Writer and Reader interfaces should store one instance of this type.
* When new callbacks are registered, they should be passed to that instance using \ref
* add_event_callback. When an event occurs, the callbacks registered for it can be called by
* calling the \ref execute_callbacks method, passing in the event information.
*/
class EventCallbackManager
{
public:
/**
* \brief Add an event callback.
*
* \param callback The callback that should be called for the event.
* \param event The event, one of the values of the \ref BagEvent enumeration.
*/
template<typename EventCallbackT>
void add_event_callback(const EventCallbackT & callback, const BagEvent event)
{
auto cb = std::make_shared<BagEventCallback<EventCallbackT>>(callback, event);
callbacks_.push_back(cb);
}

/**
* \brief Check if a callback is registered for the given event.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \return True if a callback is registered for the event, false otherwise.
*/
bool has_callback_for_event(const BagEvent event) const
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
return true;
}
}
return false;
}

/**
* \brief Execute all callbacks registered for the given event.
*
* The provided information value is passed to each callback by copy.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \param info The information relevant to the event that has occurred. The type varies by event.
*/
void execute_callbacks(const BagEvent event, BagEventCallbackBase::InfoPtr info)
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
cb->execute(info);
}
}
}

private:
std::vector<BagEventCallbackBase::SharedPtr> callbacks_;
};

} // namespace bag_events

} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__BAG_EVENTS_HPP_
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
Expand Down Expand Up @@ -179,6 +180,12 @@ class ROSBAG2_CPP_PUBLIC Reader final
return *reader_impl_;
}

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks);

private:
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "rcutils/types.h"
#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -57,6 +58,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface
virtual void reset_filter() = 0;

virtual void seek(const rcutils_time_point_value_t & timestamp) = 0;

virtual void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) = 0;
};

} // namespace reader_interfaces
Expand Down
15 changes: 15 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_set>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
Expand Down Expand Up @@ -101,6 +102,18 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
*/
virtual std::string get_current_uri() const;

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) override;

/**
* \brief Check if a callback is registered for the given event.
* \return True if there is any callback registered for the event, false otherwise.
*/
bool has_callback_for_event(const bag_events::BagEvent event) const;

protected:
/**
* Opens the current file and sets up the filters in the new storage.
Expand Down Expand Up @@ -172,6 +185,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
private:
rosbag2_storage::StorageOptions storage_options_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace readers
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
Expand Down Expand Up @@ -204,6 +205,12 @@ class ROSBAG2_CPP_PUBLIC Writer final
return *writer_impl_;
}

/*
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::WriterEventCallbacks & callbacks);

private:
std::mutex writer_mutex_;
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -51,6 +52,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
virtual bool take_snapshot() = 0;

virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0;
};

} // namespace writer_interfaces
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unordered_map>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/cache/cache_consumer.hpp"
#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
Expand Down Expand Up @@ -114,6 +115,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
bool take_snapshot() override;

/**
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand Down Expand Up @@ -163,6 +170,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
bool is_first_message_ {true};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace writers
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ void Reader::seek(const rcutils_time_point_value_t & timestamp)
reader_impl_->seek(timestamp);
}

void Reader::add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks)
{
reader_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
18 changes: 18 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,12 @@ void SequentialReader::load_current_file()
void SequentialReader::load_next_file()
{
assert(current_file_iterator_ != file_paths_.end());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
current_file_iterator_++;
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

std::string SequentialReader::get_current_file() const
Expand Down Expand Up @@ -292,5 +296,19 @@ void SequentialReader::fill_topics_metadata()
}
}

void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks)
{
if (callbacks.read_split_callback) {
callback_manager_.add_event_callback(
callbacks.read_split_callback,
bag_events::BagEvent::READ_SPLIT);
}
}

bool SequentialReader::has_callback_for_event(const bag_events::BagEvent event) const
{
return callback_manager_.has_callback_for_event(event);
}

} // namespace readers
} // namespace rosbag2_cpp
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,9 @@ void Writer::write(
return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::add_event_callbacks(bag_events::WriterEventCallbacks & callbacks)
{
writer_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
Loading

0 comments on commit f3a3454

Please sign in to comment.