Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: nextest cleanup race condition #3334

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ sqlx = { version = "=0.8.1", path = ".", default-features = false }

# Common type integrations shared by multiple driver crates.
# These are optional unless enabled in a workspace crate.
base64 = { version = "0.22.0", default-features = false, features = ["std"] }
bonega marked this conversation as resolved.
Show resolved Hide resolved
bigdecimal = "0.4.0"
bit-vec = "0.6.3"
chrono = { version = "0.4.34", default-features = false, features = ["std", "clock"] }
Expand Down
1 change: 1 addition & 0 deletions sqlx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async-io = { version = "1.9.0", optional = true }
paste = "1.0.6"
atoi = "2.0"

base64 = { workspace = true }
bytes = "1.1.0"
byteorder = { version = "1.4.3", default-features = false, features = ["std"] }
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
Expand Down
23 changes: 14 additions & 9 deletions sqlx-core/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use futures_core::future::BoxFuture;

use base64::{engine::general_purpose::STANDARD, Engine as _};
pub use fixtures::FixtureSnapshot;
use sha2::{Digest, Sha512};

use crate::connection::{ConnectOptions, Connection};
use crate::database::Database;
Expand All @@ -28,19 +30,22 @@ pub trait TestSupport: Database {

fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>>;

/// Cleanup any test databases that are no longer in-use.
///
/// Returns a count of the databases deleted, if possible.
///
/// The implementation may require `DATABASE_URL` to be set in order to manage databases.
/// The user credentials it contains must have the privilege to create and drop databases.
fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>>;
abonander marked this conversation as resolved.
Show resolved Hide resolved

/// Take a snapshot of the current state of the database (data only).
///
/// This snapshot can then be used to generate test fixtures.
fn snapshot(conn: &mut Self::Connection)
-> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>>;

/// Generate a unique database name for the given test path.
fn db_name(args: &TestArgs) -> String {
let mut hasher = Sha512::new();
hasher.update(args.test_path.as_bytes());
let hash = hasher.finalize();
let hash = STANDARD.encode(&hash[..39]);
abonander marked this conversation as resolved.
Show resolved Hide resolved
let db_name = format!("_sqlx_test_{}", hash);
debug_assert!(db_name.len() == 63);
db_name
}
}

