Skip to content

Commit

Permalink
add support for bigquery AppendRows storage write API (#95)
Browse files Browse the repository at this point in the history
* WIP: add storage API

* fix clippy warnings

* add support for AppendRows api

* add append_rows example

---------

Co-authored-by: Laurent Quérel <laurent.querel@gmail.com>
  • Loading branch information
imor and lquerel committed Jul 7, 2024
1 parent 5dd0052 commit d79313d
Show file tree
Hide file tree
Showing 12 changed files with 4,560 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "googleapis"]
path = googleapis
url = https://github.com/googleapis/googleapis.git
27 changes: 23 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,43 @@ bq_load_job = ["cloud-storage"]
[dependencies]
yup-oauth2 = { version = "11", default-features = false, features = ["hyper-rustls", "service-account"] }
hyper = { version = "1.3.1", features = ["http1"] }
hyper-util = { version = "0.1", default-features = false, features = ["client-legacy"] }
hyper-util = { version = "0.1", default-features = false, features = [
"client-legacy",
] }
thiserror = "1.0.59"
tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "net", "sync", "macros"] }
tokio = { version = "1.38.0", default-features = false, features = [
"rt-multi-thread",
"net",
"sync",
"macros",
] }
tokio-stream = "0.1.15"
async-stream = "0.3.5"
reqwest = { version = "0.12.4", default-features = false, features = ["json"] }
url = "2.5.0"
serde = "1.0.203"
serde_json = "1.0.117"
log = "0.4.21"
time = { version = "0.3.36", features = ["local-offset", "serde", "serde-well-known"] }
cloud-storage = {version="0.11.1", features = ["global-client"], optional = true}
time = { version = "0.3.36", features = [
"local-offset",
"serde",
"serde-well-known",
] }
cloud-storage = { version = "0.11.1", features = [
"global-client",
], optional = true }
async-trait = "0.1.80"
dyn-clone = "1.0.17"
prost = "0.12.6"
prost-types = "0.12.6"
tonic = { version = "0.11.0", features = ["transport", "tls", "tls-roots"] }

[dev-dependencies]
tokio-test = "0.4.4"
rand = "0.8.5"
wiremock = "0.6.0"
tempfile = "3.10.1"
fake = "2.9.2"

[build-dependencies]
tonic-build = "0.11.0"
10 changes: 10 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_transport(false)
.out_dir("src/google")
.compile(
&["googleapis/google/cloud/bigquery/storage/v1/storage.proto"],
&["googleapis"],
)?;
Ok(())
}
81 changes: 81 additions & 0 deletions examples/append_rows.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use gcp_bigquery_client::{
env_vars,
storage::{ColumnType, FieldDescriptor, StreamName, TableDescriptor},
};
use prost::Message;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (ref project_id, ref dataset_id, ref table_id, ref gcp_sa_key) = env_vars();

let mut client = gcp_bigquery_client::Client::from_service_account_key_file(gcp_sa_key).await?;

let field_descriptors = vec![
FieldDescriptor {
name: "actor_id".to_string(),
number: 1,
typ: ColumnType::Int64,
},
FieldDescriptor {
name: "first_name".to_string(),
number: 2,
typ: ColumnType::String,
},
FieldDescriptor {
name: "last_name".to_string(),
number: 3,
typ: ColumnType::String,
},
FieldDescriptor {
name: "last_update".to_string(),
number: 4,
typ: ColumnType::Timestamp,
},
];
let table_descriptor = TableDescriptor { field_descriptors };

#[derive(Clone, PartialEq, Message)]
struct Actor {
#[prost(int32, tag = "1")]
actor_id: i32,

#[prost(string, tag = "2")]
first_name: String,

#[prost(string, tag = "3")]
last_name: String,

#[prost(string, tag = "4")]
last_update: String,
}

let actor1 = Actor {
actor_id: 1,
first_name: "John".to_string(),
last_name: "Doe".to_string(),
last_update: "2007-02-15 09:34:33 UTC".to_string(),
};

let actor2 = Actor {
actor_id: 2,
first_name: "Jane".to_string(),
last_name: "Doe".to_string(),
last_update: "2008-02-15 09:34:33 UTC".to_string(),
};

