Skip to content

Commit

Permalink
Clock has multiple time jump callbacks (#284)
Browse files Browse the repository at this point in the history
* Clock has multiple time jump callbacks

Replaces pre_update and post_update
Adds callbacks when ROS time is activated or deactivated
Add rcl_clock_change_t, rcl_time_jump_t, rcl_jump_threshold_t
Add rcl_clock_add_jump_callback()
Add rcl_clock_remove_jump_callback()

* Fini generic clock after confirming correct clock type

* test_time no ASAN detections
  • Loading branch information
sloretz authored Aug 27, 2018
1 parent ba9f24b commit ceff88b
Show file tree
Hide file tree
Showing 3 changed files with 547 additions and 46 deletions.
109 changes: 101 additions & 8 deletions rcl/include/rcl/time.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,73 @@ typedef enum rcl_clock_type_t
RCL_STEADY_TIME
} rcl_clock_type_t;

/// A duration of time, measured in nanoseconds and its source.
typedef struct rcl_duration_t
{
rcl_duration_value_t nanoseconds;
} rcl_duration_t;

/// Enumeration to describe the type of time jump.
typedef enum rcl_clock_change_t
{
/// The source before and after the jump is ROS_TIME.
RCL_ROS_TIME_NO_CHANGE = 1,
/// The source switched to ROS_TIME from SYSTEM_TIME.
RCL_ROS_TIME_ACTIVATED = 2,
/// The source switched to SYSTEM_TIME from ROS_TIME.
RCL_ROS_TIME_DEACTIVATED = 3,
/// The source before and after the jump is SYSTEM_TIME.
RCL_SYSTEM_TIME_NO_CHANGE = 4
} rcl_clock_change_t;

/// Struct to describe a jump in time.
typedef struct rcl_time_jump_t
{
/// Indicate whether or not the source of time changed.
rcl_clock_change_t clock_change;
/// The new time minus the last time before the jump.
rcl_duration_t delta;
} rcl_time_jump_t;

/// Signature of a time jump callback.
/// \param[in] time_jump A description of the jump in time.
/// \param[in] before_jump Every jump callback is called twice: once before the clock changes and
/// once after. This is true the first call and false the second.
/// \param[in] user_data A pointer given at callback registration which is passed to the callback.
typedef void (* rcl_jump_callback_t)(
const struct rcl_time_jump_t * time_jump,
bool before_jump,
void * user_data);

/// Describe the prerequisites for calling a time jump callback.
typedef struct rcl_jump_threshold_t
{
/// True to call callback when the clock type changes.
bool on_clock_change;
/// A positive duration indicating the minimum jump forwards to be considered exceeded, or zero
/// to disable.
rcl_duration_t min_forward;
/// A negative duration indicating the minimum jump backwards to be considered exceeded, or zero
/// to disable.
rcl_duration_t min_backward;
} rcl_jump_threshold_t;

/// Struct to describe an added callback.
typedef struct rcl_jump_callback_info_t
{
rcl_jump_callback_t callback;
rcl_jump_threshold_t threshold;
void * user_data;
} rcl_jump_callback_info_t;

/// Encapsulation of a time source.
typedef struct rcl_clock_t
{
enum rcl_clock_type_t type;
void (* pre_update)(void);
void (* post_update)(void);
/// An array of added jump callbacks.
rcl_jump_callback_info_t * jump_callbacks;
/// Number of callbacks in jump_callbacks.
size_t num_jump_callbacks;
rcl_ret_t (* get_now)(void * data, rcl_time_point_value_t * now);
// void (*set_now) (rcl_time_point_value_t);
void * data;
Expand All @@ -73,12 +134,6 @@ typedef struct rcl_time_point_t
rcl_clock_type_t clock_type;
} rcl_time_point_t;

/// A duration of time, measured in nanoseconds and its source.
typedef struct rcl_duration_t
{
rcl_duration_value_t nanoseconds;
} rcl_duration_t;

// typedef struct rcl_rate_t
// {
// rcl_time_point_value_t trigger_time;
Expand Down Expand Up @@ -358,6 +413,44 @@ rcl_ret_t
rcl_set_ros_time_override(
rcl_clock_t * clock, rcl_time_point_value_t time_value);

