Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 93 additions & 21 deletions src/connection_like/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use futures_util::FutureExt;

use crate::{BoxFuture, Pool};

/// Connection.
/// Inner [`Connection`] representation.
#[derive(Debug)]
pub enum Connection<'a, 't: 'a> {
pub(crate) enum ConnectionInner<'a, 't: 'a> {
/// Just a connection.
Conn(crate::Conn),
/// Mutable reference to a connection.
Expand All @@ -21,55 +21,127 @@ pub enum Connection<'a, 't: 'a> {
Tx(&'a mut crate::Transaction<'t>),
}

impl std::ops::Deref for ConnectionInner<'_, '_> {
type Target = crate::Conn;

fn deref(&self) -> &Self::Target {
match self {
ConnectionInner::Conn(ref conn) => conn,
ConnectionInner::ConnMut(conn) => conn,
ConnectionInner::Tx(tx) => tx.0.deref(),
}
}
}

impl std::ops::DerefMut for ConnectionInner<'_, '_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
ConnectionInner::Conn(conn) => conn,
ConnectionInner::ConnMut(conn) => conn,
ConnectionInner::Tx(tx) => tx.0.inner.deref_mut(),
}
}
}

/// Some connection.
///
/// This could at least be queried.
#[derive(Debug)]
pub struct Connection<'a, 't: 'a> {
pub(crate) inner: ConnectionInner<'a, 't>,
}

impl Connection<'_, '_> {
#[inline]
pub(crate) fn as_mut(&mut self) -> &mut crate::Conn {
&mut self.inner
}
}

impl<'a, 't: 'a> Connection<'a, 't> {
/// Borrows a [`Connection`] rather than consuming it.
///
/// This is useful to allow calling [`Query`] methods while still retaining
/// ownership of the original connection.
///
/// # Examples
///
/// ```no_run
/// # use mysql_async::Connection;
/// # use mysql_async::prelude::Query;
/// async fn connection_by_ref(mut connection: Connection<'_, '_>) {
/// // Perform some query
/// "SELECT 1".ignore(connection.by_ref()).await.unwrap();
/// // Perform another query.
/// // We can only do this because we used `by_ref` earlier.
/// "SELECT 2".ignore(connection).await.unwrap();
/// }
/// ```
///
/// [`Query`]: crate::prelude::Query
pub fn by_ref(&mut self) -> Connection<'_, '_> {
Connection {
inner: ConnectionInner::ConnMut(self.as_mut()),
}
}
}

impl From<crate::Conn> for Connection<'static, 'static> {
fn from(conn: crate::Conn) -> Self {
Connection::Conn(conn)
Self {
inner: ConnectionInner::Conn(conn),
}
}
}

impl<'a> From<&'a mut crate::Conn> for Connection<'a, 'static> {
fn from(conn: &'a mut crate::Conn) -> Self {
Connection::ConnMut(conn)
Self {
inner: ConnectionInner::ConnMut(conn),
}
}
}

impl<'a, 't> From<&'a mut crate::Transaction<'t>> for Connection<'a, 't> {
fn from(tx: &'a mut crate::Transaction<'t>) -> Self {
Connection::Tx(tx)
Self {
inner: ConnectionInner::Tx(tx),
}
}
}

impl std::ops::Deref for Connection<'_, '_> {
type Target = crate::Conn;

fn deref(&self) -> &Self::Target {
match self {
Connection::Conn(ref conn) => conn,
Connection::ConnMut(conn) => conn,
Connection::Tx(tx) => tx.0.deref(),
}
}
}

impl std::ops::DerefMut for Connection<'_, '_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
Connection::Conn(conn) => conn,
Connection::ConnMut(conn) => conn,
Connection::Tx(tx) => tx.0.deref_mut(),
}
&self.inner
}
}

/// Result of `ToConnection::to_connection` call.
/// Result of a [`ToConnection::to_connection`] call.
pub enum ToConnectionResult<'a, 't: 'a> {
/// Connection is immediately available.
Immediate(Connection<'a, 't>),
/// We need some time to get a connection and the operation itself may fail.
Mediate(BoxFuture<'a, Connection<'a, 't>>),
}

