Skip to content

Commit 0706221

Browse files
fix(log): adding span metadata to tokio spawned futures (#4118)
1 parent 7f5ad62 commit 0706221

File tree

13 files changed

+117
-86
lines changed

13 files changed

+117
-86
lines changed

crates/common_utils/src/macros.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,6 @@ macro_rules! newtype {
4747
};
4848
}
4949

50-
#[macro_export]
51-
macro_rules! async_spawn {
52-
($t:block) => {
53-
tokio::spawn(async move { $t });
54-
};
55-
}
56-
5750
/// Use this to ensure that the corresponding
5851
/// openapi route has been implemented in the openapi crate
5952
#[macro_export]

crates/diesel_models/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ pub mod gsm;
2424
#[cfg(feature = "kv_store")]
2525
pub mod kv;
2626
pub mod locker_mock_up;
27-
pub mod macros;
2827
pub mod mandate;
2928
pub mod merchant_account;
3029
pub mod merchant_connector_account;

crates/diesel_models/src/macros.rs

Lines changed: 0 additions & 6 deletions
This file was deleted.

crates/drainer/src/handler.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::{atomic, Arc};
22

3+
use router_env::tracing::Instrument;
34
use tokio::{
45
sync::{mpsc, oneshot},
56
time::{self, Duration},
@@ -68,13 +69,16 @@ impl Handler {
6869
while self.running.load(atomic::Ordering::SeqCst) {
6970
metrics::DRAINER_HEALTH.add(&metrics::CONTEXT, 1, &[]);
7071
if self.store.is_stream_available(stream_index).await {
71-
tokio::spawn(drainer_handler(
72-
self.store.clone(),
73-
stream_index,
74-
self.conf.max_read_count,
75-
self.active_tasks.clone(),
76-
jobs_picked.clone(),
77-
));
72+
let _task_handle = tokio::spawn(
73+
drainer_handler(
74+
self.store.clone(),
75+
stream_index,
76+
self.conf.max_read_count,
77+
self.active_tasks.clone(),
78+
jobs_picked.clone(),
79+
)
80+
.in_current_span(),
81+
);
7882
}
7983
stream_index = utils::increment_stream_index(
8084
(stream_index, jobs_picked.clone()),
@@ -116,10 +120,12 @@ impl Handler {
116120
let redis_conn_clone = self.store.redis_conn.clone();
117121

118122
// Spawn a task to monitor if redis is down or not
119-
tokio::spawn(async move { redis_conn_clone.on_error(redis_error_tx).await });
123+
let _task_handle = tokio::spawn(
124+
async move { redis_conn_clone.on_error(redis_error_tx).await }.in_current_span(),
125+
);
120126

121127
//Spawns a task to send shutdown signal if redis goes down
122-
tokio::spawn(redis_error_receiver(redis_error_rx, tx));
128+
let _task_handle = tokio::spawn(redis_error_receiver(redis_error_rx, tx).in_current_span());
123129

124130
Ok(())
125131
}

crates/drainer/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use common_utils::signals::get_allowed_signals;
1818
use diesel_models::kv;
1919
use error_stack::{IntoReport, ResultExt};
2020
use hyperswitch_interfaces::secrets_interface::secret_state::RawSecret;
21-
use router_env::{instrument, tracing};
21+
use router_env::{
22+
instrument,
23+
tracing::{self, Instrument},
24+
};
2225
use tokio::sync::mpsc;
2326

2427
pub(crate) type Settings = crate::settings::Settings<RawSecret>;
@@ -39,7 +42,8 @@ pub async fn start_drainer(store: Arc<Store>, conf: DrainerSettings) -> errors::
3942
"Failed while getting allowed signals".to_string(),
4043
))?;
4144
let handle = signal.handle();
42-
let task_handle = tokio::spawn(common_utils::signals::signal_handler(signal, tx.clone()));
45+
let task_handle =
46+
tokio::spawn(common_utils::signals::signal_handler(signal, tx.clone()).in_current_span());
4347

4448
let handler_clone = drainer_handler.clone();
4549

crates/router/src/bin/scheduler.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use router::{
1616
services::{self, api},
1717
workflows,
1818
};
19-
use router_env::{instrument, tracing};
19+
use router_env::{
20+
instrument,
21+
tracing::{self, Instrument},
22+
};
2023
use scheduler::{
2124
consumer::workflows::ProcessTrackerWorkflow, errors::ProcessTrackerError,
2225
workflows::ProcessTrackerWorkflows, SchedulerAppState,
@@ -49,10 +52,9 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
4952
.await;
5053
// channel to shutdown scheduler gracefully
5154
let (tx, rx) = mpsc::channel(1);
52-
tokio::spawn(router::receiver_for_error(
53-
redis_shutdown_signal_rx,
54-
tx.clone(),
55-
));
55+
let _task_handle = tokio::spawn(
56+
router::receiver_for_error(redis_shutdown_signal_rx, tx.clone()).in_current_span(),
57+
);
5658

5759
#[allow(clippy::expect_used)]
5860
let scheduler_flow_str =
@@ -81,10 +83,13 @@ async fn main() -> CustomResult<(), ProcessTrackerError> {
8183
.await
8284
.expect("Failed to create the server");
8385

84-
tokio::spawn(async move {
85-
let _ = web_server.await;
86-
logger::error!("The health check probe stopped working!");
87-
});
86+
let _task_handle = tokio::spawn(
87+
async move {
88+
let _ = web_server.await;
89+
logger::error!("The health check probe stopped working!");
90+
}
91+
.in_current_span(),
92+
);
8893

8994
logger::debug!(startup_config=?state.conf);
9095

crates/router/src/core/payments/flows/authorize_flow.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use async_trait::async_trait;
22
use error_stack;
3+
use router_env::tracing::Instrument;
34

45
use super::{ConstructFlowSpecificData, Feature};
56
use crate::{
@@ -125,26 +126,32 @@ impl Feature<api::Authorize, types::PaymentsAuthorizeData> for types::PaymentsAu
125126
let state = state.clone();
126127

127128
logger::info!("Call to save_payment_method in locker");
128-
tokio::spawn(async move {
129-
logger::info!("Starting async call to save_payment_method in locker");
130-
131-
let result = Box::pin(tokenization::save_payment_method(
132-
&state,
133-
&connector,
134-
response,
135-
&maybe_customer,
136-
&merchant_account,
137-
self.request.payment_method_type,
138-
&key_store,
139-
Some(resp.request.amount),
140-
Some(resp.request.currency),
141-
))
142-
.await;
143-
144-
if let Err(err) = result {
145-
logger::error!("Asynchronously saving card in locker failed : {:?}", err);
129+
let _task_handle = tokio::spawn(
130+
async move {
131+
logger::info!("Starting async call to save_payment_method in locker");
132+
133+
let result = Box::pin(tokenization::save_payment_method(
134+
&state,
135+
&connector,
136+
response,
137+
&maybe_customer,
138+
&merchant_account,
139+
self.request.payment_method_type,
140+
&key_store,
141+
Some(resp.request.amount),
142+
Some(resp.request.currency),
143+
))
144+
.await;
145+
146+
if let Err(err) = result {
147+
logger::error!(
148+
"Asynchronously saving card in locker failed : {:?}",
149+
err
150+
);
151+
}
146152
}
147-
});
153+
.in_current_span(),
154+
);
148155

149156
Ok(resp)
150157
}

crates/router/src/core/payments/operations/payment_confirm.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -160,18 +160,21 @@ impl<F: Send + Clone, Ctx: PaymentMethodRetrieve>
160160

161161
let store = state.store.clone();
162162

163-
let business_profile_fut = tokio::spawn(async move {
164-
store
165-
.find_business_profile_by_profile_id(&profile_id)
166-
.map(|business_profile_result| {
167-
business_profile_result.to_not_found_response(
168-
errors::ApiErrorResponse::BusinessProfileNotFound {
169-
id: profile_id.to_string(),
170-
},
171-
)
172-
})
173-
.await
174-
});
163+
let business_profile_fut = tokio::spawn(
164+
async move {
165+
store
166+
.find_business_profile_by_profile_id(&profile_id)
167+
.map(|business_profile_result| {
168+
business_profile_result.to_not_found_response(
169+
errors::ApiErrorResponse::BusinessProfileNotFound {
170+
id: profile_id.to_string(),
171+
},
172+
)
173+
})
174+
.await
175+
}
176+
.in_current_span(),
177+
);
175178

176179
let store = state.store.clone();
177180

@@ -498,13 +501,17 @@ impl<F: Send + Clone, Ctx: PaymentMethodRetrieve>
498501

499502
let store = state.clone().store;
500503

501-
let additional_pm_data_fut = tokio::spawn(async move {
502-
Ok(n_request_payment_method_data
503-
.async_map(|payment_method_data| async move {
504-
helpers::get_additional_payment_data(&payment_method_data, store.as_ref()).await
505-
})
506-
.await)
507-
});
504+
let additional_pm_data_fut = tokio::spawn(
505+
async move {
506+
Ok(n_request_payment_method_data
507+
.async_map(|payment_method_data| async move {
508+
helpers::get_additional_payment_data(&payment_method_data, store.as_ref())
509+
.await
510+
})
511+
.await)
512+
}
513+
.in_current_span(),
514+
);
508515

509516
let store = state.clone().store;
510517

crates/router/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use actix_web::{
3131
};
3232
use http::StatusCode;
3333
use hyperswitch_interfaces::secrets_interface::secret_state::SecuredSecret;
34+
use router_env::tracing::Instrument;
3435
use routes::AppState;
3536
use storage_impl::errors::ApplicationResult;
3637
use tokio::sync::{mpsc, oneshot};
@@ -192,7 +193,7 @@ pub async fn start_server(conf: settings::Settings<SecuredSecret>) -> Applicatio
192193
.workers(server.workers)
193194
.shutdown_timeout(server.shutdown_timeout)
194195
.run();
195-
tokio::spawn(receiver_for_error(rx, server.handle()));
196+
let _task_handle = tokio::spawn(receiver_for_error(rx, server.handle()).in_current_span());
196197
Ok(server)
197198
}
198199

crates/router/tests/utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use actix_web::{
1212
};
1313
use derive_deref::Deref;
1414
use router::{configs::settings::Settings, routes::AppState, services};
15+
use router_env::tracing::Instrument;
1516
use serde::{de::DeserializeOwned, Deserialize};
1617
use serde_json::{json, Value};
1718
use tokio::sync::{oneshot, OnceCell};
@@ -24,7 +25,7 @@ async fn spawn_server() -> bool {
2425
.await
2526
.expect("failed to create server");
2627

27-
let _server = tokio::spawn(server);
28+
let _server = tokio::spawn(server.in_current_span());
2829
true
2930
}
3031

0 commit comments

Comments
 (0)