Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change default subscriber config #242

Merged
merged 3 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pubsub/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ mod tests {
let config = ReceiveConfig {
worker_count: 2,
channel_capacity: None,
subscriber_config: SubscriberConfig {
subscriber_config: Some(SubscriberConfig {
ping_interval: Duration::from_secs(1),
..Default::default()
},
}),
};
let cancel_receiver = cancellation_token.clone();
let (s, mut r) = tokio::sync::mpsc::channel(100);
Expand Down Expand Up @@ -631,14 +631,16 @@ mod tests_in_gcp {
break;
}
let msg_id = &message.message.message_id;
// heavy task
tokio::time::sleep(Duration::from_secs(1)).await;
*msgs.entry(msg_id.clone()).or_insert(0) += 1;
message.ack().await.unwrap();
}
tracing::info!("finish subscriber");
msgs
});

tokio::time::sleep(Duration::from_secs(30)).await;
tokio::time::sleep(Duration::from_secs(60)).await;

// check redelivered messages
ctx.cancel();
Expand Down
20 changes: 16 additions & 4 deletions pubsub/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,21 @@ pub struct SubscriberConfig {
/// ping interval for Bi Directional Streaming
pub ping_interval: Duration,
pub retry_setting: Option<RetrySetting>,
/// It is important for exactly_once_delivery
/// The ack deadline to use for the stream. This must be provided in
/// the first request on the stream, but it can also be updated on subsequent
/// requests from client to server. The minimum deadline you can specify is 10
/// seconds. The maximum deadline you can specify is 600 seconds (10 minutes).
pub stream_ack_deadline_seconds: i32,
/// Flow control settings for the maximum number of outstanding messages. When
/// there are `max_outstanding_messages` or more currently sent to the
/// streaming pull client that have not yet been acked or nacked, the server
/// stops sending more messages. The sending of messages resumes once the
/// number of outstanding messages is less than this value. If the value is
/// <= 0, there is no limit to the number of outstanding messages. This
/// property can only be set on the initial StreamingPullRequest. If it is set
/// on a subsequent request, the stream will be aborted with status
/// `INVALID_ARGUMENT`.
pub max_outstanding_messages: i64,
pub max_outstanding_bytes: i64,
}
Expand All @@ -101,7 +115,7 @@ impl Default for SubscriberConfig {
ping_interval: std::time::Duration::from_secs(10),
retry_setting: Some(default_retry_setting()),
stream_ack_deadline_seconds: 60,
max_outstanding_messages: 1000,
max_outstanding_messages: 50,
max_outstanding_bytes: 1000 * 1000 * 1000,
}
}
Expand All @@ -119,10 +133,8 @@ impl Subscriber {
subscription: String,
client: SubscriberClient,
queue: async_channel::Sender<ReceivedMessage>,
opt: Option<SubscriberConfig>,
config: SubscriberConfig,
) -> Self {
let config = opt.unwrap_or_default();

let (ping_sender, ping_receiver) = async_channel::unbounded();

// ping request
Expand Down
38 changes: 25 additions & 13 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::{max, min};
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -83,7 +84,7 @@ pub struct SubscriptionConfigToUpdate {
pub struct SubscribeConfig {
enable_multiple_subscriber: bool,
channel_capacity: Option<usize>,
subscriber_config: SubscriberConfig,
subscriber_config: Option<SubscriberConfig>,
}

impl SubscribeConfig {
Expand All @@ -92,7 +93,7 @@ impl SubscribeConfig {
self
}
pub fn with_subscriber_config(mut self, v: SubscriberConfig) -> Self {
self.subscriber_config = v;
self.subscriber_config = Some(v);
self
}
pub fn with_channel_capacity(mut self, v: usize) -> Self {
Expand All @@ -105,14 +106,14 @@ impl SubscribeConfig {
pub struct ReceiveConfig {
pub worker_count: usize,
pub channel_capacity: Option<usize>,
pub subscriber_config: SubscriberConfig,
pub subscriber_config: Option<SubscriberConfig>,
}

impl Default for ReceiveConfig {
fn default() -> Self {
Self {
worker_count: 10,
subscriber_config: SubscriberConfig::default(),
subscriber_config: None,
channel_capacity: None,
}
}
Expand Down Expand Up @@ -385,6 +386,7 @@ impl Subscription {
let opt = opt.unwrap_or_default();
let (tx, rx) = create_channel(opt.channel_capacity);
let cancel = CancellationToken::new();
let sub_opt = self.unwrap_subscribe_config(opt.subscriber_config).await?;

// spawn a separate subscriber task for each connection in the pool
let subscribers = if opt.enable_multiple_subscriber {
Expand All @@ -398,7 +400,7 @@ impl Subscription {
self.fqsn.clone(),
self.subc.clone(),
tx.clone(),
Some(opt.subscriber_config.clone()),
sub_opt.clone(),
);
}

Expand All @@ -420,9 +422,10 @@ impl Subscription {
let op = config.unwrap_or_default();
let mut receivers = Vec::with_capacity(op.worker_count);
let mut senders = Vec::with_capacity(receivers.len());
let sub_opt = self.unwrap_subscribe_config(op.subscriber_config).await?;

if self
.config(op.subscriber_config.retry_setting.clone())
.config(sub_opt.retry_setting.clone())
.await?
.1
.enable_message_ordering
Expand All @@ -444,13 +447,7 @@ impl Subscription {
let subscribers: Vec<Subscriber> = senders
.into_iter()
.map(|queue| {
Subscriber::start(
cancel.clone(),
self.fqsn.clone(),
self.subc.clone(),
queue,
Some(op.subscriber_config.clone()),
)
Subscriber::start(cancel.clone(), self.fqsn.clone(), self.subc.clone(), queue, sub_opt.clone())
})
.collect();

Expand Down Expand Up @@ -602,6 +599,21 @@ impl Subscription {
let _ = self.subc.delete_snapshot(req, retry).await?;
Ok(())
}

async fn unwrap_subscribe_config(&self, cfg: Option<SubscriberConfig>) -> Result<SubscriberConfig, Status> {
if let Some(cfg) = cfg {
return Ok(cfg);
}
let cfg = self.config(None).await?;
let mut default_cfg = SubscriberConfig {
stream_ack_deadline_seconds: max(min(cfg.1.ack_deadline_seconds, 600), 10),
..Default::default()
};
if cfg.1.enable_exactly_once_delivery {
default_cfg.max_outstanding_messages = 5;
}
Ok(default_cfg)
}
}

fn create_channel(
Expand Down
2 changes: 1 addition & 1 deletion spanner/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
})
}

pub(crate) fn set_call_options(&mut self, option: CallOptions) {
pub fn set_call_options(&mut self, option: CallOptions) {
self.reader_option = Some(option);
}

Expand Down
Loading