From 1c03bf6e25a258813ce210109c849ea0af723142 Mon Sep 17 00:00:00 2001 From: Jordan Doyle Date: Mon, 11 Nov 2024 14:27:09 +0000 Subject: [PATCH] Add metrics for pool internals --- Cargo.toml | 1 + src/conn/mod.rs | 2 + src/conn/pool/metrics.rs | 145 ++++++++++++++++++++++++++++++++++++++ src/conn/pool/mod.rs | 96 ++++++++++++++++++++++--- src/conn/pool/recycler.rs | 60 ++++++++++++++++ 5 files changed, 295 insertions(+), 9 deletions(-) create mode 100644 src/conn/pool/metrics.rs diff --git a/Cargo.toml b/Cargo.toml index b612531d..6ada4949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,6 +37,7 @@ tracing = { version = "0.1.37", default-features = false, features = [ ], optional = true } twox-hash = "2" url = "2.1" +hdrhistogram = { version = "7.5", optional = true } [dependencies.tokio-rustls] version = "0.26" diff --git a/src/conn/mod.rs b/src/conn/mod.rs index dbf7e5df..be778752 100644 --- a/src/conn/mod.rs +++ b/src/conn/mod.rs @@ -116,6 +116,7 @@ struct ConnInner { auth_plugin: AuthPlugin<'static>, auth_switched: bool, server_key: Option>, + active_since: Instant, /// Connection is already disconnected. pub(crate) disconnected: bool, /// One-time connection-level infile handler. @@ -169,6 +170,7 @@ impl ConnInner { server_key: None, infile_handler: None, reset_upon_returning_to_a_pool: false, + active_since: Instant::now(), } } diff --git a/src/conn/pool/metrics.rs b/src/conn/pool/metrics.rs new file mode 100644 index 00000000..c5c6a08a --- /dev/null +++ b/src/conn/pool/metrics.rs @@ -0,0 +1,145 @@ +use std::sync::atomic::AtomicUsize; + +use serde::Serialize; + +#[derive(Default, Debug, Serialize)] +#[non_exhaustive] +pub struct Metrics { + /// Guage of active connections to the database server, this includes both connections that have belong + /// to the pool, and connections currently owned by the application. + pub connection_count: AtomicUsize, + /// Guage of active connections that currently belong to the pool. + pub connections_in_pool: AtomicUsize, + /// Guage of GetConn requests that are currently active. + pub active_wait_requests: AtomicUsize, + /// Counter of connections that failed to be created. + pub create_failed: AtomicUsize, + /// Counter of connections discarded due to pool constraints. + pub discarded_superfluous_connection: AtomicUsize, + /// Counter of connections discarded due to being closed upon return to the pool. + pub discarded_unestablished_connection: AtomicUsize, + /// Counter of connections that have been returned to the pool dirty that needed to be cleaned + /// (ie. open transactions, pending queries, etc). + pub dirty_connection_return: AtomicUsize, + /// Counter of connections that have been discarded as they were expired by the pool constraints. + pub discarded_expired_connection: AtomicUsize, + /// Counter of connections that have been reset. + pub resetting_connection: AtomicUsize, + /// Counter of connections that have been discarded as they returned an error during cleanup. + pub discarded_error_during_cleanup: AtomicUsize, + /// Counter of connections that have been returned to the pool. + pub connection_returned_to_pool: AtomicUsize, + /// Histogram of times connections have spent outside of the pool. + #[cfg(feature = "hdrhistogram")] + pub connection_active_duration: MetricsHistogram, + /// Histogram of times connections have spent inside of the pool. + #[cfg(feature = "hdrhistogram")] + pub connection_idle_duration: MetricsHistogram, + /// Histogram of times connections have spent being checked for health. + #[cfg(feature = "hdrhistogram")] + pub check_duration: MetricsHistogram, + /// Histogram of time spent waiting to connect to the server. + #[cfg(feature = "hdrhistogram")] + pub connect_duration: MetricsHistogram, +} + +impl Metrics { + /// Resets all histograms to allow for histograms to be bound to a period of time (ie. between metric scrapes) + #[cfg(feature = "hdrhistogram")] + pub fn clear_histograms(&self) { + self.connection_active_duration.reset(); + self.connection_idle_duration.reset(); + self.check_duration.reset(); + self.connect_duration.reset(); + } +} + +#[cfg(feature = "hdrhistogram")] +#[derive(Debug)] +pub struct MetricsHistogram(std::sync::Mutex>); + +#[cfg(feature = "hdrhistogram")] +impl MetricsHistogram { + pub fn reset(&self) { + self.lock().unwrap().reset(); + } +} + +#[cfg(feature = "hdrhistogram")] +impl Default for MetricsHistogram { + fn default() -> Self { + let hdr = hdrhistogram::Histogram::new_with_bounds(1, 30 * 1_000_000, 2).unwrap(); + Self(std::sync::Mutex::new(hdr)) + } +} + +#[cfg(feature = "hdrhistogram")] +impl Serialize for MetricsHistogram { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let hdr = self.0.lock().unwrap(); + + /// A percentile of this histogram - for supporting serializers this + /// will ignore the key (such as `90%ile`) and instead add a + /// dimension to the metrics (such as `quantile=0.9`). + macro_rules! ile { + ($e:expr) => { + &MetricAlias(concat!("!|quantile=", $e), hdr.value_at_quantile($e)) + }; + } + + /// A 'qualified' metric name - for supporting serializers such as + /// serde_prometheus, this will prepend the metric name to this key, + /// outputting `response_time_count`, for example rather than just + /// `count`. + macro_rules! qual { + ($e:expr) => { + &MetricAlias("<|", $e) + }; + } + + use serde::ser::SerializeMap; + + let mut tup = serializer.serialize_map(Some(10))?; + tup.serialize_entry("samples", qual!(hdr.len()))?; + tup.serialize_entry("min", qual!(hdr.min()))?; + tup.serialize_entry("max", qual!(hdr.max()))?; + tup.serialize_entry("mean", qual!(hdr.mean()))?; + tup.serialize_entry("stdev", qual!(hdr.stdev()))?; + tup.serialize_entry("90%ile", ile!(0.9))?; + tup.serialize_entry("95%ile", ile!(0.95))?; + tup.serialize_entry("99%ile", ile!(0.99))?; + tup.serialize_entry("99.9%ile", ile!(0.999))?; + tup.serialize_entry("99.99%ile", ile!(0.9999))?; + tup.end() + } +} + +/// This is a mocked 'newtype' (eg. `A(u64)`) that instead allows us to +/// define our own type name that doesn't have to abide by Rust's constraints +/// on type names. This allows us to do some manipulation of our metrics, +/// allowing us to add dimensionality to our metrics via key=value pairs, or +/// key manipulation on serializers that support it. +#[cfg(feature = "hdrhistogram")] +struct MetricAlias(&'static str, T); + +#[cfg(feature = "hdrhistogram")] +impl Serialize for MetricAlias { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_newtype_struct(self.0, &self.1) + } +} + +#[cfg(feature = "hdrhistogram")] +impl std::ops::Deref for MetricsHistogram { + type Target = std::sync::Mutex>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index 433395eb..b1f96d03 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -28,9 +28,12 @@ use crate::{ queryable::transaction::{Transaction, TxOpts}, }; +pub use metrics::Metrics; + mod recycler; // this is a really unfortunate name for a module pub mod futures; +mod metrics; mod ttl_check_inerval; /// Connection that is idling in the pool. @@ -107,7 +110,7 @@ struct Waitlist { } impl Waitlist { - fn push(&mut self, waker: Waker, queue_id: QueueId) { + fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool { // The documentation of Future::poll says: // Note that on multiple calls to poll, only the Waker from // the Context passed to the most recent call should be @@ -120,7 +123,9 @@ impl Waitlist { // This means we have to remove first to have the most recent // waker in the queue. self.remove(queue_id); - self.queue.push(QueuedWaker { queue_id, waker }, queue_id); + self.queue + .push(QueuedWaker { queue_id, waker }, queue_id) + .is_none() } fn pop(&mut self) -> Option { @@ -130,8 +135,8 @@ impl Waitlist { } } - fn remove(&mut self, id: QueueId) { - self.queue.remove(&id); + fn remove(&mut self, id: QueueId) -> bool { + self.queue.remove(&id).is_some() } fn peek_id(&mut self) -> Option { @@ -181,6 +186,7 @@ impl Hash for QueuedWaker { /// Connection pool data. #[derive(Debug)] pub struct Inner { + metrics: Arc, close: atomic::AtomicBool, closed: atomic::AtomicBool, exchange: Mutex, @@ -220,6 +226,7 @@ impl Pool { inner: Arc::new(Inner { close: false.into(), closed: false.into(), + metrics: Arc::new(Metrics::default()), exchange: Mutex::new(Exchange { available: VecDeque::with_capacity(pool_opts.constraints().max()), waiting: Waitlist::default(), @@ -231,6 +238,11 @@ impl Pool { } } + /// Returns metrics for the connection pool. + pub fn metrics(&self) -> Arc { + self.inner.metrics.clone() + } + /// Creates a new pool of connections. pub fn from_url>(url: T) -> Result { let opts = Opts::from_str(url.as_ref())?; @@ -288,6 +300,10 @@ impl Pool { pub(super) fn cancel_connection(&self) { let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= 1; + self.inner + .metrics + .create_failed + .fetch_add(1, atomic::Ordering::Relaxed); // we just enabled the creation of a new connection! if let Some(w) = exchange.waiting.pop() { w.wake(); @@ -320,15 +336,44 @@ impl Pool { // If we are not, just queue if !highest { - exchange.waiting.push(cx.waker().clone(), queue_id); + if exchange.waiting.push(cx.waker().clone(), queue_id) { + self.inner + .metrics + .active_wait_requests + .fetch_add(1, atomic::Ordering::Relaxed); + } return Poll::Pending; } - while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() { + #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled + while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() { + self.inner + .metrics + .connections_in_pool + .fetch_sub(1, atomic::Ordering::Relaxed); + if !conn.expired() { + #[cfg(feature = "hdrhistogram")] + self.inner + .metrics + .connection_idle_duration + .lock() + .unwrap() + .saturating_record(since.elapsed().as_micros() as u64); + #[cfg(feature = "hdrhistogram")] + let metrics = self.metrics(); + conn.inner.active_since = Instant::now(); return Poll::Ready(Ok(GetConnInner::Checking( async move { conn.stream_mut()?.check().await?; + #[cfg(feature = "hdrhistogram")] + metrics + .check_duration + .lock() + .unwrap() + .saturating_record( + conn.inner.active_since.elapsed().as_micros() as u64 + ); Ok(conn) } .boxed(), @@ -344,19 +389,52 @@ impl Pool { // we are allowed to make a new connection, so we will! exchange.exist += 1; + self.inner + .metrics + .connection_count + .fetch_add(1, atomic::Ordering::Relaxed); + + let opts = self.opts.clone(); + #[cfg(feature = "hdrhistogram")] + let metrics = self.metrics(); + return Poll::Ready(Ok(GetConnInner::Connecting( - Conn::new(self.opts.clone()).boxed(), + async move { + let conn = Conn::new(opts).await; + #[cfg(feature = "hdrhistogram")] + if let Ok(conn) = &conn { + metrics + .connect_duration + .lock() + .unwrap() + .saturating_record( + conn.inner.active_since.elapsed().as_micros() as u64 + ); + } + conn + } + .boxed(), ))); } // Polled, but no conn available? Back into the queue. - exchange.waiting.push(cx.waker().clone(), queue_id); + if exchange.waiting.push(cx.waker().clone(), queue_id) { + self.inner + .metrics + .active_wait_requests + .fetch_add(1, atomic::Ordering::Relaxed); + } Poll::Pending } fn unqueue(&self, queue_id: QueueId) { let mut exchange = self.inner.exchange.lock().unwrap(); - exchange.waiting.remove(queue_id); + if exchange.waiting.remove(queue_id) { + self.inner + .metrics + .active_wait_requests + .fetch_sub(1, atomic::Ordering::Relaxed); + } } } diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 1ea855c0..7a257443 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -67,8 +67,31 @@ impl Future for Recycler { let mut exchange = $self.inner.exchange.lock().unwrap(); if $pool_is_closed || exchange.available.len() >= $self.pool_opts.active_bound() { drop(exchange); + $self + .inner + .metrics + .discarded_superfluous_connection + .fetch_add(1, Ordering::Relaxed); $self.discard.push($conn.close_conn().boxed()); } else { + $self + .inner + .metrics + .connection_returned_to_pool + .fetch_add(1, Ordering::Relaxed); + $self + .inner + .metrics + .connections_in_pool + .fetch_add(1, Ordering::Relaxed); + #[cfg(feature = "hdrhistogram")] + $self + .inner + .metrics + .connection_active_duration + .lock() + .unwrap() + .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64); exchange.available.push_back($conn.into()); if let Some(w) = exchange.waiting.pop() { w.wake(); @@ -81,12 +104,32 @@ impl Future for Recycler { ($self:ident, $conn:ident) => { if $conn.inner.stream.is_none() || $conn.inner.disconnected { // drop unestablished connection + $self + .inner + .metrics + .discarded_unestablished_connection + .fetch_add(1, Ordering::Relaxed); $self.discard.push(futures_util::future::ok(()).boxed()); } else if $conn.inner.tx_status != TxStatus::None || $conn.has_pending_result() { + $self + .inner + .metrics + .dirty_connection_return + .fetch_add(1, Ordering::Relaxed); $self.cleaning.push($conn.cleanup_for_pool().boxed()); } else if $conn.expired() || close { + $self + .inner + .metrics + .discarded_expired_connection + .fetch_add(1, Ordering::Relaxed); $self.discard.push($conn.close_conn().boxed()); } else if $conn.inner.reset_upon_returning_to_a_pool { + $self + .inner + .metrics + .resetting_connection + .fetch_add(1, Ordering::Relaxed); $self.reset.push($conn.reset_for_pool().boxed()); } else { conn_return!($self, $conn, false); @@ -142,6 +185,10 @@ impl Future for Recycler { // anything that comes through .dropped we know has .pool.is_none(). // therefore, dropping the conn won't decrement .exist, so we need to do that. self.discarded += 1; + self.inner + .metrics + .discarded_error_during_cleanup + .fetch_add(1, Ordering::Relaxed); // NOTE: we're discarding the error here let _ = e; } @@ -157,6 +204,10 @@ impl Future for Recycler { // an error during reset. // replace with a new connection self.discarded += 1; + self.inner + .metrics + .discarded_error_during_cleanup + .fetch_add(1, Ordering::Relaxed); // NOTE: we're discarding the error here let _ = e; } @@ -177,6 +228,10 @@ impl Future for Recycler { // an error occurred while closing a connection. // what do we do? we still replace it with a new connection.. self.discarded += 1; + self.inner + .metrics + .discarded_error_during_cleanup + .fetch_add(1, Ordering::Relaxed); // NOTE: we're discarding the error here let _ = e; } @@ -184,6 +239,11 @@ impl Future for Recycler { } if self.discarded != 0 { + self.inner + .metrics + .connection_count + .fetch_sub(self.discarded, Ordering::Relaxed); + // we need to open up slots for new connctions to be established! let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= self.discarded;