Skip to content

Commit

Permalink
[pubsub] Fix ordering key error (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
yoshidan authored Feb 5, 2024
1 parent 0be1686 commit 4484492
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 15 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ jobs:
PUBSUB_EMULATOR_HOST: localhost:8681
RUSTFLAGS: "-A dead_code -A unused"
run: cargo test --release --all-features --manifest-path pubsub/Cargo.toml
- name: Setup gcloud
uses: google-github-actions/setup-gcloud@v0.6.0
with:
service_account_key: ${{ secrets.STORAGE_CREDENTIALS }}
export_default_credentials: true
- name: test_in_gcp
env:
RUSTFLAGS: "-A dead_code -A unused"
run: cargo test --release --all-features --manifest-path pubsub/Cargo.toml -- --ignored
spanner:
name: spanner
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion pubsub/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "google-cloud-pubsub"
version = "0.22.0"
version = "0.22.1"
authors = ["yoshidan <naohiro.y@gmail.com>"]
edition = "2021"
repository = "https://github.com/yoshidan/google-cloud-rust/tree/main/pubsub"
Expand Down
2 changes: 1 addition & 1 deletion pubsub/src/apiv1/subscriber_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub(crate) fn create_empty_streaming_pull_request() -> StreamingPullRequest {
}

#[derive(Clone, Debug)]
pub(crate) struct SubscriberClient {
pub struct SubscriberClient {
cm: Arc<ConnectionManager>,
}

Expand Down
124 changes: 121 additions & 3 deletions pubsub/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,13 @@ mod tests {
use std::thread;
use std::time::Duration;

use google_cloud_gax::conn::Environment;
use serial_test::serial;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use google_cloud_googleapis::pubsub::v1::PubsubMessage;

use crate::client::{Client, ClientConfig};
use crate::client::Client;
use crate::subscriber::SubscriberConfig;
use crate::subscription::{ReceiveConfig, SubscriptionConfig};

Expand Down Expand Up @@ -455,12 +454,131 @@ mod tests {
assert_eq!(1, subs_after.len() - subs.len());
assert_eq!(1, snapshots_after.len() - snapshots.len());
}
}

#[cfg(test)]
mod tests_in_gcp {
use crate::client::{Client, ClientConfig};
use crate::publisher::PublisherConfig;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use serial_test::serial;
use std::time::Duration;

fn make_msg(key: &str) -> PubsubMessage {
PubsubMessage {
data: if key.is_empty() {
"empty".into()
} else {
key.to_string().into()
},
ordering_key: key.into(),
..Default::default()
}
}

#[tokio::test]
#[ignore]
async fn test_with_auth() {
let config = ClientConfig::default().with_auth().await.unwrap();
if let Environment::GoogleCloud(_) = config.environment {
if let Environment::Emulator(_) = config.environment {
unreachable!()
}
}

#[tokio::test]
#[serial]
#[ignore]
async fn test_publish_ordering_in_gcp_flush_buffer() {
let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
.await
.unwrap();
let topic = client.topic("test-topic2");
let publisher = topic.new_publisher(Some(PublisherConfig {
flush_interval: Duration::from_secs(3),
workers: 3,
..Default::default()
}));

let mut awaiters = vec![];
for key in ["", "key1", "key2", "key3", "key3"] {
awaiters.push(publisher.publish(make_msg(key)).await);
}
for awaiter in awaiters.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}

// check same key
let mut awaiters = vec![];
for key in ["", "key1", "key2", "key3", "key3"] {
awaiters.push(publisher.publish(make_msg(key)).await);
}
for awaiter in awaiters.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}
}

#[tokio::test]
#[serial]
#[ignore]
async fn test_publish_ordering_in_gcp_limit_exceed() {
let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
.await
.unwrap();
let topic = client.topic("test-topic2");
let publisher = topic.new_publisher(Some(PublisherConfig {
flush_interval: Duration::from_secs(30),
workers: 1,
bundle_size: 8,
..Default::default()
}));

let mut awaiters = vec![];
for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
awaiters.push(publisher.publish(make_msg(key)).await);
}
for awaiter in awaiters.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}

// check same key twice
let mut awaiters = vec![];
for key in ["", "key1", "key2", "key3", "key1", "key2", "key3", ""] {
awaiters.push(publisher.publish(make_msg(key)).await);
}
for awaiter in awaiters.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}
}

#[tokio::test]
#[serial]
#[ignore]
async fn test_publish_ordering_in_gcp_bulk() {
let client = Client::new(ClientConfig::default().with_auth().await.unwrap())
.await
.unwrap();
let topic = client.topic("test-topic2");
let publisher = topic.new_publisher(Some(PublisherConfig {
flush_interval: Duration::from_secs(30),
workers: 2,
bundle_size: 8,
..Default::default()
}));

let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
.map(make_msg)
.to_vec();
for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}

