Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ CREATE TABLE payment_attempt_queue (
`profile_id` String,
`card_network` Nullable(String),
`routing_approach` LowCardinality(Nullable(String)),
`debit_routing_savings` Nullable(UInt32),
`sign_flag` Int8
) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka0:29092',
kafka_topic_list = 'hyperswitch-payment-attempt-events',
Expand Down Expand Up @@ -98,6 +99,7 @@ CREATE TABLE payment_attempts (
`profile_id` String,
`card_network` Nullable(String),
`routing_approach` LowCardinality(Nullable(String)),
`debit_routing_savings` Nullable(UInt32),
`sign_flag` Int8,
INDEX connectorIndex connector TYPE bloom_filter GRANULARITY 1,
INDEX paymentMethodIndex payment_method TYPE bloom_filter GRANULARITY 1,
Expand Down Expand Up @@ -155,6 +157,7 @@ CREATE MATERIALIZED VIEW payment_attempt_mv TO payment_attempts (
`profile_id` String,
`card_network` Nullable(String),
`routing_approach` LowCardinality(Nullable(String)),
`debit_routing_savings` Nullable(UInt32),
`sign_flag` Int8
) AS
SELECT
Expand Down Expand Up @@ -204,6 +207,7 @@ SELECT
profile_id,
card_network,
routing_approach,
debit_routing_savings,
sign_flag
FROM
payment_attempt_queue
Expand Down
34 changes: 34 additions & 0 deletions crates/analytics/src/payments/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct PaymentMetricsAccumulator {
pub connector_success_rate: SuccessRateAccumulator,
pub payments_distribution: PaymentsDistributionAccumulator,
pub failure_reasons_distribution: FailureReasonsDistributionAccumulator,
pub debit_routing: DebitRoutingAccumulator,
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -58,6 +59,12 @@ pub struct ProcessedAmountAccumulator {
pub total_without_retries: Option<i64>,
}

#[derive(Debug, Default)]
pub struct DebitRoutingAccumulator {
pub transaction_count: u64,
pub savings_amount: u64,
}

#[derive(Debug, Default)]
pub struct AverageAccumulator {
pub total: u32,
Expand Down Expand Up @@ -183,6 +190,27 @@ impl PaymentMetricAccumulator for SuccessRateAccumulator {
}
}

impl PaymentMetricAccumulator for DebitRoutingAccumulator {
type MetricOutput = (Option<u64>, Option<u64>, Option<u64>);

fn add_metrics_bucket(&mut self, metrics: &PaymentMetricRow) {
if let Some(count) = metrics.count {
self.transaction_count += u64::try_from(count).unwrap_or(0);
}
if let Some(total) = metrics.total.as_ref().and_then(ToPrimitive::to_u64) {
self.savings_amount += total;
}
}

fn collect(self) -> Self::MetricOutput {
(
Some(self.transaction_count),
Some(self.savings_amount),
Some(0),
)
}
}

impl PaymentMetricAccumulator for PaymentsDistributionAccumulator {
type MetricOutput = (
Option<f64>,
Expand Down Expand Up @@ -440,6 +468,9 @@ impl PaymentMetricsAccumulator {
) = self.payments_distribution.collect();
let (failure_reason_count, failure_reason_count_without_smart_retries) =
self.failure_reasons_distribution.collect();
let (debit_routed_transaction_count, debit_routing_savings, debit_routing_savings_in_usd) =
self.debit_routing.collect();

PaymentMetricsBucketValue {
payment_success_rate: self.payment_success_rate.collect(),
payment_count: self.payment_count.collect(),
Expand All @@ -463,6 +494,9 @@ impl PaymentMetricsAccumulator {
failure_reason_count_without_smart_retries,
payment_processed_amount_in_usd,
payment_processed_amount_without_smart_retries_usd,
debit_routed_transaction_count,
debit_routing_savings,
debit_routing_savings_in_usd,
}
}
}
30 changes: 30 additions & 0 deletions crates/analytics/src/payments/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ pub async fn get_metrics(
.connector_success_rate
.add_metrics_bucket(&value);
}
PaymentMetrics::DebitRouting => {
metrics_builder.debit_routing.add_metrics_bucket(&value);
}
PaymentMetrics::PaymentsDistribution => {
metrics_builder
.payments_distribution
Expand Down Expand Up @@ -294,6 +297,33 @@ pub async fn get_metrics(
if let Some(count) = collected_values.failure_reason_count_without_smart_retries {
total_failure_reasons_count_without_smart_retries += count;
}
if let Some(savings) = collected_values.debit_routing_savings {
let savings_in_usd = if let Some(ex_rates) = ex_rates {
id.currency
.and_then(|currency| {
i64::try_from(savings)
.inspect_err(|e| {
logger::error!(
"Debit Routing savings conversion error: {:?}",
e
)
})
.ok()
.and_then(|savings_i64| {
convert(ex_rates, currency, Currency::USD, savings_i64)
.inspect_err(|e| {
logger::error!("Currency conversion error: {:?}", e)
})
.ok()
})
})
.map(|savings| (savings * rust_decimal::Decimal::new(100, 0)).to_u64())
.unwrap_or_default()
} else {
None
};
collected_values.debit_routing_savings_in_usd = savings_in_usd;
}
MetricsBucketResponse {
values: collected_values,
dimensions: id,
Expand Down
7 changes: 7 additions & 0 deletions crates/analytics/src/payments/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{

mod avg_ticket_size;
mod connector_success_rate;
mod debit_routing;
mod payment_count;
mod payment_processed_amount;
mod payment_success_count;
Expand All @@ -24,6 +25,7 @@ mod success_rate;

use avg_ticket_size::AvgTicketSize;
use connector_success_rate::ConnectorSuccessRate;
use debit_routing::DebitRouting;
use payment_count::PaymentCount;
use payment_processed_amount::PaymentProcessedAmount;
use payment_success_count::PaymentSuccessCount;
Expand Down Expand Up @@ -130,6 +132,11 @@ where
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::DebitRouting => {
DebitRouting
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
.await
}
Self::SessionizedPaymentSuccessRate => {
sessionized_metrics::PaymentSuccessRate
.load_metrics(dimensions, auth, filters, granularity, time_range, pool)
Expand Down
151 changes: 151 additions & 0 deletions crates/analytics/src/payments/metrics/debit_routing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use std::collections::HashSet;

use api_models::analytics::{
payments::{PaymentDimensions, PaymentFilters, PaymentMetricsBucketIdentifier},
Granularity, TimeRange,
};
use common_utils::errors::ReportSwitchExt;
use diesel_models::enums as storage_enums;
use error_stack::ResultExt;
use time::PrimitiveDateTime;

use super::PaymentMetricRow;
use crate::{
enums::AuthInfo,
query::{Aggregate, GroupByClause, QueryBuilder, QueryFilter, SeriesBucket, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, MetricsError, MetricsResult},
};

#[derive(Default)]
pub(super) struct DebitRouting;

#[async_trait::async_trait]
impl<T> super::PaymentMetric<T> for DebitRouting
where
T: AnalyticsDataSource + super::PaymentMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
dimensions: &[PaymentDimensions],
auth: &AuthInfo,
filters: &PaymentFilters,
granularity: Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<HashSet<(PaymentMetricsBucketIdentifier, PaymentMetricRow)>> {
let mut query_builder: QueryBuilder<T> = QueryBuilder::new(AnalyticsCollection::Payment);

for dim in dimensions.iter() {
query_builder.add_select_column(dim).switch()?;
}
query_builder
.add_select_column(Aggregate::Count {
field: None,
alias: Some("count"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Sum {
field: "debit_routing_savings",
alias: Some("total"),
})
.switch()?;
query_builder.add_select_column("currency").switch()?;
query_builder
.add_select_column(Aggregate::Min {
field: "created_at",
alias: Some("start_bucket"),
})
.switch()?;
query_builder
.add_select_column(Aggregate::Max {
field: "created_at",
alias: Some("end_bucket"),
})
.switch()?;

filters.set_filter_clause(&mut query_builder).switch()?;

auth.set_filter_clause(&mut query_builder).switch()?;

time_range
.set_filter_clause(&mut query_builder)
.attach_printable("Error filtering time range")
.switch()?;

for dim in dimensions.iter() {
query_builder
.add_group_by_clause(dim)
.attach_printable("Error grouping by dimensions")
.switch()?;
}

query_builder
.add_group_by_clause("currency")
.attach_printable("Error grouping by currency")
.switch()?;

if let Some(granularity) = granularity {
granularity
.set_group_by_clause(&mut query_builder)
.attach_printable("Error adding granularity")
.switch()?;
}

query_builder
.add_filter_clause(
PaymentDimensions::PaymentStatus,
storage_enums::AttemptStatus::Charged,
)
.switch()?;

query_builder
.execute_query::<PaymentMetricRow, _>(pool)
.await
.change_context(MetricsError::QueryBuildingError)?
.change_context(MetricsError::QueryExecutionFailure)?
.into_iter()
.map(|i| {
Ok((
PaymentMetricsBucketIdentifier::new(
i.currency.as_ref().map(|i| i.0),
None,
i.connector.clone(),
i.authentication_type.as_ref().map(|i| i.0),
i.payment_method.clone(),
i.payment_method_type.clone(),
i.client_source.clone(),
i.client_version.clone(),
i.profile_id.clone(),
i.card_network.clone(),
i.merchant_id.clone(),
i.card_last_4.clone(),
i.card_issuer.clone(),
i.error_reason.clone(),
i.routing_approach.as_ref().map(|i| i.0),
TimeRange {
start_time: match (granularity, i.start_bucket) {
(Some(g), Some(st)) => g.clip_to_start(st)?,
_ => time_range.start_time,
},
end_time: granularity.as_ref().map_or_else(
|| Ok(time_range.end_time),
|g| i.end_bucket.map(|et| g.clip_to_end(et)).transpose(),
)?,
},
),
i,
))
})
.collect::<error_stack::Result<
HashSet<(PaymentMetricsBucketIdentifier, PaymentMetricRow)>,
crate::query::PostProcessingError,
>>()
.change_context(MetricsError::PostProcessingFailure)
}
}
5 changes: 5 additions & 0 deletions crates/api_models/src/analytics/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub enum PaymentMetrics {
AvgTicketSize,
RetriesCount,
ConnectorSuccessRate,
DebitRouting,
SessionizedPaymentSuccessRate,
SessionizedPaymentCount,
SessionizedPaymentSuccessCount,
Expand All @@ -128,6 +129,7 @@ impl ForexMetric for PaymentMetrics {
self,
Self::PaymentProcessedAmount
| Self::AvgTicketSize
// add here New Metric
Copy link
Contributor

@tsdk02 tsdk02 Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add Self::DebitRouting here and can remove this comment
This is for the forex conversion of other currencies to USD for debit_routing_savings

| Self::SessionizedPaymentProcessedAmount
| Self::SessionizedAvgTicketSize
)
Expand Down Expand Up @@ -309,6 +311,9 @@ pub struct PaymentMetricsBucketValue {
pub payments_failure_rate_distribution_with_only_retries: Option<f64>,
pub failure_reason_count: Option<u64>,
pub failure_reason_count_without_smart_retries: Option<u64>,
pub debit_routed_transaction_count: Option<u64>,
pub debit_routing_savings: Option<u64>,
pub debit_routing_savings_in_usd: Option<u64>,
}

#[derive(Debug, serde::Serialize)]
Expand Down
Loading
Loading