Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update unique constraint on packages table to allow multiple versions of the same package #97

Merged
merged 4 commits into from
Nov 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions daemon/migrations/2021-11-25-002754_pkgs-table-uniq/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
DROP INDEX packages_pkgbase_idx;
DROP INDEX queue_pkgbase_idx;

PRAGMA foreign_keys=off;

CREATE TABLE _packages_new (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
pkgbase_id INTEGER NOT NULL,
name VARCHAR NOT NULL,
version VARCHAR NOT NULL,
status VARCHAR NOT NULL,
distro VARCHAR NOT NULL,
suite VARCHAR NOT NULL,
architecture VARCHAR NOT NULL,
artifact_url VARCHAR NOT NULL,
build_id INTEGER,
built_at DATETIME,
has_diffoscope BOOLEAN NOT NULL,
has_attestation BOOLEAN NOT NULL,
checksum VARCHAR,
CONSTRAINT packages_unique UNIQUE (name, distro, suite, architecture),
FOREIGN KEY(pkgbase_id) REFERENCES pkgbases(id) ON DELETE CASCADE,
FOREIGN KEY(build_id) REFERENCES builds(id) ON DELETE SET NULL
);

INSERT INTO _packages_new (id, pkgbase_id, name, version, status, distro, suite, architecture, artifact_url, build_id, built_at, has_diffoscope, has_attestation, checksum)
SELECT id, pkgbase_id, name, version, status, distro, suite, architecture, artifact_url, build_id, built_at, has_diffoscope, has_attestation, checksum
FROM packages;

DROP TABLE packages;
ALTER TABLE _packages_new RENAME TO packages;

PRAGMA foreign_keys=on;
33 changes: 33 additions & 0 deletions daemon/migrations/2021-11-25-002754_pkgs-table-uniq/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
PRAGMA foreign_keys=off;

CREATE TABLE _packages_new (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
pkgbase_id INTEGER NOT NULL,
name VARCHAR NOT NULL,
version VARCHAR NOT NULL,
status VARCHAR NOT NULL,
distro VARCHAR NOT NULL,
suite VARCHAR NOT NULL,
architecture VARCHAR NOT NULL,
artifact_url VARCHAR NOT NULL,
build_id INTEGER,
built_at DATETIME,
has_diffoscope BOOLEAN NOT NULL,
has_attestation BOOLEAN NOT NULL,
checksum VARCHAR,
CONSTRAINT packages_unique UNIQUE (name, version, distro, suite, architecture),
FOREIGN KEY(pkgbase_id) REFERENCES pkgbases(id) ON DELETE CASCADE,
FOREIGN KEY(build_id) REFERENCES builds(id) ON DELETE SET NULL
);

INSERT INTO _packages_new (id, pkgbase_id, name, version, status, distro, suite, architecture, artifact_url, build_id, built_at, has_diffoscope, has_attestation, checksum)
SELECT id, pkgbase_id, name, version, status, distro, suite, architecture, artifact_url, build_id, built_at, has_diffoscope, has_attestation, checksum
FROM packages;

DROP TABLE packages;
ALTER TABLE _packages_new RENAME TO packages;

PRAGMA foreign_keys=on;

CREATE INDEX packages_pkgbase_idx ON packages(pkgbase_id);
CREATE INDEX queue_pkgbase_idx ON queue(pkgbase_id);
1 change: 1 addition & 0 deletions daemon/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl Connection for SqliteConnectionWrap {
PRAGMA synchronous = NORMAL; -- fsync only in critical moments
PRAGMA wal_autocheckpoint = 1000; -- write WAL changes back every 1000 pages, for an in average 1MB WAL file. May affect readers if number is increased
PRAGMA wal_checkpoint(TRUNCATE); -- free some space by truncating possibly massive WAL files from the last run.
PRAGMA cache_size = 134217728; -- set disk cache size to 128MB
").map_err(|err| {
warn!("executing pragmas for wall mode failed: {:?}", err);
ConnectionError::CouldntSetupConfiguration(err)
Expand Down
7 changes: 7 additions & 0 deletions daemon/src/models/pkgbase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ impl PkgBase {
.execute(connection)?;
Ok(())
}

pub fn delete_batch(batch: &[i32], connection: &SqliteConnection) -> Result<()> {
use crate::schema::pkgbases::dsl::*;
diesel::delete(pkgbases.filter(id.eq_any(batch)))
.execute(connection)?;
Ok(())
}
}

