Skip to content

Commit

Permalink
Merge pull request #1257 from jphickey/fix-1155-zerocopy-api
Browse files Browse the repository at this point in the history
Fix #1155, clean up zero copy API
  • Loading branch information
astrogeco committed Mar 29, 2021
2 parents 3daa5fa + 410447e commit 7db7c72
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 161 deletions.
52 changes: 20 additions & 32 deletions modules/core_api/fsw/inc/cfe_sb.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,31 +477,26 @@ CFE_Status_t CFE_SB_ReceiveBuffer(CFE_SB_Buffer_t **BufPtr, CFE_SB_PipeId_t Pipe
** This routine can be used to get a pointer to one of the software bus'
** internal memory buffers that are used for sending messages. The caller
** can use this memory buffer to build an SB message, then send it using
** the #CFE_SB_TransmitBuffer function. This interface is more complicated
** than the normal #CFE_SB_TransmitMsg interface, but it avoids an extra
** the CFE_SB_TransmitBuffer() function. This interface avoids an extra
** copy of the message from the user's memory buffer to the software bus
** internal buffer. The "zero copy" interface can be used to improve
** performance in high-rate, high-volume software bus traffic.
** internal buffer.
**
** \par Assumptions, External Events, and Notes:
** -# The pointer returned by #CFE_SB_ZeroCopyGetPtr is only good for one
** call to #CFE_SB_TransmitBuffer.
** -# Applications should be written as if #CFE_SB_ZeroCopyGetPtr is
** equivalent to a \c malloc() and #CFE_SB_TransmitBuffer is equivalent to
** a \c free().
** -# The pointer returned by CFE_SB_AllocateMessageBuffer() is only good for one
** call to CFE_SB_TransmitBuffer().
** -# Once a buffer has been successfully transmitted (as indicated by a successful
** return from CFE_SB_TransmitBuffer()) the buffer becomes owned by the SB application.
** It will automatically be freed by SB once all recipients have finished reading it.
** -# Applications must not de-reference the message pointer (for reading
** or writing) after the call to #CFE_SB_TransmitBuffer.
** or writing) after the call to CFE_SB_TransmitBuffer().
**
** \param[in] MsgSize The size of the SB message buffer the caller wants
** (including the SB message header).
**
** \param[out] BufferHandle A handle that must be supplied when sending or releasing
** in zero copy mode.
**
** \return A pointer to a memory buffer that message data can be written to
** for use with #CFE_SB_TransmitBuffer.
** for use with CFE_SB_TransmitBuffer().
**/
CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *BufferHandle);
CFE_SB_Buffer_t *CFE_SB_AllocateMessageBuffer(size_t MsgSize);

/*****************************************************************************/
/**
Expand All @@ -514,21 +509,18 @@ CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *
** \par Assumptions, External Events, and Notes:
** -# This function is not needed for normal "zero copy" transfers. It
** is needed only for cleanup when an application gets a pointer using
** #CFE_SB_ZeroCopyGetPtr, but (due to some error condition) never uses
** that pointer for a #CFE_SB_TransmitBuffer
** CFE_SB_AllocateMessageBuffer(), but (due to some error condition) never
** uses that pointer in a call to CFE_SB_TransmitBuffer().
**
** \param[in] Ptr2Release A pointer to the SB internal buffer. This must be a
** pointer returned by a call to #CFE_SB_ZeroCopyGetPtr,
** but never used in a call to #CFE_SB_TransmitBuffer.
**
** \param[in] ZeroCopyHandle This must be the handle supplied with the pointer
** when #CFE_SB_ZeroCopyGetPtr was called.
** \param[in] BufPtr A pointer to the SB internal buffer. This must be a
** pointer returned by a call to CFE_SB_AllocateMessageBuffer(),
** but never used in a call to CFE_SB_TransmitBuffer().
**
** \return Execution status, see \ref CFEReturnCodes
** \retval #CFE_SUCCESS \copybrief CFE_SUCCESS
** \retval #CFE_SB_BUFFER_INVALID \copybrief CFE_SB_BUFFER_INVALID
**/
CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle);
CFE_Status_t CFE_SB_ReleaseMessageBuffer(CFE_SB_Buffer_t *BufPtr);