let stream_name = StreamName::new_default(project_id.clone(), dataset_id.clone(), table_id.clone());
let trace_id = "test_client".to_string();

let mut streaming = client
.storage_mut()
.append_rows(&stream_name, &table_descriptor, &[actor1, actor2], trace_id)
.await?;

while let Some(resp) = streaming.next().await {
let resp = resp?;
println!("response: {resp:#?}");
}

Ok(())
}
1 change: 1 addition & 0 deletions googleapis
Submodule googleapis added at 57c6d7
18 changes: 9 additions & 9 deletions src/client_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ impl ClientBuilder {
self
}

pub fn build_from_authenticator(&self, auth: Arc<dyn Authenticator>) -> Client {
let mut client = Client::from_authenticator(auth);
pub async fn build_from_authenticator(&self, auth: Arc<dyn Authenticator>) -> Result<Client, BQError> {
let mut client = Client::from_authenticator(auth).await?;
client.v2_base_url(self.v2_base_url.clone());
client
Ok(client)
}

pub async fn build_from_service_account_key(
Expand All @@ -51,14 +51,14 @@ impl ClientBuilder {
};
let sa_auth = ServiceAccountAuthenticator::from_service_account_key(sa_key, &[&scope]).await?;

Ok(self.build_from_authenticator(sa_auth))
self.build_from_authenticator(sa_auth).await
}

pub async fn build_from_service_account_key_file(&self, sa_key_file: &str) -> Result<Client, BQError> {
let scopes = vec![self.auth_base_url.as_str()];
let sa_auth = service_account_authenticator(scopes, sa_key_file).await?;

Ok(self.build_from_authenticator(sa_auth))
self.build_from_authenticator(sa_auth).await
}

pub async fn build_with_workload_identity(&self, readonly: bool) -> Result<Client, BQError> {
Expand All @@ -70,7 +70,7 @@ impl ClientBuilder {

let sa_auth = ServiceAccountAuthenticator::with_workload_identity(&[&scope]).await?;

Ok(self.build_from_authenticator(sa_auth))
self.build_from_authenticator(sa_auth).await
}

pub async fn build_from_installed_flow_authenticator<S: AsRef<[u8]>, P: Into<PathBuf>>(
Expand All @@ -81,7 +81,7 @@ impl ClientBuilder {
let scopes = vec![self.auth_base_url.as_str()];
let auth = installed_flow_authenticator(secret, &scopes, persistant_file_path).await?;

let mut client = Client::from_authenticator(auth);
let mut client = Client::from_authenticator(auth).await?;
client.v2_base_url(self.v2_base_url.clone());
Ok(client)
}
Expand All @@ -104,7 +104,7 @@ impl ClientBuilder {
let scopes = vec![self.auth_base_url.as_str()];
let auth = application_default_credentials_authenticator(&scopes).await?;

let mut client = Client::from_authenticator(auth);
let mut client = Client::from_authenticator(auth).await?;
client.v2_base_url(self.v2_base_url.clone());
Ok(client)
}
Expand All @@ -116,7 +116,7 @@ impl ClientBuilder {
let scopes = vec![self.auth_base_url.as_str()];
let auth = authorized_user_authenticator(authorized_user_secret_path, &scopes).await?;

let mut client = Client::from_authenticator(auth);
let mut client = Client::from_authenticator(auth).await?;
client.v2_base_url(self.v2_base_url.clone());
Ok(client)
}
Expand Down
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use std::collections::HashMap;

use tonic::{metadata::errors::InvalidMetadataValue, Status};

#[allow(clippy::upper_case_acronyms)]
#[derive(thiserror::Error, Debug)]
pub enum BQError {
Expand Down Expand Up @@ -53,6 +55,15 @@ pub enum BQError {

#[error("Json serialization error (error: {0})")]
SerializationError(#[from] serde_json::Error),

#[error("Tonic transport error (error: {0}")]
TonicTransportError(#[from] tonic::transport::Error),

#[error("Tonic invalid metadata value error (error: {0}")]
TonicInvalidMetadataValueError(#[from] InvalidMetadataValue),

#[error("Tonic status error (error: {0}")]
TonicStatusError(#[from] Status),
}

#[derive(Debug, Deserialize)]
Expand Down
Loading

0 comments on commit d79313d

Please sign in to comment.