#[derive(Insertable, PartialEq, Debug, Clone)]
Expand Down
239 changes: 130 additions & 109 deletions daemon/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::models;
// use crate::versions::PkgVerCmp;
use diesel::Connection;
use diesel::SqliteConnection;
use rebuilderd_common::{PkgGroup, Status};
use rebuilderd_common::api::*;
use rebuilderd_common::errors::*;
// use std::cmp::Ordering;
use std::collections::HashMap;

const DEFAULT_QUEUE_PRIORITY: i32 = 1;
Expand All @@ -30,12 +29,18 @@ impl CurrentArtifactNamespace {
})
}

fn gen_key(name: &str, version: &str, architecture: &str) -> String {
format!("{:?}-{:?}-{:?}", name, version, architecture)
}

#[inline]
fn gen_key_for_pkgbase(pkgbase: &models::PkgBase) -> String {
format!("{:?}-{:?}", pkgbase.name, pkgbase.version)
Self::gen_key(&pkgbase.name, &pkgbase.version, &pkgbase.architecture)
}

#[inline]
fn gen_key_for_pkggroup(pkggroup: &PkgGroup) -> String {
format!("{:?}-{:?}", pkggroup.name, pkggroup.version)
Self::gen_key(&pkggroup.name, &pkggroup.version, &pkggroup.architecture)
}

// returns Some(()) if the group was already known and remove
Expand Down Expand Up @@ -87,124 +92,140 @@ fn pkggroup_to_newpkgbase(group: &PkgGroup) -> Result<models::NewPkgBase> {
}

