Skip to content

Commit

Permalink
add subcommand to migrate old CBOR archive indexes to SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
syphar committed Jun 16, 2023
1 parent 058ea55 commit a00f8cc
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 76 deletions.
11 changes: 9 additions & 2 deletions src/bin/cratesfyi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use docs_rs::utils::{
remove_crate_priority, set_crate_priority, ConfigName,
};
use docs_rs::{
start_background_metrics_webserver, start_web_server, BuildQueue, Config, Context, Index,
InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage,
migrate_old_archive_indexes, start_background_metrics_webserver, start_web_server, BuildQueue,
Config, Context, Index, InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage,
};
use humantime::Duration;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -482,6 +482,9 @@ enum DatabaseSubcommand {
/// Backfill GitHub/Gitlab stats for crates.
BackfillRepositoryStats,

/// migrate the old CBOR archive index files to SQLIte
MigrateArchiveIndex,

/// Updates info for a crate from the registry's API
UpdateCrateRegistryFields {
#[arg(name = "CRATE")]
Expand Down Expand Up @@ -533,6 +536,10 @@ impl DatabaseSubcommand {
ctx.repository_stats_updater()?.update_all_crates()?;
}

Self::MigrateArchiveIndex => {
migrate_old_archive_indexes(&*ctx.storage()?, &mut *ctx.conn()?)?;
}

Self::BackfillRepositoryStats => {
ctx.repository_stats_updater()?.backfill_repositories()?;
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use self::docbuilder::PackageKind;
pub use self::docbuilder::RustwideBuilder;
pub use self::index::Index;
pub use self::metrics::{InstanceMetrics, ServiceMetrics};
pub use self::storage::migrate_old_archive_indexes;
pub use self::storage::Storage;
pub use self::web::{start_background_metrics_webserver, start_web_server};

Expand Down
192 changes: 121 additions & 71 deletions src/storage/archive_index.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crate::error::Result;
use crate::storage::{compression::CompressionAlgorithm, FileRange};
use anyhow::{bail, Context as _};
use anyhow::Context as _;
use memmap2::MmapOptions;
use rusqlite::{Connection, OptionalExtension};
use serde::de::DeserializeSeed;
use serde::de::{IgnoredAny, MapAccess, Visitor};
use serde::{Deserialize, Deserializer, Serialize};
use std::io::BufReader;
use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path};
use tempfile::TempPath;

use super::sqlite_pool::SqliteConnectionPool;

static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0";

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, PartialEq, Eq, Debug)]
pub(crate) struct FileInfo {
range: FileRange,
compression: CompressionAlgorithm,
Expand All @@ -27,63 +29,87 @@ impl FileInfo {
}
}

#[derive(Serialize)]
#[derive(Deserialize, Serialize)]
struct Index {
files: HashMap<String, FileInfo>,
}