/*****************************************************************************/
/**
Expand All @@ -537,22 +529,22 @@ CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_Zero
** \par Description
** This routine sends a message that has been created directly in an
** internal SB message buffer by an application (after a call to
** #CFE_SB_ZeroCopyGetPtr). This interface is more complicated than
** #CFE_SB_AllocateMessageBuffer). This interface is more complicated than
** the normal #CFE_SB_TransmitMsg interface, but it avoids an extra copy of
** the message from the user's memory buffer to the software bus
** internal buffer. The "zero copy" interface can be used to improve
** performance in high-rate, high-volume software bus traffic.
**
** \par Assumptions, External Events, and Notes:
** -# A handle returned by #CFE_SB_ZeroCopyGetPtr is "consumed" by
** -# A handle returned by #CFE_SB_AllocateMessageBuffer is "consumed" by
** a _successful_ call to #CFE_SB_TransmitBuffer.
** -# If this function returns CFE_SUCCESS, this indicates the zero copy handle is
** now owned by software bus, and is no longer owned by the calling application,
** and should not be re-used.
** -# Howver if this function fails (returns any error status) it does not change
** the state of the buffer at all, meaning the calling application still owns it.
** (a failure means the buffer is left in the same state it was before the call).
** -# Applications should be written as if #CFE_SB_ZeroCopyGetPtr is
** -# Applications should be written as if #CFE_SB_AllocateMessageBuffer is
** equivalent to a \c malloc() and a successful call to #CFE_SB_TransmitBuffer
** is equivalent to a \c free().
** -# Applications must not de-reference the message pointer (for reading
Expand All @@ -561,7 +553,6 @@ CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_Zero
** sequence counter if set to do so.
**
** \param[in] BufPtr A pointer to the buffer to be sent.
** \param[in] ZeroCopyHandle The handle supplied by the #CFE_SB_ZeroCopyGetPtr call
** \param[in] IncrementSequenceCount Boolean to increment the internally tracked
** sequence count and update the message if the
** buffer contains a telemetry message
Expand All @@ -570,11 +561,8 @@ CFE_Status_t CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_Zero
** \retval #CFE_SUCCESS \copybrief CFE_SUCCESS
** \retval #CFE_SB_BAD_ARGUMENT \copybrief CFE_SB_BAD_ARGUMENT
** \retval #CFE_SB_MSG_TOO_BIG \copybrief CFE_SB_MSG_TOO_BIG
** \retval #CFE_SB_BUF_ALOC_ERR \copybrief CFE_SB_BUF_ALOC_ERR
**/
CFE_Status_t CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle,
bool IncrementSequenceCount);
/**@}*/
CFE_Status_t CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, bool IncrementSequenceCount);

/** @defgroup CFEAPISBSetMessage cFE Setting Message Characteristics APIs
* @{
Expand Down
34 changes: 26 additions & 8 deletions modules/core_api/fsw/inc/cfe_sb_api_typedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,31 @@ typedef union CFE_SB_Msg
long double LongDouble; /**< \brief Align to support Long Double */
} CFE_SB_Buffer_t;

/** \brief CFE_SB_ZeroCopyHandle_t to primitive type definition
**
** Software Zero Copy handle used in many SB APIs
*/
typedef struct
{
struct CFE_SB_BufferD *BufDscPtr; /* abstract descriptor reference (internal use) */
} CFE_SB_ZeroCopyHandle_t;
#ifndef CFE_OMIT_DEPRECATED_6_8

/** \brief Deperecated type to minimize required changes */
typedef CFE_SB_Buffer_t CFE_SB_Msg_t;

