Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate fetch_* methods in a trait #3501

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
437 changes: 184 additions & 253 deletions sqlx-core/src/query.rs

Large diffs are not rendered by default.

112 changes: 20 additions & 92 deletions sqlx-core/src/query_as.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::marker::PhantomData;

use either::Either;
use futures_core::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use futures_util::{FutureExt, StreamExt};

use crate::arguments::IntoArguments;
use crate::database::{Database, HasStatementCache};
use crate::encode::Encode;
use crate::error::{BoxDynError, Error};
use crate::executor::{Execute, Executor};
use crate::from_row::FromRow;
use crate::query::{query, query_statement, query_statement_with, query_with_result, Query};
use crate::query::{query, query_statement, query_statement_with, query_with_result, Fetch, Query};
use crate::types::Type;

/// A single SQL query as a prepared statement, mapping results using [`FromRow`].
Expand Down Expand Up @@ -77,44 +77,23 @@ where
}
}

// FIXME: This is very close, nearly 1:1 with `Map`
// noinspection DuplicatedCode
impl<'q, DB, O, A> QueryAs<'q, DB, O, A>
impl<'q, DB, O, A> Fetch<'q, DB> for QueryAs<'q, DB, O, A>
where
DB: Database,
A: 'q + IntoArguments<'q, DB>,
O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
{
/// Execute the query and return the generated results as a stream.
pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
{
// FIXME: this should have used `executor.fetch()` but that's a breaking change
// because this technically allows multiple statements in one query string.
#[allow(deprecated)]
self.fetch_many(executor)
.try_filter_map(|step| async move { Ok(step.right()) })
.boxed()
}
type Output = O;

/// Execute multiple queries and return the generated results as a stream
/// from each query, in a stream.
#[deprecated = "Only the SQLite driver supports multiple statements in one prepared statement and that behavior is deprecated. Use `sqlx::raw_sql()` instead. See https://github.com/launchbadge/sqlx/issues/3108 for discussion."]
pub fn fetch_many<'e, 'c: 'e, E>(
fn fetch_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
) -> BoxStream<'e, Result<Either<DB::QueryResult, Self::Output>, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
Self::Output: 'e,
{
executor
.fetch_many(self.inner)
Expand All @@ -126,76 +105,25 @@ where
.boxed()
}

/// Execute the query and return all the resulting rows collected into a [`Vec`].
///
/// ### Note: beware result set size.
/// This will attempt to collect the full result set of the query into memory.
///
/// To avoid exhausting available memory, ensure the result set has a known upper bound,
/// e.g. using `LIMIT`.
#[inline]
pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
{
self.fetch(executor).try_collect().await
}

/// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
///
/// ### Note: for best performance, ensure the query returns at most one row.
/// Depending on the driver implementation, if your query can return more than one row,
/// it may lead to wasted CPU time and bandwidth on the database server.
///
/// Even when the driver implementation takes this into account, ensuring the query returns at most one row
/// can result in a more optimal query plan.
///
/// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
{
self.fetch_optional(executor)
.await
.and_then(|row| row.ok_or(Error::RowNotFound))
}

/// Execute the query, returning the first row or `None` otherwise.
///
/// ### Note: for best performance, ensure the query returns at most one row.
/// Depending on the driver implementation, if your query can return more than one row,
/// it may lead to wasted CPU time and bandwidth on the database server.
///
/// Even when the driver implementation takes this into account, ensuring the query returns at most one row
/// can result in a more optimal query plan.
///
/// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<O>, Error>
fn fetch_optional<'e, 'c: 'e, E>(
self,
executor: E,
) -> futures_core::future::BoxFuture<'e, Result<Option<Self::Output>, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
Self::Output: 'e + Send + Unpin,
{
let row = executor.fetch_optional(self.inner).await?;
if let Some(row) = row {
O::from_row(&row).map(Some)
} else {
Ok(None)
async {
let row = executor.fetch_optional(self.inner).await?;
if let Some(row) = row {
O::from_row(&row).map(Some)
} else {
Ok(None)
}
}
.boxed()
}
}

Expand Down
101 changes: 13 additions & 88 deletions sqlx-core/src/query_scalar.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use either::Either;
use futures_core::stream::BoxStream;
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{FutureExt, StreamExt, TryStreamExt};

