Skip to content

Commit 2870af1

Browse files
authored
feat: gracefully shutdown drainer if redis goes down (#2391)
1 parent 14cf0e7 commit 2870af1

File tree

1 file changed

+25
-2
lines changed

1 file changed

+25
-2
lines changed

crates/drainer/src/lib.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::sync::{atomic, Arc};
1010
use common_utils::signals::get_allowed_signals;
1111
use diesel_models::kv;
1212
use error_stack::{IntoReport, ResultExt};
13-
use tokio::sync::mpsc;
13+
use tokio::sync::{mpsc, oneshot};
1414

1515
use crate::{connection::pg_connection, services::Store};
1616

@@ -36,9 +36,18 @@ pub async fn start_drainer(
3636
"Failed while getting allowed signals".to_string(),
3737
))?;
3838

39+
let (redis_error_tx, redis_error_rx) = oneshot::channel();
3940
let (tx, mut rx) = mpsc::channel(1);
4041
let handle = signal.handle();
41-
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx));
42+
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx.clone()));
43+
44+
let redis_conn_clone = store.redis_conn.clone();
45+
46+
// Spawn a task to monitor if redis is down or not
47+
tokio::spawn(async move { redis_conn_clone.on_error(redis_error_tx).await });
48+
49+
//Spawns a task to send shutdown signal if redis goes down
50+
tokio::spawn(redis_error_receiver(redis_error_rx, tx));
4251

4352
let active_tasks = Arc::new(atomic::AtomicU64::new(0));
4453
'event: loop {
@@ -90,6 +99,20 @@ pub async fn start_drainer(
9099
Ok(())
91100
}
92101

102+
pub async fn redis_error_receiver(rx: oneshot::Receiver<()>, shutdown_channel: mpsc::Sender<()>) {
103+
match rx.await {
104+
Ok(_) => {
105+
logger::error!("The redis server failed ");
106+
let _ = shutdown_channel.send(()).await.map_err(|err| {
107+
logger::error!("Failed to send signal to the shutdown channel {err}")
108+
});
109+
}
110+
Err(err) => {
111+
logger::error!("Channel receiver error{err}");
112+
}
113+
}
114+
}
115+
93116
async fn drainer_handler(
94117
store: Arc<Store>,
95118
stream_index: u8,

0 commit comments

Comments
 (0)