diff --git a/src/storage/database.rs b/src/storage/database.rs index b398ab974..3e8c7bb83 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -121,7 +121,7 @@ impl DatabaseBackend { } } - pub(super) fn store_batch(&mut self, batch: Vec) -> Result<()> { + pub(super) fn store_batch(&self, batch: Vec) -> Result<()> { let mut conn = self.pool.get()?; for blob in batch { let compression = blob.compression.map(|alg| alg as i32); @@ -137,7 +137,7 @@ impl DatabaseBackend { Ok(()) } - pub(crate) fn delete_prefix(&mut self, prefix: &str) -> Result<()> { + pub(crate) fn delete_prefix(&self, prefix: &str) -> Result<()> { self.pool.get()?.execute( "DELETE FROM files WHERE path LIKE $1;", &[&format!("{}%", prefix.replace('%', "\\%"))], diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e308faf6c..ed8f96f34 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,7 +9,6 @@ use self::database::DatabaseBackend; use self::s3::S3Backend; use self::sqlite_pool::SqliteConnectionPool; use crate::error::Result; -use crate::utils::spawn_blocking; use crate::web::metrics::RenderingTimesRecorder; use crate::{db::Pool, Config, InstanceMetrics}; use anyhow::{anyhow, ensure}; @@ -30,8 +29,6 @@ use std::{ use tokio::runtime::Runtime; use tracing::{error, instrument, trace}; -const MAX_CONCURRENT_UPLOADS: usize = 1000; - type FileRange = RangeInclusive; #[derive(Debug, thiserror::Error)] @@ -155,13 +152,7 @@ impl Storage { } pub(crate) async fn exists_async(&self, path: &str) -> Result { match &self.backend { - StorageBackend::Database(ref db) => { - spawn_blocking({ - let path = path.to_owned(); - move || db.exists(&path) - }) - .await - } + StorageBackend::Database(db) => db.exists(path), StorageBackend::S3(s3) => s3.exists(path).await, } } @@ -169,14 +160,14 @@ impl Storage { pub(crate) fn get_public_access(&self, path: &str) -> Result { match &self.backend { StorageBackend::Database(db) => db.get_public_access(path), - StorageBackend::S3(s3) => s3.get_public_access(path), + StorageBackend::S3(s3) => self.runtime.block_on(s3.get_public_access(path)), } } pub(crate) fn set_public_access(&self, path: &str, public: bool) -> Result<()> { match &self.backend { StorageBackend::Database(db) => db.set_public_access(path, public), - StorageBackend::S3(s3) => s3.set_public_access(path, public), + StorageBackend::S3(s3) => self.runtime.block_on(s3.set_public_access(path, public)), } } @@ -270,7 +261,7 @@ impl Storage { pub(crate) fn get(&self, path: &str, max_size: usize) -> Result { let mut blob = match &self.backend { StorageBackend::Database(db) => db.get(path, max_size, None), - StorageBackend::S3(s3) => s3.get(path, max_size, None), + StorageBackend::S3(s3) => self.runtime.block_on(s3.get(path, max_size, None)), }?; if let Some(alg) = blob.compression { blob.content = decompress(blob.content.as_slice(), alg, max_size)?; @@ -288,7 +279,7 @@ impl Storage { ) -> Result { let mut blob = match &self.backend { StorageBackend::Database(db) => db.get(path, max_size, Some(range)), - StorageBackend::S3(s3) => s3.get(path, max_size, Some(range)), + StorageBackend::S3(s3) => self.runtime.block_on(s3.get(path, max_size, Some(range))), }?; // `compression` represents the compression of the file-stream inside the archive. // We don't compress the whole archive, so the encoding of the archive's blob is irrelevant @@ -407,26 +398,22 @@ impl Storage { let compressed_index_content = compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?; - self.store_inner( - vec![ - Blob { - path: archive_path.to_string(), - mime: "application/zip".to_owned(), - content: zip_content, - compression: None, - date_updated: Utc::now(), - }, - Blob { - path: remote_index_path, - mime: "application/octet-stream".to_owned(), - content: compressed_index_content, - compression: Some(alg), - date_updated: Utc::now(), - }, - ] - .into_iter() - .map(Ok), - )?; + self.store_inner(vec![ + Blob { + path: archive_path.to_string(), + mime: "application/zip".to_owned(), + content: zip_content, + compression: None, + date_updated: Utc::now(), + }, + Blob { + path: remote_index_path, + mime: "application/octet-stream".to_owned(), + content: compressed_index_content, + compression: Some(alg), + date_updated: Utc::now(), + }, + ])?; let file_alg = CompressionAlgorithm::Bzip2; Ok((file_paths, file_alg)) @@ -443,7 +430,7 @@ impl Storage { let mut file_paths_and_mimes = HashMap::new(); let mut algs = HashSet::with_capacity(1); - let blobs = get_file_list(root_dir)? + let blobs: Vec<_> = get_file_list(root_dir)? .into_iter() .filter_map(|file_path| { // Some files have insufficient permissions @@ -470,7 +457,8 @@ impl Storage { // this field is ignored by the backend date_updated: Utc::now(), }) - }); + }) + .collect::>>()?; self.store_inner(blobs)?; Ok((file_paths_and_mimes, algs)) @@ -478,7 +466,7 @@ impl Storage { #[cfg(test)] pub(crate) fn store_blobs(&self, blobs: Vec) -> Result<()> { - self.store_inner(blobs.into_iter().map(Ok)) + self.store_inner(blobs) } // Store file into the backend at the given path (also used to detect mime type), returns the @@ -494,37 +482,30 @@ impl Storage { let content = compress(&*content, alg)?; let mime = detect_mime(&path).to_owned(); - self.store_inner(std::iter::once(Ok(Blob { + self.store_inner(vec![Blob { path, mime, content, compression: Some(alg), // this field is ignored by the backend date_updated: Utc::now(), - })))?; + }])?; Ok(alg) } - fn store_inner(&self, blobs: impl IntoIterator>) -> Result<()> { - let mut blobs = blobs.into_iter(); - self.transaction(|trans| { - loop { - let batch: Vec<_> = blobs - .by_ref() - .take(MAX_CONCURRENT_UPLOADS) - .collect::>()?; - if batch.is_empty() { - break; - } - trans.store_batch(batch)?; - } - Ok(()) - }) + fn store_inner(&self, batch: Vec) -> Result<()> { + match &self.backend { + StorageBackend::Database(db) => db.store_batch(batch), + StorageBackend::S3(s3) => self.runtime.block_on(s3.store_batch(batch)), + } } pub(crate) fn delete_prefix(&self, prefix: &str) -> Result<()> { - self.transaction(|trans| trans.delete_prefix(prefix)) + match &self.backend { + StorageBackend::Database(db) => db.delete_prefix(prefix), + StorageBackend::S3(s3) => self.runtime.block_on(s3.delete_prefix(prefix)), + } } // We're using `&self` instead of consuming `self` or creating a Drop impl because during tests @@ -533,7 +514,7 @@ impl Storage { #[cfg(test)] pub(crate) fn cleanup_after_test(&self) -> Result<()> { if let StorageBackend::S3(s3) = &self.backend { - s3.cleanup_after_test()?; + self.runtime.block_on(s3.cleanup_after_test())?; } Ok(()) } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 1e59fbfc3..aa86d2fa9 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -15,7 +15,6 @@ use futures_util::{ stream::{FuturesUnordered, StreamExt}, }; use std::{io::Write, sync::Arc}; -use tokio::runtime::Runtime; use tracing::{error, warn}; const PUBLIC_ACCESS_TAG: &str = "static-cloudfront-access"; @@ -199,7 +198,7 @@ impl S3Backend { }) } - pub(super) async fn store_batch(&mut self, mut batch: Vec) -> Result<(), Error> { + pub(super) async fn store_batch(&self, mut batch: Vec) -> Result<(), Error> { // Attempt to upload the batch 3 times for _ in 0..3 { let mut futures = FuturesUnordered::new(); @@ -240,7 +239,7 @@ impl S3Backend { panic!("failed to upload 3 times, exiting"); } - pub(super) async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> { + pub(super) async fn delete_prefix(&self, prefix: &str) -> Result<(), Error> { let mut continuation_token = None; loop { let list = self