use crate::arguments::IntoArguments;
use crate::database::{Database, HasStatementCache};
use crate::encode::Encode;
use crate::error::{BoxDynError, Error};
use crate::executor::{Execute, Executor};
use crate::from_row::FromRow;
use crate::query::Fetch;
use crate::query_as::{
query_as, query_as_with_result, query_statement_as, query_statement_as_with, QueryAs,
};
Expand Down Expand Up @@ -74,42 +75,24 @@ where
}
}

// FIXME: This is very close, nearly 1:1 with `Map`
// noinspection DuplicatedCode
impl<'q, DB, O, A> QueryScalar<'q, DB, O, A>
impl<'q, DB, O, A> Fetch<'q, DB> for QueryScalar<'q, DB, O, A>
where
DB: Database,
O: Send + Unpin,
A: 'q + IntoArguments<'q, DB>,
(O,): Send + Unpin + for<'r> FromRow<'r, DB::Row>,
{
/// Execute the query and return the generated results as a stream.
#[inline]
pub fn fetch<'e, 'c: 'e, E>(self, executor: E) -> BoxStream<'e, Result<O, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
A: 'e,
O: 'e,
{
self.inner.fetch(executor).map_ok(|it| it.0).boxed()
}
type Output = O;

/// Execute multiple queries and return the generated results as a stream
/// from each query, in a stream.
#[inline]
#[deprecated = "Only the SQLite driver supports multiple statements in one prepared statement and that behavior is deprecated. Use `sqlx::raw_sql()` instead. See https://github.com/launchbadge/sqlx/issues/3108 for discussion."]
pub fn fetch_many<'e, 'c: 'e, E>(
fn fetch_many<'e, 'c: 'e, E>(
self,
executor: E,
) -> BoxStream<'e, Result<Either<DB::QueryResult, O>, Error>>
) -> BoxStream<'e, Result<Either<DB::QueryResult, Self::Output>, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
A: 'e,
O: 'e,
Self::Output: 'e,
{
#[allow(deprecated)]
self.inner
Expand All @@ -118,75 +101,17 @@ where
.boxed()
}

/// Execute the query and return all the resulting rows collected into a [`Vec`].
///
/// ### Note: beware result set size.
/// This will attempt to collect the full result set of the query into memory.
///
/// To avoid exhausting available memory, ensure the result set has a known upper bound,
/// e.g. using `LIMIT`.
#[inline]
pub async fn fetch_all<'e, 'c: 'e, E>(self, executor: E) -> Result<Vec<O>, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
(O,): 'e,
A: 'e,
{
self.inner
.fetch(executor)
.map_ok(|it| it.0)
.try_collect()
.await
}

/// Execute the query, returning the first row or [`Error::RowNotFound`] otherwise.
///
/// ### Note: for best performance, ensure the query returns at most one row.
/// Depending on the driver implementation, if your query can return more than one row,
/// it may lead to wasted CPU time and bandwidth on the database server.
///
/// Even when the driver implementation takes this into account, ensuring the query returns at most one row
/// can result in a more optimal query plan.
///
/// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
#[inline]
pub async fn fetch_one<'e, 'c: 'e, E>(self, executor: E) -> Result<O, Error>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
{
self.inner.fetch_one(executor).map_ok(|it| it.0).await
}

/// Execute the query, returning the first row or `None` otherwise.
///
/// ### Note: for best performance, ensure the query returns at most one row.
/// Depending on the driver implementation, if your query can return more than one row,
/// it may lead to wasted CPU time and bandwidth on the database server.
///
/// Even when the driver implementation takes this into account, ensuring the query returns at most one row
/// can result in a more optimal query plan.
///
/// If your query has a `WHERE` clause filtering a unique column by a single value, you're good.
///
/// Otherwise, you might want to add `LIMIT 1` to your query.
#[inline]
pub async fn fetch_optional<'e, 'c: 'e, E>(self, executor: E) -> Result<Option<O>, Error>
fn fetch_optional<'e, 'c: 'e, E>(
self,
executor: E,
) -> futures_core::future::BoxFuture<'e, Result<Option<Self::Output>, Error>>
where
'q: 'e,
E: 'e + Executor<'c, Database = DB>,
DB: 'e,
O: 'e,
A: 'e,
Self::Output: 'e + Send + Unpin,
{
Ok(self.inner.fetch_optional(executor).await?.map(|it| it.0))
async { Ok(self.inner.fetch_optional(executor).await?.map(|it| it.0)) }.boxed()
}
}

