-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/1.5.8/zenoh utransport impl #65
Changes from all commits
0a12a8d
0b2e638
7388a7e
f82c1d1
ab53e69
db76fd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -15,7 +15,34 @@ | |||||
#include <up-cpp/transport/UTransport.h> | ||||||
|
||||||
#include <filesystem> | ||||||
#include <mutex> | ||||||
#include <optional> | ||||||
#include <unordered_map> | ||||||
|
||||||
#define ZENOHCXX_ZENOHC | ||||||
#include <zenoh.hxx> | ||||||
|
||||||
namespace zenohc { | ||||||
|
||||||
class OwnedQuery { | ||||||
public: | ||||||
OwnedQuery(const z_query_t& query) : _query(z_query_clone(&query)) {} | ||||||
|
||||||
OwnedQuery(const OwnedQuery&) = delete; | ||||||
OwnedQuery& operator=(const OwnedQuery&) = delete; | ||||||
|
||||||
~OwnedQuery() { z_drop(&_query); } | ||||||
|
||||||
Query loan() const { return z_loan(_query); } | ||||||
bool check() const { return z_check(_query); } | ||||||
|
||||||
private: | ||||||
z_owned_query_t _query; | ||||||
}; | ||||||
|
||||||
using OwnedQueryPtr = std::shared_ptr<OwnedQuery>; | ||||||
|
||||||
} // namespace zenohc | ||||||
|
||||||
namespace uprotocol::transport { | ||||||
|
||||||
|
@@ -61,6 +88,31 @@ struct ZenohUTransport : public UTransport { | |||||
|
||||||
/// @brief Represents the callable end of a callback connection. | ||||||
using CallableConn = typename UTransport::CallableConn; | ||||||
using UuriKey = std::string; | ||||||
|
||||||
struct ListenerKey { | ||||||
CallableConn listener; | ||||||
std::string zenoh_key; | ||||||
|
||||||
ListenerKey(CallableConn listener, const std::string& zenoh_key) | ||||||
: listener(listener), zenoh_key(zenoh_key) {} | ||||||
|
||||||
bool operator==(const ListenerKey& other) const { | ||||||
return listener == other.listener && zenoh_key == other.zenoh_key; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or maybe that's not the intent here - the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class needs only if the cleanupListener will receive sink/source filters like in rust, but currently it receives only listener, and I still haven't received an answer why this is so. |
||||||
} | ||||||
|
||||||
bool operator<(const ListenerKey& other) const { | ||||||
if (listener == other.listener) { | ||||||
return zenoh_key < other.zenoh_key; | ||||||
} | ||||||
return listener < other.listener; | ||||||
} | ||||||
}; | ||||||
|
||||||
using RpcCallbackMap = std::map<UuriKey, CallableConn>; | ||||||
using SubscriberMap = std::map<ListenerKey, zenoh::Subscriber>; | ||||||
using QueryableMap = std::map<ListenerKey, zenoh::Queryable>; | ||||||
using QueryMap = std::map<std::string, zenoh::OwnedQueryPtr>; | ||||||
Comment on lines
+112
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some clarification on what these are would be helpful. The types are also only used once and not particularly long, so the |
||||||
|
||||||
/// @brief Register listener to be called when UMessage is received | ||||||
/// for the given URI. | ||||||
|
@@ -69,20 +121,13 @@ struct ZenohUTransport : public UTransport { | |||||
/// version of registerListener() will reset the connection | ||||||
/// handle before returning it to the caller. | ||||||
/// | ||||||
/// @param sink_filter UUri for where messages are expected to arrive via | ||||||
/// the underlying transport technology. The callback | ||||||
/// will be called when a message with a matching sink | ||||||
/// @param listener shared_ptr to a connected callback object, to be | ||||||
/// called when a message is received. | ||||||
/// @param source_filter (Optional) UUri for where messages are expected to | ||||||
/// have been sent from. The callback will only be | ||||||
/// called for messages where the source matches. | ||||||
/// @see up-cpp for additional details | ||||||
/// | ||||||
/// @returns * OKSTATUS if the listener was registered successfully. | ||||||
/// * FAILSTATUS with the appropriate failure otherwise. | ||||||
[[nodiscard]] virtual v1::UStatus registerListenerImpl( | ||||||
const v1::UUri& sink_filter, CallableConn&& listener, | ||||||
std::optional<v1::UUri>&& source_filter) override; | ||||||
CallableConn&& listener, const v1::UUri& source_filter, | ||||||
std::optional<v1::UUri>&& sink_filter) override; | ||||||
|
||||||
/// @brief Clean up on listener disconnect. | ||||||
/// | ||||||
|
@@ -94,7 +139,56 @@ struct ZenohUTransport : public UTransport { | |||||
/// @param listener shared_ptr of the Connection that has been broken. | ||||||
virtual void cleanupListener(CallableConn listener) override; | ||||||
|
||||||
static std::string toZenohKeyString( | ||||||
const std::string& default_authority_name, const v1::UUri& source, | ||||||
const std::optional<v1::UUri>& sink); | ||||||
Comment on lines
+142
to
+144
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be documented. |
||||||
|
||||||
private: | ||||||
static v1::UStatus uError(v1::UCode code, std::string_view message); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this should be in up-cpp so it can be reused other places |
||||||
|
||||||
static std::vector<std::pair<std::string, std::string>> | ||||||
uattributesToAttachment(const v1::UAttributes& attributes); | ||||||
|
||||||
static v1::UAttributes attachmentToUAttributes( | ||||||
const zenoh::AttachmentView& attachment); | ||||||
|
||||||
static zenoh::Priority mapZenohPriority(v1::UPriority upriority); | ||||||
|
||||||
static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); | ||||||
Comment on lines
+147
to
+157
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could probably be in the anonymous namespace of the .cpp file. |
||||||
|
||||||
v1::UStatus registerRequestListener_(const std::string& zenoh_key, | ||||||
CallableConn listener); | ||||||
|
||||||
v1::UStatus registerResponseListener_(const std::string& zenoh_key, | ||||||
CallableConn listener); | ||||||
|
||||||
v1::UStatus registerPublishNotificationListener_( | ||||||
const std::string& zenoh_key, CallableConn listener); | ||||||
|
||||||
v1::UStatus sendRequest_(const std::string& zenoh_key, | ||||||
const std::string& payload, | ||||||
const v1::UAttributes& attributes); | ||||||
|
||||||
v1::UStatus sendResponse_(const std::string& payload, | ||||||
const v1::UAttributes& attributes); | ||||||
|
||||||
v1::UStatus sendPublishNotification_(const std::string& zenoh_key, | ||||||
const std::string& payload, | ||||||
const v1::UAttributes& attributes); | ||||||
|
||||||
zenoh::Session session_; | ||||||
|
||||||
RpcCallbackMap rpc_callback_map_; | ||||||
std::mutex rpc_callback_map_mutex_; | ||||||
|
||||||
SubscriberMap subscriber_map_; | ||||||
std::mutex subscriber_map_mutex_; | ||||||
|
||||||
QueryableMap queryable_map_; | ||||||
std::mutex queryable_map_mutex_; | ||||||
|
||||||
QueryMap query_map_; | ||||||
std::mutex query_map_mutex_; | ||||||
Comment on lines
+179
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should probably be hidden in the .cpp file since they depend on zenoh types. We would like not to expose those directly to uEs through the transport headers. |
||||||
}; | ||||||
|
||||||
} // namespace uprotocol::transport | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be relocated into the .cpp file. It's a detail specific to this implementation and does not need to be exposed externally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is temporary workaround and it should be removed after switching to zenohcpp 1.0