Skip to content

Commit

Permalink
Add logging and fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Aug 1, 2024
1 parent 70e06d5 commit d648121
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 16 deletions.
1 change: 1 addition & 0 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct ZenohUTransport : public UTransport {
static zenoh::Priority mapZenohPriority(v1::UPriority upriority);

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
static v1::UMessage queryToUMessage(const zenoh::Query& query);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);
Expand Down
91 changes: 75 additions & 16 deletions src/ZenohUTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <stdexcept>

#define LOG_LEVEL 0
#include "log.hpp"

namespace uprotocol::transport {

const char UATTRIBUTE_VERSION = 1;
Expand Down Expand Up @@ -152,41 +155,65 @@ v1::UMessage ZenohUTransport::sampleToUMessage(const zenoh::Sample& sample) {
return message;
}

v1::UMessage ZenohUTransport::queryToUMessage(const zenoh::Query& query) {
v1::UMessage message;
*message.mutable_attributes() =
attachmentToUAttributes(query.get_attachment());
std::string payload(query.get_payload().deserialize<std::string>());
message.set_payload(payload);

return message;
}

ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri,
const std::filesystem::path& configFile)
: UTransport(defaultUri),
session_(zenoh::Session::open(
std::move(zenoh::Config::from_file(configFile.string().c_str())))) {}
std::move(zenoh::Config::from_file(configFile.string().c_str())))) {
I("ZenohUTransport init");
}

v1::UStatus ZenohUTransport::registerRequestListener_(
const std::string& zenoh_key, CallableConn listener) {
I("registerRequestListener_: " << zenoh_key);

// NOTE: listener is captured by copy here so that it does not go out
// of scope when this function returns.
auto on_query = [this, listener](const zenoh::Query& query) {
auto on_query = [this, listener](const zenoh::Query& query) mutable {
auto attributes = attachmentToUAttributes(query.get_attachment());
auto id_str =
datamodel::serializer::uuid::AsString().serialize(attributes.id());
std::unique_lock<std::mutex> lock(query_map_mutex_);
{
std::unique_lock<std::mutex> lock(query_map_mutex_);

// TODO(sashacmc): Replace this workaround with `query.clone()`
// after zenohcpp 1.0.0-rc6 release
zenoh::Query cloned_query(nullptr);
z_query_clone(zenoh::detail::as_owned_c_ptr(cloned_query),
zenoh::detail::loan(query));
// TODO(sashacmc): Replace this workaround with `query.clone()`
// after zenohcpp 1.0.0-rc6 release
zenoh::Query cloned_query(nullptr);
z_query_clone(zenoh::detail::as_owned_c_ptr(cloned_query),
zenoh::detail::loan(query));

query_map_.emplace(std::move(id_str), std::move(cloned_query));
query_map_.emplace(std::move(id_str), std::move(cloned_query));
}
listener(queryToUMessage(query));
};

auto on_drop = []() {};

auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query),
std::move(on_drop));

{
std::unique_lock<std::mutex> lock(queryable_map_mutex_);
queryable_map_.emplace(listener, std::move(queryable));
}

return v1::UStatus();
}

v1::UStatus ZenohUTransport::registerResponseListener_(
const std::string& zenoh_key, CallableConn listener) {
I("registerResponseListener_: " << zenoh_key);

std::unique_lock<std::mutex> lock(rpc_callback_map_mutex_);
rpc_callback_map_.insert(std::make_pair(zenoh_key, listener));

Expand All @@ -195,6 +222,8 @@ v1::UStatus ZenohUTransport::registerResponseListener_(

v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
const std::string& zenoh_key, CallableConn listener) {
I("registerPublishNotificationListener_: " << zenoh_key);

// NOTE: listener is captured by copy here so that it does not go out
// of scope when this function returns.
auto on_sample = [this, listener](const zenoh::Sample& sample) mutable {
Expand All @@ -212,30 +241,56 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
return v1::UStatus();
}

// Define a custom search function
template <typename K, typename V, typename Predicate>
typename std::map<K, V>::iterator find_in_map_if(std::map<K, V>& m,
Predicate pred) {
return std::find_if(m.begin(), m.end(), [&](const std::pair<K, V>& pair) {
return pred(pair);
});
}

v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key,
const std::string& payload,
const v1::UAttributes& attributes) {
auto source_str =
datamodel::serializer::uri::AsString().serialize(attributes.source());
D("sendRequest_: " << zenoh_key << ": " << payload);
zenoh::KeyExpr ke(zenoh_key);
auto ke_search = [&](const std::pair<std::string, CallableConn>& pair) {
std::cout << "!!! serach: " << pair.first << " <> " << zenoh_key
<< std::endl;
return zenoh::KeyExpr(pair.first).intersects(ke);
};

CallableConn resp_callback;
{
std::unique_lock<std::mutex> lock(rpc_callback_map_mutex_);

if (auto resp_callback_it = rpc_callback_map_.find(source_str);
if (auto resp_callback_it =
find_in_map_if(rpc_callback_map_, ke_search);
resp_callback_it == rpc_callback_map_.end()) {
return uError(v1::UCode::UNAVAILABLE, "failed to find UUID");
E("sendRequest_: failed to find response callback for: "
<< zenoh_key);
return uError(v1::UCode::UNAVAILABLE,
"failed to find response callback");
} else {
D("sendRequest_: found callback for: " << zenoh_key);
resp_callback = resp_callback_it->second;
}
}
auto on_reply = [&](const zenoh::Reply& reply) {
auto on_reply = [=](const zenoh::Reply& reply) mutable {
D("on_reply for " << zenoh_key);
if (reply.is_ok()) {
const auto& sample = reply.get_ok();
D("resp_callback: "
<< sample.get_payload().deserialize<std::string>());
resp_callback(sampleToUMessage(sample));
D("resp_callback: done");
} else {
E("on_reply got en error: "
<< reply.get_err().get_payload().deserialize<std::string>());
// TODO: error report
// std::cout << "Received an error :" <<
// reply.get_err().get_payload().deserialize<std::string>() << "\n";
// << "\n";
}
};

Expand All @@ -259,6 +314,7 @@ v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload,
const v1::UAttributes& attributes) {
auto reqid_str =
datamodel::serializer::uuid::AsString().serialize(attributes.reqid());
D("sendResponse_: " << reqid_str << ": " << payload);
zenoh::Query* query;
{
std::unique_lock<std::mutex> lock(query_map_mutex_);
Expand All @@ -270,7 +326,10 @@ v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload,
}
}

query->reply(query->get_keyexpr(), payload);
D("sendResponse_ to query: " << query->get_keyexpr().as_string_view());
auto attachment = uattributesToAttachment(attributes);
query->reply(query->get_keyexpr(), payload,
{.attachment = zenoh::Bytes::serialize(attachment)});

return v1::UStatus();
}
Expand Down

0 comments on commit d648121

Please sign in to comment.