Skip to content

Commit

Permalink
split connection for streaming_pull
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshidan committed Jul 31, 2024
1 parent 1f45886 commit cac61fd
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pubsub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "google-cloud-pubsub"
version = "0.28.0"
version = "0.28.1"
authors = ["yoshidan <naohiro.y@gmail.com>"]
edition = "2021"
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub"
Expand Down
21 changes: 16 additions & 5 deletions pubsub/src/apiv1/subscriber_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@ pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
#[derive(Clone, Debug)]
pub struct SubscriberClient {
cm: Arc<ConnectionManager>,
streaming_pull_cm: Arc<ConnectionManager>,
}

#[allow(dead_code)]
impl SubscriberClient {
/// create new Subscriber client
pub fn new(cm: ConnectionManager) -> SubscriberClient {
SubscriberClient { cm: Arc::new(cm) }
pub fn new(cm: ConnectionManager, streaming_pull_cm: ConnectionManager) -> SubscriberClient {
SubscriberClient {
cm: Arc::new(cm),
streaming_pull_cm: Arc::new(streaming_pull_cm),
}
}

#[inline]
Expand All @@ -49,8 +53,15 @@ impl SubscriberClient {
.max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
}

pub(crate) fn pool_size(&self) -> usize {
self.cm.num()
#[inline]
fn client_for_streaming_pull(&self) -> InternalSubscriberClient<Channel> {
InternalSubscriberClient::new(self.streaming_pull_cm.conn())
.max_decoding_message_size(PUBSUB_MESSAGE_LIMIT)
.max_encoding_message_size(PUBSUB_MESSAGE_LIMIT)
}

pub(crate) fn streaming_pool_size(&self) -> usize {
self.streaming_pull_cm.num()
}

/// create_subscription creates a subscription to a given topic. See the [resource name rules]
Expand Down Expand Up @@ -231,7 +242,7 @@ impl SubscriberClient {
retry: Option<RetrySetting>,
) -> Result<Response<Streaming<StreamingPullResponse>>, Status> {
let action = || async {
let mut client = self.client();
let mut client = self.client_for_streaming_pull();
let base_req = req.clone();
let rx = ping_receiver.clone();
let request = Box::pin(async_stream::stream! {
Expand Down
8 changes: 7 additions & 1 deletion pubsub/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,14 @@ impl Client {
&config.connection_option,
)
.await?,
ConnectionManager::new(
pool_size,
config.endpoint.as_str(),
&config.environment,
&config.connection_option,
)
.await?,
);

Ok(Self {
project_id: config.project_id.ok_or(Error::ProjectIdNotFound)?,
pubc,
Expand Down
2 changes: 1 addition & 1 deletion pubsub/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ mod tests {
.await
.unwrap()
};
let subc = SubscriberClient::new(cm().await);
let subc = SubscriberClient::new(cm().await, cm().await);
let pubc = PublisherClient::new(cm().await);

pubc.publish(
Expand Down
20 changes: 14 additions & 6 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ impl Subscription {
Self { fqsn, subc }
}

pub(crate) fn pool_size(&self) -> usize {
self.subc.pool_size()
pub(crate) fn streaming_pool_size(&self) -> usize {
self.subc.streaming_pool_size()
}

/// id returns the unique identifier of the subscription within its project.
Expand Down Expand Up @@ -459,7 +459,7 @@ impl Subscription {

// spawn a separate subscriber task for each connection in the pool
let subscribers = if opt.enable_multiple_subscriber {
self.pool_size()
self.streaming_pool_size()
} else {
1
};
Expand Down Expand Up @@ -649,8 +649,8 @@ impl Subscription {
/// - The message backlog on the subscription -- or to be specific, messages that are unacknowledged
/// at the time of the subscription's creation.
/// - All messages published to the subscription's topic after the snapshot's creation.
/// Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which
/// they are discarded and any messages being retained solely due to the snapshot dropped.
/// Snapshots have a finite lifetime -- a maximum of 7 days from the time of creation, beyond which
/// they are discarded and any messages being retained solely due to the snapshot dropped.
pub async fn create_snapshot(
&self,
name: &str,
Expand Down Expand Up @@ -740,7 +740,15 @@ mod tests {
)
.await
.unwrap();
let client = SubscriberClient::new(cm);
let cm2 = ConnectionManager::new(
4,
"",
&Environment::Emulator(EMULATOR.to_string()),
&ConnectionOptions::default(),
)
.await
.unwrap();
let client = SubscriberClient::new(cm, cm2);

let uuid = Uuid::new_v4().hyphenated().to_string();
let subscription_name = format!("projects/{}/subscriptions/s{}", PROJECT_NAME, &uuid);
Expand Down
5 changes: 4 additions & 1 deletion pubsub/src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,10 @@ mod tests {
let cm2 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
.await
.unwrap();
let subc = SubscriberClient::new(cm2);
let cm3 = ConnectionManager::new(4, "", &environment, &ConnectionOptions::default())
.await
.unwrap();
let subc = SubscriberClient::new(cm2, cm3);

let uuid = Uuid::new_v4().hyphenated().to_string();
let topic_name = format!("projects/local-project/topics/t{uuid}");
Expand Down

0 comments on commit cac61fd

Please sign in to comment.