From f570518aba3cca66b1298c148e13819b525dd0e6 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Wed, 6 Sep 2023 16:30:25 +0200 Subject: [PATCH] wip --- src/storage/database.rs | 45 ++++---------------------- src/storage/mod.rs | 47 +++++++++++---------------- src/storage/s3.rs | 71 ++++++++++++++++------------------------- 3 files changed, 52 insertions(+), 111 deletions(-) diff --git a/src/storage/database.rs b/src/storage/database.rs index bfdc0eec4..b398ab974 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,8 +1,7 @@ -use super::{Blob, FileRange, StorageTransaction}; +use super::{Blob, FileRange}; use crate::db::Pool; use crate::error::Result; use crate::InstanceMetrics; -use postgres::Transaction; use std::{convert::TryFrom, sync::Arc}; pub(crate) struct DatabaseBackend { @@ -122,38 +121,11 @@ impl DatabaseBackend { } } - pub(super) fn start_connection(&self) -> Result { - Ok(DatabaseClient { - conn: self.pool.get()?, - metrics: self.metrics.clone(), - }) - } -} - -pub(super) struct DatabaseClient { - conn: crate::db::PoolClient, - metrics: Arc, -} - -impl DatabaseClient { - pub(super) fn start_storage_transaction(&mut self) -> Result> { - Ok(DatabaseStorageTransaction { - transaction: self.conn.transaction()?, - metrics: &self.metrics, - }) - } -} - -pub(super) struct DatabaseStorageTransaction<'a> { - transaction: Transaction<'a>, - metrics: &'a InstanceMetrics, -} - -impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> { - fn store_batch(&mut self, batch: Vec) -> Result<()> { + pub(super) fn store_batch(&mut self, batch: Vec) -> Result<()> { + let mut conn = self.pool.get()?; for blob in batch { let compression = blob.compression.map(|alg| alg as i32); - self.transaction.query( + conn.query( "INSERT INTO files (path, mime, content, compression) VALUES ($1, $2, $3, $4) ON CONFLICT (path) DO UPDATE @@ -165,18 +137,13 @@ impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> { Ok(()) } - fn delete_prefix(&mut self, prefix: &str) -> Result<()> { - self.transaction.execute( + pub(crate) fn delete_prefix(&mut self, prefix: &str) -> Result<()> { + self.pool.get()?.execute( "DELETE FROM files WHERE path LIKE $1;", &[&format!("{}%", prefix.replace('%', "\\%"))], )?; Ok(()) } - - fn complete(self: Box) -> Result<()> { - self.transaction.commit()?; - Ok(()) - } } // The tests for this module are in src/storage/mod.rs, as part of the backend tests. Please add diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 980e4562a..e308faf6c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -9,6 +9,7 @@ use self::database::DatabaseBackend; use self::s3::S3Backend; use self::sqlite_pool::SqliteConnectionPool; use crate::error::Result; +use crate::utils::spawn_blocking; use crate::web::metrics::RenderingTimesRecorder; use crate::{db::Pool, Config, InstanceMetrics}; use anyhow::{anyhow, ensure}; @@ -117,6 +118,7 @@ enum StorageBackend { pub struct Storage { backend: StorageBackend, config: Arc, + runtime: Arc, sqlite_pool: SqliteConnectionPool, } @@ -133,13 +135,14 @@ impl Storage { .ok_or_else(|| anyhow!("invalid sqlite pool size"))?, ), config: config.clone(), + runtime: runtime.clone(), backend: match config.storage_backend { StorageKind::Database => { StorageBackend::Database(DatabaseBackend::new(pool, metrics)) } - StorageKind::S3 => { - StorageBackend::S3(Box::new(S3Backend::new(metrics, &config, runtime)?)) - } + StorageKind::S3 => StorageBackend::S3(Box::new( + runtime.block_on(S3Backend::new(metrics, &config))?, + )), }, }) } @@ -147,7 +150,19 @@ impl Storage { pub(crate) fn exists(&self, path: &str) -> Result { match &self.backend { StorageBackend::Database(db) => db.exists(path), - StorageBackend::S3(s3) => s3.exists(path), + StorageBackend::S3(s3) => self.runtime.block_on(s3.exists(path)), + } + } + pub(crate) async fn exists_async(&self, path: &str) -> Result { + match &self.backend { + StorageBackend::Database(ref db) => { + spawn_blocking({ + let path = path.to_owned(); + move || db.exists(&path) + }) + .await + } + StorageBackend::S3(s3) => s3.exists(path).await, } } @@ -417,24 +432,6 @@ impl Storage { Ok((file_paths, file_alg)) } - fn transaction(&self, f: F) -> Result - where - F: FnOnce(&mut dyn StorageTransaction) -> Result, - { - let mut conn; - let mut trans: Box = match &self.backend { - StorageBackend::Database(db) => { - conn = db.start_connection()?; - Box::new(conn.start_storage_transaction()?) - } - StorageBackend::S3(s3) => Box::new(s3.start_storage_transaction()), - }; - - let res = f(trans.as_mut())?; - trans.complete()?; - Ok(res) - } - // Store all files in `root_dir` into the backend under `prefix`. // // This returns (map, set). @@ -551,12 +548,6 @@ impl std::fmt::Debug for Storage { } } -trait StorageTransaction { - fn store_batch(&mut self, batch: Vec) -> Result<()>; - fn delete_prefix(&mut self, prefix: &str) -> Result<()>; - fn complete(self: Box) -> Result<()>; -} - fn detect_mime(file_path: impl AsRef) -> &'static str { let mime = mime_guess::from_path(file_path.as_ref()) .first_raw() diff --git a/src/storage/s3.rs b/src/storage/s3.rs index ac772a18a..1e59fbfc3 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,4 +1,4 @@ -use super::{Blob, FileRange, StorageTransaction}; +use super::{Blob, FileRange}; use crate::{Config, InstanceMetrics}; use anyhow::Error; use aws_sdk_s3::{ @@ -199,55 +199,22 @@ impl S3Backend { }) } - pub(super) fn start_storage_transaction(&self) -> S3StorageTransaction { - S3StorageTransaction { s3: self } - } - - #[cfg(test)] - pub(super) async fn cleanup_after_test(&self) -> Result<(), Error> { - if !self.temporary { - return Ok(()); - } - - if cfg!(not(test)) { - panic!("safeguard to prevent deleting the production bucket"); - } - - let mut transaction = Box::new(self.start_storage_transaction()); - transaction.delete_prefix("")?; - transaction.complete()?; - self.client - .delete_bucket() - .bucket(&self.bucket) - .send() - .await?; - - Ok(()) - } -} - -pub(super) struct S3StorageTransaction<'a> { - s3: &'a S3Backend, -} - -impl<'a> StorageTransaction for S3StorageTransaction<'a> { - async fn store_batch(&mut self, mut batch: Vec) -> Result<(), Error> { + pub(super) async fn store_batch(&mut self, mut batch: Vec) -> Result<(), Error> { // Attempt to upload the batch 3 times for _ in 0..3 { let mut futures = FuturesUnordered::new(); for blob in batch.drain(..) { futures.push( - self.s3 - .client + self.client .put_object() - .bucket(&self.s3.bucket) + .bucket(&self.bucket) .key(&blob.path) .body(blob.content.clone().into()) .content_type(&blob.mime) .set_content_encoding(blob.compression.map(|alg| alg.to_string())) .send() .map_ok(|_| { - self.s3.metrics.uploaded_files_total.inc(); + self.metrics.uploaded_files_total.inc(); }) .map_err(|err| { warn!("Failed to upload blob to S3: {:?}", err); @@ -273,14 +240,13 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { panic!("failed to upload 3 times, exiting"); } - async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> { + pub(super) async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> { let mut continuation_token = None; loop { let list = self - .s3 .client .list_objects_v2() - .bucket(&self.s3.bucket) + .bucket(&self.bucket) .prefix(prefix) .set_continuation_token(continuation_token) .send() @@ -301,10 +267,9 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { .build(); let resp = self - .s3 .client .delete_objects() - .bucket(&self.s3.bucket) + .bucket(&self.bucket) .delete(to_delete) .send() .await?; @@ -325,7 +290,25 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> { } } - fn complete(self: Box) -> Result<(), Error> { + #[cfg(test)] + pub(super) async fn cleanup_after_test(&self) -> Result<(), Error> { + if !self.temporary { + return Ok(()); + } + + if cfg!(not(test)) { + panic!("safeguard to prevent deleting the production bucket"); + } + + let mut transaction = Box::new(self.start_storage_transaction()); + transaction.delete_prefix("")?; + transaction.complete()?; + self.client + .delete_bucket() + .bucket(&self.bucket) + .send() + .await?; + Ok(()) } }