pub struct TestFixture {
Expand Down Expand Up @@ -217,7 +222,7 @@ where
let res = test_fn(test_context.pool_opts, test_context.connect_opts).await;

if res.is_success() {
if let Err(e) = DB::cleanup_test(&test_context.db_name).await {
if let Err(e) = DB::cleanup_test(&DB::db_name(&args)).await {
eprintln!(
"failed to delete database {:?}: {}",
test_context.db_name, e
Expand Down
16 changes: 0 additions & 16 deletions sqlx-mysql/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,6 @@ impl TestSupport for MySql {
})
}

fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
Box::pin(async move {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");

let mut conn = MySqlConnection::connect(&url).await?;

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

let num_deleted = do_cleanup(&mut conn, now).await?;
let _ = conn.close().await;
Ok(Some(num_deleted))
})
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to check this trait implementation for proper quoting of the database name. It doesn't look like it does it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bonega don't forget this. Even with URL_SAFE the database name still needs to be quoted because it may contain dashes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked and it seems to work.

A test in examples/postgres/axum-social-with-tests has the test_path: comment::test_create_comment
Which becomes the database name of _sqlx_test_hsAQWz-87IRR7sjnVDeMcHtoDLU3QyT6yWizSbWKFvNwoBt6Q60I

It is deleted correctly even though it contains dashes.

I see now that you're referencing the mysql implementation which I haven't started yet.
I will try and do it and the sqlite today.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to update that I am fighting with running the tests on (both my branch and main)

fn snapshot(
_conn: &mut Self::Connection,
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
Expand Down
98 changes: 16 additions & 82 deletions sqlx-postgres/src/testing/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
use std::fmt::Write;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
use std::time::Duration;

use futures_core::future::BoxFuture;

use once_cell::sync::OnceCell;

use crate::connection::Connection;

use crate::error::Error;
use crate::executor::Executor;
use crate::pool::{Pool, PoolOptions};
use crate::query::query;
use crate::query_scalar::query_scalar;
use crate::{PgConnectOptions, PgConnection, Postgres};

pub(crate) use sqlx_core::testing::*;

// Using a blocking `OnceCell` here because the critical sections are short.
static MASTER_POOL: OnceCell<Pool<Postgres>> = OnceCell::new();
// Automatically delete any databases created before the start of the test binary.
static DO_CLEANUP: AtomicBool = AtomicBool::new(true);

impl TestSupport for Postgres {
fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
Expand All @@ -49,22 +43,6 @@ impl TestSupport for Postgres {
})
}

fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
Box::pin(async move {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");

let mut conn = PgConnection::connect(&url).await?;

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

let num_deleted = do_cleanup(&mut conn, now).await?;
let _ = conn.close().await;
Ok(Some(num_deleted))
})
}

fn snapshot(
_conn: &mut Self::Connection,
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
Expand Down Expand Up @@ -135,30 +113,20 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
)
.await?;

// Record the current time _before_ we acquire the `DO_CLEANUP` permit. This
// prevents the first test thread from accidentally deleting new test dbs
// created by other test threads if we're a bit slow.
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();

// Only run cleanup if the test binary just started.
if DO_CLEANUP.swap(false, Ordering::SeqCst) {
do_cleanup(&mut conn, now).await?;
}
let db_name = <Postgres as TestSupport>::db_name(args);
do_cleanup(&mut conn, &db_name).await?;

let new_db_name: String = query_scalar(
query(
r#"
insert into _sqlx_test.databases(db_name, test_path)
select '_sqlx_test_' || nextval('_sqlx_test.database_ids'), $1
returning db_name
insert into _sqlx_test.databases(db_name, test_path) values ($1, $2)
"#,
)
.bind(&db_name)
.bind(args.test_path)
.fetch_one(&mut *conn)
.execute(&mut *conn)
.await?;

conn.execute(&format!("create database {new_db_name:?}")[..])
conn.execute(&format!("create database {db_name:?}")[..])
.await?;

Ok(TestContext {
Expand All @@ -174,52 +142,18 @@ async fn test_context(args: &TestArgs) -> Result<TestContext<Postgres>, Error> {
.connect_options()
.deref()
.clone()
.database(&new_db_name),
db_name: new_db_name,
.database(&db_name),
db_name,
})
}

async fn do_cleanup(conn: &mut PgConnection, created_before: Duration) -> Result<usize, Error> {
// since SystemTime is not monotonic we added a little margin here to avoid race conditions with other threads
let created_before = i64::try_from(created_before.as_secs()).unwrap() - 2;

let delete_db_names: Vec<String> = query_scalar(
"select db_name from _sqlx_test.databases \
where created_at < (to_timestamp($1) at time zone 'UTC')",
)
.bind(created_before)
.fetch_all(&mut *conn)
.await?;

if delete_db_names.is_empty() {
return Ok(0);
}

let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
let delete_db_names = delete_db_names.into_iter();

let mut command = String::new();

for db_name in delete_db_names {
command.clear();
writeln!(command, "drop database if exists {db_name:?};").ok();
match conn.execute(&*command).await {
Ok(_deleted) => {
deleted_db_names.push(db_name);
}
// Assume a database error just means the DB is still in use.
Err(Error::Database(dbe)) => {
eprintln!("could not clean test database {db_name:?}: {dbe}")
}
// Bubble up other errors
Err(e) => return Err(e),
}
}

query("delete from _sqlx_test.databases where db_name = any($1::text[])")
.bind(&deleted_db_names)
async fn do_cleanup(conn: &mut PgConnection, db_name: &str) -> Result<(), Error> {
let delete_db_command = format!("drop database if exists {db_name:?};");
conn.execute(&*delete_db_command).await?;
query("delete from _sqlx_test.databases where db_name = $1::text")
.bind(&db_name)
.execute(&mut *conn)
.await?;

Ok(deleted_db_names.len())
Ok(())
}
7 changes: 0 additions & 7 deletions sqlx-sqlite/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ impl TestSupport for Sqlite {
Box::pin(async move { Ok(crate::fs::remove_file(db_name).await?) })
}

fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
Box::pin(async move {
crate::fs::remove_dir_all(BASE_PATH).await?;
Ok(None)
})
}

fn snapshot(
_conn: &mut Self::Connection,
) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
Expand Down