From 0ddba9383dc95befaed478366353e73adb48f0c5 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 18 Sep 2024 18:48:56 +0800 Subject: [PATCH 1/2] change udf message limit to 20MB Signed-off-by: Richard Chien --- src/expr/impl/src/udf/external.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/expr/impl/src/udf/external.rs b/src/expr/impl/src/udf/external.rs index f3055954c3a7..b2d7c75fd439 100644 --- a/src/expr/impl/src/udf/external.rs +++ b/src/expr/impl/src/udf/external.rs @@ -164,6 +164,9 @@ impl UdfImpl for ExternalFunction { } } +// TODO(rc): allow changing this in configuration +const MAX_DECODING_MESSAGE_SIZE: usize = 20 << 20; // 20MB + /// Get or create a client for the given UDF service. /// /// There is a global cache for clients, so that we can reuse the same client for the same service. @@ -186,7 +189,10 @@ fn get_or_create_flight_client(link: &str) -> Result> { let client = Arc::new(tokio::task::block_in_place(|| { RUNTIME.block_on(async { let channel = connect_tonic(link).await?; - Ok(Client::new(channel).await?) as Result<_> + Ok(Client::new(channel) + .await? + .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)) + as Result<_> }) })?); clients.insert(link.to_owned(), Arc::downgrade(&client)); From 36037c5c769b62e8bcc35a52ca608c3a6893120c Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Fri, 20 Sep 2024 15:12:01 +0800 Subject: [PATCH 2/2] use new constructor Signed-off-by: Richard Chien --- src/expr/impl/src/udf/external.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/expr/impl/src/udf/external.rs b/src/expr/impl/src/udf/external.rs index b2d7c75fd439..e2e20a5b5934 100644 --- a/src/expr/impl/src/udf/external.rs +++ b/src/expr/impl/src/udf/external.rs @@ -18,6 +18,7 @@ use std::sync::{Arc, LazyLock, Weak}; use std::time::Duration; use anyhow::bail; +use arrow_flight::flight_service_client::FlightServiceClient; use arrow_udf_flight::Client; use futures_util::{StreamExt, TryStreamExt}; use ginepro::{LoadBalancedChannel, ResolutionStrategy}; @@ -189,10 +190,9 @@ fn get_or_create_flight_client(link: &str) -> Result> { let client = Arc::new(tokio::task::block_in_place(|| { RUNTIME.block_on(async { let channel = connect_tonic(link).await?; - Ok(Client::new(channel) - .await? - .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE)) - as Result<_> + let client = FlightServiceClient::new(channel) + .max_decoding_message_size(MAX_DECODING_MESSAGE_SIZE); + Ok(Client::with_flight_client(client).await?) as Result<_> }) })?); clients.insert(link.to_owned(), Arc::downgrade(&client));