/** \brief Deperecated type to minimize required changes */
typedef CFE_MSG_CommandHeader_t CFE_SB_CmdHdr_t;

/** \brief Deperecated type to minimize required changes */
typedef CFE_MSG_TelemetryHeader_t CFE_SB_TlmHdr_t;

#define CFE_SB_CMD_HDR_SIZE (sizeof(CFE_MSG_CommandHeader_t)) /**< \brief Size of command header */
#define CFE_SB_TLM_HDR_SIZE (sizeof(CFE_MSG_TelemetryHeader_t)) /**< \brief Size of telemetry header */

/** \brief Pointer to an SB Message */
typedef CFE_MSG_Message_t *CFE_SB_MsgPtr_t;

/** \brief CFE_SB_MsgPayloadPtr_t defined as an opaque pointer to a message Payload portion */
typedef uint8 *CFE_SB_MsgPayloadPtr_t;

#define CFE_SB_Default_Qos CFE_SB_DEFAULT_QOS /**< \deprecated use CFE_SB_DEFAULT_QOS */

#define CFE_SB_CMD_HDR_SIZE (sizeof(CFE_MSG_CommandHeader_t)) /**< \brief Size of command header */
#define CFE_SB_TLM_HDR_SIZE (sizeof(CFE_MSG_TelemetryHeader_t)) /**< \brief Size of telemetry header */

#endif /* CFE_OMIT_DEPRECATED_6_8 */

#endif /* CFE_SB_API_TYPEDEFS_H */
22 changes: 9 additions & 13 deletions modules/core_api/ut-stubs/src/ut_sb_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,9 @@ int32 CFE_SB_TransmitMsg(CFE_MSG_Message_t *MsgPtr, bool IncrementSequenceCount)
** Returns CFE_SUCCESS or overridden unit test value
**
******************************************************************************/
int32 CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle,
bool IncrementSequenceCount)
int32 CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, bool IncrementSequenceCount)
{
UT_Stub_RegisterContext(UT_KEY(CFE_SB_TransmitBuffer), BufPtr);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_TransmitBuffer), ZeroCopyHandle);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_TransmitBuffer), BufPtr);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_TransmitBuffer), IncrementSequenceCount);

int32 status = CFE_SUCCESS;
Expand Down Expand Up @@ -747,32 +745,30 @@ int32 CFE_SB_UnsubscribeLocal(CFE_SB_MsgId_t MsgId, CFE_SB_PipeId_t PipeId)
return status;
}

CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *BufferHandle)
CFE_SB_Buffer_t *CFE_SB_AllocateMessageBuffer(size_t MsgSize)
{
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_ZeroCopyGetPtr), MsgSize);
UT_Stub_RegisterContext(UT_KEY(CFE_SB_ZeroCopyGetPtr), BufferHandle);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_AllocateMessageBuffer), MsgSize);

int32 status;
CFE_SB_Buffer_t *SBBufPtr = NULL;

status = UT_DEFAULT_IMPL(CFE_SB_ZeroCopyGetPtr);
status = UT_DEFAULT_IMPL(CFE_SB_AllocateMessageBuffer);

if (status == CFE_SUCCESS)
{
UT_Stub_CopyToLocal(UT_KEY(CFE_SB_ZeroCopyGetPtr), &SBBufPtr, sizeof(SBBufPtr));
UT_Stub_CopyToLocal(UT_KEY(CFE_SB_AllocateMessageBuffer), &SBBufPtr, sizeof(SBBufPtr));
}

return SBBufPtr;
}

int32 CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t BufferHandle)
int32 CFE_SB_ReleaseMessageBuffer(CFE_SB_Buffer_t *BufPtr)
{
UT_Stub_RegisterContext(UT_KEY(CFE_SB_ZeroCopyReleasePtr), Ptr2Release);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_ZeroCopyReleasePtr), BufferHandle);
UT_Stub_RegisterContextGenericArg(UT_KEY(CFE_SB_ReleaseMessageBuffer), BufPtr);

