Skip to content

Commit 1c03bf6

Browse files
committed
Add metrics for pool internals
1 parent 129b8d8 commit 1c03bf6

File tree

5 files changed

+295
-9
lines changed

5 files changed

+295
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tracing = { version = "0.1.37", default-features = false, features = [
3737
], optional = true }
3838
twox-hash = "2"
3939
url = "2.1"
40+
hdrhistogram = { version = "7.5", optional = true }
4041

4142
[dependencies.tokio-rustls]
4243
version = "0.26"

src/conn/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct ConnInner {
116116
auth_plugin: AuthPlugin<'static>,
117117
auth_switched: bool,
118118
server_key: Option<Vec<u8>>,
119+
active_since: Instant,
119120
/// Connection is already disconnected.
120121
pub(crate) disconnected: bool,
121122
/// One-time connection-level infile handler.
@@ -169,6 +170,7 @@ impl ConnInner {
169170
server_key: None,
170171
infile_handler: None,
171172
reset_upon_returning_to_a_pool: false,
173+
active_since: Instant::now(),
172174
}
173175
}
174176

src/conn/pool/metrics.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::sync::atomic::AtomicUsize;
2+
3+
use serde::Serialize;
4+
5+
#[derive(Default, Debug, Serialize)]
6+
#[non_exhaustive]
7+
pub struct Metrics {
8+
/// Guage of active connections to the database server, this includes both connections that have belong
9+
/// to the pool, and connections currently owned by the application.
10+
pub connection_count: AtomicUsize,
11+
/// Guage of active connections that currently belong to the pool.
12+
pub connections_in_pool: AtomicUsize,
13+
/// Guage of GetConn requests that are currently active.
14+
pub active_wait_requests: AtomicUsize,
15+
/// Counter of connections that failed to be created.
16+
pub create_failed: AtomicUsize,
17+
/// Counter of connections discarded due to pool constraints.
18+
pub discarded_superfluous_connection: AtomicUsize,
19+
/// Counter of connections discarded due to being closed upon return to the pool.
20+
pub discarded_unestablished_connection: AtomicUsize,
21+
/// Counter of connections that have been returned to the pool dirty that needed to be cleaned
22+
/// (ie. open transactions, pending queries, etc).
23+
pub dirty_connection_return: AtomicUsize,
24+
/// Counter of connections that have been discarded as they were expired by the pool constraints.
25+
pub discarded_expired_connection: AtomicUsize,
26+
/// Counter of connections that have been reset.
27+
pub resetting_connection: AtomicUsize,
28+
/// Counter of connections that have been discarded as they returned an error during cleanup.
29+
pub discarded_error_during_cleanup: AtomicUsize,
30+
/// Counter of connections that have been returned to the pool.
31+
pub connection_returned_to_pool: AtomicUsize,
32+
/// Histogram of times connections have spent outside of the pool.
33+
#[cfg(feature = "hdrhistogram")]
34+
pub connection_active_duration: MetricsHistogram,
35+
/// Histogram of times connections have spent inside of the pool.
36+
#[cfg(feature = "hdrhistogram")]
37+
pub connection_idle_duration: MetricsHistogram,
38+
/// Histogram of times connections have spent being checked for health.
39+
#[cfg(feature = "hdrhistogram")]
40+
pub check_duration: MetricsHistogram,
41+
/// Histogram of time spent waiting to connect to the server.
42+
#[cfg(feature = "hdrhistogram")]
43+
pub connect_duration: MetricsHistogram,
44+
}
45+
46+
impl Metrics {
47+
/// Resets all histograms to allow for histograms to be bound to a period of time (ie. between metric scrapes)
48+
#[cfg(feature = "hdrhistogram")]
49+
pub fn clear_histograms(&self) {
50+
self.connection_active_duration.reset();
51+
self.connection_idle_duration.reset();
52+
self.check_duration.reset();
53+
self.connect_duration.reset();
54+
}
55+
}
56+
57+
#[cfg(feature = "hdrhistogram")]
58+
#[derive(Debug)]
59+
pub struct MetricsHistogram(std::sync::Mutex<hdrhistogram::Histogram<u64>>);
60+
61+
#[cfg(feature = "hdrhistogram")]
62+
impl MetricsHistogram {
63+
pub fn reset(&self) {
64+
self.lock().unwrap().reset();
65+
}
66+
}
67+
68+
#[cfg(feature = "hdrhistogram")]
69+
impl Default for MetricsHistogram {
70+
fn default() -> Self {
71+
let hdr = hdrhistogram::Histogram::new_with_bounds(1, 30 * 1_000_000, 2).unwrap();
72+
Self(std::sync::Mutex::new(hdr))
73+
}
74+
}
75+
76+
#[cfg(feature = "hdrhistogram")]
77+
impl Serialize for MetricsHistogram {
78+
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
79+
where
80+
S: serde::Serializer,
81+
{
82+
let hdr = self.0.lock().unwrap();
83+
84+
/// A percentile of this histogram - for supporting serializers this
85+
/// will ignore the key (such as `90%ile`) and instead add a
86+
/// dimension to the metrics (such as `quantile=0.9`).
87+
macro_rules! ile {
88+
($e:expr) => {
89+
&MetricAlias(concat!("!|quantile=", $e), hdr.value_at_quantile($e))
90+
};
91+
}
92+
93+
/// A 'qualified' metric name - for supporting serializers such as
94+
/// serde_prometheus, this will prepend the metric name to this key,
95+
/// outputting `response_time_count`, for example rather than just
96+
/// `count`.
97+
macro_rules! qual {
98+
($e:expr) => {
99+
&MetricAlias("<|", $e)
100+
};
101+
}
102+
103+
use serde::ser::SerializeMap;
104+
105+
let mut tup = serializer.serialize_map(Some(10))?;
106+
tup.serialize_entry("samples", qual!(hdr.len()))?;
107+
tup.serialize_entry("min", qual!(hdr.min()))?;
108+
tup.serialize_entry("max", qual!(hdr.max()))?;
109+
tup.serialize_entry("mean", qual!(hdr.mean()))?;
110+
tup.serialize_entry("stdev", qual!(hdr.stdev()))?;
111+
tup.serialize_entry("90%ile", ile!(0.9))?;
112+
tup.serialize_entry("95%ile", ile!(0.95))?;
113+
tup.serialize_entry("99%ile", ile!(0.99))?;
114+
tup.serialize_entry("99.9%ile", ile!(0.999))?;
115+
tup.serialize_entry("99.99%ile", ile!(0.9999))?;
116+
tup.end()
117+
}
118+
}
119+
120+
/// This is a mocked 'newtype' (eg. `A(u64)`) that instead allows us to
121+
/// define our own type name that doesn't have to abide by Rust's constraints
122+
/// on type names. This allows us to do some manipulation of our metrics,
123+
/// allowing us to add dimensionality to our metrics via key=value pairs, or
124+
/// key manipulation on serializers that support it.
125+
#[cfg(feature = "hdrhistogram")]
126+
struct MetricAlias<T: Serialize>(&'static str, T);
127+
128+
#[cfg(feature = "hdrhistogram")]
129+
impl<T: Serialize> Serialize for MetricAlias<T> {
130+
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
131+
where
132+
S: serde::Serializer,
133+
{
134+
serializer.serialize_newtype_struct(self.0, &self.1)
135+
}
136+
}
137+
138+
#[cfg(feature = "hdrhistogram")]
139+
impl std::ops::Deref for MetricsHistogram {
140+
type Target = std::sync::Mutex<hdrhistogram::Histogram<u64>>;
141+
142+
fn deref(&self) -> &Self::Target {
143+
&self.0
144+
}
145+
}

src/conn/pool/mod.rs

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ use crate::{
2828
queryable::transaction::{Transaction, TxOpts},
2929
};
3030

31+
pub use metrics::Metrics;
32+
3133
mod recycler;
3234
// this is a really unfortunate name for a module
3335
pub mod futures;
36+
mod metrics;
3437
mod ttl_check_inerval;
3538

3639
/// Connection that is idling in the pool.
@@ -107,7 +110,7 @@ struct Waitlist {
107110
}
108111

109112
impl Waitlist {
110-
fn push(&mut self, waker: Waker, queue_id: QueueId) {
113+
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
111114
// The documentation of Future::poll says:
112115
// Note that on multiple calls to poll, only the Waker from
113116
// the Context passed to the most recent call should be
@@ -120,7 +123,9 @@ impl Waitlist {
120123
// This means we have to remove first to have the most recent
121124
// waker in the queue.
122125
self.remove(queue_id);
123-
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
126+
self.queue
127+
.push(QueuedWaker { queue_id, waker }, queue_id)
128+
.is_none()
124129
}
125130

126131
fn pop(&mut self) -> Option<Waker> {
@@ -130,8 +135,8 @@ impl Waitlist {
130135
}
131136
}
132137

133-
fn remove(&mut self, id: QueueId) {
134-
self.queue.remove(&id);
138+
fn remove(&mut self, id: QueueId) -> bool {
139+
self.queue.remove(&id).is_some()
135140
}
136141

137142
fn peek_id(&mut self) -> Option<QueueId> {
@@ -181,6 +186,7 @@ impl Hash for QueuedWaker {
181186
/// Connection pool data.
182187
#[derive(Debug)]
183188
pub struct Inner {
189+
metrics: Arc<Metrics>,
184190
close: atomic::AtomicBool,
185191
closed: atomic::AtomicBool,
186192
exchange: Mutex<Exchange>,
@@ -220,6 +226,7 @@ impl Pool {
220226
inner: Arc::new(Inner {
221227
close: false.into(),
222228
closed: false.into(),
229+
metrics: Arc::new(Metrics::default()),
223230
exchange: Mutex::new(Exchange {
224231
available: VecDeque::with_capacity(pool_opts.constraints().max()),
225232
waiting: Waitlist::default(),
@@ -231,6 +238,11 @@ impl Pool {
231238
}
232239
}
233240

241+
/// Returns metrics for the connection pool.
242+
pub fn metrics(&self) -> Arc<Metrics> {
243+
self.inner.metrics.clone()
244+
}
245+
234246
/// Creates a new pool of connections.
235247
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
236248
let opts = Opts::from_str(url.as_ref())?;
@@ -288,6 +300,10 @@ impl Pool {
288300
pub(super) fn cancel_connection(&self) {
289301
let mut exchange = self.inner.exchange.lock().unwrap();
290302
exchange.exist -= 1;
303+
self.inner
304+
.metrics
305+
.create_failed
306+
.fetch_add(1, atomic::Ordering::Relaxed);
291307
// we just enabled the creation of a new connection!
292308
if let Some(w) = exchange.waiting.pop() {
293309
w.wake();
@@ -320,15 +336,44 @@ impl Pool {
320336

321337
// If we are not, just queue
322338
if !highest {
323-
exchange.waiting.push(cx.waker().clone(), queue_id);
339+
if exchange.waiting.push(cx.waker().clone(), queue_id) {
340+
self.inner
341+
.metrics
342+
.active_wait_requests
343+
.fetch_add(1, atomic::Ordering::Relaxed);
344+
}
324345
return Poll::Pending;
325346
}
326347

327-
while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
348+
#[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
349+
while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
350+
self.inner
351+
.metrics
352+
.connections_in_pool
353+
.fetch_sub(1, atomic::Ordering::Relaxed);
354+
328355
if !conn.expired() {
356+
#[cfg(feature = "hdrhistogram")]
357+
self.inner
358+
.metrics
359+
.connection_idle_duration
360+
.lock()
361+
.unwrap()
362+
.saturating_record(since.elapsed().as_micros() as u64);
363+
#[cfg(feature = "hdrhistogram")]
364+
let metrics = self.metrics();
365+
conn.inner.active_since = Instant::now();
329366
return Poll::Ready(Ok(GetConnInner::Checking(
330367
async move {
331368
conn.stream_mut()?.check().await?;
369+
#[cfg(feature = "hdrhistogram")]
370+
metrics
371+
.check_duration
372+
.lock()
373+
.unwrap()
374+
.saturating_record(
375+
conn.inner.active_since.elapsed().as_micros() as u64
376+
);
332377
Ok(conn)
333378
}
334379
.boxed(),
@@ -344,19 +389,52 @@ impl Pool {
344389
// we are allowed to make a new connection, so we will!
345390
exchange.exist += 1;
346391

392+
self.inner
393+
.metrics
394+
.connection_count
395+
.fetch_add(1, atomic::Ordering::Relaxed);
396+
397+
let opts = self.opts.clone();
398+
#[cfg(feature = "hdrhistogram")]
399+
let metrics = self.metrics();
400+
347401
return Poll::Ready(Ok(GetConnInner::Connecting(
348-
Conn::new(self.opts.clone()).boxed(),
402+
async move {
403+
let conn = Conn::new(opts).await;
404+
#[cfg(feature = "hdrhistogram")]
405+
if let Ok(conn) = &conn {
406+
metrics
407+
.connect_duration
408+
.lock()
409+
.unwrap()
410+
.saturating_record(
411+
conn.inner.active_since.elapsed().as_micros() as u64
412+
);
413+
}
414+
conn
415+
}
416+
.boxed(),
349417
)));
350418
}
351419

352420
// Polled, but no conn available? Back into the queue.
353-
exchange.waiting.push(cx.waker().clone(), queue_id);
421+
if exchange.waiting.push(cx.waker().clone(), queue_id) {
422+
self.inner
423+
.metrics
424+
.active_wait_requests
425+
.fetch_add(1, atomic::Ordering::Relaxed);
426+
}
354427
Poll::Pending
355428
}
356429

357430
fn unqueue(&self, queue_id: QueueId) {
358431
let mut exchange = self.inner.exchange.lock().unwrap();
359-
exchange.waiting.remove(queue_id);
432+
if exchange.waiting.remove(queue_id) {
433+
self.inner
434+
.metrics
435+
.active_wait_requests
436+
.fetch_sub(1, atomic::Ordering::Relaxed);
437+
}
360438
}
361439
}
362440

0 commit comments

Comments
 (0)