Skip to content

Commit 9bd179c

Browse files
committed
Add metrics for pool internals
1 parent 129b8d8 commit 9bd179c

File tree

5 files changed

+296
-9
lines changed

5 files changed

+296
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ tracing = { version = "0.1.37", default-features = false, features = [
3737
], optional = true }
3838
twox-hash = "2"
3939
url = "2.1"
40+
hdrhistogram = { version = "7.5", optional = true }
4041

4142
[dependencies.tokio-rustls]
4243
version = "0.26"

src/conn/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ struct ConnInner {
116116
auth_plugin: AuthPlugin<'static>,
117117
auth_switched: bool,
118118
server_key: Option<Vec<u8>>,
119+
active_since: Instant,
119120
/// Connection is already disconnected.
120121
pub(crate) disconnected: bool,
121122
/// One-time connection-level infile handler.
@@ -169,6 +170,7 @@ impl ConnInner {
169170
server_key: None,
170171
infile_handler: None,
171172
reset_upon_returning_to_a_pool: false,
173+
active_since: Instant::now(),
172174
}
173175
}
174176

src/conn/pool/metrics.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
use std::sync::atomic::AtomicUsize;
2+
3+
use serde::Serialize;
4+
5+
#[derive(Default, Debug, Serialize)]
6+
#[non_exhaustive]
7+
pub struct Metrics {
8+
/// Guage of active connections to the database server, this includes both connections that have belong
9+
/// to the pool, and connections currently owned by the application.
10+
pub connection_count: AtomicUsize,
11+
/// Guage of active connections that currently belong to the pool.
12+
pub connections_in_pool: AtomicUsize,
13+
/// Guage of GetConn requests that are currently active.
14+
pub active_wait_requests: AtomicUsize,
15+
/// Counter of connections that failed to be created.
16+
pub create_failed: AtomicUsize,
17+
/// Counter of connections discarded due to pool constraints.
18+
pub discarded_superfluous_connection: AtomicUsize,
19+
/// Counter of connections discarded due to being closed upon return to the pool.
20+
pub discarded_unestablished_connection: AtomicUsize,
21+
/// Counter of connections that have been returned to the pool dirty that needed to be cleaned
22+
/// (ie. open transactions, pending queries, etc).
23+
pub dirty_connection_return: AtomicUsize,
24+
/// Counter of connections that have been discarded as they were expired by the pool constraints.
25+
pub discarded_expired_connection: AtomicUsize,
26+
/// Counter of connections that have been reset.
27+
pub resetting_connection: AtomicUsize,
28+
/// Counter of connections that have been discarded as they returned an error during cleanup.
29+
pub discarded_error_during_cleanup: AtomicUsize,
30+
/// Counter of connections that have been returned to the pool.
31+
pub connection_returned_to_pool: AtomicUsize,
32+
/// Histogram of times connections have spent outside of the pool.
33+
#[cfg(feature = "hdrhistogram")]
34+
pub connection_active_duration: MetricsHistogram,
35+
/// Histogram of times connections have spent inside of the pool.
36+
#[cfg(feature = "hdrhistogram")]
37+
pub connection_idle_duration: MetricsHistogram,
38+
/// Histogram of times connections have spent being checked for health.
39+
#[cfg(feature = "hdrhistogram")]
40+
pub check_duration: MetricsHistogram,
41+
/// Histogram of time spent waiting to connect to the server.
42+
#[cfg(feature = "hdrhistogram")]
43+
pub connect_duration: MetricsHistogram,
44+
}
45+
46+
impl Metrics {
47+
/// Resets all histograms to allow for histograms to be bound to a period of time (ie. between metric scrapes)
48+
pub fn clear_histograms(&self) {
49+
self.connection_active_duration.reset();
50+
self.connection_idle_duration.reset();
51+
self.check_duration.reset();
52+
self.connect_duration.reset();
53+
}
54+
}
55+
56+
#[cfg(feature = "hdrhistogram")]
57+
#[derive(Debug)]
58+
pub struct MetricsHistogram(std::sync::Mutex<hdrhistogram::Histogram<u64>>);
59+
60+
impl MetricsHistogram {
61+
pub fn reset(&self) {
62+
self.lock().unwrap().reset();
63+
}
64+
}
65+
66+
#[cfg(feature = "hdrhistogram")]
67+
impl Default for MetricsHistogram {
68+
fn default() -> Self {
69+
let hdr = hdrhistogram::Histogram::new_with_bounds(1, 30 * 1_000_000, 2).unwrap();
70+
Self(std::sync::Mutex::new(hdr))
71+
}
72+
}
73+
74+
#[cfg(feature = "hdrhistogram")]
75+
impl Serialize for MetricsHistogram {
76+
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
77+
where
78+
S: serde::Serializer,
79+
{
80+
let hdr = self.0.lock().unwrap();
81+
82+
/// A percentile of this histogram - for supporting serializers this
83+
/// will ignore the key (such as `90%ile`) and instead add a
84+
/// dimension to the metrics (such as `quantile=0.9`).
85+
macro_rules! ile {
86+
($e:expr) => {
87+
&MetricAlias(concat!("!|quantile=", $e), hdr.value_at_quantile($e))
88+
};
89+
}
90+
91+
/// A 'qualified' metric name - for supporting serializers such as
92+
/// serde_prometheus, this will prepend the metric name to this key,
93+
/// outputting `response_time_count`, for example rather than just
94+
/// `count`.
95+
macro_rules! qual {
96+
($e:expr) => {
97+
&MetricAlias("<|", $e)
98+
};
99+
}
100+
101+
use serde::ser::SerializeMap;
102+
103+
let mut tup = serializer.serialize_map(Some(10))?;
104+
tup.serialize_entry("samples", qual!(hdr.len()))?;
105+
tup.serialize_entry("min", qual!(hdr.min()))?;
106+
tup.serialize_entry("max", qual!(hdr.max()))?;
107+
tup.serialize_entry("mean", qual!(hdr.mean()))?;
108+
tup.serialize_entry("stdev", qual!(hdr.stdev()))?;
109+
tup.serialize_entry("90%ile", ile!(0.9))?;
110+
tup.serialize_entry("95%ile", ile!(0.95))?;
111+
tup.serialize_entry("99%ile", ile!(0.99))?;
112+
tup.serialize_entry("99.9%ile", ile!(0.999))?;
113+
tup.serialize_entry("99.99%ile", ile!(0.9999))?;
114+
tup.end()
115+
}
116+
}
117+
118+
/// This is a mocked 'newtype' (eg. `A(u64)`) that instead allows us to
119+
/// define our own type name that doesn't have to abide by Rust's constraints
120+
/// on type names. This allows us to do some manipulation of our metrics,
121+
/// allowing us to add dimensionality to our metrics via key=value pairs, or
122+
/// key manipulation on serializers that support it.
123+
#[cfg(feature = "hdrhistogram")]
124+
struct MetricAlias<T: Serialize>(&'static str, T);
125+
126+
#[cfg(feature = "hdrhistogram")]
127+
impl<T: Serialize> Serialize for MetricAlias<T> {
128+
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
129+
where
130+
S: serde::Serializer,
131+
{
132+
serializer.serialize_newtype_struct(self.0, &self.1)
133+
}
134+
}
135+
136+
#[cfg(feature = "hdrhistogram")]
137+
impl std::ops::Deref for MetricsHistogram {
138+
type Target = std::sync::Mutex<hdrhistogram::Histogram<u64>>;
139+
140+
fn deref(&self) -> &Self::Target {
141+
&self.0
142+
}
143+
}

src/conn/pool/mod.rs

Lines changed: 90 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@ use crate::{
2828
queryable::transaction::{Transaction, TxOpts},
2929
};
3030

31+
pub use metrics::Metrics;
32+
3133
mod recycler;
3234
// this is a really unfortunate name for a module
3335
pub mod futures;
36+
mod metrics;
3437
mod ttl_check_inerval;
3538

3639
/// Connection that is idling in the pool.
@@ -107,7 +110,7 @@ struct Waitlist {
107110
}
108111

109112
impl Waitlist {
110-
fn push(&mut self, waker: Waker, queue_id: QueueId) {
113+
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
111114
// The documentation of Future::poll says:
112115
// Note that on multiple calls to poll, only the Waker from
113116
// the Context passed to the most recent call should be
@@ -120,7 +123,9 @@ impl Waitlist {
120123
// This means we have to remove first to have the most recent
121124
// waker in the queue.
122125
self.remove(queue_id);
123-
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
126+
self.queue
127+
.push(QueuedWaker { queue_id, waker }, queue_id)
128+
.is_none()
124129
}
125130

126131
fn pop(&mut self) -> Option<Waker> {
@@ -130,8 +135,8 @@ impl Waitlist {
130135
}
131136
}
132137

133-
fn remove(&mut self, id: QueueId) {
134-
self.queue.remove(&id);
138+
fn remove(&mut self, id: QueueId) -> bool {
139+
self.queue.remove(&id).is_some()
135140
}
136141

137142
fn peek_id(&mut self) -> Option<QueueId> {
@@ -181,6 +186,7 @@ impl Hash for QueuedWaker {
181186
/// Connection pool data.
182187
#[derive(Debug)]
183188
pub struct Inner {
189+
metrics: Arc<Metrics>,
184190
close: atomic::AtomicBool,
185191
closed: atomic::AtomicBool,
186192
exchange: Mutex<Exchange>,
@@ -220,6 +226,10 @@ impl Pool {
220226
inner: Arc::new(Inner {
221227
close: false.into(),
222228
closed: false.into(),
229+
metrics: Arc::new(Metrics {
230+
connection_count: atomic::AtomicUsize::new(pool_opts.constraints().max()),
231+
..Metrics::default()
232+
}),
223233
exchange: Mutex::new(Exchange {
224234
available: VecDeque::with_capacity(pool_opts.constraints().max()),
225235
waiting: Waitlist::default(),
@@ -231,6 +241,11 @@ impl Pool {
231241
}
232242
}
233243

244+
/// Returns metrics for the connection pool.
245+
pub fn metrics(&self) -> Arc<Metrics> {
246+
self.inner.metrics.clone()
247+
}
248+
234249
/// Creates a new pool of connections.
235250
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
236251
let opts = Opts::from_str(url.as_ref())?;
@@ -288,6 +303,10 @@ impl Pool {
288303
pub(super) fn cancel_connection(&self) {
289304
let mut exchange = self.inner.exchange.lock().unwrap();
290305
exchange.exist -= 1;
306+
self.inner
307+
.metrics
308+
.create_failed
309+
.fetch_add(1, atomic::Ordering::Relaxed);
291310
// we just enabled the creation of a new connection!
292311
if let Some(w) = exchange.waiting.pop() {
293312
w.wake();
@@ -320,15 +339,44 @@ impl Pool {
320339

321340
// If we are not, just queue
322341
if !highest {
323-
exchange.waiting.push(cx.waker().clone(), queue_id);
342+
if exchange.waiting.push(cx.waker().clone(), queue_id) {
343+
self.inner
344+
.metrics
345+
.active_wait_requests
346+
.fetch_add(1, atomic::Ordering::Relaxed);
347+
}
324348
return Poll::Pending;
325349
}
326350

327-
while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
351+
#[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
352+
while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
353+
self.inner
354+
.metrics
355+
.connections_in_pool
356+
.fetch_sub(1, atomic::Ordering::Relaxed);
357+
328358
if !conn.expired() {
359+
#[cfg(feature = "hdrhistogram")]
360+
self.inner
361+
.metrics
362+
.connection_idle_duration
363+
.lock()
364+
.unwrap()
365+
.saturating_record(since.elapsed().as_micros() as u64);
366+
#[cfg(feature = "hdrhistogram")]
367+
let metrics = self.metrics();
368+
conn.inner.active_since = Instant::now();
329369
return Poll::Ready(Ok(GetConnInner::Checking(
330370
async move {
331371
conn.stream_mut()?.check().await?;
372+
#[cfg(feature = "hdrhistogram")]
373+
metrics
374+
.check_duration
375+
.lock()
376+
.unwrap()
377+
.saturating_record(
378+
conn.inner.active_since.elapsed().as_micros() as u64
379+
);
332380
Ok(conn)
333381
}
334382
.boxed(),
@@ -344,19 +392,52 @@ impl Pool {
344392
// we are allowed to make a new connection, so we will!
345393
exchange.exist += 1;
346394

395+
self.inner
396+
.metrics
397+
.connection_count
398+
.fetch_add(1, atomic::Ordering::Relaxed);
399+
400+
let opts = self.opts.clone();
401+
#[cfg(feature = "hdrhistogram")]
402+
let metrics = self.metrics();
403+
347404
return Poll::Ready(Ok(GetConnInner::Connecting(
348-
Conn::new(self.opts.clone()).boxed(),
405+
async move {
406+
let conn = Conn::new(opts).await;
407+
#[cfg(feature = "hdrhistogram")]
408+
if let Ok(conn) = &conn {
409+
metrics
410+
.connect_duration
411+
.lock()
412+
.unwrap()
413+
.saturating_record(
414+
conn.inner.active_since.elapsed().as_micros() as u64
415+
);
416+
}
417+
conn
418+
}
419+
.boxed(),
349420
)));
350421
}
351422

352423
// Polled, but no conn available? Back into the queue.
353-
exchange.waiting.push(cx.waker().clone(), queue_id);
424+
if exchange.waiting.push(cx.waker().clone(), queue_id) {
425+
self.inner
426+
.metrics
427+
.active_wait_requests
428+
.fetch_add(1, atomic::Ordering::Relaxed);
429+
}
354430
Poll::Pending
355431
}
356432

357433
fn unqueue(&self, queue_id: QueueId) {
358434
let mut exchange = self.inner.exchange.lock().unwrap();
359-
exchange.waiting.remove(queue_id);
435+
if exchange.waiting.remove(queue_id) {
436+
self.inner
437+
.metrics
438+
.active_wait_requests
439+
.fetch_sub(1, atomic::Ordering::Relaxed);
440+
}
360441
}
361442
}
362443

0 commit comments

Comments
 (0)