Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tracing = { version = "0.1.37", default-features = false, features = [
], optional = true }
twox-hash = "2"
url = "2.1"
hdrhistogram = { version = "7.5", optional = true }

[dependencies.tokio-rustls]
version = "0.26"
Expand Down
2 changes: 2 additions & 0 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ struct ConnInner {
auth_plugin: AuthPlugin<'static>,
auth_switched: bool,
server_key: Option<Vec<u8>>,
active_since: Instant,
/// Connection is already disconnected.
pub(crate) disconnected: bool,
/// One-time connection-level infile handler.
Expand Down Expand Up @@ -169,6 +170,7 @@ impl ConnInner {
server_key: None,
infile_handler: None,
reset_upon_returning_to_a_pool: false,
active_since: Instant::now(),
}
}

Expand Down
145 changes: 145 additions & 0 deletions src/conn/pool/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::sync::atomic::AtomicUsize;

use serde::Serialize;

#[derive(Default, Debug, Serialize)]
#[non_exhaustive]
pub struct Metrics {
/// Guage of active connections to the database server, this includes both connections that have belong
/// to the pool, and connections currently owned by the application.
pub connection_count: AtomicUsize,
/// Guage of active connections that currently belong to the pool.
pub connections_in_pool: AtomicUsize,
/// Guage of GetConn requests that are currently active.
pub active_wait_requests: AtomicUsize,
/// Counter of connections that failed to be created.
pub create_failed: AtomicUsize,
/// Counter of connections discarded due to pool constraints.
pub discarded_superfluous_connection: AtomicUsize,
/// Counter of connections discarded due to being closed upon return to the pool.
pub discarded_unestablished_connection: AtomicUsize,
/// Counter of connections that have been returned to the pool dirty that needed to be cleaned
/// (ie. open transactions, pending queries, etc).
pub dirty_connection_return: AtomicUsize,
/// Counter of connections that have been discarded as they were expired by the pool constraints.
pub discarded_expired_connection: AtomicUsize,
/// Counter of connections that have been reset.
pub resetting_connection: AtomicUsize,
/// Counter of connections that have been discarded as they returned an error during cleanup.
pub discarded_error_during_cleanup: AtomicUsize,
/// Counter of connections that have been returned to the pool.
pub connection_returned_to_pool: AtomicUsize,
/// Histogram of times connections have spent outside of the pool.
#[cfg(feature = "hdrhistogram")]
pub connection_active_duration: MetricsHistogram,
/// Histogram of times connections have spent inside of the pool.
#[cfg(feature = "hdrhistogram")]
pub connection_idle_duration: MetricsHistogram,
/// Histogram of times connections have spent being checked for health.
#[cfg(feature = "hdrhistogram")]
pub check_duration: MetricsHistogram,
/// Histogram of time spent waiting to connect to the server.
#[cfg(feature = "hdrhistogram")]
pub connect_duration: MetricsHistogram,
}

impl Metrics {
/// Resets all histograms to allow for histograms to be bound to a period of time (ie. between metric scrapes)
#[cfg(feature = "hdrhistogram")]
pub fn clear_histograms(&self) {
self.connection_active_duration.reset();
self.connection_idle_duration.reset();
self.check_duration.reset();
self.connect_duration.reset();
}
}

#[cfg(feature = "hdrhistogram")]
#[derive(Debug)]
pub struct MetricsHistogram(std::sync::Mutex<hdrhistogram::Histogram<u64>>);

#[cfg(feature = "hdrhistogram")]
impl MetricsHistogram {
pub fn reset(&self) {
self.lock().unwrap().reset();
}
}

#[cfg(feature = "hdrhistogram")]
impl Default for MetricsHistogram {
fn default() -> Self {
let hdr = hdrhistogram::Histogram::new_with_bounds(1, 30 * 1_000_000, 2).unwrap();
Self(std::sync::Mutex::new(hdr))
}
}

