diff --git a/up-transport-vsomeip/examples/hello_client.rs b/up-transport-vsomeip/examples/hello_client.rs index e35c14b..a9db676 100644 --- a/up-transport-vsomeip/examples/hello_client.rs +++ b/up-transport-vsomeip/examples/hello_client.rs @@ -1,12 +1,25 @@ -use async_trait::async_trait; +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; use log::trace; use std::fs::canonicalize; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use up_rust::communication::{CallOptions, InMemoryRpcClient, RpcClient, UPayload}; use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UStatus, UUri}; use up_transport_vsomeip::UPTransportVsomeip; const HELLO_SERVICE_ID: u16 = 0x6000; @@ -30,28 +43,6 @@ const CLIENT_RESOURCE_ID: u16 = 0; const REQUEST_TTL: u32 = 1000; -struct ServiceResponseListener; - -#[async_trait] -impl UListener for ServiceResponseListener { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceResponseListener: Received a message: {msg:?}"); - - let mut msg = msg.clone(); - - if let Some(ref mut attributes) = msg.attributes.as_mut() { - attributes.payload_format = - ::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); - } - - let Ok(hello_response) = msg.extract_protobuf::() else { - panic!("Unable to parse into HelloResponse"); - }; - - println!("Here we received response: {hello_response:?}"); - } -} - #[tokio::main] async fn main() -> Result<(), UStatus> { env_logger::init(); @@ -73,7 +64,7 @@ async fn main() -> Result<(), UStatus> { CLIENT_RESOURCE_ID, ) .unwrap(); - let client: Arc = Arc::new( + let client = Arc::new( UPTransportVsomeip::new_with_config( client_uuri, &HELLO_SERVICE_AUTHORITY.to_string(), @@ -83,13 +74,10 @@ async fn main() -> Result<(), UStatus> { .unwrap(), ); - let source = UUri::try_from_parts( - CLIENT_AUTHORITY, - CLIENT_UE_ID, - CLIENT_UE_VERSION_MAJOR, - CLIENT_RESOURCE_ID, - ) - .unwrap(); + let l2_client = InMemoryRpcClient::new(client.clone(), client.clone()) + .await + .unwrap(); + let sink = UUri::try_from_parts( HELLO_SERVICE_AUTHORITY, HELLO_SERVICE_UE_ID, @@ -98,11 +86,6 @@ async fn main() -> Result<(), UStatus> { ) .unwrap(); - let service_response_listener: Arc = Arc::new(ServiceResponseListener); - client - .register_listener(&sink, Some(&source), service_response_listener) - .await?; - let mut i = 0; loop { tokio::time::sleep(Duration::from_millis(1000)).await; @@ -112,12 +95,36 @@ async fn main() -> Result<(), UStatus> { ..Default::default() }; i += 1; + println!("Sending Request message with payload:\n{hello_request:?}"); + + let call_options = CallOptions::for_rpc_request(REQUEST_TTL, None, None, None); + let invoke_res = l2_client + .invoke_method( + sink.clone(), + call_options, + Some(UPayload::try_from_protobuf(hello_request).unwrap()), + ) + .await; + + let Ok(response) = invoke_res else { + panic!( + "Hit an error attempting to invoke method: {:?}", + invoke_res.err().unwrap() + ); + }; + + let hello_response_vsomeip_unspecified_payload_format = response.unwrap(); + let hello_response_protobuf_payload_format = UPayload::new( + hello_response_vsomeip_unspecified_payload_format.payload(), + UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, + ); - let request_msg = UMessageBuilder::request(sink.clone(), source.clone(), REQUEST_TTL) - .build_with_protobuf_payload(&hello_request) - .unwrap(); - println!("Sending Request message:\n{request_msg:?}"); + let Ok(hello_response) = + hello_response_protobuf_payload_format.extract_protobuf::() + else { + panic!("Unable to parse into HelloResponse"); + }; - client.send(request_msg).await?; + println!("Here we received response: {hello_response:?}"); } } diff --git a/up-transport-vsomeip/examples/hello_service.rs b/up-transport-vsomeip/examples/hello_service.rs index 1dad0a4..2de442e 100644 --- a/up-transport-vsomeip/examples/hello_service.rs +++ b/up-transport-vsomeip/examples/hello_service.rs @@ -1,3 +1,16 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + use async_trait::async_trait; use hello_world_protos::hello_world_service::{HelloRequest, HelloResponse}; use log::{error, trace}; @@ -5,8 +18,11 @@ use std::fs::canonicalize; use std::path::PathBuf; use std::sync::Arc; use std::thread; +use up_rust::communication::{ + InMemoryRpcServer, RequestHandler, RpcServer, ServiceInvocationError, UPayload, +}; use up_rust::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY; -use up_rust::{UListener, UMessage, UMessageBuilder, UStatus, UTransport, UUri}; +use up_rust::{UCode, UStatus, UUri}; use up_transport_vsomeip::UPTransportVsomeip; const HELLO_SERVICE_ID: u16 = 0x6000; @@ -30,27 +46,28 @@ const _CLIENT_RESOURCE_ID: u16 = 0; const _REQUEST_TTL: u32 = 1000; -struct ServiceRequestResponder { - client: Arc, -} -impl ServiceRequestResponder { - pub fn new(client: Arc) -> Self { - Self { client } +struct ServiceRequestHandler; +impl ServiceRequestHandler { + pub fn new() -> Self { + Self } } #[async_trait] -impl UListener for ServiceRequestResponder { - async fn on_receive(&self, msg: UMessage) { - println!("ServiceRequestResponder: Received a message: {msg:?}"); - - let mut msg = msg.clone(); - - if let Some(ref mut attributes) = msg.attributes.as_mut() { - attributes.payload_format = - ::protobuf::EnumOrUnknown::new(UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); - } - - let hello_request = msg.extract_protobuf::(); +impl RequestHandler for ServiceRequestHandler { + async fn handle_request( + &self, + resource_id: u16, + request_payload: Option, + ) -> Result, ServiceInvocationError> { + println!("ServiceRequestHandler: Received a resource_id: {resource_id} request_payload: {request_payload:?}"); + + let hello_request_vsomeip_unspecified_payload_format = request_payload.unwrap(); + let hello_request_protobuf_payload_format = UPayload::new( + hello_request_vsomeip_unspecified_payload_format.payload(), + UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, + ); + let hello_request = + hello_request_protobuf_payload_format.extract_protobuf::(); let hello_request = match hello_request { Ok(hello_request) => { @@ -59,7 +76,10 @@ impl UListener for ServiceRequestResponder { } Err(err) => { error!("Unable to parse HelloRequest: {err:?}"); - return; + return Err(ServiceInvocationError::RpcError(UStatus::fail_with_code( + UCode::INTERNAL, + "Unable to parse hello_request", + ))); } }; @@ -68,10 +88,9 @@ impl UListener for ServiceRequestResponder { ..Default::default() }; - let response_msg = UMessageBuilder::response_for_request(msg.attributes.as_ref().unwrap()) - .build_with_wrapped_protobuf_payload(&hello_response) - .unwrap(); - self.client.send(response_msg).await.unwrap(); + println!("Making response to send back: {hello_response:?}"); + + Ok(Some(UPayload::try_from_protobuf(hello_response).unwrap())) } } @@ -93,10 +112,11 @@ async fn main() -> Result<(), UStatus> { HELLO_SERVICE_AUTHORITY, HELLO_SERVICE_UE_ID, HELLO_SERVICE_MAJOR, - HELLO_SERVICE_RESOURCE_ID, + // HELLO_SERVICE_RESOURCE_ID, + 0, ) .unwrap(); - let service: Arc = Arc::new( + let service = Arc::new( UPTransportVsomeip::new_with_config( service_uuri, &CLIENT_AUTHORITY.to_string(), @@ -105,25 +125,13 @@ async fn main() -> Result<(), UStatus> { ) .unwrap(), ); + let l2_service = InMemoryRpcServer::new(service.clone(), service.clone()); - let source_filter = UUri::any(); - let sink_filter = UUri::try_from_parts( - HELLO_SERVICE_AUTHORITY, - HELLO_SERVICE_UE_ID, - HELLO_SERVICE_MAJOR, - HELLO_SERVICE_RESOURCE_ID, - ) - .unwrap(); - let service_request_responder: Arc = - Arc::new(ServiceRequestResponder::new(service.clone())); - // TODO: Need to revisit how the vsomeip config file is used in non point-to-point cases - service - .register_listener( - &source_filter, - Some(&sink_filter), - service_request_responder.clone(), - ) - .await?; + let service_request_handler = Arc::new(ServiceRequestHandler::new()); + l2_service + .register_endpoint(None, HELLO_SERVICE_RESOURCE_ID, service_request_handler) + .await + .expect("Unable to register endpoint"); thread::park(); Ok(()) diff --git a/up-transport-vsomeip/src/message_conversions.rs b/up-transport-vsomeip/src/message_conversions.rs index a3c61cd..de66442 100644 --- a/up-transport-vsomeip/src/message_conversions.rs +++ b/up-transport-vsomeip/src/message_conversions.rs @@ -236,9 +236,11 @@ where { let ok = { if let Some(commstatus) = umsg.attributes.commstatus { let commstatus = commstatus.enum_value_or(UCode::UNIMPLEMENTED); + trace!("UMessage Response commstatus was set to: {commstatus:?}"); commstatus == UCode::OK } else { - false + trace!("UMessage Response had no commstatus set"); + true } }; if ok {