Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Sep 8, 2023
1 parent 3dccb8a commit f570518
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 111 deletions.
45 changes: 6 additions & 39 deletions src/storage/database.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::{Blob, FileRange, StorageTransaction};
use super::{Blob, FileRange};
use crate::db::Pool;
use crate::error::Result;
use crate::InstanceMetrics;
use postgres::Transaction;
use std::{convert::TryFrom, sync::Arc};

pub(crate) struct DatabaseBackend {
Expand Down Expand Up @@ -122,38 +121,11 @@ impl DatabaseBackend {
}
}

pub(super) fn start_connection(&self) -> Result<DatabaseClient> {
Ok(DatabaseClient {
conn: self.pool.get()?,
metrics: self.metrics.clone(),
})
}
}

pub(super) struct DatabaseClient {
conn: crate::db::PoolClient,
metrics: Arc<InstanceMetrics>,
}

impl DatabaseClient {
pub(super) fn start_storage_transaction(&mut self) -> Result<DatabaseStorageTransaction<'_>> {
Ok(DatabaseStorageTransaction {
transaction: self.conn.transaction()?,
metrics: &self.metrics,
})
}
}

pub(super) struct DatabaseStorageTransaction<'a> {
transaction: Transaction<'a>,
metrics: &'a InstanceMetrics,
}

impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> {
fn store_batch(&mut self, batch: Vec<Blob>) -> Result<()> {
pub(super) fn store_batch(&mut self, batch: Vec<Blob>) -> Result<()> {
let mut conn = self.pool.get()?;
for blob in batch {
let compression = blob.compression.map(|alg| alg as i32);
self.transaction.query(
conn.query(
"INSERT INTO files (path, mime, content, compression)
VALUES ($1, $2, $3, $4)
ON CONFLICT (path) DO UPDATE
Expand All @@ -165,18 +137,13 @@ impl<'a> StorageTransaction for DatabaseStorageTransaction<'a> {
Ok(())
}

fn delete_prefix(&mut self, prefix: &str) -> Result<()> {
self.transaction.execute(
pub(crate) fn delete_prefix(&mut self, prefix: &str) -> Result<()> {
self.pool.get()?.execute(
"DELETE FROM files WHERE path LIKE $1;",
&[&format!("{}%", prefix.replace('%', "\\%"))],
)?;
Ok(())
}

fn complete(self: Box<Self>) -> Result<()> {
self.transaction.commit()?;
Ok(())
}
}

// The tests for this module are in src/storage/mod.rs, as part of the backend tests. Please add
Expand Down
47 changes: 19 additions & 28 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use self::database::DatabaseBackend;
use self::s3::S3Backend;
use self::sqlite_pool::SqliteConnectionPool;
use crate::error::Result;
use crate::utils::spawn_blocking;
use crate::web::metrics::RenderingTimesRecorder;
use crate::{db::Pool, Config, InstanceMetrics};
use anyhow::{anyhow, ensure};
Expand Down Expand Up @@ -117,6 +118,7 @@ enum StorageBackend {
pub struct Storage {
backend: StorageBackend,
config: Arc<Config>,
runtime: Arc<Runtime>,
sqlite_pool: SqliteConnectionPool,
}

Expand All @@ -133,21 +135,34 @@ impl Storage {
.ok_or_else(|| anyhow!("invalid sqlite pool size"))?,
),
config: config.clone(),
runtime: runtime.clone(),
backend: match config.storage_backend {
StorageKind::Database => {
StorageBackend::Database(DatabaseBackend::new(pool, metrics))
}
StorageKind::S3 => {
StorageBackend::S3(Box::new(S3Backend::new(metrics, &config, runtime)?))
}
StorageKind::S3 => StorageBackend::S3(Box::new(
runtime.block_on(S3Backend::new(metrics, &config))?,
)),
},
})
}

pub(crate) fn exists(&self, path: &str) -> Result<bool> {
match &self.backend {
StorageBackend::Database(db) => db.exists(path),
StorageBackend::S3(s3) => s3.exists(path),
StorageBackend::S3(s3) => self.runtime.block_on(s3.exists(path)),
}
}
pub(crate) async fn exists_async(&self, path: &str) -> Result<bool> {
match &self.backend {
StorageBackend::Database(ref db) => {
spawn_blocking({
let path = path.to_owned();
move || db.exists(&path)
})
.await
}
StorageBackend::S3(s3) => s3.exists(path).await,
}
}

Expand Down Expand Up @@ -417,24 +432,6 @@ impl Storage {
Ok((file_paths, file_alg))
}

fn transaction<T, F>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut dyn StorageTransaction) -> Result<T>,
{
let mut conn;
let mut trans: Box<dyn StorageTransaction> = match &self.backend {
StorageBackend::Database(db) => {
conn = db.start_connection()?;
Box::new(conn.start_storage_transaction()?)
}
StorageBackend::S3(s3) => Box::new(s3.start_storage_transaction()),
};

let res = f(trans.as_mut())?;
trans.complete()?;
Ok(res)
}

// Store all files in `root_dir` into the backend under `prefix`.
//
// This returns (map<filename, mime type>, set<compression algorithms>).
Expand Down Expand Up @@ -551,12 +548,6 @@ impl std::fmt::Debug for Storage {
}
}

trait StorageTransaction {
fn store_batch(&mut self, batch: Vec<Blob>) -> Result<()>;
fn delete_prefix(&mut self, prefix: &str) -> Result<()>;
fn complete(self: Box<Self>) -> Result<()>;
}

fn detect_mime(file_path: impl AsRef<Path>) -> &'static str {
let mime = mime_guess::from_path(file_path.as_ref())
.first_raw()
Expand Down
71 changes: 27 additions & 44 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Blob, FileRange, StorageTransaction};
use super::{Blob, FileRange};
use crate::{Config, InstanceMetrics};
use anyhow::Error;
use aws_sdk_s3::{
Expand Down Expand Up @@ -199,55 +199,22 @@ impl S3Backend {
})
}

pub(super) fn start_storage_transaction(&self) -> S3StorageTransaction {
S3StorageTransaction { s3: self }
}

#[cfg(test)]
pub(super) async fn cleanup_after_test(&self) -> Result<(), Error> {
if !self.temporary {
return Ok(());
}

if cfg!(not(test)) {
panic!("safeguard to prevent deleting the production bucket");
}

let mut transaction = Box::new(self.start_storage_transaction());
transaction.delete_prefix("")?;
transaction.complete()?;
self.client
.delete_bucket()
.bucket(&self.bucket)
.send()
.await?;

Ok(())
}
}