#[cfg(feature = "hdrhistogram")]
impl Serialize for MetricsHistogram {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let hdr = self.0.lock().unwrap();

/// A percentile of this histogram - for supporting serializers this
/// will ignore the key (such as `90%ile`) and instead add a
/// dimension to the metrics (such as `quantile=0.9`).
macro_rules! ile {
($e:expr) => {
&MetricAlias(concat!("!|quantile=", $e), hdr.value_at_quantile($e))
};
}

/// A 'qualified' metric name - for supporting serializers such as
/// serde_prometheus, this will prepend the metric name to this key,
/// outputting `response_time_count`, for example rather than just
/// `count`.
macro_rules! qual {
($e:expr) => {
&MetricAlias("<|", $e)
};
}

use serde::ser::SerializeMap;

let mut tup = serializer.serialize_map(Some(10))?;
tup.serialize_entry("samples", qual!(hdr.len()))?;
tup.serialize_entry("min", qual!(hdr.min()))?;
tup.serialize_entry("max", qual!(hdr.max()))?;
tup.serialize_entry("mean", qual!(hdr.mean()))?;
tup.serialize_entry("stdev", qual!(hdr.stdev()))?;
tup.serialize_entry("90%ile", ile!(0.9))?;
tup.serialize_entry("95%ile", ile!(0.95))?;
tup.serialize_entry("99%ile", ile!(0.99))?;
tup.serialize_entry("99.9%ile", ile!(0.999))?;
tup.serialize_entry("99.99%ile", ile!(0.9999))?;
tup.end()
}
}

/// This is a mocked 'newtype' (eg. `A(u64)`) that instead allows us to
/// define our own type name that doesn't have to abide by Rust's constraints
/// on type names. This allows us to do some manipulation of our metrics,
/// allowing us to add dimensionality to our metrics via key=value pairs, or
/// key manipulation on serializers that support it.
#[cfg(feature = "hdrhistogram")]
struct MetricAlias<T: Serialize>(&'static str, T);

#[cfg(feature = "hdrhistogram")]
impl<T: Serialize> Serialize for MetricAlias<T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_newtype_struct(self.0, &self.1)
}
}

#[cfg(feature = "hdrhistogram")]
impl std::ops::Deref for MetricsHistogram {
type Target = std::sync::Mutex<hdrhistogram::Histogram<u64>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}
96 changes: 87 additions & 9 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ use crate::{
queryable::transaction::{Transaction, TxOpts},
};

pub use metrics::Metrics;

mod recycler;
// this is a really unfortunate name for a module
pub mod futures;
mod metrics;
mod ttl_check_inerval;

/// Connection that is idling in the pool.
Expand Down Expand Up @@ -107,7 +110,7 @@ struct Waitlist {
}

