Skip to content

Commit

Permalink
PR feedback fixes:
Browse files Browse the repository at this point in the history
* Modify examples to use uP-L2 APIs
* Fix RPC behavior of transport to correctly recognize empty commstatus
as an OK status
  • Loading branch information
PLeVasseur committed Aug 2, 2024
1 parent bcdaf9b commit 7f245c3
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 87 deletions.
91 changes: 49 additions & 42 deletions up-transport-vsomeip/examples/hello_client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
use async_trait::async_trait;
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::trace;
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use up_rust::communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload};
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
use up_rust::{UStatus, UUri};
use up_transport_vsomeip::UPTransportVsomeip;

const HELLO_SERVICE_ID: u16 = 0x6000;
Expand All @@ -30,28 +43,6 @@ const CLIENT_RESOURCE_ID: u16 = 0;

const REQUEST_TTL: u32 = 1000;

struct ServiceResponseListener;

#[async_trait]
impl UListener for ServiceResponseListener {
async fn on_receive(&self, msg: UMessage) {
println!("ServiceResponseListener: Received a message: {msg:?}");

let mut msg = msg.clone();

if let Some(ref mut attributes) = msg.attributes.as_mut() {
attributes.payload_format =
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
}

let Ok(hello_response) = msg.extract_protobuf::<HelloResponse>() else {
panic!("Unable to parse into HelloResponse");
};

println!("Here we received response: {hello_response:?}");
}
}