fn sync(import: &SuiteImport, connection: &SqliteConnection) -> Result<()> {
let distro = &import.distro;
let suite = &import.suite;

info!("received submitted artifact groups {:?}", import.groups.len());
let mut new_namespace = NewArtifactNamespace::new();

info!("loading existing artifact groups from database...");
let mut current_namespace = CurrentArtifactNamespace::load_current_namespace_from_database(distro, suite, connection)?;
info!("found existing artifact groups: len={}", current_namespace.pkgbases.len());

info!("checking groups already in the database...");
let mut num_already_in_database = 0;
for group in &import.groups {
trace!("received group during import: {:?}", group);
if current_namespace.mark_pkggroup_still_present(group).is_some() {
num_already_in_database += 1;
} else {
new_namespace.add(group.clone());
connection.transaction::<(), _, _>(|| {
let distro = &import.distro;
let suite = &import.suite;

info!("received submitted artifact groups {:?}", import.groups.len());
let mut new_namespace = NewArtifactNamespace::new();

info!("loading existing artifact groups from database...");
let mut current_namespace = CurrentArtifactNamespace::load_current_namespace_from_database(distro, suite, connection)?;
info!("found existing artifact groups: len={}", current_namespace.pkgbases.len());

info!("checking groups already in the database...");
let mut num_already_in_database = 0;
for group in &import.groups {
trace!("received group during import: {:?}", group);
if current_namespace.mark_pkggroup_still_present(group).is_some() {
num_already_in_database += 1;
} else {
new_namespace.add(group.clone());
}
}
}
info!("found groups already in database: len={}", num_already_in_database);
info!("found groups that need to be added to database: len={}", new_namespace.groups.len());
info!("found groups no longer present: len={}", current_namespace.pkgbases.len());

for (key, pkgbase_to_remove) in current_namespace.pkgbases {
debug!("deleting old group with key={:?}", key);
models::PkgBase::delete(pkgbase_to_remove.id, connection)
.with_context(|| anyhow!("Failed to delete pkgbase with key={:?}", key))?;
}

// inserting new groups
let mut progress_group_insert = 0;
for group_batch in new_namespace.groups.chunks(1_000) {
progress_group_insert += group_batch.len();
info!("inserting new groups in batch: {}/{}", progress_group_insert, new_namespace.groups.len());
let group_batch = group_batch.iter()
.map(pkggroup_to_newpkgbase)
.collect::<Result<Vec<_>>>()?;
if log::log_enabled!(log::Level::Trace) {
for group in &group_batch {
trace!("group in this batch: {:?}", group);
info!("found groups already in database: len={}", num_already_in_database);
info!("found groups that need to be added to database: len={}", new_namespace.groups.len());
info!("found groups no longer present: len={}", current_namespace.pkgbases.len());

let pkgbase_delete_queue = current_namespace.pkgbases.values().map(|x| x.id).collect::<Vec<_>>();

// deleting groups no longer present
let mut progress_group_delete = 0;
for delete_batch in pkgbase_delete_queue.chunks(1_000) {
progress_group_delete += delete_batch.len();
info!("deleting groups in batch: {}/{}", progress_group_delete, pkgbase_delete_queue.len());

if log::log_enabled!(log::Level::Trace) {
for pkgbase in delete_batch {
trace!("pkgbase in this batch: {:?}", pkgbase);
}
}

models::PkgBase::delete_batch(delete_batch, connection)
.with_context(|| anyhow!("Failed to delete pkgbases"))?;
}
models::NewPkgBase::insert_batch(&group_batch, connection)?;
}

// detecting pkgbase ids for new artifacts
let mut progress_pkgbase_detect = 0;
let mut backlog_insert_pkgs = Vec::new();
let mut backlog_insert_queue = Vec::new();
for group_batch in new_namespace.groups.chunks(1_000) {
progress_pkgbase_detect += group_batch.len();
info!("detecting pkgbase ids for new artifacts: {}/{}", progress_pkgbase_detect, new_namespace.groups.len());
for group in group_batch {
debug!("searching for pkgbases {:?} {:?} {:?} {:?} {:?}", group.name, group.version, distro, suite, group.architecture);
let pkgbases = models::PkgBase::get_by(&group.name,
distro,
suite,
Some(&group.version),
Some(&group.architecture),
connection)?;

if pkgbases.len() != 1 {
bail!("Failed to determine pkgbase in database for grouop (expected=1, found={}): {:?}", pkgbases.len(), group);
}
let pkgbase_id = pkgbases[0].id;

for artifact in &group.artifacts {
backlog_insert_pkgs.push(models::NewPackage {
pkgbase_id,
name: artifact.name.clone(),
version: artifact.version.clone(),
status: Status::Unknown.to_string(),
distro: distro.clone(),
suite: suite.clone(),
architecture: group.architecture.clone(),
artifact_url: artifact.url.clone(),
build_id: None,
built_at: None,
has_diffoscope: false,
has_attestation: false,
checksum: None,
});
// inserting new groups
let mut progress_group_insert = 0;
for group_batch in new_namespace.groups.chunks(1_000) {
progress_group_insert += group_batch.len();
info!("inserting new groups in batch: {}/{}", progress_group_insert, new_namespace.groups.len());
let group_batch = group_batch.iter()
.map(pkggroup_to_newpkgbase)
.collect::<Result<Vec<_>>>()?;
if log::log_enabled!(log::Level::Trace) {
for group in &group_batch {
trace!("group in this batch: {:?}", group);
}
}
models::NewPkgBase::insert_batch(&group_batch, connection)?;
}

backlog_insert_queue.push(models::NewQueued::new(pkgbase_id,
group.version.clone(),
distro.to_string(),
DEFAULT_QUEUE_PRIORITY));
// detecting pkgbase ids for new artifacts
let mut progress_pkgbase_detect = 0;
let mut backlog_insert_pkgs = Vec::new();
let mut backlog_insert_queue = Vec::new();
for group_batch in new_namespace.groups.chunks(1_000) {
progress_pkgbase_detect += group_batch.len();
info!("detecting pkgbase ids for new artifacts: {}/{}", progress_pkgbase_detect, new_namespace.groups.len());
for group in group_batch {
debug!("searching for pkgbases {:?} {:?} {:?} {:?} {:?}", group.name, group.version, distro, suite, group.architecture);
let pkgbases = models::PkgBase::get_by(&group.name,
distro,
suite,
Some(&group.version),
Some(&group.architecture),
connection)?;

if pkgbases.len() != 1 {
bail!("Failed to determine pkgbase in database for grouop (expected=1, found={}): {:?}", pkgbases.len(), group);
}
let pkgbase_id = pkgbases[0].id;

for artifact in &group.artifacts {
backlog_insert_pkgs.push(models::NewPackage {
pkgbase_id,
name: artifact.name.clone(),
version: artifact.version.clone(),
status: Status::Unknown.to_string(),
distro: distro.clone(),
suite: suite.clone(),
architecture: group.architecture.clone(),
artifact_url: artifact.url.clone(),
build_id: None,
built_at: None,
has_diffoscope: false,
has_attestation: false,
checksum: None,
});
}

backlog_insert_queue.push(models::NewQueued::new(pkgbase_id,
group.version.clone(),
distro.to_string(),
DEFAULT_QUEUE_PRIORITY));
}
}
}

// inserting new packages
let mut progress_pkg_inserts = 0;
for pkg_batch in backlog_insert_pkgs.chunks(1_000) {
progress_pkg_inserts += pkg_batch.len();
info!("inserting new packages in batch: {}/{}", progress_pkg_inserts, backlog_insert_pkgs.len());
if log::log_enabled!(log::Level::Trace) {
for pkg in pkg_batch {
trace!("pkg in this batch: {:?}", pkg);
// inserting new packages
let mut progress_pkg_inserts = 0;
for pkg_batch in backlog_insert_pkgs.chunks(1_000) {
progress_pkg_inserts += pkg_batch.len();
info!("inserting new packages in batch: {}/{}", progress_pkg_inserts, backlog_insert_pkgs.len());
if log::log_enabled!(log::Level::Trace) {
for pkg in pkg_batch {
trace!("pkg in this batch: {:?}", pkg);
}
}
models::NewPackage::insert_batch(pkg_batch, connection)?;
}
models::NewPackage::insert_batch(pkg_batch, connection)?;
}

// inserting to queue
// TODO: check if queueing has been disabled in the request, eg. to initially fill the database
let mut progress_queue_inserts = 0;
for queue_batch in backlog_insert_queue.chunks(1_000) {
progress_queue_inserts += queue_batch.len();
info!("inserting to queue in batch: {}/{}", progress_queue_inserts, backlog_insert_queue.len());
if log::log_enabled!(log::Level::Trace) {
for queued in queue_batch {
trace!("queued in this batch: {:?}", queued);
// inserting to queue
// TODO: check if queueing has been disabled in the request, eg. to initially fill the database
let mut progress_queue_inserts = 0;
for queue_batch in backlog_insert_queue.chunks(1_000) {
progress_queue_inserts += queue_batch.len();
info!("inserting to queue in batch: {}/{}", progress_queue_inserts, backlog_insert_queue.len());
if log::log_enabled!(log::Level::Trace) {
for queued in queue_batch {
trace!("queued in this batch: {:?}", queued);
}
}
models::Queued::insert_batch(queue_batch, connection)?;
}
models::Queued::insert_batch(queue_batch, connection)?;
}

Ok(())
})?;

info!("successfully synced import to database");

Expand Down