Expand Down
2 changes: 1 addition & 1 deletion sqlx-mysql/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) use sqlx_core::migrate::*;
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::query::query;
use crate::query::{query, Fetch};
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
Expand Down
2 changes: 1 addition & 1 deletion sqlx-mysql/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::connection::Connection;
use crate::error::Error;
use crate::executor::Executor;
use crate::pool::{Pool, PoolOptions};
use crate::query::query;
use crate::query::{query, Fetch};
use crate::query_builder::QueryBuilder;
use crate::query_scalar::query_scalar;
use crate::{MySql, MySqlConnectOptions, MySqlConnection};
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::PgConnection;
use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use sqlx_core::query::Fetch;
use std::ops::{Deref, DerefMut};

/// A mutex-like type utilizing [Postgres advisory locks].
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/connection/describe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::Error;
use crate::ext::ustr::UStr;
use crate::io::StatementId;
use crate::message::{ParameterDescription, RowDescription};
use crate::query::Fetch;
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::statement::PgStatementMetadata;
Expand Down
2 changes: 1 addition & 1 deletion sqlx-postgres/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) use sqlx_core::migrate::{Migrate, MigrateDatabase};
use crate::connection::{ConnectOptions, Connection};
use crate::error::Error;
use crate::executor::Executor;
use crate::query::query;
use crate::query::{query, Fetch};
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::{PgConnectOptions, PgConnection, Postgres};
Expand Down
2 changes: 1 addition & 1 deletion sqlx-postgres/src/testing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::connection::Connection;
use crate::error::Error;
use crate::executor::Executor;
use crate::pool::{Pool, PoolOptions};
use crate::query::query;
use crate::query::{query, Fetch};
use crate::query_scalar::query_scalar;
use crate::{PgConnectOptions, PgConnection, Postgres};

Expand Down
2 changes: 1 addition & 1 deletion sqlx-sqlite/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::fs;
use crate::migrate::MigrateError;
use crate::migrate::{AppliedMigration, Migration};
use crate::migrate::{Migrate, MigrateDatabase};
use crate::query::query;
use crate::query::{query, Fetch};
use crate::query_as::query_as;
use crate::{Sqlite, SqliteConnectOptions, SqliteConnection, SqliteJournalMode};
use futures_core::future::BoxFuture;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use sqlx_core::from_row::FromRow;
pub use sqlx_core::pool::{self, Pool};
#[doc(hidden)]
pub use sqlx_core::query::query_with_result as __query_with_result;
pub use sqlx_core::query::{query, query_with};
pub use sqlx_core::query::{query, query_with, Fetch};
pub use sqlx_core::query_as::{query_as, query_as_with};
pub use sqlx_core::query_builder::{self, QueryBuilder};
#[doc(hidden)]
Expand Down
2 changes: 1 addition & 1 deletion tests/any/any.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use sqlx::any::AnyRow;
use sqlx::{Any, Connection, Executor, Row};
use sqlx::{Any, Connection, Executor, Fetch, Row};
use sqlx_test::new;

#[sqlx_macros::test]
Expand Down
2 changes: 1 addition & 1 deletion tests/any/pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use sqlx::any::{AnyConnectOptions, AnyPoolOptions};
use sqlx::Executor;
use sqlx::{Executor, Fetch};
use std::sync::{
atomic::{AtomicI32, AtomicUsize, Ordering},
Arc, Mutex,
Expand Down
2 changes: 1 addition & 1 deletion tests/postgres/derives.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::TryStreamExt;
use sqlx::postgres::types::PgRange;
use sqlx::{Connection, Executor, FromRow, Postgres};
use sqlx::{Connection, Executor, Fetch, FromRow, Postgres};
use sqlx_postgres::PgHasArrayType;
use sqlx_test::{new, test_type};
use std::fmt::Debug;
Expand Down
Loading