Skip to content

Commit

Permalink
Change default subscriber config (#242)
Browse files Browse the repository at this point in the history
* set same ack_deadline_seconds as subscription config

* set same ack_deadline_seconds as subscription config

* make set_call_option to pub
  • Loading branch information
yoshidan authored Mar 19, 2024
1 parent 7a46db4 commit 96a872d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
8 changes: 5 additions & 3 deletions pubsub/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ mod tests {
let config = ReceiveConfig {
worker_count: 2,
channel_capacity: None,
subscriber_config: SubscriberConfig {
subscriber_config: Some(SubscriberConfig {
ping_interval: Duration::from_secs(1),
..Default::default()
},
}),
};
let cancel_receiver = cancellation_token.clone();
let (s, mut r) = tokio::sync::mpsc::channel(100);
Expand Down Expand Up @@ -631,14 +631,16 @@ mod tests_in_gcp {
break;
}
let msg_id = &message.message.message_id;
// heavy task
tokio::time::sleep(Duration::from_secs(1)).await;
*msgs.entry(msg_id.clone()).or_insert(0) += 1;
message.ack().await.unwrap();
}
tracing::info!("finish subscriber");
msgs
});

tokio::time::sleep(Duration::from_secs(30)).await;
tokio::time::sleep(Duration::from_secs(60)).await;

// check redelivered messages
ctx.cancel();
Expand Down
20 changes: 16 additions & 4 deletions pubsub/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,21 @@ pub struct SubscriberConfig {
/// ping interval for Bi Directional Streaming
pub ping_interval: Duration,
pub retry_setting: Option<RetrySetting>,
/// It is important for exactly_once_delivery
/// The ack deadline to use for the stream. This must be provided in
/// the first request on the stream, but it can also be updated on subsequent
/// requests from client to server. The minimum deadline you can specify is 10
/// seconds. The maximum deadline you can specify is 600 seconds (10 minutes).
pub stream_ack_deadline_seconds: i32,
/// Flow control settings for the maximum number of outstanding messages. When
/// there are `max_outstanding_messages` or more currently sent to the
/// streaming pull client that have not yet been acked or nacked, the server
/// stops sending more messages. The sending of messages resumes once the
/// number of outstanding messages is less than this value. If the value is
/// <= 0, there is no limit to the number of outstanding messages. This
/// property can only be set on the initial StreamingPullRequest. If it is set
/// on a subsequent request, the stream will be aborted with status
/// `INVALID_ARGUMENT`.
pub max_outstanding_messages: i64,
pub max_outstanding_bytes: i64,
}
Expand All @@ -101,7 +115,7 @@ impl Default for SubscriberConfig {
ping_interval: std::time::Duration::from_secs(10),
retry_setting: Some(default_retry_setting()),
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_messages: 50,
max_outstanding_bytes: 1000 * 1000 * 1000,
}
}
Expand All @@ -119,10 +133,8 @@ impl Subscriber {
subscription: String,
client: SubscriberClient,
queue: async_channel::Sender<ReceivedMessage>,
opt: Option<SubscriberConfig>,
config: SubscriberConfig,
) -> Self {
let config = opt.unwrap_or_default();

let (ping_sender, ping_receiver) = async_channel::unbounded();

// ping request
Expand Down
38 changes: 25 additions & 13 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::{max, min};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -83,7 +84,7 @@ pub struct SubscriptionConfigToUpdate {
pub struct SubscribeConfig {
enable_multiple_subscriber: bool,
channel_capacity: Option<usize>,
subscriber_config: SubscriberConfig,
subscriber_config: Option<SubscriberConfig>,
}

impl SubscribeConfig {
Expand All @@ -92,7 +93,7 @@ impl SubscribeConfig {
self
}
pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
self.subscriber_config = v;
self.subscriber_config = Some(v);
self
}
pub fn with_channel_capacity(mut self, v: usize) -> Self {
Expand All @@ -105,14 +106,14 @@ impl SubscribeConfig {
pub struct ReceiveConfig {
pub worker_count: usize,
pub channel_capacity: Option<usize>,
pub subscriber_config: SubscriberConfig,
pub subscriber_config: Option<SubscriberConfig>,
}

impl Default for ReceiveConfig {
fn default() -> Self {
Self {
worker_count: 10,
subscriber_config: SubscriberConfig::default(),
subscriber_config: None,
channel_capacity: None,
}
}
Expand Down Expand Up @@ -385,6 +386,7 @@ impl Subscription {
let opt = opt.unwrap_or_default();
let (tx, rx) = create_channel(opt.channel_capacity);
let cancel = CancellationToken::new();
let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;

// spawn a separate subscriber task for each connection in the pool
let subscribers = if opt.enable_multiple_subscriber {
Expand All @@ -398,7 +400,7 @@ impl Subscription {
self.fqsn.clone(),
self.subc.clone(),
tx.clone(),
Some(opt.subscriber_config.clone()),
sub_opt.clone(),
);
}

Expand All @@ -420,9 +422,10 @@ impl Subscription {
let op = config.unwrap_or_default();
let mut receivers = Vec::with_capacity(op.worker_count);
let mut senders = Vec::with_capacity(receivers.len());
let sub_opt = self.unwrap_subscribe_config(op.subscriber_config).await?;

if self
.config(op.subscriber_config.retry_setting.clone())
.config(sub_opt.retry_setting.clone())
.await?
.1
.enable_message_ordering
Expand All @@ -444,13 +447,7 @@ impl Subscription {
let subscribers: Vec<Subscriber> = senders
.into_iter()
.map(|queue| {
Subscriber::start(
cancel.clone(),
self.fqsn.clone(),
self.subc.clone(),
queue,
Some(op.subscriber_config.clone()),
)
Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), queue, sub_opt.clone())
})
.collect();

Expand Down Expand Up @@ -602,6 +599,21 @@ impl Subscription {
let _ = self.subc.delete_snapshot(req, retry).await?;
Ok(())
}

async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
if let Some(cfg) = cfg {
return Ok(cfg);
}
let cfg = self.config(None).await?;
let mut default_cfg = SubscriberConfig {
stream_ack_deadline_seconds: max(min(cfg.1.ack_deadline_seconds, 600), 10),
..Default::default()
};
if cfg.1.enable_exactly_once_delivery {
default_cfg.max_outstanding_messages = 5;
}
Ok(default_cfg)
}
}

fn create_channel(
Expand Down
2 changes: 1 addition & 1 deletion spanner/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
})
}

pub(crate) fn set_call_options(&mut self, option: CallOptions) {
pub fn set_call_options(&mut self, option: CallOptions) {
self.reader_option = Some(option);
}

Expand Down

0 comments on commit 96a872d

Please sign in to comment.