Skip to content

Commit a51930f

Browse files
authored
Merge pull request #213 from cloneable/feature/priority-waiting-queue
Turn get_conn waiting queue into FIFO queue.
2 parents e227f0a + b1798b7 commit a51930f

File tree

4 files changed

+125
-53
lines changed

4 files changed

+125
-53
lines changed

src/conn/pool/futures/disconnect_pool.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use futures_core::ready;
1616
use tokio::sync::mpsc::UnboundedSender;
1717

1818
use crate::{
19-
conn::pool::{Inner, Pool},
19+
conn::pool::{Inner, Pool, QueuedWaker, QUEUE_END_ID},
2020
error::Error,
2121
Conn,
2222
};
@@ -50,7 +50,9 @@ impl Future for DisconnectPool {
5050
self.pool_inner.close.store(true, atomic::Ordering::Release);
5151
let mut exchange = self.pool_inner.exchange.lock().unwrap();
5252
exchange.spawn_futures_if_needed(&self.pool_inner);
53-
exchange.waiting.push_back(cx.waker().clone());
53+
exchange
54+
.waiting
55+
.push(QueuedWaker::new(QUEUE_END_ID, cx.waker().clone()));
5456
drop(exchange);
5557

5658
if self.pool_inner.closed.load(atomic::Ordering::Acquire) {

src/conn/pool/futures/get_conn.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use std::{
1616
use futures_core::ready;
1717

1818
use crate::{
19-
conn::{pool::Pool, Conn},
19+
conn::{
20+
pool::{Pool, QueueId},
21+
Conn,
22+
},
2023
error::*,
2124
};
2225

@@ -58,13 +61,15 @@ impl GetConnInner {
5861
#[derive(Debug)]
5962
#[must_use = "futures do nothing unless you `.await` or poll them"]
6063
pub struct GetConn {
64+
pub(crate) queue_id: Option<QueueId>,
6165
pub(crate) pool: Option<Pool>,
6266
pub(crate) inner: GetConnInner,
6367
}
6468

6569
impl GetConn {
6670
pub(crate) fn new(pool: &Pool) -> GetConn {
6771
GetConn {
72+
queue_id: None,
6873
pool: Some(pool.clone()),
6974
inner: GetConnInner::New,
7075
}
@@ -91,23 +96,26 @@ impl Future for GetConn {
9196
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
9297
loop {
9398
match self.inner {
94-
GetConnInner::New => match ready!(Pin::new(self.pool_mut()).poll_new_conn(cx))?
95-
.inner
96-
.take()
97-
{
98-
GetConnInner::Connecting(conn_fut) => {
99-
self.inner = GetConnInner::Connecting(conn_fut);
100-
}
101-
GetConnInner::Checking(conn_fut) => {
102-
self.inner = GetConnInner::Checking(conn_fut);
103-
}
104-
GetConnInner::Done => unreachable!(
105-
"Pool::poll_new_conn never gives out already-consumed GetConns"
106-
),
107-
GetConnInner::New => {
108-
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
99+
GetConnInner::New => {
100+
let queued = self.queue_id.is_some();
101+
let queue_id = *self.queue_id.get_or_insert_with(QueueId::next);
102+
let next =
103+
ready!(Pin::new(self.pool_mut()).poll_new_conn(cx, queued, queue_id))?;
104+
match next {
105+
GetConnInner::Connecting(conn_fut) => {
106+
self.inner = GetConnInner::Connecting(conn_fut);
107+
}
108+
GetConnInner::Checking(conn_fut) => {
109+
self.inner = GetConnInner::Checking(conn_fut);
110+
}
111+
GetConnInner::Done => unreachable!(
112+
"Pool::poll_new_conn never gives out already-consumed GetConns"
113+
),
114+
GetConnInner::New => {
115+
unreachable!("Pool::poll_new_conn never gives out GetConnInner::New")
116+
}
109117
}
110-
},
118+
}
111119
GetConnInner::Done => {
112120
unreachable!("GetConn::poll polled after returning Async::Ready");
113121
}

src/conn/pool/mod.rs

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use futures_util::FutureExt;
1010
use tokio::sync::mpsc;
1111

1212
use std::{
13-
collections::VecDeque,
13+
cmp::{Ordering, Reverse},
14+
collections::{BinaryHeap, VecDeque},
1415
convert::TryFrom,
1516
pin::Pin,
1617
str::FromStr,
@@ -62,7 +63,7 @@ impl From<Conn> for IdlingConn {
6263
/// This is fine as long as we never do expensive work while holding the lock!
6364
#[derive(Debug)]
6465
struct Exchange {
65-
waiting: VecDeque<Waker>,
66+
waiting: BinaryHeap<QueuedWaker>,
6667
available: VecDeque<IdlingConn>,
6768
exist: usize,
6869
// only used to spawn the recycler the first time we're in async context
@@ -87,6 +88,51 @@ impl Exchange {
8788
}
8889
}
8990

91+
const QUEUE_END_ID: QueueId = QueueId(Reverse(u64::MAX));
92+
93+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
94+
pub(crate) struct QueueId(Reverse<u64>);
95+
96+
impl QueueId {
97+
fn next() -> Self {
98+
static NEXT_QUEUE_ID: atomic::AtomicU64 = atomic::AtomicU64::new(0);
99+
let id = NEXT_QUEUE_ID.fetch_add(1, atomic::Ordering::SeqCst);
100+
QueueId(Reverse(id))
101+
}
102+
}
103+
104+
#[derive(Debug)]
105+
struct QueuedWaker {
106+
queue_id: QueueId,
107+
waker: Waker,
108+
}
109+
110+
impl QueuedWaker {
111+
fn new(queue_id: QueueId, waker: Waker) -> Self {
112+
QueuedWaker { queue_id, waker }
113+
}
114+
}
115+
116+
impl Eq for QueuedWaker {}
117+
118+
impl PartialEq for QueuedWaker {
119+
fn eq(&self, other: &Self) -> bool {
120+
self.queue_id == other.queue_id
121+
}
122+
}
123+
124+
impl Ord for QueuedWaker {
125+
fn cmp(&self, other: &Self) -> Ordering {
126+
self.queue_id.cmp(&other.queue_id)
127+
}
128+
}
129+
130+
impl PartialOrd for QueuedWaker {
131+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
132+
Some(self.cmp(other))
133+
}
134+
}
135+
90136
/// Connection pool data.
91137
#[derive(Debug)]
92138
pub struct Inner {
@@ -131,7 +177,7 @@ impl Pool {
131177
closed: false.into(),
132178
exchange: Mutex::new(Exchange {
133179
available: VecDeque::with_capacity(pool_opts.constraints().max()),
134-
waiting: VecDeque::new(),
180+
waiting: BinaryHeap::new(),
135181
exist: 0,
136182
recycler: Some((rx, pool_opts)),
137183
}),
@@ -181,8 +227,8 @@ impl Pool {
181227
let mut exchange = self.inner.exchange.lock().unwrap();
182228
if exchange.available.len() < self.opts.pool_opts().active_bound() {
183229
exchange.available.push_back(conn.into());
184-
if let Some(w) = exchange.waiting.pop_front() {
185-
w.wake();
230+
if let Some(qw) = exchange.waiting.pop() {
231+
qw.waker.wake();
186232
}
187233
return;
188234
}
@@ -216,17 +262,27 @@ impl Pool {
216262
let mut exchange = self.inner.exchange.lock().unwrap();
217263
exchange.exist -= 1;
218264
// we just enabled the creation of a new connection!
219-
if let Some(w) = exchange.waiting.pop_front() {
220-
w.wake();
265+
if let Some(qw) = exchange.waiting.pop() {
266+
qw.waker.wake();
221267
}
222268
}
223269

224270
/// Poll the pool for an available connection.
225-
fn poll_new_conn(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<GetConn>> {
226-
self.poll_new_conn_inner(cx)
227-
}
228-
229-
fn poll_new_conn_inner(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<GetConn>> {
271+
fn poll_new_conn(
272+
self: Pin<&mut Self>,
273+
cx: &mut Context<'_>,
274+
queued: bool,
275+
queue_id: QueueId,
276+
) -> Poll<Result<GetConnInner>> {
277+
self.poll_new_conn_inner(cx, queued, queue_id)
278+
}
279+
280+
fn poll_new_conn_inner(
281+
self: Pin<&mut Self>,
282+
cx: &mut Context<'_>,
283+
queued: bool,
284+
queue_id: QueueId,
285+
) -> Poll<Result<GetConnInner>> {
230286
let mut exchange = self.inner.exchange.lock().unwrap();
231287

232288
// NOTE: this load must happen while we hold the lock,
@@ -238,18 +294,23 @@ impl Pool {
238294

239295
exchange.spawn_futures_if_needed(&self.inner);
240296

297+
// Check if others are waiting and we're not queued.
298+
if !exchange.waiting.is_empty() && !queued {
299+
exchange
300+
.waiting
301+
.push(QueuedWaker::new(queue_id, cx.waker().clone()));
302+
return Poll::Pending;
303+
}
304+
241305
while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
242306
if !conn.expired() {
243-
return Poll::Ready(Ok(GetConn {
244-
pool: Some(self.clone()),
245-
inner: GetConnInner::Checking(
246-
async move {
247-
conn.stream_mut()?.check().await?;
248-
Ok(conn)
249-
}
250-
.boxed(),
251-
),
252-
}));
307+
return Poll::Ready(Ok(GetConnInner::Checking(
308+
async move {
309+
conn.stream_mut()?.check().await?;
310+
Ok(conn)
311+
}
312+
.boxed(),
313+
)));
253314
} else {
254315
self.send_to_recycler(conn);
255316
}
@@ -261,14 +322,15 @@ impl Pool {
261322
// we are allowed to make a new connection, so we will!
262323
exchange.exist += 1;
263324

264-
return Poll::Ready(Ok(GetConn {
265-
pool: Some(self.clone()),
266-
inner: GetConnInner::Connecting(Conn::new(self.opts.clone()).boxed()),
267-
}));
325+
return Poll::Ready(Ok(GetConnInner::Connecting(
326+
Conn::new(self.opts.clone()).boxed(),
327+
)));
268328
}
269329

270-
// no go -- we have to wait
271-
exchange.waiting.push_back(cx.waker().clone());
330+
// Polled, but no conn available? Back into the queue.
331+
exchange
332+
.waiting
333+
.push(QueuedWaker::new(queue_id, cx.waker().clone()));
272334
Poll::Pending
273335
}
274336
}

src/conn/pool/recycler.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ impl Future for Recycler {
7676
$self.discard.push($conn.close_conn().boxed());
7777
} else {
7878
exchange.available.push_back($conn.into());
79-
if let Some(w) = exchange.waiting.pop_front() {
80-
w.wake();
79+
if let Some(qw) = exchange.waiting.pop() {
80+
qw.waker.wake();
8181
}
8282
}
8383
}
@@ -163,8 +163,8 @@ impl Future for Recycler {
163163
let mut exchange = self.inner.exchange.lock().unwrap();
164164
exchange.exist -= self.discarded;
165165
for _ in 0..self.discarded {
166-
if let Some(w) = exchange.waiting.pop_front() {
167-
w.wake();
166+
if let Some(qw) = exchange.waiting.pop() {
167+
qw.waker.wake();
168168
}
169169
}
170170
drop(exchange);
@@ -197,8 +197,8 @@ impl Future for Recycler {
197197
if self.inner.closed.load(Ordering::Acquire) {
198198
// `DisconnectPool` might still wait to be woken up.
199199
let mut exchange = self.inner.exchange.lock().unwrap();
200-
while let Some(w) = exchange.waiting.pop_front() {
201-
w.wake();
200+
while let Some(qw) = exchange.waiting.pop() {
201+
qw.waker.wake();
202202
}
203203
// we're about to exit, so there better be no outstanding connections
204204
assert_eq!(exchange.exist, 0);

0 commit comments

Comments
 (0)