Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clock has multiple time jump callbacks #284

Merged
merged 14 commits into from
Aug 27, 2018
109 changes: 101 additions & 8 deletions rcl/include/rcl/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,73 @@ typedef enum rcl_clock_type_t
RCL_STEADY_TIME
} rcl_clock_type_t;

/// A duration of time, measured in nanoseconds and its source.
typedef struct rcl_duration_t
{
rcl_duration_value_t nanoseconds;
} rcl_duration_t;

/// Enumeration to describe the type of time jump.
typedef enum rcl_clock_change_t
{
/// The source before and after the jump is ROS_TIME.
RCL_ROS_TIME_NO_CHANGE = 1,
/// The source switched to ROS_TIME from SYSTEM_TIME.
RCL_ROS_TIME_ACTIVATED = 2,
/// The source switched to SYSTEM_TIME from ROS_TIME.
RCL_ROS_TIME_DEACTIVATED = 3,
/// The source before and after the jump is SYSTEM_TIME.
RCL_SYSTEM_TIME_NO_CHANGE = 4
} rcl_clock_change_t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is following the same enums from rclcpp so my comment doesn't have to be considered now. Currently the callback can determine if the clock stayed the same (1 and 4) or from which clock it has changed to whoch new clock (2 and 3).

I was just wondering if we ever add a third clock beside ROS_TIME and SYSTEM_TIME the naming of the enums would be "problematic". With a third clock the enum would need to cover all 3x3 cases. Maybe it would be better to just put the type of the clock before and after the jump into the rcl_time_jump_t? If they are equal the clock hasn't changed. And if ROS_TIME is either in the "before" value or in the "after" value (but not both) it was "activated" / "deactivated".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point.

I'm not sure about any of the enums in this struct. It doesn't really describe if a clock's type changed. Instead describes if a ROS clock is getting time from system_time or a time source like /clock. Are you saying we might add a 3rd source of time to a ROS clock in the future?

Since jump callbacks are called both before and after the clock update a callback could get that info by querying the clock before and after. How about I remove this struct and add bool time_source_changed in rcl_time_jump_t instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe - sounds reasonable to me.

It would be good if @tfoote could provide some context why this API / approach was chosen in rclcpp. Maybe there is something we are missing.


/// Struct to describe a jump in time.
typedef struct rcl_time_jump_t
{
/// Indicate whether or not the source of time changed.
rcl_clock_change_t clock_change;
/// The new time minus the last time before the jump.
rcl_duration_t delta;
} rcl_time_jump_t;

/// Signature of a time jump callback.
/// \param[in] time_jump A description of the jump in time.
/// \param[in] before_jump Every jump callback is called twice: once before the clock changes and
/// once after. This is true the first call and false the second.
/// \param[in] user_data A pointer given at callback registration which is passed to the callback.
typedef void (* rcl_jump_callback_t)(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data);

/// Describe the prerequisites for calling a time jump callback.
typedef struct rcl_jump_threshold_t
{
/// True to call callback when the clock type changes.
bool on_clock_change;
/// A positive duration indicating the minimum jump forwards to be considered exceeded, or zero
/// to disable.
rcl_duration_t min_forward;
/// A negative duration indicating the minimum jump backwards to be considered exceeded, or zero
/// to disable.
rcl_duration_t min_backward;
} rcl_jump_threshold_t;

/// Struct to describe an added callback.
typedef struct rcl_jump_callback_info_t
{
rcl_jump_callback_t callback;
rcl_jump_threshold_t threshold;
void * user_data;
} rcl_jump_callback_info_t;

/// Encapsulation of a time source.
typedef struct rcl_clock_t
{
enum rcl_clock_type_t type;
void (* pre_update)(void);
void (* post_update)(void);
/// An array of added jump callbacks.
rcl_jump_callback_info_t * jump_callbacks;
/// Number of callbacks in jump_callbacks.
size_t num_jump_callbacks;
rcl_ret_t (* get_now)(void * data, rcl_time_point_value_t * now);
// void (*set_now) (rcl_time_point_value_t);
void * data;
Expand All @@ -73,12 +134,6 @@ typedef struct rcl_time_point_t
rcl_clock_type_t clock_type;
} rcl_time_point_t;

/// A duration of time, measured in nanoseconds and its source.
typedef struct rcl_duration_t
{
rcl_duration_value_t nanoseconds;
} rcl_duration_t;

// typedef struct rcl_rate_t
// {
// rcl_time_point_value_t trigger_time;
Expand Down Expand Up @@ -358,6 +413,44 @@ rcl_ret_t
rcl_set_ros_time_override(
rcl_clock_t * clock, rcl_time_point_value_t time_value);

