Skip to content

Commit

Permalink
send service call failure operation (#298)
Browse files Browse the repository at this point in the history
### Changelog
None

### Docs

None

### Description
Similar to foxglove/ws-protocol#743, this PR
implements the recently added `serviceCallFailure` operation that was
added inhttps://github.com/foxglove/ws-protocol/pull/733. This operation
is sent to a client when the service call failed or if the service could
not be found to avoid that the client potentially waits forever for a
service response message.

---------

Co-authored-by: Jacob Bandes-Storch <jacob@foxglove.dev>
  • Loading branch information
achim-k and jtbandes authored May 14, 2024
1 parent e773b92 commit 0b6c588
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class ServerInterface {
virtual void broadcastTime(uint64_t timestamp) = 0;
virtual void sendServiceResponse(ConnectionHandle clientHandle,
const ServiceResponse& response) = 0;
virtual void sendServiceFailure(ConnectionHandle clientHandle, ServiceId serviceId,
uint32_t callId, const std::string& message) = 0;
virtual void updateConnectionGraph(const MapOfSets& publishedTopics,
const MapOfSets& subscribedTopics,
const MapOfSets& advertisedServices) = 0;
Expand Down
34 changes: 28 additions & 6 deletions foxglove_bridge_base/include/foxglove_bridge/websocket_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class Server final : public ServerInterface<ConnHandle> {
const uint8_t* payload, size_t payloadSize) override;
void broadcastTime(uint64_t timestamp) override;
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
void sendServiceFailure(ConnHandle clientHandle, ServiceId serviceId, uint32_t callId,
const std::string& message) override;
void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
const MapOfSets& advertisedServices) override;
void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse& response) override;
Expand Down Expand Up @@ -768,8 +770,10 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
case ClientBinaryOpcode::SERVICE_CALL_REQUEST: {
ServiceRequest request;
if (length < request.size()) {
sendStatusAndLogMsg(hdl, StatusLevel::Error,
"Invalid service call request length " + std::to_string(length));
const std::string errMessage =
"Invalid service call request length " + std::to_string(length);
sendServiceFailure(hdl, request.serviceId, request.callId, errMessage);
_server.get_elog().write(RECOVERABLE, errMessage);
return;
}

Expand All @@ -778,15 +782,23 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
{
std::shared_lock<std::shared_mutex> lock(_servicesMutex);
if (_services.find(request.serviceId) == _services.end()) {
sendStatusAndLogMsg(
hdl, StatusLevel::Error,
"Service " + std::to_string(request.serviceId) + " is not advertised");
const std::string errMessage =
"Service " + std::to_string(request.serviceId) + " is not advertised";
sendServiceFailure(hdl, request.serviceId, request.callId, errMessage);
_server.get_elog().write(RECOVERABLE, errMessage);
return;
}
}

if (_handlers.serviceRequestHandler) {
try {
if (!_handlers.serviceRequestHandler) {
throw foxglove::ServiceError(request.serviceId, "No service handler");
}

_handlers.serviceRequestHandler(request, hdl);
} catch (const std::exception& e) {
sendServiceFailure(hdl, request.serviceId, request.callId, e.what());
_server.get_elog().write(RECOVERABLE, e.what());
}
} break;
default: {
Expand Down Expand Up @@ -1024,6 +1036,16 @@ inline uint16_t Server<ServerConfiguration>::getPort() {
return endpoint.port();
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendServiceFailure(ConnHandle clientHandle,
ServiceId serviceId, uint32_t callId,
const std::string& message) {
sendJson(clientHandle, json{{"op", "serviceCallFailure"},
{"serviceId", serviceId},
{"callId", callId},
{"message", message}});
};

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::updateConnectionGraph(
const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
Expand Down
26 changes: 26 additions & 0 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,32 @@ TEST_F(ServiceTest, testCallServiceParallel) {
}
}

TEST_F(ServiceTest, testCallNonexistentService) {
auto client = std::make_shared<foxglove::Client<websocketpp::config::asio_client>>();
ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(std::chrono::seconds(5)));

std::promise<nlohmann::json> promise;
auto serviceFailureFuture = promise.get_future();
client->setTextMessageHandler([&promise](const std::string& payload) mutable {
const auto msg = nlohmann::json::parse(payload);
if (msg["op"].get<std::string>() == "serviceCallFailure") {
promise.set_value(msg);
}
});

foxglove::ServiceRequest request;
request.serviceId = 99u;
request.callId = 123u;
request.encoding = "";
request.data = {};
client->sendServiceRequest(request);

ASSERT_EQ(std::future_status::ready, serviceFailureFuture.wait_for(std::chrono::seconds(5)));
const auto failureMsg = serviceFailureFuture.get();
EXPECT_EQ(failureMsg["serviceId"].get<foxglove::ServiceId>(), request.serviceId);
EXPECT_EQ(failureMsg["callId"].get<uint32_t>(), request.callId);
}

TEST(SmokeTest, receiveMessagesOfMultipleTransientLocalPublishers) {
const std::string topicName = "/latched";
auto node = rclcpp::Node::make_shared("node");
Expand Down

0 comments on commit 0b6c588

Please sign in to comment.