/// Add a callback to be called when a time jump exceeds a threshold.
/**
* The callback is called twice when the threshold is exceeded: once before the clock is
* updated, and once after.
* The user_data pointer is passed to the callback as the last argument.
* A callback and user_data pair must be unique among the callbacks added to a clock.
*
* \param[in] clock A clock to add a jump callback to.
* \param[in] threshold Criteria indicating when to call the callback.
* \param[in] callback A callback to call.
* \param[in] user_data A pointer to be passed to the callback.
* \return `RCL_RET_OK` if the callback was added successfully, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_ERROR` an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_clock_add_jump_callback(
rcl_clock_t * clock, rcl_jump_threshold_t threshold, rcl_jump_callback_t callback,
void * user_data);

/// Remove a previously added time jump callback.
/**
* \param[in] clock The clock to remove a jump callback from.
* \param[in] threshold Criteria indicating when to call callback.
* \param[in] callback The callback to call.
* \param[in] user_data A pointer to be passed to the callback.
* \return `RCL_RET_OK` if the callback was added successfully, or
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
* \return `RCL_RET_ERROR` the callback was not found or an unspecified error occurs.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_clock_remove_jump_callback(
rcl_clock_t * clock, rcl_jump_callback_t callback, void * user_data);

#ifdef __cplusplus
}
#endif
Expand Down
163 changes: 153 additions & 10 deletions rcl/src/rcl/time.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ void
rcl_init_generic_clock(rcl_clock_t * clock)
{
clock->type = RCL_CLOCK_UNINITIALIZED;
clock->pre_update = NULL;
clock->post_update = NULL;
clock->jump_callbacks = NULL;
clock->num_jump_callbacks = 0u;
clock->get_now = NULL;
clock->data = NULL;
}
Expand Down Expand Up @@ -106,6 +106,18 @@ rcl_clock_init(
}
}

void
_rcl_clock_generic_fini(
rcl_clock_t * clock)
{
// Internal function; assume caller has already checked that clock is valid.
if (clock->num_jump_callbacks > 0) {
clock->num_jump_callbacks = 0;
clock->allocator.deallocate(clock->jump_callbacks, clock->allocator.state);
clock->jump_callbacks = NULL;
}
}

rcl_ret_t
rcl_clock_fini(
rcl_clock_t * clock)
Expand Down Expand Up @@ -155,6 +167,7 @@ rcl_ros_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_ROS_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
if (!clock->data) {
RCL_SET_ERROR_MSG("clock data invalid", rcl_get_default_allocator());
return RCL_RET_ERROR;
Expand Down Expand Up @@ -186,6 +199,7 @@ rcl_steady_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_STEADY_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
return RCL_RET_OK;
}

Expand All @@ -212,6 +226,7 @@ rcl_system_clock_fini(
RCL_SET_ERROR_MSG("clock not of type RCL_SYSTEM_TIME", rcl_get_default_allocator());
return RCL_RET_ERROR;
}
_rcl_clock_generic_fini(clock);
return RCL_RET_OK;
}

Expand Down Expand Up @@ -249,6 +264,27 @@ rcl_clock_get_now(rcl_clock_t * clock, rcl_time_point_value_t * time_point_value
return RCL_RET_ERROR;
}

void
_rcl_clock_call_callbacks(
rcl_clock_t * clock, const rcl_time_jump_t * time_jump, bool before_jump)
{
// Internal function; assume parameters are valid.
bool is_clock_change = time_jump->clock_change == RCL_ROS_TIME_ACTIVATED ||
time_jump->clock_change == RCL_ROS_TIME_DEACTIVATED;
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (
(is_clock_change && info->threshold.on_clock_change) ||
(time_jump->delta.nanoseconds < 0 &&
time_jump->delta.nanoseconds <= info->threshold.min_backward.nanoseconds) ||
(time_jump->delta.nanoseconds > 0 &&
time_jump->delta.nanoseconds >= info->threshold.min_forward.nanoseconds))
{
info->callback(time_jump, before_jump, info->user_data);
}
}
}

rcl_ret_t
rcl_enable_ros_time_override(rcl_clock_t * clock)
{
Expand All @@ -264,7 +300,14 @@ rcl_enable_ros_time_override(rcl_clock_t * clock)
rcl_get_default_allocator())
return RCL_RET_ERROR;
}
storage->active = true;
if (!storage->active) {
rcl_time_jump_t time_jump;
time_jump.delta.nanoseconds = 0;
time_jump.clock_change = RCL_ROS_TIME_ACTIVATED;
_rcl_clock_call_callbacks(clock, &time_jump, true);
storage->active = true;
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

Expand All @@ -284,7 +327,14 @@ rcl_disable_ros_time_override(rcl_clock_t * clock)
rcl_get_default_allocator())
return RCL_RET_ERROR;
}
storage->active = false;
if (storage->active) {
rcl_time_jump_t time_jump;
time_jump.delta.nanoseconds = 0;
time_jump.clock_change = RCL_ROS_TIME_DEACTIVATED;
_rcl_clock_call_callbacks(clock, &time_jump, true);
storage->active = false;
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

Expand Down Expand Up @@ -323,14 +373,107 @@ rcl_set_ros_time_override(
"Clock is not of type RCL_ROS_TIME, cannot set time override.", rcl_get_default_allocator())
return RCL_RET_ERROR;
}
rcl_ros_clock_storage_t * storage = \
(rcl_ros_clock_storage_t *)clock->data;
if (storage->active && clock->pre_update) {
clock->pre_update();
rcl_time_jump_t time_jump;
rcl_ros_clock_storage_t * storage = (rcl_ros_clock_storage_t *)clock->data;
if (storage->active) {
time_jump.clock_change = RCL_ROS_TIME_NO_CHANGE;
rcl_time_point_value_t current_time;
rcl_ret_t ret = rcl_get_ros_time(storage, &current_time);
if (RCL_RET_OK != ret) {
return ret;
}
time_jump.delta.nanoseconds = time_value - current_time;
_rcl_clock_call_callbacks(clock, &time_jump, true);
}
rcl_atomic_store(&(storage->current_time), time_value);
if (storage->active && clock->post_update) {
clock->post_update();
if (storage->active) {
_rcl_clock_call_callbacks(clock, &time_jump, false);
}
return RCL_RET_OK;
}

rcl_ret_t
rcl_clock_add_jump_callback(
rcl_clock_t * clock, rcl_jump_threshold_t threshold, rcl_jump_callback_t callback,
void * user_data)
{
// Make sure parameters are valid
RCL_CHECK_ARGUMENT_FOR_NULL(clock, RCL_RET_INVALID_ARGUMENT, rcl_get_default_allocator());
RCL_CHECK_ALLOCATOR_WITH_MSG(&(clock->allocator), "invalid allocator",
return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT, clock->allocator);
if (threshold.min_forward.nanoseconds < 0) {
RCL_SET_ERROR_MSG("forward jump threshold must be positive or zero", clock->allocator);
return RCL_RET_INVALID_ARGUMENT;
}
if (threshold.min_backward.nanoseconds > 0) {
RCL_SET_ERROR_MSG("backward jump threshold must be negative or zero", clock->allocator);
return RCL_RET_INVALID_ARGUMENT;
}

// Callback/user_data pair must be unique
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
const rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (info->callback == callback && info->user_data == user_data) {
RCL_SET_ERROR_MSG("callback/user_data are already added to this clock", clock->allocator);
return RCL_RET_ERROR;
}
}

// Add the new callback, increasing the size of the callback list
rcl_jump_callback_info_t * callbacks = clock->allocator.reallocate(
clock->jump_callbacks, sizeof(rcl_jump_callback_info_t) * (clock->num_jump_callbacks + 1),
clock->allocator.state);
if (NULL == callbacks) {
RCL_SET_ERROR_MSG("Failed to realloc jump callbacks", clock->allocator);
return RCL_RET_BAD_ALLOC;
}
clock->jump_callbacks = callbacks;
clock->jump_callbacks[clock->num_jump_callbacks].callback = callback;
clock->jump_callbacks[clock->num_jump_callbacks].threshold = threshold;
clock->jump_callbacks[clock->num_jump_callbacks].user_data = user_data;
++(clock->num_jump_callbacks);
return RCL_RET_OK;
}

rcl_ret_t
rcl_clock_remove_jump_callback(
rcl_clock_t * clock, rcl_jump_callback_t callback, void * user_data)
{
// Make sure parameters are valid
RCL_CHECK_ARGUMENT_FOR_NULL(clock, RCL_RET_INVALID_ARGUMENT, rcl_get_default_allocator());
RCL_CHECK_ALLOCATOR_WITH_MSG(&(clock->allocator), "invalid allocator",
return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(callback, RCL_RET_INVALID_ARGUMENT, clock->allocator);

// Delete callback if found, moving all callbacks after back one
bool found_callback = false;
for (size_t cb_idx = 0; cb_idx < clock->num_jump_callbacks; ++cb_idx) {
const rcl_jump_callback_info_t * info = &(clock->jump_callbacks[cb_idx]);
if (found_callback) {
clock->jump_callbacks[cb_idx - 1] = *info;
} else if (info->callback == callback && info->user_data == user_data) {
found_callback = true;
}
}
if (!found_callback) {
RCL_SET_ERROR_MSG("jump callback was not found", clock->allocator);
return RCL_RET_ERROR;
}

// Shrink size of the callback array
if (clock->num_jump_callbacks == 1) {
clock->allocator.deallocate(clock->jump_callbacks, clock->allocator.state);
} else {
rcl_jump_callback_info_t * callbacks = clock->allocator.reallocate(
clock->jump_callbacks, sizeof(rcl_jump_callback_info_t) * (clock->num_jump_callbacks - 1),
clock->allocator.state);
if (NULL == callbacks) {
RCL_SET_ERROR_MSG("Failed to shrink jump callbacks", clock->allocator);
return RCL_RET_BAD_ALLOC;
}
clock->jump_callbacks = callbacks;
}
--(clock->num_jump_callbacks);
return RCL_RET_OK;
}
Loading

0 comments on commit ceff88b

Please sign in to comment.