/// Add a callback to be called when a time jump exceeds a threshold.
/**
* The callback is called twice when the threshold is exceeded: once before the clock is
* updated, and once after.
* The user_data pointer is passed to the callback as the last argument.
* A callback and user_data pair must be unique among the callbacks added to a clock.
*
* \param[in] clock A clock to add a jump callback to.
* \param[in] threshold Criteria indicating when to call the callback.
* \param[in] callback A callback to call.
* \param[in] user_data A pointer to be passed to the callback.
* \return `RCL_RET_OK` if the callback was added successfully, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_ERROR` an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_clock_add_jump_callback(
rcl_clock_t * clock, rcl_jump_threshold_t threshold, rcl_jump_callback_t callback,
void * user_data);

/// Remove a previously added time jump callback.
/**
* \param[in] clock The clock to remove a jump callback from.
* \param[in] threshold Criteria indicating when to call callback.
* \param[in] callback The callback to call.
* \param[in] user_data A pointer to be passed to the callback.
* \return `RCL_RET_OK` if the callback was added successfully, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_ERROR` the callback was not found or an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_clock_remove_jump_callback(
rcl_clock_t * clock, rcl_jump_callback_t callback, void * user_data);

#ifdef __cplusplus
}
#endif
Expand Down
163 changes: 153 additions & 10 deletions rcl/src/rcl/time.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ void
rcl_init_generic_clock(rcl_clock_t * clock)
{
clock->type = RCL_CLOCK_UNINITIALIZED;
clock->pre_update = NULL;
clock->post_update = NULL;
clock->jump_callbacks = NULL;
clock->num_jump_callbacks = 0u;
clock->get_now = NULL;
clock->data = NULL;
}
Expand Down Expand Up @@ -106,6 +106,18 @@ rcl_clock_init(
}
}

void
_rcl_clock_generic_fini(
rcl_clock_t * clock)
{
// Internal function; assume caller has already checked that clock is valid.
if (clock->num_jump_callbacks > 0) {
clock->num_jump_callbacks = 0;
clock->allocator.deallocate(clock->jump_callbacks, clock->allocator.state);
clock->jump_callbacks = NULL;
}
}

rcl_ret_t
rcl_clock_fini(
rcl_clock_t * clock)
Expand Down Expand Up @@ -155,6 +167,7 @@ rcl_ros_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_ROS_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
if (!clock->data) {
RCL_SET_ERROR_MSG("clock data invalid", rcl_get_default_allocator());
return RCL_RET_ERROR;
Expand Down Expand Up @@ -186,6 +199,7 @@ rcl_steady_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_STEADY_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
return RCL_RET_OK;
}

Expand All @@ -212,6 +226,7 @@ rcl_system_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_SYSTEM_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
return RCL_RET_OK;
}

Expand Down Expand Up @@ -249,6 +264,27 @@ rcl_clock_get_now(rcl_clock_t * clock, rcl_time_point_value_t * time_point_value
return RCL_RET_ERROR;
}

void
_rcl_clock_call_callbacks(
rcl_clock_t * clock, const rcl_time_jump_t * time_jump, bool before_jump)
{
// Internal function; assume parameters are valid.
bool is_clock_change = time_jump->clock_change == RCL_ROS_TIME_ACTIVATED ||
time_jump->clock_change == RCL_ROS_TIME_DEACTIVATED;
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (
(is_clock_change && info->threshold.on_clock_change) ||
(time_jump->delta.nanoseconds < 0 &&
time_jump->delta.nanoseconds <= info->threshold.min_backward.nanoseconds) ||
(time_jump->delta.nanoseconds > 0 &&
time_jump->delta.nanoseconds >= info->threshold.min_forward.nanoseconds))
{
info->callback(time_jump, before_jump, info->user_data);
}
}
}

rcl_ret_t
rcl_enable_ros_time_override(rcl_clock_t * clock)
{
Expand All @@ -264,7 +300,14 @@ rcl_enable_ros_time_override(rcl_clock_t * clock)
rcl_get_default_allocator())
return RCL_RET_ERROR;
}
storage->active = true;
if (!storage->active) {
rcl_time_jump_t time_jump;
time_jump.delta.nanoseconds = 0;
time_jump.clock_change = RCL_ROS_TIME_ACTIVATED;
_rcl_clock_call_callbacks(clock, &time_jump, true);
storage->active = true;
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

Expand All @@ -284,7 +327,14 @@ rcl_disable_ros_time_override(rcl_clock_t * clock)
rcl_get_default_allocator())
return RCL_RET_ERROR;
}
storage->active = false;
if (storage->active) {
rcl_time_jump_t time_jump;
time_jump.delta.nanoseconds = 0;
time_jump.clock_change = RCL_ROS_TIME_DEACTIVATED;
_rcl_clock_call_callbacks(clock, &time_jump, true);
storage->active = false;
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

Expand Down Expand Up @@ -323,14 +373,107 @@ rcl_set_ros_time_override(
"Clock is not of type RCL_ROS_TIME, cannot set time override.", rcl_get_default_allocator())
return RCL_RET_ERROR;
}
rcl_ros_clock_storage_t * storage = \
(rcl_ros_clock_storage_t *)clock->data;
if (storage->active && clock->pre_update) {
clock->pre_update();
rcl_time_jump_t time_jump;
rcl_ros_clock_storage_t * storage = (rcl_ros_clock_storage_t *)clock->data;
if (storage->active) {
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
rcl_time_point_value_t current_time;
rcl_ret_t ret = rcl_get_ros_time(storage, &current_time);
if (RCL_RET_OK != ret) {
return ret;
}
time_jump.delta.nanoseconds = time_value - current_time;
_rcl_clock_call_callbacks(clock, &time_jump, true);
}
rcl_atomic_store(&(storage->current_time), time_value);
if (storage->active && clock->post_update) {
clock->post_update();
if (storage->active) {
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_clock_add_jump_callback(
rcl_clock_t * clock, rcl_jump_threshold_t threshold, rcl_jump_callback_t callback,
void * user_data)
{
// Make sure parameters are valid
RCL_CHECK_ARGUMENT_FOR_NULL(clock, RCL_RET_INVALID_ARGUMENT, rcl_get_default_allocator());
RCL_CHECK_ALLOCATOR_WITH_MSG(&(clock->allocator), "invalid allocator",
return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT, clock->allocator);
if (threshold.min_forward.nanoseconds < 0) {
RCL_SET_ERROR_MSG("forward jump threshold must be positive or zero", clock->allocator);
return RCL_RET_INVALID_ARGUMENT;
}
if (threshold.min_backward.nanoseconds > 0) {
RCL_SET_ERROR_MSG("backward jump threshold must be negative or zero", clock->allocator);
return RCL_RET_INVALID_ARGUMENT;
}

// Callback/user_data pair must be unique
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
const rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (info->callback == callback && info->user_data == user_data) {
RCL_SET_ERROR_MSG("callback/user_data are already added to this clock", clock->allocator);
return RCL_RET_ERROR;
}
}

// Add the new callback, increasing the size of the callback list
rcl_jump_callback_info_t * callbacks = clock->allocator.reallocate(
clock->jump_callbacks, sizeof(rcl_jump_callback_info_t) * (clock->num_jump_callbacks + 1),
clock->allocator.state);
if (NULL == callbacks) {
RCL_SET_ERROR_MSG("Failed to realloc jump callbacks", clock->allocator);
return RCL_RET_BAD_ALLOC;
}
clock->jump_callbacks = callbacks;
clock->jump_callbacks[clock->num_jump_callbacks].callback = callback;
clock->jump_callbacks[clock->num_jump_callbacks].threshold = threshold;
clock->jump_callbacks[clock->num_jump_callbacks].user_data = user_data;
++(clock->num_jump_callbacks);
return RCL_RET_OK;
}

rcl_ret_t
rcl_clock_remove_jump_callback(
rcl_clock_t * clock, rcl_jump_callback_t callback, void * user_data)
{
// Make sure parameters are valid
RCL_CHECK_ARGUMENT_FOR_NULL(clock, RCL_RET_INVALID_ARGUMENT, rcl_get_default_allocator());
RCL_CHECK_ALLOCATOR_WITH_MSG(&(clock->allocator), "invalid allocator",
return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT, clock->allocator);

// Delete callback if found, moving all callbacks after back one
bool found_callback = false;
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
const rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (found_callback) {
clock->jump_callbacks[cb_idx - 1] = *info;
} else if (info->callback == callback && info->user_data == user_data) {
found_callback = true;
}
}
if (!found_callback) {
RCL_SET_ERROR_MSG("jump callback was not found", clock->allocator);
return RCL_RET_ERROR;
}

// Shrink size of the callback array
if (clock->num_jump_callbacks == 1) {
clock->allocator.deallocate(clock->jump_callbacks, clock->allocator.state);
} else {
rcl_jump_callback_info_t * callbacks = clock->allocator.reallocate(
clock->jump_callbacks, sizeof(rcl_jump_callback_info_t) * (clock->num_jump_callbacks - 1),
clock->allocator.state);
if (NULL == callbacks) {
RCL_SET_ERROR_MSG("Failed to shrink jump callbacks", clock->allocator);
return RCL_RET_BAD_ALLOC;
}
clock->jump_callbacks = callbacks;
}
--(clock->num_jump_callbacks);
return RCL_RET_OK;
}
Loading