/// create an archive index based on a zipfile.
///
/// Will delete the destination file if it already exists.
pub(crate) fn create<R: io::Read + io::Seek, P: AsRef<Path>>(
zipfile: &mut R,
destination: P,
) -> Result<()> {
if destination.as_ref().exists() {
fs::remove_file(&destination)?;
}
impl Index {
pub(crate) fn write_sqlite<P: AsRef<Path>>(&self, destination: P) -> Result<()> {
let destination = destination.as_ref();
if destination.exists() {
fs::remove_file(destination)?;
}

let mut archive = zip::ZipArchive::new(zipfile)?;
let conn = rusqlite::Connection::open(destination)?;
conn.execute("PRAGMA synchronous = FULL", ())?;
conn.execute("BEGIN", ())?;
conn.execute(
"
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE,
start INTEGER,
end INTEGER,
compression INTEGER
);
",
(),
)?;

let conn = rusqlite::Connection::open(&destination)?;
conn.execute("PRAGMA synchronous = FULL", ())?;
conn.execute("BEGIN", ())?;
conn.execute(
"
CREATE TABLE files (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE,
start INTEGER,
end INTEGER,
compression INTEGER
);
",
(),
)?;
for (name, info) in self.files.iter() {
conn.execute(
"INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)",
(
name,
info.range.start(),
info.range.end(),
info.compression as i32,
),
)?;
}

for i in 0..archive.len() {
let zf = archive.by_index(i)?;
conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?;
conn.execute("END", ())?;
conn.execute("VACUUM", ())?;
Ok(())
}

let compression_bzip = CompressionAlgorithm::Bzip2 as i32;
pub(crate) fn from_zip<R: io::Read + io::Seek>(zipfile: &mut R) -> Result<Self> {
let mut archive = zip::ZipArchive::new(zipfile)?;

conn.execute(
"INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)",
(
zf.name(),
zf.data_start(),
zf.data_start() + zf.compressed_size() - 1,
match zf.compression() {
zip::CompressionMethod::Bzip2 => compression_bzip,
c => bail!("unsupported compression algorithm {} in zip-file", c),
let mut index = Index {
files: HashMap::with_capacity(archive.len()),
};

for i in 0..archive.len() {
let zf = archive.by_index(i)?;

index.files.insert(
zf.name().to_owned(),
FileInfo {
range: FileRange::new(
zf.data_start(),
zf.data_start() + zf.compressed_size() - 1,
),
compression: CompressionAlgorithm::Bzip2,
},
),
)?;
);
}
Ok(index)
}
}

conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?;
conn.execute("END", ())?;
conn.execute("VACUUM", ())?;

/// create an archive index based on a zipfile.
///
/// Will delete the destination file if it already exists.
pub(crate) fn create<R: io::Read + io::Seek, P: AsRef<Path>>(
zipfile: &mut R,
destination: P,
) -> Result<()> {
Index::from_zip(zipfile)?
.write_sqlite(&destination)
.context("error writing SQLite index")?;
Ok(())
}

Expand Down Expand Up @@ -227,7 +253,7 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result<Option<Fi
/// > OFFSET SIZE DESCRIPTION
/// > 0 16 Header string: "SQLite format 3\000"
/// > [...]
fn is_sqlite_file<P: AsRef<Path>>(archive_index_path: P) -> Result<bool> {
pub(crate) fn is_sqlite_file<P: AsRef<Path>>(archive_index_path: P) -> Result<bool> {
let mut f = File::open(archive_index_path)?;

let mut buffer = [0; 16];
Expand Down Expand Up @@ -259,6 +285,20 @@ pub(crate) fn find_in_file<P: AsRef<Path>>(
}
}

pub(crate) fn convert_to_sqlite_index<P: AsRef<Path>>(path: P) -> Result<TempPath> {
let path = path.as_ref();
let index: Index = { serde_cbor::from_reader(BufReader::new(File::open(path)?))? };

// write the new index into a temporary file so reads from ongoing requests
// can continue on the old index until the new one is fully written.
let tmp_path = tempfile::NamedTempFile::new()?.into_temp_path();
index
.write_sqlite(&tmp_path)
.context("error writing SQLite index")?;

Ok(tmp_path)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -270,29 +310,7 @@ mod tests {
zipfile: &mut R,
writer: &mut W,
) -> Result<()> {
let mut archive = zip::ZipArchive::new(zipfile)?;

// get file locations
let mut files: HashMap<String, FileInfo> = HashMap::with_capacity(archive.len());
for i in 0..archive.len() {
let zf = archive.by_index(i)?;

files.insert(
zf.name().to_string(),
FileInfo {
range: FileRange::new(
zf.data_start(),
zf.data_start() + zf.compressed_size() - 1,
),
compression: match zf.compression() {
zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2,
c => bail!("unsupported compression algorithm {} in zip-file", c),
},
},
);
}

serde_cbor::to_writer(writer, &Index { files }).context("serialization error")
serde_cbor::to_writer(writer, &Index::from_zip(zipfile)?).context("serialization error")
}

fn create_test_archive() -> fs::File {
Expand All @@ -312,6 +330,38 @@ mod tests {
tf
}

#[test]
fn convert_to_sqlite() {
let mut tf = create_test_archive();
let mut cbor_buf = Vec::new();
create_cbor_index(&mut tf, &mut cbor_buf).unwrap();
let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap();
io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap();

assert!(!is_sqlite_file(&cbor_index_file).unwrap());

let original_fi = find_in_file(
cbor_index_file.path(),
"testfile1",
&SqliteConnectionPool::default(),
)
.unwrap()
.unwrap();

let sqlite_index_file = convert_to_sqlite_index(cbor_index_file).unwrap();
assert!(is_sqlite_file(&sqlite_index_file).unwrap());

let migrated_fi = find_in_file(
sqlite_index_file,
"testfile1",
&SqliteConnectionPool::default(),
)
.unwrap()
.unwrap();

assert_eq!(migrated_fi, original_fi);
}

#[test]
fn index_create_save_load_cbor_direct() {
let mut tf = create_test_archive();
Expand Down
Loading

0 comments on commit a00f8cc

Please sign in to comment.