Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/router/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,8 @@ pub async fn update_payment_connector(
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed while encrypting data")?,
test_mode: mca.test_mode,
disabled: mca.disabled,
test_mode: req.test_mode,
disabled: req.disabled,
payment_methods_enabled,
metadata: req.metadata,
frm_configs,
Expand Down
2 changes: 1 addition & 1 deletion crates/router/src/core/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub async fn invalidate(
key: &str,
) -> CustomResult<services::api::ApplicationResponse<serde_json::Value>, errors::ApiErrorResponse> {
let store = state.store.as_ref();
let result = publish_into_redact_channel(store, CacheKind::All(key.into()))
let result = publish_into_redact_channel(store, [CacheKind::All(key.into())])
.await
.change_context(errors::ApiErrorResponse::InternalServerError)?;

Expand Down
5 changes: 5 additions & 0 deletions crates/router/src/core/payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@ where
)
.await?;

if payment_data.payment_attempt.merchant_connector_id.is_none() {
payment_data.payment_attempt.merchant_connector_id =
merchant_connector_account.get_mca_id();
}

let (pd, tokenization_action) = get_connector_tokenization_action_when_confirm_true(
state,
operation,
Expand Down
37 changes: 30 additions & 7 deletions crates/router/src/db/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ where
Ok(data)
}

pub async fn publish_into_redact_channel<'a>(
pub async fn publish_into_redact_channel<'a, K: IntoIterator<Item = CacheKind<'a>> + Send>(
store: &dyn StorageInterface,
key: CacheKind<'a>,
keys: K,
) -> CustomResult<usize, errors::StorageError> {
let redis_conn = store
.get_redis_conn()
Expand All @@ -111,10 +111,18 @@ pub async fn publish_into_redact_channel<'a>(
))
.attach_printable("Failed to get redis connection")?;

redis_conn
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.change_context(errors::StorageError::KVError)
let futures = keys.into_iter().map(|key| async {
redis_conn
.clone()
.publish(consts::PUB_SUB_CHANNEL, key)
.await
.change_context(errors::StorageError::KVError)
});

Ok(futures::future::try_join_all(futures)
.await?
.iter()
.sum::<usize>())
}

pub async fn publish_and_redact<'a, T, F, Fut>(
Expand All @@ -127,6 +135,21 @@ where
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, key).await?;
publish_into_redact_channel(store, [key]).await?;
Ok(data)
}

pub async fn publish_and_redact_multiple<'a, T, F, Fut, K>(
store: &dyn StorageInterface,
keys: K,
fun: F,
) -> CustomResult<T, errors::StorageError>
where
F: FnOnce() -> Fut + Send,
Fut: futures::Future<Output = CustomResult<T, errors::StorageError>> + Send,
K: IntoIterator<Item = CacheKind<'a>> + Send,
{
let data = fun().await?;
publish_into_redact_channel(store, keys).await?;
Ok(data)
}
22 changes: 10 additions & 12 deletions crates/router/src/db/merchant_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,19 +399,17 @@ async fn publish_and_redact_merchant_account_cache(
store: &dyn super::StorageInterface,
merchant_account: &storage::MerchantAccount,
) -> CustomResult<(), errors::StorageError> {
super::cache::publish_into_redact_channel(
store,
CacheKind::Accounts(merchant_account.merchant_id.as_str().into()),
)
.await?;
merchant_account
let publishable_key = merchant_account
.publishable_key
.as_ref()
.async_map(|pub_key| async {
super::cache::publish_into_redact_channel(store, CacheKind::Accounts(pub_key.into()))
.await
})
.await
.transpose()?;
.map(|publishable_key| CacheKind::Accounts(publishable_key.into()));

let mut cache_keys = vec![CacheKind::Accounts(
merchant_account.merchant_id.as_str().into(),
)];

cache_keys.extend(publishable_key.into_iter());

super::cache::publish_into_redact_channel(store, cache_keys).await?;
Ok(())
}
60 changes: 45 additions & 15 deletions crates/router/src/db/merchant_connector_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,21 +290,40 @@ impl MerchantConnectorAccountInterface for Store {
merchant_connector_id: &str,
key_store: &domain::MerchantKeyStore,
) -> CustomResult<domain::MerchantConnectorAccount, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
.async_and_then(|item| async {
item.convert(key_store.key.get_inner())
let find_call = || async {
let conn = connection::pg_connection_read(self).await?;
storage::MerchantConnectorAccount::find_by_merchant_id_merchant_connector_id(
&conn,
merchant_id,
merchant_connector_id,
)
.await
.map_err(Into::into)
.into_report()
};

#[cfg(not(feature = "accounts_cache"))]
{
find_call()
.await?
.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
})
.await
}

#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_in_memory(
self,
&format!("{}_{}", merchant_id, merchant_connector_id),
find_call,
&cache::ACCOUNTS_CACHE,
)
.await?
.convert(key_store.key.get_inner())
.await
.change_context(errors::StorageError::DecryptionError)
}
}

async fn insert_merchant_connector_account(
Expand Down Expand Up @@ -367,6 +386,9 @@ impl MerchantConnectorAccountInterface for Store {
"profile_id".to_string(),
))?;

let _merchant_id = this.merchant_id.clone();
let _merchant_connector_id = this.merchant_connector_id.clone();

let update_call = || async {
let conn = connection::pg_connection_write(self).await?;
Conversion::convert(this)
Expand All @@ -386,9 +408,17 @@ impl MerchantConnectorAccountInterface for Store {

#[cfg(feature = "accounts_cache")]
{
super::cache::publish_and_redact(
// Redact both the caches as any one or both might be used because of backwards compatibility
super::cache::publish_and_redact_multiple(
self,
cache::CacheKind::Accounts(format!("{}_{}", _profile_id, _connector_name).into()),
[
cache::CacheKind::Accounts(
format!("{}_{}", _profile_id, _connector_name).into(),
),
cache::CacheKind::Accounts(
format!("{}_{}", _merchant_id, _merchant_connector_id).into(),
),
],
update_call,
)
.await
Expand Down