Skip to content

Commit

Permalink
Change DB migration logic to have one transaction per migrations script
Browse files Browse the repository at this point in the history
Executing all migrations in one big transaction is problematic as due
to a few Postgres limitations. For example, `alter type ... add value`
does add a value to an enum, but that new value cannot be used until the
transaction has been committed. But this is something migrations
regularly do. Previously we solved this by copying the type, adjusting
the new one, replacing the old with the new. But that doesn't work as
easily anymore now since functions depend on the type.

Having one transaction per migration should be totally fine. We will
a migration plan more often, but the wasted work is pretty irrelevant.
By making a plan each time, we still make sure that we apply the next
required migration, even when multiple Tobira nodes doing all this at
the same time. So yeah, as far as I can tell, there is still zero chance
for DB corruption, every migration will be executed exactly once (by
whatever process) and processes should still not deadlock.
  • Loading branch information
LukasKalbertodt committed Dec 6, 2023
1 parent 3d1d057 commit 5fc735f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
6 changes: 4 additions & 2 deletions backend/src/cmd/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub(crate) async fn run(shared: &args::Shared, args: &Args) -> Result<()> {
=> println!(" ▸ DB up to date"),
Ok(MigrationPlan::EmptyDb)
=> println!(" ▸ DB is empty, all migrations will be applied"),
Ok(MigrationPlan::Migrate { new_migrations })
=> println!(" ▸ DB is compatible, {new_migrations} new migrations will be applied"),
Ok(m @ MigrationPlan::Migrate { .. }) => println!(
" ▸ DB is compatible, {} new migrations will be applied",
m.missing_migrations(),
),
}
}
print_outcome(&mut any_errors, "MeiliSearch", &meili);
Expand Down
89 changes: 53 additions & 36 deletions backend/src/db/migrations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chrono::{DateTime, Utc, offset::TimeZone};
use once_cell::sync::Lazy;
use std::{collections::BTreeMap, time::Duration, num::NonZeroU64};
use std::{collections::BTreeMap, time::Duration};
use tokio_postgres::{IsolationLevel, Transaction, error::SqlState, Client};

use crate::{prelude::*, db::util::select};
Expand All @@ -17,10 +17,10 @@ pub(crate) enum MigrationPlan {
/// The database is completely up to date and all migrations match.
UpToDate,

/// The DB can be migrated to the state we expect by applying that many new
/// migrations.
/// The DB can be migrated to the state we expect, with `next_migration`
/// being the migration index that needs to be executed next.
Migrate {
new_migrations: NonZeroU64,
next_migration: u64,
},
}

Expand Down Expand Up @@ -52,8 +52,6 @@ impl MigrationPlan {
script: String,
}

debug!("Checking DB migrations");

