Skip to content

Commit

Permalink
backend update draft, calling db backend sync in async ()
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Sep 8, 2023
1 parent f570518 commit c760a45
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 61 deletions.
4 changes: 2 additions & 2 deletions src/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl DatabaseBackend {
}
}

pub(super) fn store_batch(&mut self, batch: Vec<Blob>) -> Result<()> {
pub(super) fn store_batch(&self, batch: Vec<Blob>) -> Result<()> {
let mut conn = self.pool.get()?;
for blob in batch {
let compression = blob.compression.map(|alg| alg as i32);
Expand All @@ -137,7 +137,7 @@ impl DatabaseBackend {
Ok(())
}

pub(crate) fn delete_prefix(&mut self, prefix: &str) -> Result<()> {
pub(crate) fn delete_prefix(&self, prefix: &str) -> Result<()> {
self.pool.get()?.execute(
"DELETE FROM files WHERE path LIKE $1;",
&[&format!("{}%", prefix.replace('%', "\\%"))],
Expand Down
93 changes: 37 additions & 56 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use self::database::DatabaseBackend;
use self::s3::S3Backend;
use self::sqlite_pool::SqliteConnectionPool;
use crate::error::Result;
use crate::utils::spawn_blocking;
use crate::web::metrics::RenderingTimesRecorder;
use crate::{db::Pool, Config, InstanceMetrics};
use anyhow::{anyhow, ensure};
Expand All @@ -30,8 +29,6 @@ use std::{
use tokio::runtime::Runtime;
use tracing::{error, instrument, trace};

const MAX_CONCURRENT_UPLOADS: usize = 1000;

type FileRange = RangeInclusive<u64>;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -155,28 +152,22 @@ impl Storage {
}
pub(crate) async fn exists_async(&self, path: &str) -> Result<bool> {
match &self.backend {
StorageBackend::Database(ref db) => {
spawn_blocking({
let path = path.to_owned();
move || db.exists(&path)
})
.await
}
StorageBackend::Database(db) => db.exists(path),
StorageBackend::S3(s3) => s3.exists(path).await,
}
}

pub(crate) fn get_public_access(&self, path: &str) -> Result<bool> {
match &self.backend {
StorageBackend::Database(db) => db.get_public_access(path),
StorageBackend::S3(s3) => s3.get_public_access(path),
StorageBackend::S3(s3) => self.runtime.block_on(s3.get_public_access(path)),
}
}

pub(crate) fn set_public_access(&self, path: &str, public: bool) -> Result<()> {
match &self.backend {
StorageBackend::Database(db) => db.set_public_access(path, public),
StorageBackend::S3(s3) => s3.set_public_access(path, public),
StorageBackend::S3(s3) => self.runtime.block_on(s3.set_public_access(path, public)),
}
}

Expand Down Expand Up @@ -270,7 +261,7 @@ impl Storage {
pub(crate) fn get(&self, path: &str, max_size: usize) -> Result<Blob> {
let mut blob = match &self.backend {
StorageBackend::Database(db) => db.get(path, max_size, None),
StorageBackend::S3(s3) => s3.get(path, max_size, None),
StorageBackend::S3(s3) => self.runtime.block_on(s3.get(path, max_size, None)),
}?;
if let Some(alg) = blob.compression {
blob.content = decompress(blob.content.as_slice(), alg, max_size)?;
Expand All @@ -288,7 +279,7 @@ impl Storage {
) -> Result<Blob> {
let mut blob = match &self.backend {
StorageBackend::Database(db) => db.get(path, max_size, Some(range)),
StorageBackend::S3(s3) => s3.get(path, max_size, Some(range)),
StorageBackend::S3(s3) => self.runtime.block_on(s3.get(path, max_size, Some(range))),
}?;
// `compression` represents the compression of the file-stream inside the archive.
// We don't compress the whole archive, so the encoding of the archive's blob is irrelevant
Expand Down Expand Up @@ -407,26 +398,22 @@ impl Storage {
let compressed_index_content =
compress(BufReader::new(fs::File::open(&local_index_path)?), alg)?;

self.store_inner(
vec![
Blob {
path: archive_path.to_string(),
mime: "application/zip".to_owned(),
content: zip_content,
compression: None,
date_updated: Utc::now(),
},
Blob {
path: remote_index_path,
mime: "application/octet-stream".to_owned(),
content: compressed_index_content,
compression: Some(alg),
date_updated: Utc::now(),
},
]
.into_iter()
.map(Ok),
)?;
self.store_inner(vec![
Blob {
path: archive_path.to_string(),
mime: "application/zip".to_owned(),
content: zip_content,
compression: None,
date_updated: Utc::now(),
},
Blob {
path: remote_index_path,
mime: "application/octet-stream".to_owned(),
content: compressed_index_content,
compression: Some(alg),
date_updated: Utc::now(),
},
])?;

let file_alg = CompressionAlgorithm::Bzip2;
Ok((file_paths, file_alg))
Expand All @@ -443,7 +430,7 @@ impl Storage {
let mut file_paths_and_mimes = HashMap::new();
let mut algs = HashSet::with_capacity(1);

let blobs = get_file_list(root_dir)?
let blobs: Vec<_> = get_file_list(root_dir)?
.into_iter()
.filter_map(|file_path| {
// Some files have insufficient permissions
Expand All @@ -470,15 +457,16 @@ impl Storage {
// this field is ignored by the backend
date_updated: Utc::now(),
})
});
})
.collect::<Result<Vec<_>>>()?;

self.store_inner(blobs)?;
Ok((file_paths_and_mimes, algs))
}

#[cfg(test)]
pub(crate) fn store_blobs(&self, blobs: Vec<Blob>) -> Result<()> {
self.store_inner(blobs.into_iter().map(Ok))
self.store_inner(blobs)
}

// Store file into the backend at the given path (also used to detect mime type), returns the
Expand All @@ -494,37 +482,30 @@ impl Storage {
let content = compress(&*content, alg)?;
let mime = detect_mime(&path).to_owned();

self.store_inner(std::iter::once(Ok(Blob {
self.store_inner(vec![Blob {
path,
mime,
content,
compression: Some(alg),
// this field is ignored by the backend
date_updated: Utc::now(),
})))?;
}])?;

Ok(alg)
}

fn store_inner(&self, blobs: impl IntoIterator<Item = Result<Blob>>) -> Result<()> {
let mut blobs = blobs.into_iter();
self.transaction(|trans| {
loop {
let batch: Vec<_> = blobs
.by_ref()
.take(MAX_CONCURRENT_UPLOADS)
.collect::<Result<_>>()?;
if batch.is_empty() {
break;
}
trans.store_batch(batch)?;
}
Ok(())
})
fn store_inner(&self, batch: Vec<Blob>) -> Result<()> {
match &self.backend {
StorageBackend::Database(db) => db.store_batch(batch),
StorageBackend::S3(s3) => self.runtime.block_on(s3.store_batch(batch)),
}
}

pub(crate) fn delete_prefix(&self, prefix: &str) -> Result<()> {
self.transaction(|trans| trans.delete_prefix(prefix))
match &self.backend {
StorageBackend::Database(db) => db.delete_prefix(prefix),
StorageBackend::S3(s3) => self.runtime.block_on(s3.delete_prefix(prefix)),
}
}

// We're using `&self` instead of consuming `self` or creating a Drop impl because during tests
Expand All @@ -533,7 +514,7 @@ impl Storage {
#[cfg(test)]
pub(crate) fn cleanup_after_test(&self) -> Result<()> {
if let StorageBackend::S3(s3) = &self.backend {
s3.cleanup_after_test()?;
self.runtime.block_on(s3.cleanup_after_test())?;
}
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures_util::{
stream::{FuturesUnordered, StreamExt},
};
use std::{io::Write, sync::Arc};
use tokio::runtime::Runtime;
use tracing::{error, warn};

const PUBLIC_ACCESS_TAG: &str = "static-cloudfront-access";
Expand Down Expand Up @@ -199,7 +198,7 @@ impl S3Backend {
})
}

pub(super) async fn store_batch(&mut self, mut batch: Vec<Blob>) -> Result<(), Error> {
pub(super) async fn store_batch(&self, mut batch: Vec<Blob>) -> Result<(), Error> {
// Attempt to upload the batch 3 times
for _ in 0..3 {
let mut futures = FuturesUnordered::new();
Expand Down Expand Up @@ -240,7 +239,7 @@ impl S3Backend {
panic!("failed to upload 3 times, exiting");
}

pub(super) async fn delete_prefix(&mut self, prefix: &str) -> Result<(), Error> {
pub(super) async fn delete_prefix(&self, prefix: &str) -> Result<(), Error> {
let mut continuation_token = None;
loop {
let list = self
Expand Down

0 comments on commit c760a45

Please sign in to comment.