int32 status;

status = UT_DEFAULT_IMPL(CFE_SB_ZeroCopyReleasePtr);
status = UT_DEFAULT_IMPL(CFE_SB_ReleaseMessageBuffer);

return status;
}
61 changes: 28 additions & 33 deletions modules/sb/fsw/src/cfe_sb_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -2026,9 +2026,9 @@ int32 CFE_SB_ReceiveBuffer(CFE_SB_Buffer_t **BufPtr, CFE_SB_PipeId_t PipeId, int
}

/*
* Function: CFE_SB_ZeroCopyGetPtr - See API and header file for details
* Function: CFE_SB_AllocateMessageBuffer - See API and header file for details
*/
CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *BufferHandle)
CFE_SB_Buffer_t *CFE_SB_AllocateMessageBuffer(size_t MsgSize)
{
CFE_ES_AppId_t AppId;
CFE_SB_BufferD_t *BufDscPtr;
Expand All @@ -2037,18 +2037,13 @@ CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *
AppId = CFE_ES_APPID_UNDEFINED;
BufDscPtr = NULL;
BufPtr = NULL;

if (MsgSize > CFE_MISSION_SB_MAX_SB_MSG_SIZE)
{
CFE_ES_WriteToSysLog(" CFE_SB:ZeroCopyGetPtr-Failed, MsgSize is too large\n");
return NULL;
}

if (BufferHandle == NULL)
{
CFE_ES_WriteToSysLog(" CFE_SB:ZeroCopyGetPtr-BufferHandle is NULL\n");
return NULL;
}

/* get callers AppId */
if (CFE_ES_GetAppID(&AppId) == CFE_SUCCESS)
{
Expand Down Expand Up @@ -2082,34 +2077,37 @@ CFE_SB_Buffer_t *CFE_SB_ZeroCopyGetPtr(size_t MsgSize, CFE_SB_ZeroCopyHandle_t *
memset(BufPtr, 0, MsgSize);
}

/* Export both items (descriptor + msg buffer) to caller */
BufferHandle->BufDscPtr = BufDscPtr;
return BufPtr;

} /* CFE_SB_ZeroCopyGetPtr */
} /* CFE_SB_AllocateMessageBuffer */

