Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor how migrations are ran #8326

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,11 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let connstr = self.connstr.clone();
let mut connstr = self.connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "apply_config");

let mut client = match Client::connect(connstr.as_str(), NoTls) {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
Expand Down Expand Up @@ -867,6 +871,11 @@ impl ComputeNode {

// Run migrations separately to not hold up cold starts
thread::spawn(move || {
let mut connstr = connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "migrations");

let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
});
Expand Down
1 change: 1 addition & 0 deletions compute_tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;
Expand Down
100 changes: 100 additions & 0 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use anyhow::{Context, Result};
use postgres::Client;
use tracing::info;

pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}

impl<'m> MigrationRunner<'m> {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
Self { client, migrations }
}

fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
.client
.query_one(query, &[])
.context("run_migrations get migration_id")?;

Ok(row.get::<&str, i64>("id"))
}

fn update_migration_id(&mut self) -> Result<()> {
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
self.migrations.len()
);

self.client
.simple_query(&setval)
.context("run_migrations update id")?;

Ok(())
}

fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;

let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;

let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;

let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;

let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;

Ok(())
}

pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;

let mut current_migration: usize = self.get_migration_id()? as usize;
let starting_migration_id = current_migration;

let query = "BEGIN";
self.client
.simple_query(query)
.context("run_migrations begin")?;

while current_migration < self.migrations.len() {
let migration = self.migrations[current_migration];

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
self.client.simple_query(migration).with_context(|| {
format!("run_migration current_migration={}", current_migration)
})?;
}

current_migration += 1;
}

self.update_migration_id()?;

let query = "COMMIT";
self.client
.simple_query(query)
.context("run_migrations commit")?;

info!(
"Ran {} migrations",
(self.migrations.len() - starting_migration_id)
);

Ok(())
}
}
65 changes: 2 additions & 63 deletions compute_tools/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};

use crate::config;
use crate::logger::inlinify;
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;

Expand Down Expand Up @@ -791,69 +792,7 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"),
];

let mut func = || {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
client.simple_query(query)?;

let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
client.simple_query(query)?;

let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
client.simple_query(query)?;

let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
client.simple_query(query)?;

let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
client.simple_query(query)?;
Ok::<_, anyhow::Error>(())
};
func().context("handle_migrations prepare")?;

let query = "SELECT id FROM neon_migration.migration_id";
let row = client
.query_one(query, &[])
.context("handle_migrations get migration_id")?;
let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
let starting_migration_id = current_migration;

let query = "BEGIN";
client
.simple_query(query)
.context("handle_migrations begin")?;

while current_migration < migrations.len() {
let migration = &migrations[current_migration];
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
client.simple_query(migration).with_context(|| {
format!("handle_migrations current_migration={}", current_migration)
})?;
}
current_migration += 1;
}
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
migrations.len()
);
client
.simple_query(&setval)
.context("handle_migrations update id")?;

let query = "COMMIT";
client
.simple_query(query)
.context("handle_migrations commit")?;

info!(
"Ran {} migrations",
(migrations.len() - starting_migration_id)
);
MigrationRunner::new(client, &migrations).run_migrations()?;

Ok(())
}
Expand Down
Loading