From 79852269aeee694b6fc8794282be25ef753cfd41 Mon Sep 17 00:00:00 2001 From: Geoffry Song Date: Wed, 9 Apr 2025 19:18:00 -0700 Subject: [PATCH] Fix connection count metrics --- src/conn/pool/mod.rs | 16 ++++++++++------ src/conn/pool/recycler.rs | 19 +++++++++---------- src/conn/pool/ttl_check_inerval.rs | 8 ++++++++ 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index b1f96d03..9106764a 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -304,6 +304,10 @@ impl Pool { .metrics .create_failed .fetch_add(1, atomic::Ordering::Relaxed); + self.inner + .metrics + .connection_count + .store(exchange.exist, atomic::Ordering::Relaxed); // we just enabled the creation of a new connection! if let Some(w) = exchange.waiting.pop() { w.wake(); @@ -347,11 +351,6 @@ impl Pool { #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled while let Some(IdlingConn { mut conn, since }) = exchange.available.pop_back() { - self.inner - .metrics - .connections_in_pool - .fetch_sub(1, atomic::Ordering::Relaxed); - if !conn.expired() { #[cfg(feature = "hdrhistogram")] self.inner @@ -383,6 +382,11 @@ impl Pool { } } + self.inner + .metrics + .connections_in_pool + .store(exchange.available.len(), atomic::Ordering::Relaxed); + // we didn't _immediately_ get one -- try to make one // we first try to just do a load so we don't do an unnecessary add then sub if exchange.exist < self.opts.pool_opts().constraints().max() { @@ -392,7 +396,7 @@ impl Pool { self.inner .metrics .connection_count - .fetch_add(1, atomic::Ordering::Relaxed); + .store(exchange.exist, atomic::Ordering::Relaxed); let opts = self.opts.clone(); #[cfg(feature = "hdrhistogram")] diff --git a/src/conn/pool/recycler.rs b/src/conn/pool/recycler.rs index 7a257443..2809dc0b 100644 --- a/src/conn/pool/recycler.rs +++ b/src/conn/pool/recycler.rs @@ -79,11 +79,6 @@ impl Future for Recycler { .metrics .connection_returned_to_pool .fetch_add(1, Ordering::Relaxed); - $self - .inner - .metrics - .connections_in_pool - .fetch_add(1, Ordering::Relaxed); #[cfg(feature = "hdrhistogram")] $self .inner @@ -93,6 +88,11 @@ impl Future for Recycler { .unwrap() .saturating_record($conn.inner.active_since.elapsed().as_micros() as u64); exchange.available.push_back($conn.into()); + $self + .inner + .metrics + .connections_in_pool + .store(exchange.available.len(), Ordering::Relaxed); if let Some(w) = exchange.waiting.pop() { w.wake(); } @@ -239,14 +239,13 @@ impl Future for Recycler { } if self.discarded != 0 { - self.inner - .metrics - .connection_count - .fetch_sub(self.discarded, Ordering::Relaxed); - // we need to open up slots for new connctions to be established! let mut exchange = self.inner.exchange.lock().unwrap(); exchange.exist -= self.discarded; + self.inner + .metrics + .connection_count + .store(exchange.exist, Ordering::Relaxed); for _ in 0..self.discarded { if let Some(w) = exchange.waiting.pop() { w.wake(); diff --git a/src/conn/pool/ttl_check_inerval.rs b/src/conn/pool/ttl_check_inerval.rs index a0caa1a9..95b624aa 100644 --- a/src/conn/pool/ttl_check_inerval.rs +++ b/src/conn/pool/ttl_check_inerval.rs @@ -70,6 +70,10 @@ impl TtlCheckInterval { } } exchange.available = kept_available; + self.inner + .metrics + .connections_in_pool + .store(exchange.available.len(), Ordering::Relaxed); to_be_dropped }; @@ -79,6 +83,10 @@ impl TtlCheckInterval { tokio::spawn(idling_conn.conn.disconnect().then(move |_| { let mut exchange = inner.exchange.lock().unwrap(); exchange.exist -= 1; + inner + .metrics + .connection_count + .store(exchange.exist, Ordering::Relaxed); ok::<_, ()>(()) })); }