Skip to content

Commit

Permalink
Rework error processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Aug 5, 2024
1 parent 9b9e00e commit 7c959d6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 36 deletions.
3 changes: 0 additions & 3 deletions include/up-transport-zenoh-cpp/ZenohUTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ struct ZenohUTransport : public UTransport {

static v1::UMessage sampleToUMessage(const zenoh::Sample& sample);
static v1::UMessage queryToUMessage(const zenoh::Query& query);
static v1::UMessage errorToUMessage(v1::UCode err_code,
const std::string& err_message);

v1::UStatus registerRequestListener_(const std::string& zenoh_key,
CallableConn listener);
Expand Down Expand Up @@ -149,4 +147,3 @@ struct ZenohUTransport : public UTransport {
} // namespace uprotocol::transport

#endif // UP_TRANSPORT_ZENOH_CPP_ZENOHUTRANSPORT_H

58 changes: 25 additions & 33 deletions src/ZenohUTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,38 +146,20 @@ zenoh::Priority ZenohUTransport::mapZenohPriority(v1::UPriority upriority) {
}

v1::UMessage ZenohUTransport::sampleToUMessage(const zenoh::Sample& sample) {
try {
v1::UMessage message;
*message.mutable_attributes() =
attachmentToUAttributes(sample.get_attachment());
std::string payload(sample.get_payload().deserialize<std::string>());
message.set_payload(payload);
return message;
} catch (const std::exception& e) {
spdlog::error("sampleToUMessage: {}", e.what());
return errorToUMessage(v1::UCode::INVALID_ARGUMENT, e.what());
}
v1::UMessage message;
*message.mutable_attributes() =
attachmentToUAttributes(sample.get_attachment());
std::string payload(sample.get_payload().deserialize<std::string>());
message.set_payload(payload);
return message;
}

v1::UMessage ZenohUTransport::queryToUMessage(const zenoh::Query& query) {
try {
v1::UMessage message;
*message.mutable_attributes() =
attachmentToUAttributes(query.get_attachment());
std::string payload(query.get_payload().deserialize<std::string>());
message.set_payload(payload);
return message;
} catch (const std::exception& e) {
spdlog::error("queryToUMessage: {}", e.what());
return errorToUMessage(v1::UCode::INVALID_ARGUMENT, e.what());
}
}

v1::UMessage ZenohUTransport::errorToUMessage(v1::UCode err_code,
const std::string& err_message) {
v1::UMessage message;
message.mutable_attributes()->set_commstatus(v1::UCode::INVALID_ARGUMENT);
message.set_payload(err_message);
*message.mutable_attributes() =
attachmentToUAttributes(query.get_attachment());
std::string payload(query.get_payload().deserialize<std::string>());
message.set_payload(payload);
return message;
}

Expand Down Expand Up @@ -210,7 +192,11 @@ v1::UStatus ZenohUTransport::registerRequestListener_(
zenoh::detail::loan(query));

query_map_.emplace(std::move(id_str), std::move(cloned_query));
listener(queryToUMessage(query));
try {
listener(queryToUMessage(query));
} catch (const std::exception& e) {
spdlog::error("query processing failed: {}", e.what());
}
};

auto on_drop = []() {};
Expand Down Expand Up @@ -239,7 +225,11 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_(
// NOTE: listener is captured by copy here so that it does not go out
// of scope when this function returns.
auto on_sample = [this, listener](const zenoh::Sample& sample) mutable {
listener(sampleToUMessage(sample));
try {
listener(sampleToUMessage(sample));
} catch (const std::exception& e) {
spdlog::error("sample processing failed: {}", e.what());
}
};

auto on_drop = []() {};
Expand Down Expand Up @@ -277,13 +267,16 @@ v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key,
const auto& sample = reply.get_ok();
spdlog::debug("resp_callback: {}",
sample.get_payload().deserialize<std::string>());
resp_callback(sampleToUMessage(sample));
try {
resp_callback(sampleToUMessage(sample));
} catch (const std::exception& e) {
spdlog::error("sample processing failed: {}", e.what());
}
spdlog::debug("resp_callback: done");
} else {
auto err_message =
reply.get_err().get_payload().deserialize<std::string>();
spdlog::error("on_reply got en error: {}", err_message);
resp_callback(errorToUMessage(v1::UCode::INTERNAL, err_message));
}
};

Expand Down Expand Up @@ -416,4 +409,3 @@ void ZenohUTransport::cleanupListener(CallableConn listener) {
}

} // namespace uprotocol::transport

0 comments on commit 7c959d6

Please sign in to comment.