Skip to content

Commit

Permalink
Refactor: TransactionManager delegation without BC
Browse files Browse the repository at this point in the history
SQLite implementation is currently WIP
  • Loading branch information
mpyw committed Aug 12, 2024
1 parent 4f9d3f9 commit 48dbba7
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 61 deletions.
5 changes: 2 additions & 3 deletions sqlx-core/src/any/connection/backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::any::{Any, AnyArguments, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo};
use crate::describe::Describe;
use crate::Error;
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
Expand Down Expand Up @@ -35,13 +34,13 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {

fn start_rollback(&mut self);

/// Returns the current transaction depth asynchronously.
/// Returns the current transaction depth.
///
/// Transaction depth indicates the level of nested transactions:
/// - Level 0: No active transaction.
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>>;
fn get_transaction_depth(&self) -> usize;

/// The number of statements currently cached in the connection.
fn cached_statements_size(&self) -> usize {
Expand Down
12 changes: 5 additions & 7 deletions sqlx-core/src/any/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_core::future::BoxFuture;

use crate::any::{Any, AnyConnectOptions};
use crate::connection::{AsyncTransactionDepth, ConnectOptions, Connection};
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;

use crate::database::Database;
Expand Down Expand Up @@ -90,6 +90,10 @@ impl Connection for AnyConnection {
Transaction::begin(self)
}

fn get_transaction_depth(&self) -> usize {
self.backend.get_transaction_depth()
}

fn cached_statements_size(&self) -> usize {
self.backend.cached_statements_size()
}
Expand All @@ -112,9 +116,3 @@ impl Connection for AnyConnection {
self.backend.should_flush()
}
}

impl AsyncTransactionDepth for AnyConnection {
fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>> {
self.backend.get_transaction_depth()
}
}
5 changes: 5 additions & 0 deletions sqlx-core/src/any/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures_util::future::BoxFuture;

use crate::any::{Any, AnyConnection};
use crate::database::Database;
use crate::error::Error;
use crate::transaction::TransactionManager;

Expand All @@ -24,4 +25,8 @@ impl TransactionManager for AnyTransactionManager {
fn start_rollback(conn: &mut AnyConnection) {
conn.backend.start_rollback()
}

fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
conn.backend.get_transaction_depth()
}
}
43 changes: 22 additions & 21 deletions sqlx-core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::database::{Database, HasStatementCache};
use crate::error::Error;

use crate::transaction::Transaction;
use crate::transaction::{Transaction, TransactionManager};
use futures_core::future::BoxFuture;
use log::LevelFilter;
use std::fmt::Debug;
Expand Down Expand Up @@ -49,6 +49,27 @@ pub trait Connection: Send {
where
Self: Sized;

/// Returns the current transaction depth synchronously.
///
/// Transaction depth indicates the level of nested transactions:
/// - Level 0: No active transaction.
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(&self) -> usize {
// Fallback implementation to avoid breaking changes
<Self::Database as Database>::TransactionManager::get_transaction_depth(self)
}

/// Checks if the connection is currently in a transaction.
///
/// This method returns `true` if the current transaction depth is greater than 0,
/// indicating that a transaction is active. It returns `false` if the transaction depth is 0,
/// meaning no transaction is active.
#[inline]
fn is_in_transaction(&self) -> bool {
self.get_transaction_depth() != 0
}

/// Execute the function inside a transaction.
///
/// If the function returns an error, the transaction will be rolled back. If it does not
Expand Down Expand Up @@ -236,23 +257,3 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug +
.log_slow_statements(LevelFilter::Off, Duration::default())
}
}

pub trait TransactionDepth {
/// Returns the current transaction depth synchronously.
///
/// Transaction depth indicates the level of nested transactions:
/// - Level 0: No active transaction.
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(&self) -> usize;
}

pub trait AsyncTransactionDepth {
/// Returns the current transaction depth asynchronously.
///
/// Transaction depth indicates the level of nested transactions:
/// - Level 0: No active transaction.
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>>;
}
8 changes: 8 additions & 0 deletions sqlx-core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ pub trait TransactionManager {

/// Starts to abort the active transaction or restore from the most recent snapshot.
fn start_rollback(conn: &mut <Self::Database as Database>::Connection);

/// Returns the current transaction depth.
///
/// Transaction depth indicates the level of nested transactions:
/// - Level 0: No active transaction.
/// - Level 1: A transaction is active.
/// - Level 2 or higher: A transaction is active and one or more SAVEPOINTs have been created within it.
fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
}

/// An in-progress database transaction or savepoint.
Expand Down
7 changes: 3 additions & 4 deletions sqlx-mysql/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ use sqlx_core::any::{
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
};
use sqlx_core::connection::{Connection, TransactionDepth};
use sqlx_core::connection::Connection;
use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::transaction::TransactionManager;
use sqlx_core::Error;
use std::future;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
Expand Down Expand Up @@ -54,8 +53,8 @@ impl AnyConnectionBackend for MySqlConnection {
MySqlTransactionManager::start_rollback(self)
}

fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>> {
Box::pin(async { Ok(TransactionDepth::get_transaction_depth(self)) })
fn get_transaction_depth(&self) -> usize {
MySqlTransactionManager::get_transaction_depth(self)
}

fn shrink_buffers(&mut self) {
Expand Down
10 changes: 4 additions & 6 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ impl Connection for MySqlConnection {
Transaction::begin(self)
}

fn shrink_buffers(&mut self) {
self.inner.stream.shrink_buffers();
}
}

impl TransactionDepth for MySqlConnection {
fn get_transaction_depth(&self) -> usize {
self.inner.transaction_depth
}

fn shrink_buffers(&mut self) {
self.inner.stream.shrink_buffers();
}
}
4 changes: 4 additions & 0 deletions sqlx-mysql/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ impl TransactionManager for MySqlTransactionManager {
conn.inner.transaction_depth = depth - 1;
}
}

fn get_transaction_depth(conn: &MySqlConnection) -> usize {
conn.inner.transaction_depth
}
}
7 changes: 3 additions & 4 deletions sqlx-postgres/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use std::future;
pub use sqlx_core::any::*;

use crate::type_info::PgType;
use sqlx_core::connection::{Connection, TransactionDepth};
use sqlx_core::connection::Connection;
use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::ext::ustr::UStr;
use sqlx_core::transaction::TransactionManager;
use sqlx_core::Error;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Postgres);

Expand Down Expand Up @@ -53,8 +52,8 @@ impl AnyConnectionBackend for PgConnection {
PgTransactionManager::start_rollback(self)
}

fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>> {
Box::pin(async { Ok(TransactionDepth::get_transaction_depth(self)) })
fn get_transaction_depth(&mut self) -> usize {
PgTransactionManager::get_transaction_depth(self)
}

fn shrink_buffers(&mut self) {
Expand Down
4 changes: 4 additions & 0 deletions sqlx-postgres/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ impl Connection for PgConnection {
Transaction::begin(self)
}

fn get_transaction_depth(&self) -> usize {
self.transaction_depth
}

fn cached_statements_size(&self) -> usize {
self.cache_statement.len()
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-postgres/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures_core::future::BoxFuture;
use sqlx_core::database::Database;

use crate::error::Error;
use crate::executor::Executor;
Expand Down Expand Up @@ -59,6 +60,10 @@ impl TransactionManager for PgTransactionManager {
conn.transaction_depth -= 1;
}
}

fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize {
conn.transaction_depth
}
}

struct Rollback<'c> {
Expand Down
7 changes: 3 additions & 4 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ use sqlx_core::any::{
};

use crate::type_info::DataType;
use sqlx_core::connection::{AsyncTransactionDepth, ConnectOptions, Connection};
use sqlx_core::connection::{ConnectOptions, Connection};
use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::transaction::TransactionManager;
use sqlx_core::Error;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);

Expand Down Expand Up @@ -54,8 +53,8 @@ impl AnyConnectionBackend for SqliteConnection {
SqliteTransactionManager::start_rollback(self)
}

fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>> {
AsyncTransactionDepth::get_transaction_depth(self)
fn get_transaction_depth(&self) -> usize {
SqliteTransactionManager::get_transaction_depth(self)
}

fn shrink_buffers(&mut self) {
Expand Down
10 changes: 4 additions & 6 deletions sqlx-sqlite/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ impl Connection for SqliteConnection {
Transaction::begin(self)
}

fn get_transaction_depth(&self) -> usize {
todo!()
}

fn cached_statements_size(&self) -> usize {
self.worker
.shared
Expand Down Expand Up @@ -426,9 +430,3 @@ impl Statements {
self.temp = None;
}
}

impl AsyncTransactionDepth for SqliteConnection {
fn get_transaction_depth(&mut self) -> BoxFuture<'_, Result<usize, Error>> {
Box::pin(async { Ok(self.lock_handle().await?.guard.transaction_depth) })
}
}
9 changes: 7 additions & 2 deletions sqlx-sqlite/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use futures_core::future::BoxFuture;

use crate::{Sqlite, SqliteConnection};
use sqlx_core::database::Database;
use sqlx_core::error::Error;
use sqlx_core::transaction::TransactionManager;

use crate::{Sqlite, SqliteConnection};

/// Implementation of [`TransactionManager`] for SQLite.
pub struct SqliteTransactionManager;

Expand All @@ -25,4 +26,8 @@ impl TransactionManager for SqliteTransactionManager {
fn start_rollback(conn: &mut SqliteConnection) {
conn.worker.start_rollback().ok();
}

fn get_transaction_depth(_conn: &<Self::Database as Database>::Connection) -> usize {
todo!()
}
}
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ pub use sqlx_core::acquire::Acquire;
pub use sqlx_core::arguments::{Arguments, IntoArguments};
pub use sqlx_core::column::Column;
pub use sqlx_core::column::ColumnIndex;
pub use sqlx_core::connection::{
AsyncTransactionDepth, ConnectOptions, Connection, TransactionDepth,
};
pub use sqlx_core::connection::{ConnectOptions, Connection};
pub use sqlx_core::database::{self, Database};
pub use sqlx_core::describe::Describe;
pub use sqlx_core::executor::{Execute, Executor};
Expand Down
1 change: 0 additions & 1 deletion tests/postgres/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use sqlx::postgres::{
PgPoolOptions, PgRow, PgSeverity, Postgres,
};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_core::connection::TransactionDepth;
use sqlx_core::{bytes::Bytes, error::BoxDynError};
use sqlx_test::{new, pool, setup_if_needed};
use std::env;
Expand Down

0 comments on commit 48dbba7

Please sign in to comment.