Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
filters::ApiEventFilter,
metrics::{latency::LatencyAvg, ApiEventMetricRow},
},
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
};
Expand Down Expand Up @@ -120,6 +121,7 @@ impl AnalyticsDataSource for ClickhouseClient {
}
AnalyticsCollection::SdkEvents => TableEngine::BasicTree,
AnalyticsCollection::ApiEvents => TableEngine::BasicTree,
AnalyticsCollection::OutgoingWebhookEvent => TableEngine::BasicTree,
}
}
}
Expand All @@ -145,6 +147,10 @@ impl super::sdk_events::events::SdkEventsFilterAnalytics for ClickhouseClient {}
impl super::api_event::events::ApiLogsFilterAnalytics for ClickhouseClient {}
impl super::api_event::filters::ApiEventFilterAnalytics for ClickhouseClient {}
impl super::api_event::metrics::ApiEventMetricAnalytics for ClickhouseClient {}
impl super::outgoing_webhook_event::events::OutgoingWebhookLogsFilterAnalytics
for ClickhouseClient
{
}

#[derive(Debug, serde::Serialize)]
struct CkhQuery {
Expand Down Expand Up @@ -302,6 +308,18 @@ impl TryInto<ApiEventFilter> for serde_json::Value {
}
}

impl TryInto<OutgoingWebhookLogsResult> for serde_json::Value {
type Error = Report<ParsingError>;

fn try_into(self) -> Result<OutgoingWebhookLogsResult, Self::Error> {
serde_json::from_value(self)
.into_report()
.change_context(ParsingError::StructParseFailure(
"Failed to parse OutgoingWebhookLogsResult in clickhouse results",
))
}
}

