diff --git a/Cargo.lock b/Cargo.lock index 74be98825a..5538f3c940 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1419,6 +1419,7 @@ dependencies = [ "aws-config", "aws-sdk-dynamodb", "aws-sdk-s3", + "base64 0.21.7", "chrono", "clap", "comm-lib", diff --git a/services/blob/Cargo.toml b/services/blob/Cargo.toml index c29f7b392a..b9892087df 100644 --- a/services/blob/Cargo.toml +++ b/services/blob/Cargo.toml @@ -14,6 +14,7 @@ async-stream = { workspace = true } aws-config = { workspace = true } aws-sdk-dynamodb = { workspace = true } aws-sdk-s3 = { workspace = true } +base64 = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } comm-lib = { path = "../../shared/comm-lib", features = [ diff --git a/services/blob/src/http/handlers/blob.rs b/services/blob/src/http/handlers/blob.rs index 873560a35d..d0bedeabba 100644 --- a/services/blob/src/http/handlers/blob.rs +++ b/services/blob/src/http/handlers/blob.rs @@ -5,15 +5,17 @@ use crate::service::BlobService; use crate::validate_identifier; use actix_web::error::{ErrorBadRequest, ErrorRangeNotSatisfiable}; +use actix_web::web::Bytes; use actix_web::{ http::header::{ByteRangeSpec, Range}, web, HttpResponse, }; use async_stream::try_stream; +use base64::Engine; use comm_lib::http::multipart; use serde::{Deserialize, Serialize}; use tokio_stream::StreamExt; -use tracing::{info, instrument, trace, warn}; +use tracing::{debug, info, instrument, trace, warn}; use tracing_futures::Instrument; /// Returns a tuple of first and last byte number (inclusive) represented by given range header. @@ -162,21 +164,43 @@ pub async fn upload_blob_handler( return Err(ErrorBadRequest("Bad request")); } validate_identifier!(blob_hash); - tracing::Span::current().record("blob_hash", &blob_hash); trace!("Receiving blob data"); let stream = try_stream! { while let Some(mut field) = payload.try_next().await? { let field_name = field.name(); + + if field_name == "base64_data" { + trace!("Got base64_data"); + + let mut buf = Vec::new(); + while let Some(chunk) = field.try_next().await? { + buf.extend_from_slice(&chunk); + } + + let base64_string = String::from_utf8(buf) + .map_err(|err| actix_web::error::ParseError::Utf8(err.utf8_error()))?; + + let data = base64::engine::general_purpose::STANDARD + .decode(&base64_string) + .map_err(|err| { + debug!("Invalid base64 payload: {err:?}"); + ErrorBadRequest("Invalid base64") + })?; + yield Bytes::from(data); + return; + } + if field_name != "blob_data" { warn!( field_name, - "Malfolmed request: 'blob_data' multipart field expected." + "Malformed request: 'blob_data' or 'base64_data' multipart field expected." ); Err(ErrorBadRequest("Bad request"))?; } + trace!("Got blob_data. Streaming..."); while let Some(chunk) = field.try_next().await? { yield chunk; }