/*
* Helper functions to do sanity checks on the Zero Copy handle + Buffer combo.
*
* Note in a future CFE version the API can be simplified -
* only one of these pointers is strictly needed, since they
* should refer to the same buffer descriptor object.
* Helper function to do sanity checks on the Zero Copy Buffer and
* outputs the encapsulating descriptor if successful
*/
int32 CFE_SB_ZeroCopyHandleValidate(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle)
int32 CFE_SB_ZeroCopyBufferValidate(CFE_SB_Buffer_t *BufPtr, CFE_SB_BufferD_t **BufDscPtr)
{
cpuaddr BufDscAddr;

/*
* Sanity Check that the pointers are not NULL
*/
if (BufPtr == NULL || ZeroCopyHandle.BufDscPtr == NULL)
if (BufPtr == NULL)
{
return CFE_SB_BAD_ARGUMENT;
}

/*
* Calculate descriptor pointer from buffer pointer -
* The buffer is just a member (offset) in the descriptor
*/
BufDscAddr = (cpuaddr)BufPtr - offsetof(CFE_SB_BufferD_t, Content);
*BufDscPtr = (CFE_SB_BufferD_t *)BufDscAddr;

/*
* Check that the descriptor is actually a "zero copy" type,
* and that it refers to same actual message buffer.
*/
if (!CFE_RESOURCEID_TEST_DEFINED(ZeroCopyHandle.BufDscPtr->AppId) || (&ZeroCopyHandle.BufDscPtr->Content != BufPtr))
if (!CFE_RESOURCEID_TEST_DEFINED((*BufDscPtr)->AppId))
{
return CFE_SB_BUFFER_INVALID;
}
Expand All @@ -2119,46 +2117,43 @@ int32 CFE_SB_ZeroCopyHandleValidate(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHand
}

/*
* Function: CFE_SB_ZeroCopyReleasePtr - See API and header file for details
* Function: CFE_SB_ReleaseMessageBuffer - See API and header file for details
*/
int32 CFE_SB_ZeroCopyReleasePtr(CFE_SB_Buffer_t *Ptr2Release, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle)
CFE_Status_t CFE_SB_ReleaseMessageBuffer(CFE_SB_Buffer_t *BufPtr)
{
int32 Status;
CFE_SB_BufferD_t *BufDscPtr;
int32 Status;

Status = CFE_SB_ZeroCopyHandleValidate(Ptr2Release, ZeroCopyHandle);
Status = CFE_SB_ZeroCopyBufferValidate(BufPtr, &BufDscPtr);

CFE_SB_LockSharedData(__func__, __LINE__);

if (Status == CFE_SUCCESS)
{
/* Clear the ownership app ID and decrement use count (may also free) */
ZeroCopyHandle.BufDscPtr->AppId = CFE_ES_APPID_UNDEFINED;
CFE_SB_DecrBufUseCnt(ZeroCopyHandle.BufDscPtr);
BufDscPtr->AppId = CFE_ES_APPID_UNDEFINED;
CFE_SB_DecrBufUseCnt(BufDscPtr);
}

CFE_SB_UnlockSharedData(__func__, __LINE__);

return Status;

} /* end CFE_SB_ZeroCopyReleasePtr */
} /* end CFE_SB_ReleaseMessageBuffer */

/*
* Function CFE_SB_TransmitBuffer - See API and header file for details
*/
int32 CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle,
bool IncrementSequenceCount)
int32 CFE_SB_TransmitBuffer(CFE_SB_Buffer_t *BufPtr, bool IncrementSequenceCount)
{
int32 Status;
CFE_SB_BufferD_t *BufDscPtr;
CFE_SBR_RouteId_t RouteId;

Status = CFE_SB_ZeroCopyHandleValidate(BufPtr, ZeroCopyHandle);
Status = CFE_SB_ZeroCopyBufferValidate(BufPtr, &BufDscPtr);

if (Status == CFE_SUCCESS)
{
/* Get actual buffer descriptor pointer from zero copy handle */
BufDscPtr = ZeroCopyHandle.BufDscPtr;

/* Validate the content and get the MsgId, store it in the descriptor */
Status = CFE_SB_TransmitMsgValidate(&BufPtr->Msg, &BufDscPtr->MsgId, &BufDscPtr->ContentSize, &RouteId);

Expand Down
6 changes: 3 additions & 3 deletions modules/sb/fsw/src/cfe_sb_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ void CFE_SB_BroadcastBufferToRoute(CFE_SB_BufferD_t *BufDscPtr, CFE_SBR_RouteId_
/**
* \brief Perform basic sanity check on the Zero Copy handle
*
* \param[in] BufPtr pointer to the content buffer
* \param[in] ZeroCopyHandle Zero copy handle to check
* \param[in] BufPtr pointer to the content buffer
* \param[out] BufDscPtr Will be set to actual buffer descriptor
*
* \returns CFE_SUCCESS if validation passed, or error code.
*/
int32 CFE_SB_ZeroCopyHandleValidate(CFE_SB_Buffer_t *BufPtr, CFE_SB_ZeroCopyHandle_t ZeroCopyHandle);
int32 CFE_SB_ZeroCopyBufferValidate(CFE_SB_Buffer_t *BufPtr, CFE_SB_BufferD_t **BufDscPtr);

/**
* \brief Add a destination node
Expand Down
Loading

0 comments on commit 7db7c72

Please sign in to comment.