// Retrieve all active migrations from the DB.
let (selection, mapping) = select!(id, name, applied_on, script);
let query = format!("select {selection} from __db_migrations");
Expand Down Expand Up @@ -114,51 +112,58 @@ impl MigrationPlan {

// We already know that `MIGRATIONS` contains at least as many elements
// as `active_migrations`, therefore we can subtract here.
match NonZeroU64::new(MIGRATIONS.len() as u64 - active_migrations.len() as u64) {
None => Ok(Self::UpToDate),
Some(new_migrations) => Ok(Self::Migrate { new_migrations }),
if MIGRATIONS.len() == active_migrations.len() {
Ok(Self::UpToDate)
} else {
Ok(Self::Migrate { next_migration: active_migrations.len() as u64 + 1 })
}
}

pub(crate) fn missing_migrations(&self) -> u64 {
match self {
MigrationPlan::EmptyDb => MIGRATIONS.len() as u64,
MigrationPlan::UpToDate => 0,
MigrationPlan::Migrate { next_migration }
=> MIGRATIONS.len() as u64 - next_migration + 1,
}
}

/// Executes this plan on the database, bringing it into the state we expect.
pub(crate) async fn execute(&self, tx: &Transaction<'_>) -> Result<()> {
let new_migrations = match self {
/// Executes the next migration in this plan on the database. Returns
/// `done`, i.e. `true` if the DB is up to date after this method call.
pub(crate) async fn execute_next(&self, tx: &Transaction<'_>) -> Result<bool> {
let id = match self {
Self::UpToDate => {
info!("All migrations are already applied: database schema is up to date.");
return Ok(());
return Ok(true);
}
Self::EmptyDb => {
create_meta_table_if_missing(tx).await?;
MIGRATIONS.len() as u64
0
}
Self::Migrate { new_migrations } => new_migrations.get(),
Self::Migrate { next_migration } => *next_migration,
};

// Apply missing migrations in order.
info!("The database is missing {new_migrations} migrations. Applying them now.");
for (id, migration) in MIGRATIONS.range(MIGRATIONS.len() as u64 - new_migrations + 1..) {
debug!("Applying migration '{}-{}' ...", id, migration.name);
trace!("Executing:\n{}", migration.script);

tx.batch_execute(migration.script)
.await
.context(format!("failed to run script for '{}-{}'", id, migration.name))?;

let query = "insert into __db_migrations (id, name, applied_on, script) \
values ($1, $2, now() at time zone 'utc', $3)";
tx.execute(query, &[&(*id as i64), &migration.name, &migration.script])
.await
.context("failed to update __db_migrations")?;
}
let migration = &MIGRATIONS[&id];
debug!("Applying migration '{}-{}' ...", id, migration.name);
trace!("Executing:\n{}", migration.script);

info!("Applied {new_migrations} migrations. DB is up to date now.");
tx.batch_execute(migration.script)
.await
.context(format!("failed to run script for '{}-{}'", id, migration.name))?;

let query = "insert into __db_migrations (id, name, applied_on, script) \
values ($1, $2, now() at time zone 'utc', $3)";
tx.execute(query, &[&(id as i64), &migration.name, &migration.script])
.await
.context("failed to update __db_migrations")?;

Ok(())
Ok(id == MIGRATIONS.len() as u64)
}
}

async fn create_meta_table_if_missing(tx: &Transaction<'_>) -> Result<()> {
debug!("Creating table '__db_migrations' if it does not exist yet...");
trace!("Creating table '__db_migrations' if it does not exist yet...");
tx.batch_execute(include_str!("db-migrations.sql"))
.await
.context("could not create migrations meta table")?;
Expand All @@ -182,6 +187,8 @@ pub async fn migrate(db: &mut Client) -> Result<()> {
// second loop iteration the node will observe that the `__db_migrations`
// table already exists as the transaction of another node is expected to
// have correctly committed by that point.
debug!("Checking DB migrations");
let mut migrations_executed = 0;
loop {
let tx = db.build_transaction()
.isolation_level(IsolationLevel::Serializable)
Expand Down Expand Up @@ -219,12 +226,22 @@ pub async fn migrate(db: &mut Client) -> Result<()> {


// We are now the only process allowed to tinker with migrations. First
// build a plan of what needs to be done and then execute it.
// build a plan of what needs to be done and then execute the next
// migration. We do one migration at a time so that each migration
// script runs in its own transaction. Otherwise certain things
// (like adding a value to an enum) don't work.
let plan = MigrationPlan::build(&tx).await?;
plan.execute(&tx).await?;
let is_done = plan.execute_next(&tx).await?;


match tx.commit().await {
Ok(_) => return Ok(()),
Ok(_) => {
migrations_executed += 1;
if is_done {
info!("Applied {migrations_executed} migrations. DB is up to date now.");
return Ok(());
}
},

Err(e) if e.code() == Some(&SqlState::T_R_SERIALIZATION_FAILURE) => {
let backoff_duration = Duration::from_millis(500);
Expand Down

0 comments on commit 5fc735f

Please sign in to comment.