impl ToSql<ClickhouseClient> for PrimitiveDateTime {
fn to_sql(&self, _table_engine: &TableEngine) -> error_stack::Result<String, ParsingError> {
let format =
Expand All @@ -326,6 +344,7 @@ impl ToSql<ClickhouseClient> for AnalyticsCollection {
Self::SdkEvents => Ok("sdk_events_dist".to_string()),
Self::ApiEvents => Ok("api_audit_log".to_string()),
Self::PaymentIntent => Ok("payment_intents_dist".to_string()),
Self::OutgoingWebhookEvent => Ok("outgoing_webhook_events_audit".to_string()),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod query;
pub mod refunds;

pub mod api_event;
pub mod outgoing_webhook_event;
pub mod sdk_events;
mod sqlx;
mod types;
Expand Down
6 changes: 6 additions & 0 deletions crates/analytics/src/outgoing_webhook_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod core;
pub mod events;

pub trait OutgoingWebhookEventAnalytics: events::OutgoingWebhookLogsFilterAnalytics {}

pub use self::core::outgoing_webhook_events_core;
27 changes: 27 additions & 0 deletions crates/analytics/src/outgoing_webhook_event/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use api_models::analytics::outgoing_webhook_event::OutgoingWebhookLogsRequest;
use common_utils::errors::ReportSwitchExt;
use error_stack::{IntoReport, ResultExt};

use super::events::{get_outgoing_webhook_event, OutgoingWebhookLogsResult};
use crate::{errors::AnalyticsResult, types::FiltersError, AnalyticsProvider};

pub async fn outgoing_webhook_events_core(
pool: &AnalyticsProvider,
req: OutgoingWebhookLogsRequest,
merchant_id: String,
) -> AnalyticsResult<Vec<OutgoingWebhookLogsResult>> {
let data = match pool {
AnalyticsProvider::Sqlx(_) => Err(FiltersError::NotImplemented(
"Outgoing Webhook Events Logs not implemented for SQLX",
))
.into_report()
.attach_printable("SQL Analytics is not implemented for Outgoing Webhook Events"),
AnalyticsProvider::Clickhouse(ckh_pool)
| AnalyticsProvider::CombinedSqlx(_, ckh_pool)
| AnalyticsProvider::CombinedCkh(_, ckh_pool) => {
get_outgoing_webhook_event(&merchant_id, req, ckh_pool).await
}
}
.switch()?;
Ok(data)
}
90 changes: 90 additions & 0 deletions crates/analytics/src/outgoing_webhook_event/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use api_models::analytics::{outgoing_webhook_event::OutgoingWebhookLogsRequest, Granularity};
use common_utils::errors::ReportSwitchExt;
use error_stack::ResultExt;
use time::PrimitiveDateTime;

use crate::{
query::{Aggregate, GroupByClause, QueryBuilder, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, FiltersError, FiltersResult, LoadRow},
};
pub trait OutgoingWebhookLogsFilterAnalytics: LoadRow<OutgoingWebhookLogsResult> {}

pub async fn get_outgoing_webhook_event<T>(
merchant_id: &String,
query_param: OutgoingWebhookLogsRequest,
pool: &T,
) -> FiltersResult<Vec<OutgoingWebhookLogsResult>>
where
T: AnalyticsDataSource + OutgoingWebhookLogsFilterAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
let mut query_builder: QueryBuilder<T> =
QueryBuilder::new(AnalyticsCollection::OutgoingWebhookEvent);
query_builder.add_select_column("*").switch()?;

query_builder
.add_filter_clause("merchant_id", merchant_id)
.switch()?;
query_builder
.add_filter_clause("payment_id", query_param.payment_id)
.switch()?;

if let Some(event_id) = query_param.event_id {
query_builder
.add_filter_clause("event_id", &event_id)
.switch()?;
}
if let Some(refund_id) = query_param.refund_id {
query_builder
.add_filter_clause("refund_id", &refund_id)
.switch()?;
}
if let Some(dispute_id) = query_param.dispute_id {
query_builder
.add_filter_clause("dispute_id", &dispute_id)
.switch()?;
}
if let Some(mandate_id) = query_param.mandate_id {
query_builder
.add_filter_clause("mandate_id", &mandate_id)
.switch()?;
}
if let Some(payment_method_id) = query_param.payment_method_id {
query_builder
.add_filter_clause("payment_method_id", &payment_method_id)
.switch()?;
}
if let Some(attempt_id) = query_param.attempt_id {
query_builder
.add_filter_clause("attempt_id", &attempt_id)
.switch()?;
}
//TODO!: update the execute_query function to return reports instead of plain errors...
query_builder
.execute_query::<OutgoingWebhookLogsResult, _>(pool)
.await
.change_context(FiltersError::QueryBuildingError)?
.change_context(FiltersError::QueryExecutionFailure)
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct OutgoingWebhookLogsResult {
pub merchant_id: String,
pub event_id: String,
pub event_type: String,
pub outgoing_webhook_event_type: String,
pub payment_id: String,
pub refund_id: Option<String>,
pub attempt_id: Option<String>,
pub dispute_id: Option<String>,
pub payment_method_id: Option<String>,
pub mandate_id: Option<String>,
pub content: Option<String>,
pub is_error: bool,
pub error: Option<String>,
#[serde(with = "common_utils::custom_serde::iso8601")]
pub created_at: PrimitiveDateTime,
}
2 changes: 2 additions & 0 deletions crates/analytics/src/sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ impl ToSql<SqlxClient> for AnalyticsCollection {
Self::ApiEvents => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("ApiEvents table is not implemented for Sqlx"))?,
Self::PaymentIntent => Ok("payment_intent".to_string()),
Self::OutgoingWebhookEvent => Err(error_stack::report!(ParsingError::UnknownError)
.attach_printable("OutgoingWebhookEvents table is not implemented for Sqlx"))?,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/analytics/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum AnalyticsCollection {
SdkEvents,
ApiEvents,
PaymentIntent,
OutgoingWebhookEvent,
}

#[allow(dead_code)]
Expand Down
1 change: 1 addition & 0 deletions crates/api_models/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use self::{
pub use crate::payments::TimeRange;

pub mod api_event;
pub mod outgoing_webhook_event;
pub mod payments;
pub mod refunds;
pub mod sdk_events;
Expand Down
10 changes: 10 additions & 0 deletions crates/api_models/src/analytics/outgoing_webhook_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct OutgoingWebhookLogsRequest {
pub payment_id: String,
pub event_id: Option<String>,
pub refund_id: Option<String>,
pub dispute_id: Option<String>,
pub mandate_id: Option<String>,
pub payment_method_id: Option<String>,
pub attempt_id: Option<String>,
}
7 changes: 5 additions & 2 deletions crates/api_models/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use common_utils::{

use crate::{
admin::*,
analytics::{api_event::*, sdk_events::*, *},
analytics::{
api_event::*, outgoing_webhook_event::OutgoingWebhookLogsRequest, sdk_events::*, *,
},
api_keys::*,
cards_info::*,
disputes::*,
Expand Down Expand Up @@ -89,7 +91,8 @@ impl_misc_api_event_type!(
ApiLogsRequest,
GetApiEventMetricRequest,
SdkEventsRequest,
ReportRequest
ReportRequest,
OutgoingWebhookLogsRequest
);

#[cfg(feature = "stripe")]
Expand Down
30 changes: 29 additions & 1 deletion crates/router/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod routes {
use actix_web::{web, Responder, Scope};
use analytics::{
api_event::api_events_core, errors::AnalyticsError, lambda_utils::invoke_lambda,
sdk_events::sdk_events_core,
outgoing_webhook_event::outgoing_webhook_events_core, sdk_events::sdk_events_core,
};
use api_models::analytics::{
GenerateReportRequest, GetApiEventFiltersRequest, GetApiEventMetricRequest,
Expand Down Expand Up @@ -71,6 +71,10 @@ pub mod routes {
)
.service(web::resource("api_event_logs").route(web::get().to(get_api_events)))
.service(web::resource("sdk_event_logs").route(web::post().to(get_sdk_events)))
.service(
web::resource("outgoing_webhook_event_logs")
.route(web::get().to(get_outgoing_webhook_events)),
)
.service(
web::resource("filters/api_events")
.route(web::post().to(get_api_event_filters)),
Expand Down Expand Up @@ -314,6 +318,30 @@ pub mod routes {
.await
}

pub async fn get_outgoing_webhook_events(
state: web::Data<AppState>,
req: actix_web::HttpRequest,
json_payload: web::Query<
api_models::analytics::outgoing_webhook_event::OutgoingWebhookLogsRequest,
>,
) -> impl Responder {
let flow = AnalyticsFlow::GetOutgoingWebhookEvents;
Box::pin(api::server_wrap(
flow,
state,
&req,
json_payload.into_inner(),
|state, auth: AuthenticationData, req| async move {
outgoing_webhook_events_core(&state.pool, req, auth.merchant_account.merchant_id)
.await
.map(ApplicationResponse::Json)
},
&auth::JWTAuth(Permission::Analytics),
api_locking::LockAction::NotApplicable,
))
.await
}

pub async fn get_sdk_events(
state: web::Data<AppState>,
req: actix_web::HttpRequest,
Expand Down
1 change: 1 addition & 0 deletions crates/router_env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum AnalyticsFlow {
GenerateRefundReport,
GetApiEventMetrics,
GetApiEventFilters,
GetOutgoingWebhookEvents,
}

impl FlowMetric for AnalyticsFlow {}