// check same key twice
let msgs = ["", "", "key1", "key1", "key2", "key2", "key3", "key3"]
.map(make_msg)
.to_vec();
for awaiter in publisher.publish_bulk(msgs).await.into_iter() {
tracing::info!("msg id {}", awaiter.get().await.unwrap());
}
}
}
103 changes: 96 additions & 7 deletions pubsub/src/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};

use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -222,16 +224,19 @@ impl Tasks {
bundle_size: usize,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut bundle = Vec::<ReservedMessage>::with_capacity(bundle_size);
//TODO enable manage task by ordering_key
let mut bundle = MessageBundle::new();
while !receiver.is_closed() {
let result = match timeout(flush_interval, &mut receiver.recv()).await {
Ok(result) => result,
//timed out
Err(_e) => {
if !bundle.is_empty() {
tracing::trace!("elapsed: flush buffer : {}", topic);
Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await;
bundle = Vec::new();
for value in bundle.key_by() {
Self::flush(&mut client, topic.as_str(), value, retry.clone()).await;
}
bundle = MessageBundle::new();
}
continue;
}
Expand All @@ -243,9 +248,11 @@ impl Tasks {
Reserved::Multi(messages) => bundle.extend(messages),
}
if bundle.len() >= bundle_size {
tracing::trace!("maximum buffer {} : {}", bundle.len(), topic);
Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await;
bundle = Vec::new();
tracing::trace!("bundle size max: {}", topic);
for value in bundle.key_by() {
Self::flush(&mut client, topic.as_str(), value, retry.clone()).await;
}
bundle = MessageBundle::new();
}
}
//closed
Expand All @@ -256,7 +263,9 @@ impl Tasks {
tracing::trace!("stop publisher : {}", topic);
if !bundle.is_empty() {
tracing::trace!("flush rest buffer : {}", topic);
Self::flush(&mut client, topic.as_str(), bundle, retry.clone()).await;
for value in bundle.key_by() {
Self::flush(&mut client, topic.as_str(), value, retry.clone()).await;
}
}
})
}
Expand Down Expand Up @@ -314,3 +323,83 @@ impl Tasks {
}
}
}

struct MessageBundle {
inner: Vec<ReservedMessage>,
}

impl MessageBundle {
fn new() -> Self {
Self { inner: vec![] }
}

fn key_by(self) -> Vec<Vec<ReservedMessage>> {
let mut values = HashMap::<String, Vec<ReservedMessage>>::new();
for v in self.inner {
let key = v.message.ordering_key.to_string();
match values.get_mut(&key) {
Some(e) => {
e.push(v);
}
None => {
values.insert(key, vec![v]);
}
}
}
let mut result = Vec::with_capacity(values.len());
for (_, v) in values.into_iter() {
result.push(v);
}
result
}
}

impl Deref for MessageBundle {
type Target = Vec<ReservedMessage>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl DerefMut for MessageBundle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

#[cfg(test)]
mod tests {
use crate::publisher::{MessageBundle, ReservedMessage};
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use tokio::sync::oneshot;

fn msg(key: &str) -> ReservedMessage {
let (sender, _) = oneshot::channel();
ReservedMessage {
producer: sender,
message: PubsubMessage {
ordering_key: key.to_string(),
..Default::default()
},
}
}

#[test]
fn test_message_bundle_key_by() {
let mut bundle = MessageBundle::new();
for key in ["", "a", "b", "c", "A", "", "D", "a"] {
bundle.push(msg(key));
}
let msgs = bundle.key_by();
assert_eq!(6, msgs.len());
for msg in msgs {
let key = msg.first().unwrap().message.ordering_key.clone();
if key == "a" || key.is_empty() {
assert_eq!(2, msg.len());
} else {
assert_eq!(1, msg.len());
}
}
}
}
5 changes: 2 additions & 3 deletions pubsub/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ impl Subscription {
parts.join("/")
}

#[allow(private_interfaces)]
pub fn get_client(&self) -> SubscriberClient {
self.subc.clone()
}
Expand Down Expand Up @@ -1020,7 +1019,7 @@ mod tests {
ack_all(&messages).await;
assert_eq!(messages.len(), 1);

let message_publish_time = messages.get(0).unwrap().message.publish_time.to_owned().unwrap();
let message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();

// rewind to a timestamp where message was just published
subscription
Expand All @@ -1032,7 +1031,7 @@ mod tests {
let messages = subscription.pull(100, None).await.unwrap();
ack_all(&messages).await;
assert_eq!(messages.len(), 1);
let seek_message_publish_time = messages.get(0).unwrap().message.publish_time.to_owned().unwrap();
let seek_message_publish_time = messages.first().unwrap().message.publish_time.to_owned().unwrap();
assert_eq!(seek_message_publish_time, message_publish_time);

// cleanup
Expand Down

0 comments on commit 4484492

Please sign in to comment.