impl Waitlist {
fn push(&mut self, waker: Waker, queue_id: QueueId) {
fn push(&mut self, waker: Waker, queue_id: QueueId) -> bool {
// The documentation of Future::poll says:
// Note that on multiple calls to poll, only the Waker from
// the Context passed to the most recent call should be
Expand All @@ -120,7 +123,9 @@ impl Waitlist {
// This means we have to remove first to have the most recent
// waker in the queue.
self.remove(queue_id);
self.queue.push(QueuedWaker { queue_id, waker }, queue_id);
self.queue
.push(QueuedWaker { queue_id, waker }, queue_id)
.is_none()
}

fn pop(&mut self) -> Option<Waker> {
Expand All @@ -130,8 +135,8 @@ impl Waitlist {
}
}

fn remove(&mut self, id: QueueId) {
self.queue.remove(&id);
fn remove(&mut self, id: QueueId) -> bool {
self.queue.remove(&id).is_some()
}

fn peek_id(&mut self) -> Option<QueueId> {
Expand Down Expand Up @@ -181,6 +186,7 @@ impl Hash for QueuedWaker {
/// Connection pool data.
#[derive(Debug)]
pub struct Inner {
metrics: Arc<Metrics>,
close: atomic::AtomicBool,
closed: atomic::AtomicBool,
exchange: Mutex<Exchange>,
Expand Down Expand Up @@ -220,6 +226,7 @@ impl Pool {
inner: Arc::new(Inner {
close: false.into(),
closed: false.into(),
metrics: Arc::new(Metrics::default()),
exchange: Mutex::new(Exchange {
available: VecDeque::with_capacity(pool_opts.constraints().max()),
waiting: Waitlist::default(),
Expand All @@ -231,6 +238,11 @@ impl Pool {
}
}

/// Returns metrics for the connection pool.
pub fn metrics(&self) -> Arc<Metrics> {
self.inner.metrics.clone()
}

/// Creates a new pool of connections.
pub fn from_url<T: AsRef<str>>(url: T) -> Result<Pool> {
let opts = Opts::from_str(url.as_ref())?;
Expand Down Expand Up @@ -288,6 +300,10 @@ impl Pool {
pub(super) fn cancel_connection(&self) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.exist -= 1;
self.inner
.metrics
.create_failed
.fetch_add(1, atomic::Ordering::Relaxed);
// we just enabled the creation of a new connection!
if let Some(w) = exchange.waiting.pop() {
w.wake();
Expand Down Expand Up @@ -320,15 +336,44 @@ impl Pool {

// If we are not, just queue
if !highest {
exchange.waiting.push(cx.waker().clone(), queue_id);
if exchange.waiting.push(cx.waker().clone(), queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
return Poll::Pending;
}

while let Some(IdlingConn { mut conn, .. }) = exchange.available.pop_back() {
#[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled
while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() {
self.inner
.metrics
.connections_in_pool
.fetch_sub(1, atomic::Ordering::Relaxed);

if !conn.expired() {
#[cfg(feature = "hdrhistogram")]
self.inner
.metrics
.connection_idle_duration
.lock()
.unwrap()
.saturating_record(since.elapsed().as_micros() as u64);
#[cfg(feature = "hdrhistogram")]
let metrics = self.metrics();
conn.inner.active_since = Instant::now();
return Poll::Ready(Ok(GetConnInner::Checking(
async move {
conn.stream_mut()?.check().await?;
#[cfg(feature = "hdrhistogram")]
metrics
.check_duration
.lock()
.unwrap()
.saturating_record(
conn.inner.active_since.elapsed().as_micros() as u64
);
Ok(conn)
}
.boxed(),
Expand All @@ -344,19 +389,52 @@ impl Pool {
// we are allowed to make a new connection, so we will!
exchange.exist += 1;

self.inner
.metrics
.connection_count
.fetch_add(1, atomic::Ordering::Relaxed);

let opts = self.opts.clone();
#[cfg(feature = "hdrhistogram")]
let metrics = self.metrics();

return Poll::Ready(Ok(GetConnInner::Connecting(
Conn::new(self.opts.clone()).boxed(),
async move {
let conn = Conn::new(opts).await;
#[cfg(feature = "hdrhistogram")]
if let Ok(conn) = &conn {
metrics
.connect_duration
.lock()
.unwrap()
.saturating_record(
conn.inner.active_since.elapsed().as_micros() as u64
);
}
conn
}
.boxed(),
)));
}

// Polled, but no conn available? Back into the queue.
exchange.waiting.push(cx.waker().clone(), queue_id);
if exchange.waiting.push(cx.waker().clone(), queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_add(1, atomic::Ordering::Relaxed);
}
Poll::Pending
}

fn unqueue(&self, queue_id: QueueId) {
let mut exchange = self.inner.exchange.lock().unwrap();
exchange.waiting.remove(queue_id);
if exchange.waiting.remove(queue_id) {
self.inner
.metrics
.active_wait_requests
.fetch_sub(1, atomic::Ordering::Relaxed);
}
}
}

Expand Down
Loading
Loading