Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport/Humble] Notification of significant events during bag recording and playback #1037

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_CPP__BAG_EVENTS_HPP_
#define ROSBAG2_CPP__BAG_EVENTS_HPP_

#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "rclcpp/function_traits.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

namespace rosbag2_cpp
{

namespace bag_events
{

/**
* \brief The types of bag events available for registering callbacks.
*/
enum class BagEvent
{
/// The output bag file has been split, starting a new file.
WRITE_SPLIT,
/// Reading of the input bag file has gone over a split, opening the next file.
READ_SPLIT,
};

/**
* \brief The information structure passed to callbacks for the WRITE_SPLIT and READ_SPLIT events.
*/
struct BagSplitInfo
{
/// The URI of the file that was closed.
std::string closed_file;
/// The URI of the file that was opened.
std::string opened_file;
};

using BagSplitCallbackType = std::function<void (BagSplitInfo &)>;

/**
* \brief Use this structure to register callbacks with Writers.
*/
struct WriterEventCallbacks
{
/// The callback to call for the WRITE_SPLIT event.
BagSplitCallbackType write_split_callback;
};

/**
* \brief Use this structure to register callbacks with Readers.
*/
struct ReaderEventCallbacks
{
/// The callback to call for the READ_SPLIT event.
BagSplitCallbackType read_split_callback;
};

/**
* \brief Base class for event callbacks.
*
* This class should not be used directly.
*/
class BagEventCallbackBase
{
public:
using SharedPtr = std::shared_ptr<BagEventCallbackBase>;
using InfoPtr = std::shared_ptr<void>;

virtual ~BagEventCallbackBase()
{}

virtual void execute(InfoPtr & info) = 0;

virtual bool is_type(BagEvent event) const = 0;
};

/**
* \brief Templated class for storing an event callback.
*
* This class should not be used directly by users.
*/
template<typename EventCallbackT>
class BagEventCallback : public BagEventCallbackBase
{
public:
BagEventCallback(const EventCallbackT & callback, BagEvent event)
: callback_(callback),
event_(event)
{}

virtual ~BagEventCallback()
{}

void execute(InfoPtr & info) override
{
callback_(*std::static_pointer_cast<EventCallbackInfoT>(info));
}

bool is_type(BagEvent event) const override
{
return event == event_;
}

private:
using EventCallbackInfoT = typename std::remove_reference<typename
rclcpp::function_traits::function_traits<EventCallbackT>::template argument_type<0>>::type;

EventCallbackT callback_;
BagEvent event_;
};

/**
* \brief The class used to manage event callbacks registered with a Writer or Reader.
*
* Each implementation of the Writer and Reader interfaces should store one instance of this type.
* When new callbacks are registered, they should be passed to that instance using \ref
* add_event_callback. When an event occurs, the callbacks registered for it can be called by
* calling the \ref execute_callbacks method, passing in the event information.
*/
class EventCallbackManager
{
public:
/**
* \brief Add an event callback.
*
* \param callback The callback that should be called for the event.
* \param event The event, one of the values of the \ref BagEvent enumeration.
*/
template<typename EventCallbackT>
void add_event_callback(const EventCallbackT & callback, const BagEvent event)
{
auto cb = std::make_shared<BagEventCallback<EventCallbackT>>(callback, event);
callbacks_.push_back(cb);
}

/**
* \brief Check if a callback is registered for the given event.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \return True if a callback is registered for the event, false otherwise.
*/
bool has_callback_for_event(const BagEvent event) const
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
return true;
}
}
return false;
}

/**
* \brief Execute all callbacks registered for the given event.
*
* The provided information value is passed to each callback by copy.
*
* \param event The event, one of the values of the \ref BagEvent enumeration.
* \param info The information relevant to the event that has occurred. The type varies by event.
*/
void execute_callbacks(const BagEvent event, BagEventCallbackBase::InfoPtr info)
{
for (auto & cb : callbacks_) {
if (cb->is_type(event)) {
cb->execute(info);
}
}
}

private:
std::vector<BagEventCallbackBase::SharedPtr> callbacks_;
};

} // namespace bag_events

} // namespace rosbag2_cpp