pub(super) struct S3StorageTransaction<'a> {
s3: &'a S3Backend,
}

impl<'a> StorageTransaction for S3StorageTransaction<'a> {
async fn store_batch(&mut self, mut batch: Vec<Blob>) -> Result<(), Error> {
pub(super) async fn store_batch(&mut self, mut batch: Vec<Blob>) -> Result<(), Error> {
// Attempt to upload the batch 3 times
for _ in 0..3 {
let mut futures = FuturesUnordered::new();
for blob in batch.drain(..) {
futures.push(
self.s3
.client
self.client
.put_object()
.bucket(&self.s3.bucket)
.bucket(&self.bucket)
.key(&blob.path)
.body(blob.content.clone().into())
.content_type(&blob.mime)
.set_content_encoding(blob.compression.map(|alg| alg.to_string()))
.send()
.map_ok(|_| {
self.s3.metrics.uploaded_files_total.inc();
self.metrics.uploaded_files_total.inc();
})
.map_err(|err| {
warn!("Failed to upload blob to S3: {:?}", err);
Expand All @@ -273,14 +240,13 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> {
panic!("failed to upload 3 times, exiting");
}

async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> {
pub(super) async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> {
let mut continuation_token = None;
loop {
let list = self
.s3
.client
.list_objects_v2()
.bucket(&self.s3.bucket)
.bucket(&self.bucket)
.prefix(prefix)
.set_continuation_token(continuation_token)
.send()
Expand All @@ -301,10 +267,9 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> {
.build();

let resp = self
.s3
.client
.delete_objects()
.bucket(&self.s3.bucket)
.bucket(&self.bucket)
.delete(to_delete)
.send()
.await?;
Expand All @@ -325,7 +290,25 @@ impl<'a> StorageTransaction for S3StorageTransaction<'a> {
}
}

fn complete(self: Box<Self>) -> Result<(), Error> {
#[cfg(test)]
pub(super) async fn cleanup_after_test(&self) -> Result<(), Error> {
if !self.temporary {
return Ok(());
}

if cfg!(not(test)) {
panic!("safeguard to prevent deleting the production bucket");
}

let mut transaction = Box::new(self.start_storage_transaction());
transaction.delete_prefix("")?;
transaction.complete()?;
self.client
.delete_bucket()
.bucket(&self.bucket)
.send()
.await?;

Ok(())
}
}
Expand Down

0 comments on commit f570518

Please sign in to comment.