impl<'a, 't: 'a> ToConnectionResult<'a, 't> {
/// Resolves `self` to a connection.
#[inline]
pub async fn resolve(self) -> crate::Result<Connection<'a, 't>> {
match self {
ToConnectionResult::Immediate(immediate) => Ok(immediate),
ToConnectionResult::Mediate(mediate) => mediate.await,
}
}
}

/// Everything that can be given in exchange to a connection.
///
/// Note that you could obtain a `'static` connection by giving away `Conn` or `Pool`.
pub trait ToConnection<'a, 't: 'a>: Send {
/// Converts self to a connection.
fn to_connection(self) -> ToConnectionResult<'a, 't>;
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/read_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl Future for ReadPacket<'_, '_> {
type Output = std::result::Result<PooledBuf, IoError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let packet_opt = match self.0.stream_mut() {
let packet_opt = match self.0.as_mut().stream_mut() {
Ok(stream) => ready!(Pin::new(stream).poll_next(cx)).transpose()?,
// `ConnectionClosed` error.
Err(_) => None,
};

match packet_opt {
Some(packet) => {
self.0.touch();
self.0.as_mut().touch();
Poll::Ready(Ok(packet))
}
None => Poll::Ready(Err(Error::new(
Expand Down
2 changes: 1 addition & 1 deletion src/io/write_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Future for WritePacket<'_, '_> {
ref mut data,
} = *self;

match conn.stream_mut() {
match conn.as_mut().stream_mut() {
Ok(stream) => {
if data.is_some() {
let codec = Pin::new(stream.codec.as_mut().expect("must be here"));
Expand Down
16 changes: 5 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,18 @@ pub use self::queryable::stmt::Statement;
#[doc(inline)]
pub use self::conn::pool::Metrics;

#[doc(inline)]
pub use crate::connection_like::{Connection, ToConnectionResult};

/// Futures used in this crate
pub mod futures {
pub use crate::conn::pool::futures::{DisconnectPool, GetConn};
}

/// Traits used in this crate
pub mod prelude {
#[doc(inline)]
pub use crate::connection_like::ToConnection;
#[doc(inline)]
pub use crate::local_infile_handler::GlobalHandler;
#[doc(inline)]
Expand Down Expand Up @@ -596,17 +601,6 @@ pub mod prelude {
pub trait StatementLike: crate::queryable::stmt::StatementLike {}
impl<T: crate::queryable::stmt::StatementLike> StatementLike for T {}

/// Everything that is a connection.
///
/// Note that you could obtain a `'static` connection by giving away `Conn` or `Pool`.
pub trait ToConnection<'a, 't: 'a>: crate::connection_like::ToConnection<'a, 't> {}
// explicitly implemented because of rusdoc
impl<'a> ToConnection<'a, 'static> for &'a crate::Pool {}
impl ToConnection<'static, 'static> for crate::Pool {}
impl ToConnection<'static, 'static> for crate::Conn {}
impl<'a> ToConnection<'a, 'static> for &'a mut crate::Conn {}
impl<'a, 't> ToConnection<'a, 't> for &'a mut crate::Transaction<'t> {}

/// Trait for protocol markers [`crate::TextProtocol`] and [`crate::BinaryProtocol`].
pub trait Protocol: crate::queryable::Protocol {}
impl Protocol for crate::BinaryProtocol {}
Expand Down
27 changes: 9 additions & 18 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::borrow::Cow;
use futures_util::FutureExt;

use crate::{
connection_like::ToConnectionResult,
from_row,
prelude::{FromRow, StatementLike, ToConnection},
tracing_utils::LevelInfo,
Expand Down Expand Up @@ -217,11 +216,8 @@ impl<Q: AsQuery> Query for Q {
C: ToConnection<'a, 't> + 'a,
{
async move {
let mut conn = match conn.to_connection() {
ToConnectionResult::Immediate(conn) => conn,
ToConnectionResult::Mediate(fut) => fut.await?,
};
conn.raw_query::<'_, _, LevelInfo>(self).await?;
let mut conn = conn.to_connection().resolve().await?;
conn.as_mut().raw_query::<'_, _, LevelInfo>(self).await?;
Ok(QueryResult::new(conn))
}
.boxed()
Expand Down Expand Up @@ -264,14 +260,12 @@ where
C: ToConnection<'a, 't> + 'a,
{
async move {
let mut conn = match conn.to_connection() {
ToConnectionResult::Immediate(conn) => conn,
ToConnectionResult::Mediate(fut) => fut.await?,
};
let mut conn = conn.to_connection().resolve().await?;

let statement = conn.get_statement(self.query).await?;
let statement = conn.as_mut().get_statement(self.query).await?;

conn.execute_statement(&statement, self.params.into())
conn.as_mut()
.execute_statement(&statement, self.params.into())
.await?;

Ok(QueryResult::new(conn))
Expand Down Expand Up @@ -324,15 +318,12 @@ where
C: ToConnection<'a, 't> + 'a,
{
async move {
let mut conn = match conn.to_connection() {
ToConnectionResult::Immediate(conn) => conn,
ToConnectionResult::Mediate(fut) => fut.await?,
};
let mut conn = conn.to_connection().resolve().await?;

let statement = conn.get_statement(self.query).await?;
let statement = conn.as_mut().get_statement(self.query).await?;

for params in self.params {
conn.execute_statement(&statement, params).await?;
conn.as_mut().execute_statement(&statement, params).await?;
}

Ok(())
Expand Down
65 changes: 58 additions & 7 deletions src/queryable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
query::AsQuery,
queryable::query_result::ResultSetMeta,
tracing_utils::{LevelInfo, LevelTrace, TracingLevel},
BoxFuture, Column, Conn, Params, ResultSetStream, Row,
BoxFuture, Column, Conn, Connection, Params, ResultSetStream, Row,
};

pub mod query_result;
Expand Down Expand Up @@ -537,7 +537,7 @@ impl Queryable for Conn {

impl Queryable for Transaction<'_> {
fn ping(&mut self) -> BoxFuture<'_, ()> {
self.0.ping()
self.0.as_mut().ping()
}

fn query_iter<'a, Q>(
Expand All @@ -547,18 +547,18 @@ impl Queryable for Transaction<'_> {
where
Q: AsQuery + 'a,
{
self.0.query_iter(query)
self.0.as_mut().query_iter(query)
}

fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
where
Q: AsQuery + 'a,
{
self.0.prep(query)
self.0.as_mut().prep(query)
}

fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
self.0.close(stmt)
self.0.as_mut().close(stmt)
}

fn exec_iter<'a: 's, 's, Q, P>(
Expand All @@ -570,7 +570,7 @@ impl Queryable for Transaction<'_> {
Q: StatementLike + 'a,
P: Into<Params>,
{
self.0.exec_iter(stmt, params)
self.0.as_mut().exec_iter(stmt, params)
}

fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
Expand All @@ -580,7 +580,58 @@ impl Queryable for Transaction<'_> {
I::IntoIter: Send,
P: Into<Params> + Send,
{
self.0.exec_batch(stmt, params_iter)
self.0.as_mut().exec_batch(stmt, params_iter)
}
}

impl<'c, 't: 'c> Queryable for Connection<'c, 't> {
#[inline]
fn ping(&mut self) -> BoxFuture<'_, ()> {
self.as_mut().ping()
}

#[inline]
fn query_iter<'a, Q>(
&'a mut self,
query: Q,
) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
where
Q: AsQuery + 'a,
{
self.as_mut().query_iter(query)
}

fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
where
Q: AsQuery + 'a,
{
self.as_mut().prep(query)
}

fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
self.as_mut().close(stmt)
}

fn exec_iter<'a: 's, 's, Q, P>(
&'a mut self,
stmt: Q,
params: P,
) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
where
Q: StatementLike + 'a,
P: Into<Params>,
{
self.as_mut().exec_iter(stmt, params)
}

fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
where
S: StatementLike + 'b,
I: IntoIterator<Item = P> + Send + 'b,
I::IntoIter: Send,
P: Into<Params> + Send,
{
self.as_mut().exec_batch(stmt, params_iter)
}
}

Expand Down
Loading
Loading