diff --git a/Cargo.lock b/Cargo.lock index e880947fcd4a..98e4a27390e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1852,14 +1852,38 @@ dependencies = [ "syn", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.14.3", + "darling_macro 0.14.3", +] + +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", ] [[package]] @@ -1876,13 +1900,24 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn", +] + [[package]] name = "darling_macro" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685" dependencies = [ - "darling_core", + "darling_core 0.14.3", "quote", "syn", ] @@ -1915,6 +1950,12 @@ dependencies = [ "parking_lot_core 0.9.7", ] +[[package]] +name = "data-url" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d7439c3735f405729d52c3fbbe4de140eaf938a1fe47d227c27f8254d4302a5" + [[package]] name = "deadpool" version = "0.9.5" @@ -1980,7 +2021,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn", @@ -2586,8 +2627,10 @@ version = "0.2.7" source = "git+https://github.com/madsim-rs/getrandom.git?rev=cc95ee3#cc95ee36a2ae473edb01fcdcf34da3f2dcfc4b2f" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -3565,7 +3608,7 @@ dependencies = [ "http", "madsim", "serde", - "serde_with", + "serde_with 2.2.0", "spin 0.9.5", "thiserror", "tokio", @@ -3580,7 +3623,7 @@ version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3d248e97b1a48826a12c3828d921e8548e714394bf17274dd0a93910dc946e1" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn", @@ -4129,6 +4172,26 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" +[[package]] +name = "oauth2" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeaf26a72311c087f8c5ba617c96fac67a5c04f430e716ac8d8ab2de62e23368" +dependencies = [ + "base64 0.13.1", + "chrono", + "getrandom 0.2.7", + "http", + "rand 0.8.5", + "reqwest", + "serde", + "serde_json", + "serde_path_to_error", + "sha2", + "thiserror", + "url", +] + [[package]] name = "object" version = "0.30.3" @@ -4183,6 +4246,33 @@ dependencies = [ "uuid", ] +[[package]] +name = "openidconnect" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a0f47b0f1499d08c4a8480c963d49c5ec77f4249c2b6869780979415f45809" +dependencies = [ + "base64 0.13.1", + "chrono", + "http", + "itertools", + "log", + "num-bigint", + "oauth2", + "rand 0.8.5", + "ring", + "serde", + "serde-value", + "serde_derive", + "serde_json", + "serde_path_to_error", + "serde_plain", + "serde_with 1.14.0", + "subtle", + "thiserror", + "url", +] + [[package]] name = "openssl" version = "0.10.45" @@ -4308,6 +4398,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -5142,21 +5241,27 @@ dependencies = [ "bytes", "chrono", "crc 3.0.1", + "data-url", "futures", "futures-io", "futures-timer", "log", "native-tls", "nom", + "oauth2", + "openidconnect", "pem", "prost 0.11.8", "prost-build", "prost-derive 0.11.8", "rand 0.8.5", "regex", + "serde", + "serde_json", "tokio", "tokio-native-tls", "tokio-util", + "tracing", "url", "uuid", ] @@ -5478,6 +5583,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "winreg", ] @@ -5532,7 +5638,7 @@ dependencies = [ "regex", "serde", "serde_json", - "serde_with", + "serde_with 2.2.0", "serde_yaml", "tempfile", "workspace-hack", @@ -5923,7 +6029,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "serde_with", + "serde_with 2.2.0", "simd-json", "tempfile", "thiserror", @@ -6238,7 +6344,7 @@ dependencies = [ "risingwave_frontend", "risingwave_sqlparser", "serde", - "serde_with", + "serde_with 2.2.0", "serde_yaml", "tempfile", "walkdir", @@ -6819,6 +6925,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float 2.10.0", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.152" @@ -6850,6 +6966,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_plain" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6018081315db179d0ce57b1fe4b62a12a0028c9cf9bbef868c9cf477b3c34ae" +dependencies = [ + "serde", +] + [[package]] name = "serde_qs" version = "0.8.5" @@ -6882,6 +7007,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros 1.5.2", +] + [[package]] name = "serde_with" version = "2.2.0" @@ -6894,17 +7029,29 @@ dependencies = [ "indexmap", "serde", "serde_json", - "serde_with_macros", + "serde_with_macros 2.2.0", "time 0.3.17", ] +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling 0.13.4", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_with_macros" version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1966009f3c05f095697c537312f5415d1e3ed31ce0a56942bac4c771c5c335e" dependencies = [ - "darling", + "darling 0.14.3", "proc-macro2", "quote", "syn", @@ -7476,7 +7623,7 @@ dependencies = [ "byteorder", "integer-encoding", "log", - "ordered-float", + "ordered-float 1.1.1", "threadpool", ] @@ -8645,6 +8792,7 @@ dependencies = [ "smallvec", "socket2", "strum", + "subtle", "syn", "time 0.3.17", "tokio", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 2ac67f4d68c1..3643f351d555 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -47,7 +47,7 @@ prometheus = { version = "0.13", features = ["process"] } prost = { version = "0.11.0", features = ["no-recursion-limit"] } prost-reflect = "0.9.2" protobuf-native = "0.2.1" -pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime"] } +pulsar = { version = "4.2", default-features = false, features = ["tokio-runtime", "telemetry", "auth-oauth2"] } rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build", "ssl-vendored", "gssapi"] } reqwest = { version = "0.11", features = ["json"] } risingwave_common = { path = "../common" } diff --git a/src/connector/src/source/pulsar/admin/client.rs b/src/connector/src/source/pulsar/admin/client.rs deleted file mode 100644 index ba024efdb962..000000000000 --- a/src/connector/src/source/pulsar/admin/client.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use anyhow::{anyhow, bail, Result}; -use http::{Response, StatusCode}; -use hyper::body::Buf; -use hyper::{Body, Client, Uri}; -use hyper_tls::HttpsConnector; -use serde_derive::{Deserialize, Serialize}; - -use crate::source::pulsar::topic::Topic; - -#[derive(Debug, Default)] -pub struct PulsarAdminClient { - pub(crate) base_path: String, - pub(crate) auth_token: Option, -} - -impl PulsarAdminClient { - pub fn new(base_path: String, auth_token: Option) -> Self { - Self { - base_path: base_path.trim_end_matches('/').to_string(), - auth_token, - } - } -} - -impl PulsarAdminClient { - pub async fn get_last_message_id(&self, topic: &Topic) -> Result { - self.get(topic, "lastMessageId").await - } - - pub async fn get_topic_metadata(&self, topic: &Topic) -> Result { - let res = self.http_get(topic, "partitions").await?; - - if res.status() == StatusCode::NOT_FOUND { - bail!( - "could not find metadata for pulsar topic {}", - topic.to_string() - ); - } - - let body = hyper::body::aggregate(res).await?; - serde_json::from_reader(body.reader()).map_err(|e| anyhow!(e)) - } - - pub async fn http_get(&self, topic: &Topic, api: &str) -> Result> { - let client = Client::builder().build::<_, hyper::Body>(HttpsConnector::new()); - - let url = format!( - "{}/{}/{}/{}", - self.base_path, - "admin/v2", - topic.rest_path(), - api - ); - let mut req = hyper::Request::builder() - .method("GET") - .uri(url.parse::()?) - .body(Body::empty()) - .unwrap(); - - if let Some(auth_token) = &self.auth_token { - req.headers_mut() - .insert("Authorization", auth_token.to_string().parse().unwrap()); - } - - client.request(req).await.map_err(|e| anyhow!(e)) - } - - pub async fn get(&self, topic: &Topic, api: &str) -> Result - where - T: for<'a> serde::Deserialize<'a>, - { - let res = self.http_get(topic, api).await?; - let body = hyper::body::aggregate(res).await?; - let result: T = serde_json::from_reader(body.reader())?; - Ok(result) - } -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct LastMessageId { - pub ledger_id: i64, - pub entry_id: i64, - pub partition_index: i64, - pub batch_index: Option, - pub batch_size: Option, - pub acker: Option, - pub outstanding_acks_in_same_batch: Option, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct LastMessageIdAcker { - pub batch_size: Option, - pub prev_batch_cumulatively_acked: Option, - pub outstanding_acks: Option, - pub bit_set_size: Option, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PartitionedTopicMetadata { - pub partitions: i64, -} - -#[cfg(test)] -mod test { - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::source::pulsar::admin::client::PulsarAdminClient; - use crate::source::pulsar::topic::parse_topic; - - async fn mock_server(web_path: &str, body: &str) -> MockServer { - let mock_server = MockServer::start().await; - use wiremock::matchers::{method, path}; - - let response = ResponseTemplate::new(200) - .set_body_string(body) - .append_header("content-type", "application/json"); - - Mock::given(method("GET")) - .and(path(web_path)) - .respond_with(response) - .mount(&mock_server) - .await; - - mock_server - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_get_topic_metadata() { - let server = mock_server( - "/admin/v2/persistent/public/default/t2/partitions", - "{\"partitions\":3}", - ) - .await; - - let client = PulsarAdminClient::new(server.uri(), None); - - let topic = parse_topic("public/default/t2").unwrap(); - - let meta = client.get_topic_metadata(&topic).await.unwrap(); - - assert_eq!(meta.partitions, 3); - } -} diff --git a/src/connector/src/source/pulsar/admin/mod.rs b/src/connector/src/source/pulsar/admin/mod.rs deleted file mode 100644 index ea9e0a396a09..000000000000 --- a/src/connector/src/source/pulsar/admin/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use client::*; - -mod client; diff --git a/src/connector/src/source/pulsar/enumerator/client.rs b/src/connector/src/source/pulsar/enumerator/client.rs index 49f2dad2a3a6..cdafd49b9016 100644 --- a/src/connector/src/source/pulsar/enumerator/client.rs +++ b/src/connector/src/source/pulsar/enumerator/client.rs @@ -15,16 +15,16 @@ use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use itertools::Itertools; +use pulsar::{Pulsar, TokioExecutor}; use serde::{Deserialize, Serialize}; -use crate::source::pulsar::admin::PulsarAdminClient; use crate::source::pulsar::split::PulsarSplit; use crate::source::pulsar::topic::{parse_topic, Topic}; use crate::source::pulsar::PulsarProperties; use crate::source::SplitEnumerator; pub struct PulsarSplitEnumerator { - admin_client: PulsarAdminClient, + client: Pulsar, topic: Topic, start_offset: PulsarEnumeratorOffset, } @@ -43,8 +43,8 @@ impl SplitEnumerator for PulsarSplitEnumerator { type Split = PulsarSplit; async fn new(properties: PulsarProperties) -> Result { + let pulsar = properties.build_pulsar_client().await?; let topic = properties.topic; - let admin_url = properties.admin_url; let parsed_topic = parse_topic(&topic)?; let mut scan_start_offset = match properties @@ -68,7 +68,7 @@ impl SplitEnumerator for PulsarSplitEnumerator { } Ok(PulsarSplitEnumerator { - admin_client: PulsarAdminClient::new(admin_url, properties.auth_token), + client: pulsar, topic: parsed_topic, start_offset: scan_start_offset, }) @@ -79,19 +79,15 @@ impl SplitEnumerator for PulsarSplitEnumerator { // MessageId is only used when recovering from a State assert!(!matches!(offset, PulsarEnumeratorOffset::MessageId(_))); - let topic_metadata = self.admin_client.get_topic_metadata(&self.topic).await?; - // note: may check topic exists by get stats - if topic_metadata.partitions < 0 { - bail!( - "illegal metadata {:?} for pulsar topic {}", - topic_metadata.partitions, - self.topic.to_string() - ); - } + let topic_partitions = self + .client + .lookup_partitioned_topic_number(&self.topic.to_string()) + .await + .map_err(|e| anyhow!(e))?; - let splits = if topic_metadata.partitions > 0 { + let splits = if topic_partitions > 0 { // partitioned topic - (0..topic_metadata.partitions as i32) + (0..topic_partitions as i32) .map(|p| PulsarSplit { topic: self.topic.sub_topic(p).unwrap(), start_offset: offset.clone(), @@ -108,117 +104,3 @@ impl SplitEnumerator for PulsarSplitEnumerator { Ok(splits) } } - -#[cfg(test)] -mod test { - use wiremock::{Mock, MockServer, ResponseTemplate}; - - use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties, PulsarSplitEnumerator}; - use crate::source::SplitEnumerator; - - async fn empty_mock_server() -> MockServer { - MockServer::start().await - } - - pub async fn mock_server(web_path: &str, body: &str) -> MockServer { - let mock_server = MockServer::start().await; - use wiremock::matchers::{method, path}; - - let response = ResponseTemplate::new(200) - .set_body_string(body) - .append_header("content-type", "application/json"); - - Mock::given(method("GET")) - .and(path(web_path)) - .respond_with(response) - .mount(&mock_server) - .await; - - mock_server - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_on_no_existing_pulsar() { - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: "http://test_illegal_url:8000".to_string(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - assert!(enumerator.list_splits().await.is_err()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_on_no_existing_topic() { - let server = empty_mock_server().await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - assert!(enumerator.list_splits().await.is_err()); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_with_partitioned_topic() { - let server = mock_server( - "/admin/v2/persistent/public/default/t/partitions", - "{\"partitions\":3}", - ) - .await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - - let splits = enumerator.list_splits().await.unwrap(); - assert_eq!(splits.len(), 3); - - (0..3).for_each(|i| { - assert_eq!(splits[i].start_offset, PulsarEnumeratorOffset::Earliest); - assert_eq!(splits[i].topic.partition_index, Some(i as i32)); - }); - } - - #[tokio::test] - #[cfg_attr(madsim, ignore)] // MockServer is not supported in simulation. - async fn test_list_splits_with_non_partitioned_topic() { - let server = mock_server( - "/admin/v2/persistent/public/default/t/partitions", - "{\"partitions\":0}", - ) - .await; - - let prop = PulsarProperties { - topic: "t".to_string(), - admin_url: server.uri(), - service_url: "pulsar://localhost:6650".to_string(), - scan_startup_mode: Some("earliest".to_string()), - time_offset: None, - auth_token: None, - }; - let mut enumerator = PulsarSplitEnumerator::new(prop).await.unwrap(); - - let splits = enumerator.list_splits().await.unwrap(); - assert_eq!(splits.len(), 1); - assert_eq!(splits[0].start_offset, PulsarEnumeratorOffset::Earliest); - assert_eq!(splits[0].topic.partition_index, None); - } -} diff --git a/src/connector/src/source/pulsar/mod.rs b/src/connector/src/source/pulsar/mod.rs index 92233686901f..0c055442a978 100644 --- a/src/connector/src/source/pulsar/mod.rs +++ b/src/connector/src/source/pulsar/mod.rs @@ -12,26 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod admin; pub mod enumerator; pub mod source; pub mod split; pub mod topic; +use anyhow::{anyhow, Result}; pub use enumerator::*; +use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params}; +use pulsar::{Authentication, Pulsar, TokioExecutor}; use serde::Deserialize; pub use split::*; +use url::Url; pub const PULSAR_CONNECTOR: &str = "pulsar"; +#[derive(Clone, Debug, Deserialize)] +pub struct PulsarOauth { + #[serde(rename = "oauth.issuer.url")] + pub issuer_url: String, + + #[serde(rename = "oauth.credentials.url")] + pub credentials_url: String, + + #[serde(rename = "oauth.audience")] + pub audience: String, + + #[serde(rename = "oauth.scope")] + pub scope: Option, + // #[serde(flatten)] + // pub s3_cridentials: Option<>, +} + #[derive(Clone, Debug, Deserialize)] pub struct PulsarProperties { #[serde(rename = "topic", alias = "pulsar.topic")] pub topic: String, - #[serde(rename = "admin.url", alias = "pulsar.admin.url")] - pub admin_url: String, - #[serde(rename = "service.url", alias = "pulsar.service.url")] pub service_url: String, @@ -43,4 +60,36 @@ pub struct PulsarProperties { #[serde(rename = "auth.token")] pub auth_token: Option, + + #[serde(flatten)] + pub oauth: Option, +} + +impl PulsarProperties { + pub async fn build_pulsar_client(&self) -> Result> { + let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor); + if let Some(oauth) = &self.oauth { + let url = Url::parse(&oauth.credentials_url)?; + if url.scheme() == "s3" { + todo!("s3 oauth credentials not supported yet"); + } + + let auth_params = OAuth2Params { + issuer_url: oauth.issuer_url.clone(), + credentials_url: oauth.credentials_url.clone(), + audience: Some(oauth.audience.clone()), + scope: oauth.scope.clone(), + }; + + pulsar_builder = pulsar_builder + .with_auth_provider(OAuth2Authentication::client_credentials(auth_params)); + } else if let Some(auth_token) = &self.auth_token { + pulsar_builder = pulsar_builder.with_auth(Authentication { + name: "token".to_string(), + data: Vec::from(auth_token.as_str()), + }); + } + + pulsar_builder.build().await.map_err(|e| anyhow!(e)) + } } diff --git a/src/connector/src/source/pulsar/source/reader.rs b/src/connector/src/source/pulsar/source/reader.rs index 25eab2daaadc..22d89516f7d3 100644 --- a/src/connector/src/source/pulsar/source/reader.rs +++ b/src/connector/src/source/pulsar/source/reader.rs @@ -21,9 +21,7 @@ use futures_async_stream::try_stream; use itertools::Itertools; use pulsar::consumer::InitialPosition; use pulsar::message::proto::MessageIdData; -use pulsar::{ - Authentication, Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor, -}; +use pulsar::{Consumer, ConsumerBuilder, ConsumerOptions, Pulsar, SubType, TokioExecutor}; use risingwave_common::try_match_expand; use crate::impl_common_split_reader_logic; @@ -102,25 +100,14 @@ impl SplitReader for PulsarSplitReader { ) -> Result { ensure!(splits.len() == 1, "only support single split"); let split = try_match_expand!(splits.into_iter().next().unwrap(), SplitImpl::Pulsar)?; - - let service_url = &props.service_url; + let pulsar = props.build_pulsar_client().await?; let topic = split.topic.to_string(); tracing::debug!("creating consumer for pulsar split topic {}", topic,); - let mut pulsar_builder = Pulsar::builder(service_url, TokioExecutor); - if let Some(auth_token) = props.auth_token { - pulsar_builder = pulsar_builder.with_auth(Authentication { - name: "token".to_string(), - data: Vec::from(auth_token), - }); - } - - let pulsar = pulsar_builder.build().await.map_err(|e| anyhow!(e))?; - let builder: ConsumerBuilder = pulsar .consumer() - .with_topic(topic) + .with_topic(&topic) .with_subscription_type(SubType::Exclusive) .with_subscription(format!( "consumer-{}", @@ -131,17 +118,35 @@ impl SplitReader for PulsarSplitReader { )); let builder = match split.start_offset.clone() { - PulsarEnumeratorOffset::Earliest => builder.with_options( - ConsumerOptions::default().with_initial_position(InitialPosition::Earliest), - ), + PulsarEnumeratorOffset::Earliest => { + if topic.starts_with("non-persistent://") { + tracing::warn!("Earliest offset is not supported for non-persistent topic, use Latest instead"); + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Latest), + ) + } else { + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Earliest), + ) + } + } PulsarEnumeratorOffset::Latest => builder.with_options( ConsumerOptions::default().with_initial_position(InitialPosition::Latest), ), - PulsarEnumeratorOffset::MessageId(m) => builder.with_options(pulsar::ConsumerOptions { - durable: Some(false), - start_message_id: parse_message_id(m.as_str()).ok(), - ..Default::default() - }), + PulsarEnumeratorOffset::MessageId(m) => { + if topic.starts_with("non-persistent://") { + tracing::warn!("MessageId offset is not supported for non-persistent topic, use Latest instead"); + builder.with_options( + ConsumerOptions::default().with_initial_position(InitialPosition::Latest), + ) + } else { + builder.with_options(pulsar::ConsumerOptions { + durable: Some(false), + start_message_id: parse_message_id(m.as_str()).ok(), + ..Default::default() + }) + } + } PulsarEnumeratorOffset::Timestamp(_) => builder, }; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index e9eebe194449..ecd4ff11bc05 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -27,7 +27,7 @@ aws-smithy-client = { version = "0.51", default-features = false, features = ["n aws-types = { version = "0.51", default-features = false, features = ["hardcoded-credentials"] } base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -82,13 +82,14 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] } tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710", features = ["net"] } @@ -118,7 +119,7 @@ aws-types = { version = "0.51", default-features = false, features = ["hardcoded base64 = { version = "0.21" } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } -chrono = { version = "0.4" } +chrono = { version = "0.4", features = ["serde"] } clap = { version = "4", features = ["derive", "env"] } combine = { version = "4" } criterion = { version = "0.4", features = ["async_futures", "async_tokio"] } @@ -174,13 +175,14 @@ rand_chacha = { version = "0.3" } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-syntax = { version = "0.6" } -reqwest = { version = "0.11", features = ["json"] } +reqwest = { version = "0.11", features = ["blocking", "json", "rustls-tls"] } ring = { version = "0.16", features = ["std"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } smallvec = { version = "1", default-features = false, features = ["serde"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } strum = { version = "0.24", features = ["derive"] } +subtle = { version = "2" } syn = { version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "parking_lot", "process", "rt-multi-thread", "signal", "stats", "sync", "time", "tracing"] }