#endif // ROSBAG2_CPP__BAG_EVENTS_HPP_
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "rclcpp/serialization.hpp"
#include "rclcpp/serialized_message.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/readers/sequential_reader.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
Expand Down Expand Up @@ -179,6 +180,12 @@ class ROSBAG2_CPP_PUBLIC Reader final
return *reader_impl_;
}

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks);

private:
std::unique_ptr<reader_interfaces::BaseReaderInterface> reader_impl_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <vector>

#include "rcutils/types.h"
#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -57,6 +58,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface
virtual void reset_filter() = 0;

virtual void seek(const rcutils_time_point_value_t & timestamp) = 0;

virtual void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) = 0;
};

} // namespace reader_interfaces
Expand Down
15 changes: 15 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_set>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
Expand Down Expand Up @@ -101,6 +102,18 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
*/
virtual std::string get_current_uri() const;

/**
* \brief Add callbacks for events that may occur during bag reading.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) override;

/**
* \brief Check if a callback is registered for the given event.
* \return True if there is any callback registered for the event, false otherwise.
*/
bool has_callback_for_event(const bag_events::BagEvent event) const;

protected:
/**
* Opens the current file and sets up the filters in the new storage.
Expand Down Expand Up @@ -172,6 +185,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
private:
rosbag2_storage::StorageOptions storage_options_;
std::shared_ptr<SerializationFormatConverterFactoryInterface> converter_factory_{};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace readers
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
Expand Down Expand Up @@ -204,6 +205,12 @@ class ROSBAG2_CPP_PUBLIC Writer final
return *writer_impl_;
}

/*
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(bag_events::WriterEventCallbacks & callbacks);

private:
std::mutex writer_mutex_;
std::unique_ptr<rosbag2_cpp::writer_interfaces::BaseWriterInterface> writer_impl_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <memory>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/visibility_control.hpp"

Expand Down Expand Up @@ -51,6 +52,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface
* \returns true if snapshot is successful, false if snapshot fails or is not supported
*/
virtual bool take_snapshot() = 0;

virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0;
};

} // namespace writer_interfaces
Expand Down
10 changes: 10 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unordered_map>
#include <vector>

#include "rosbag2_cpp/bag_events.hpp"
#include "rosbag2_cpp/cache/cache_consumer.hpp"
#include "rosbag2_cpp/cache/circular_message_cache.hpp"
#include "rosbag2_cpp/cache/message_cache.hpp"
Expand Down Expand Up @@ -114,6 +115,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
*/
bool take_snapshot() override;

/**
* \brief Add callbacks for events that may occur during bag writing.
* \param callbacks the structure containing the callback to add for each event.
*/
void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override;

protected:
std::string base_folder_;
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory_;
Expand Down Expand Up @@ -162,6 +169,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
/// Helper method to write messages while also updating tracked metadata.
void write_messages(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
bool is_first_message_ {true};

bag_events::EventCallbackManager callback_manager_;
};

} // namespace writers
Expand Down
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ void Reader::seek(const rcutils_time_point_value_t & timestamp)
reader_impl_->seek(timestamp);
}

void Reader::add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks)
{
reader_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
18 changes: 18 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,12 @@ void SequentialReader::load_current_file()
void SequentialReader::load_next_file()
{
assert(current_file_iterator_ != file_paths_.end());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
current_file_iterator_++;
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

std::string SequentialReader::get_current_file() const
Expand Down Expand Up @@ -292,5 +296,19 @@ void SequentialReader::fill_topics_metadata()
}
}

void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks)
{
if (callbacks.read_split_callback) {
callback_manager_.add_event_callback(
callbacks.read_split_callback,
bag_events::BagEvent::READ_SPLIT);
}
}

bool SequentialReader::has_callback_for_event(const bag_events::BagEvent event) const
{
return callback_manager_.has_callback_for_event(event);
}

} // namespace readers
} // namespace rosbag2_cpp
5 changes: 5 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,9 @@ void Writer::write(
return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::add_event_callbacks(bag_events::WriterEventCallbacks & callbacks)
{
writer_impl_->add_event_callbacks(callbacks);
}

} // namespace rosbag2_cpp
Loading