#[tokio::main]
async fn main() -> Result<(), UStatus> {
env_logger::init();
Expand All @@ -73,7 +64,7 @@ async fn main() -> Result<(), UStatus> {
CLIENT_RESOURCE_ID,
)
.unwrap();
let client: Arc<dyn UTransport> = Arc::new(
let client = Arc::new(
UPTransportVsomeip::new_with_config(
client_uuri,
&HELLO_SERVICE_AUTHORITY.to_string(),
Expand All @@ -83,13 +74,10 @@ async fn main() -> Result<(), UStatus> {
.unwrap(),
);

let source = UUri::try_from_parts(
CLIENT_AUTHORITY,
CLIENT_UE_ID,
CLIENT_UE_VERSION_MAJOR,
CLIENT_RESOURCE_ID,
)
.unwrap();
let l2_client = InMemoryRpcClient::new(client.clone(), client.clone())
.await
.unwrap();

let sink = UUri::try_from_parts(
HELLO_SERVICE_AUTHORITY,
HELLO_SERVICE_UE_ID,
Expand All @@ -98,11 +86,6 @@ async fn main() -> Result<(), UStatus> {
)
.unwrap();

let service_response_listener: Arc<dyn UListener> = Arc::new(ServiceResponseListener);
client
.register_listener(&sink, Some(&source), service_response_listener)
.await?;

let mut i = 0;
loop {
tokio::time::sleep(Duration::from_millis(1000)).await;
Expand All @@ -112,12 +95,36 @@ async fn main() -> Result<(), UStatus> {
..Default::default()
};
i += 1;
println!("Sending Request message with payload:\n{hello_request:?}");

let call_options = CallOptions::for_rpc_request(REQUEST_TTL, None, None, None);
let invoke_res = l2_client
.invoke_method(
sink.clone(),
call_options,
Some(UPayload::try_from_protobuf(hello_request).unwrap()),
)
.await;

let Ok(response) = invoke_res else {
panic!(
"Hit an error attempting to invoke method: {:?}",
invoke_res.err().unwrap()
);
};

let hello_response_vsomeip_unspecified_payload_format = response.unwrap();
let hello_response_protobuf_payload_format = UPayload::new(
hello_response_vsomeip_unspecified_payload_format.payload(),
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
);

let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL)
.build_with_protobuf_payload(&hello_request)
.unwrap();
println!("Sending Request message:\n{request_msg:?}");
let Ok(hello_response) =
hello_response_protobuf_payload_format.extract_protobuf::<HelloResponse>()
else {
panic!("Unable to parse into HelloResponse");
};

client.send(request_msg).await?;
println!("Here we received response: {hello_response:?}");
}
}
96 changes: 52 additions & 44 deletions up-transport-vsomeip/examples/hello_service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use async_trait::async_trait;
use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse};
use log::{error, trace};
use std::fs::canonicalize;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
use up_rust::communication::{
InMemoryRpcServer, RequestHandler, RpcServer, ServiceInvocationError, UPayload,
};
use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY;
use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri};
use up_rust::{UCode, UStatus, UUri};
use up_transport_vsomeip::UPTransportVsomeip;

const HELLO_SERVICE_ID: u16 = 0x6000;
Expand All @@ -30,27 +46,28 @@ const _CLIENT_RESOURCE_ID: u16 = 0;

const _REQUEST_TTL: u32 = 1000;

struct ServiceRequestResponder {
client: Arc<dyn UTransport>,
}
impl ServiceRequestResponder {
pub fn new(client: Arc<dyn UTransport>) -> Self {
Self { client }
struct ServiceRequestHandler;
impl ServiceRequestHandler {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl UListener for ServiceRequestResponder {
async fn on_receive(&self, msg: UMessage) {
println!("ServiceRequestResponder: Received a message: {msg:?}");

let mut msg = msg.clone();

if let Some(ref mut attributes) = msg.attributes.as_mut() {
attributes.payload_format =
::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY);
}

let hello_request = msg.extract_protobuf::<HelloRequest>();
impl RequestHandler for ServiceRequestHandler {
async fn handle_request(
&self,
resource_id: u16,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
println!("ServiceRequestHandler: Received a resource_id: {resource_id} request_payload: {request_payload:?}");

let hello_request_vsomeip_unspecified_payload_format = request_payload.unwrap();
let hello_request_protobuf_payload_format = UPayload::new(
hello_request_vsomeip_unspecified_payload_format.payload(),
UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY,
);
let hello_request =
hello_request_protobuf_payload_format.extract_protobuf::<HelloRequest>();

let hello_request = match hello_request {
Ok(hello_request) => {
Expand All @@ -59,7 +76,10 @@ impl UListener for ServiceRequestResponder {
}
Err(err) => {
error!("Unable to parse HelloRequest: {err:?}");
return;
return Err(ServiceInvocationError::RpcError(UStatus::fail_with_code(
UCode::INTERNAL,
"Unable to parse hello_request",
)));
}
};

Expand All @@ -68,10 +88,9 @@ impl UListener for ServiceRequestResponder {
..Default::default()
};

let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap())
.build_with_wrapped_protobuf_payload(&hello_response)
.unwrap();
self.client.send(response_msg).await.unwrap();
println!("Making response to send back: {hello_response:?}");

Ok(Some(UPayload::try_from_protobuf(hello_response).unwrap()))
}
}

Expand All @@ -93,10 +112,11 @@ async fn main() -> Result<(), UStatus> {
HELLO_SERVICE_AUTHORITY,
HELLO_SERVICE_UE_ID,
HELLO_SERVICE_MAJOR,
HELLO_SERVICE_RESOURCE_ID,
// HELLO_SERVICE_RESOURCE_ID,
0,
)
.unwrap();
let service: Arc<dyn UTransport> = Arc::new(
let service = Arc::new(
UPTransportVsomeip::new_with_config(
service_uuri,
&CLIENT_AUTHORITY.to_string(),
Expand All @@ -105,25 +125,13 @@ async fn main() -> Result<(), UStatus> {
)
.unwrap(),
);
let l2_service = InMemoryRpcServer::new(service.clone(), service.clone());

let source_filter = UUri::any();
let sink_filter = UUri::try_from_parts(
HELLO_SERVICE_AUTHORITY,
HELLO_SERVICE_UE_ID,
HELLO_SERVICE_MAJOR,
HELLO_SERVICE_RESOURCE_ID,
)
.unwrap();
let service_request_responder: Arc<dyn UListener> =
Arc::new(ServiceRequestResponder::new(service.clone()));
// TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases
service
.register_listener(
&source_filter,
Some(&sink_filter),
service_request_responder.clone(),
)
.await?;
let service_request_handler = Arc::new(ServiceRequestHandler::new());
l2_service
.register_endpoint(None, HELLO_SERVICE_RESOURCE_ID, service_request_handler)
.await
.expect("Unable to register endpoint");

thread::park();
Ok(())
Expand Down
4 changes: 3 additions & 1 deletion up-transport-vsomeip/src/message_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,11 @@ where {
let ok = {
if let Some(commstatus) = umsg.attributes.commstatus {
let commstatus = commstatus.enum_value_or(UCode::UNIMPLEMENTED);
trace!("UMessage Response commstatus was set to: {commstatus:?}");
commstatus == UCode::OK
} else {
false
trace!("UMessage Response had no commstatus set");
true
}
};
if ok {
Expand Down

0 comments on commit 7f245c3

Please sign in to comment.