diff --git a/crates/diesel_models/src/events.rs b/crates/diesel_models/src/events.rs index 82b2b58f80bf..527913108632 100644 --- a/crates/diesel_models/src/events.rs +++ b/crates/diesel_models/src/events.rs @@ -62,21 +62,39 @@ pub struct Event { #[derive(Clone, Debug, Deserialize, Serialize, AsExpression, diesel::FromSqlRow)] #[diesel(sql_type = diesel::sql_types::Jsonb)] pub enum EventMetadata { + #[cfg(feature = "v1")] Payment { payment_id: common_utils::id_type::PaymentId, }, + #[cfg(feature = "v2")] + Payment { + payment_id: common_utils::id_type::GlobalPaymentId, + }, Payout { payout_id: String, }, + #[cfg(feature = "v1")] Refund { payment_id: common_utils::id_type::PaymentId, refund_id: String, }, + #[cfg(feature = "v2")] + Refund { + payment_id: common_utils::id_type::GlobalPaymentId, + refund_id: common_utils::id_type::GlobalRefundId, + }, + #[cfg(feature = "v1")] Dispute { payment_id: common_utils::id_type::PaymentId, attempt_id: String, dispute_id: String, }, + #[cfg(feature = "v2")] + Dispute { + payment_id: common_utils::id_type::GlobalPaymentId, + attempt_id: String, + dispute_id: String, + }, Mandate { payment_method_id: String, mandate_id: String, diff --git a/crates/router/src/core/webhooks.rs b/crates/router/src/core/webhooks.rs index 64c5617398b4..ffa8ff9281a4 100644 --- a/crates/router/src/core/webhooks.rs +++ b/crates/router/src/core/webhooks.rs @@ -4,13 +4,13 @@ mod incoming; mod incoming_v2; #[cfg(feature = "v1")] mod outgoing; +#[cfg(feature = "v2")] +mod outgoing_v2; pub mod types; pub mod utils; #[cfg(feature = "olap")] pub mod webhook_events; -#[cfg(feature = "v2")] -pub(crate) use self::incoming_v2::incoming_webhooks_wrapper; #[cfg(feature = "v1")] pub(crate) use self::{ incoming::incoming_webhooks_wrapper, @@ -19,5 +19,9 @@ pub(crate) use self::{ trigger_webhook_and_raise_event, }, }; +#[cfg(feature = "v2")] +pub(crate) use self::{ + incoming_v2::incoming_webhooks_wrapper, outgoing_v2::create_event_and_trigger_outgoing_webhook, +}; const MERCHANT_ID: &str = "merchant_id"; diff --git a/crates/router/src/core/webhooks/incoming_v2.rs b/crates/router/src/core/webhooks/incoming_v2.rs index 0deb91efaa8d..4092ca5904bf 100644 --- a/crates/router/src/core/webhooks/incoming_v2.rs +++ b/crates/router/src/core/webhooks/incoming_v2.rs @@ -433,7 +433,7 @@ async fn payments_incoming_webhook_flow( req_state, merchant_account.clone(), key_store.clone(), - profile, + profile.clone(), payments::operations::PaymentGet, api::PaymentsRetrieveRequest { force_sync: true, @@ -499,22 +499,22 @@ async fn payments_incoming_webhook_flow( let event_type: Option = payments_response.status.foreign_into(); // If event is NOT an UnsupportedEvent, trigger Outgoing Webhook - if let Some(_outgoing_event_type) = event_type { - let _primary_object_created_at = payments_response.created; + if let Some(outgoing_event_type) = event_type { + let primary_object_created_at = payments_response.created; // TODO: trigger an outgoing webhook to merchant - // Box::pin(super::create_event_and_trigger_outgoing_webhook( - // state, - // merchant_account, - // profile, - // &key_store, - // outgoing_event_type, - // enums::EventClass::Payments, - // payment_id.get_string_repr().to_owned(), - // enums::EventObjectType::PaymentDetails, - // api::OutgoingWebhookContent::PaymentDetails(Box::new(payments_response)), - // Some(primary_object_created_at), - // )) - // .await?; + Box::pin(super::create_event_and_trigger_outgoing_webhook( + state, + merchant_account, + profile, + &key_store, + outgoing_event_type, + enums::EventClass::Payments, + payment_id.get_string_repr().to_owned(), + enums::EventObjectType::PaymentDetails, + api::OutgoingWebhookContent::PaymentDetails(Box::new(payments_response)), + Some(primary_object_created_at), + )) + .await?; }; let response = WebhookResponseTracker::Payment { payment_id, status }; diff --git a/crates/router/src/core/webhooks/outgoing_v2.rs b/crates/router/src/core/webhooks/outgoing_v2.rs new file mode 100644 index 000000000000..7e753eb0f8cb --- /dev/null +++ b/crates/router/src/core/webhooks/outgoing_v2.rs @@ -0,0 +1,817 @@ +use std::collections::HashMap; + +use api_models::{ + webhook_events::{OutgoingWebhookRequestContent, OutgoingWebhookResponseContent}, + webhooks, +}; +use common_utils::{ + ext_traits::{Encode, StringExt}, + request::RequestContent, + type_name, + types::keymanager::{Identifier, KeyManagerState}, +}; +use diesel_models::process_tracker::business_status; +use error_stack::{report, ResultExt}; +use hyperswitch_domain_models::type_encryption::{crypto_operation, CryptoOperation}; +use hyperswitch_interfaces::consts; +use masking::{ExposeInterface, Mask, PeekInterface, Secret}; +use router_env::{ + instrument, + tracing::{self, Instrument}, +}; + +use super::{types, utils, MERCHANT_ID}; +use crate::{ + core::{ + errors::{self, CustomResult}, + metrics, + }, + events::outgoing_webhook_logs::{ + OutgoingWebhookEvent, OutgoingWebhookEventContent, OutgoingWebhookEventMetric, + }, + logger, + routes::{app::SessionStateInfo, SessionState}, + services, + types::{ + api, + domain::{self}, + storage::{self, enums}, + transformers::ForeignFrom, + }, + utils::{OptionExt, ValueExt}, +}; + +const OUTGOING_WEBHOOK_TIMEOUT_SECS: u64 = 5; + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +pub(crate) async fn create_event_and_trigger_outgoing_webhook( + state: SessionState, + merchant_account: domain::MerchantAccount, + business_profile: domain::Profile, + merchant_key_store: &domain::MerchantKeyStore, + event_type: enums::EventType, + event_class: enums::EventClass, + primary_object_id: String, + primary_object_type: enums::EventObjectType, + content: api::OutgoingWebhookContent, + primary_object_created_at: Option, +) -> CustomResult<(), errors::ApiErrorResponse> { + let delivery_attempt = enums::WebhookDeliveryAttempt::InitialAttempt; + let idempotent_event_id = + utils::get_idempotent_event_id(&primary_object_id, event_type, delivery_attempt); + let webhook_url_result = get_webhook_url_from_business_profile(&business_profile); + + if !state.conf.webhooks.outgoing_enabled + || webhook_url_result.is_err() + || webhook_url_result.as_ref().is_ok_and(String::is_empty) + { + logger::debug!( + business_profile_id=?business_profile.get_id(), + %idempotent_event_id, + "Outgoing webhooks are disabled in application configuration, or merchant webhook URL \ + could not be obtained; skipping outgoing webhooks for event" + ); + return Ok(()); + } + + let event_id = utils::generate_event_id(); + let merchant_id = business_profile.merchant_id.clone(); + let now = common_utils::date_time::now(); + + let outgoing_webhook = api::OutgoingWebhook { + merchant_id: merchant_id.clone(), + event_id: event_id.clone(), + event_type, + content: content.clone(), + timestamp: now, + }; + + let request_content = get_outgoing_webhook_request(outgoing_webhook, &business_profile) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to construct outgoing webhook request content")?; + + let event_metadata = storage::EventMetadata::foreign_from(&content); + let key_manager_state = &(&state).into(); + let new_event = domain::Event { + event_id: event_id.clone(), + event_type, + event_class, + is_webhook_notified: false, + primary_object_id, + primary_object_type, + created_at: now, + merchant_id: Some(business_profile.merchant_id.clone()), + business_profile_id: Some(business_profile.get_id().to_owned()), + primary_object_created_at, + idempotent_event_id: Some(idempotent_event_id.clone()), + initial_attempt_id: Some(event_id.clone()), + request: Some( + crypto_operation( + key_manager_state, + type_name!(domain::Event), + CryptoOperation::Encrypt( + request_content + .encode_to_string_of_json() + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to encode outgoing webhook request content") + .map(Secret::new)?, + ), + Identifier::Merchant(merchant_key_store.merchant_id.clone()), + merchant_key_store.key.get_inner().peek(), + ) + .await + .and_then(|val| val.try_into_operation()) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to encrypt outgoing webhook request content")?, + ), + response: None, + delivery_attempt: Some(delivery_attempt), + metadata: Some(event_metadata), + }; + + let event_insert_result = state + .store + .insert_event(key_manager_state, new_event, merchant_key_store) + .await; + + let event = match event_insert_result { + Ok(event) => Ok(event), + Err(error) => { + if error.current_context().is_db_unique_violation() { + logger::debug!("Event with idempotent ID `{idempotent_event_id}` already exists in the database"); + return Ok(()); + } else { + logger::error!(event_insertion_failure=?error); + Err(error + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .attach_printable("Failed to insert event in events table")) + } + } + }?; + + let cloned_key_store = merchant_key_store.clone(); + // Using a tokio spawn here and not arbiter because not all caller of this function + // may have an actix arbiter + tokio::spawn( + async move { + Box::pin(trigger_webhook_and_raise_event( + state, + business_profile, + &cloned_key_store, + event, + request_content, + delivery_attempt, + Some(content), + )) + .await; + } + .in_current_span(), + ); + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +#[instrument(skip_all)] +pub(crate) async fn trigger_webhook_and_raise_event( + state: SessionState, + business_profile: domain::Profile, + merchant_key_store: &domain::MerchantKeyStore, + event: domain::Event, + request_content: OutgoingWebhookRequestContent, + delivery_attempt: enums::WebhookDeliveryAttempt, + content: Option, +) { + logger::debug!( + event_id=%event.event_id, + idempotent_event_id=?event.idempotent_event_id, + initial_attempt_id=?event.initial_attempt_id, + "Attempting to send webhook" + ); + + let merchant_id = business_profile.merchant_id.clone(); + let trigger_webhook_result = trigger_webhook_to_merchant( + state.clone(), + business_profile, + merchant_key_store, + event.clone(), + request_content, + delivery_attempt, + ) + .await; + + let _ = raise_webhooks_analytics_event( + state, + trigger_webhook_result, + content, + merchant_id, + event, + merchant_key_store, + ) + .await; +} + +async fn trigger_webhook_to_merchant( + state: SessionState, + business_profile: domain::Profile, + merchant_key_store: &domain::MerchantKeyStore, + event: domain::Event, + request_content: OutgoingWebhookRequestContent, + delivery_attempt: enums::WebhookDeliveryAttempt, +) -> CustomResult<(), errors::WebhooksFlowError> { + let webhook_url = get_webhook_url_from_business_profile(&business_profile)?; + + let event_id = event.event_id; + + let headers = request_content + .headers + .into_iter() + .map(|(name, value)| (name, value.into_masked())) + .collect(); + let request = services::RequestBuilder::new() + .method(services::Method::Post) + .url(&webhook_url) + .attach_default_headers() + .headers(headers) + .set_body(RequestContent::RawBytes( + request_content.body.expose().into_bytes(), + )) + .build(); + + let response = state + .api_client + .send_request(&state, request, Some(OUTGOING_WEBHOOK_TIMEOUT_SECS), false) + .await; + + metrics::WEBHOOK_OUTGOING_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + business_profile.merchant_id.get_string_repr().to_owned(), + )], + ); + logger::debug!(outgoing_webhook_response=?response); + + match delivery_attempt { + enums::WebhookDeliveryAttempt::InitialAttempt => match response { + Err(client_error) => { + api_client_error_handler( + state.clone(), + merchant_key_store.clone(), + &business_profile.merchant_id, + &event_id, + client_error, + delivery_attempt, + ScheduleWebhookRetry::NoSchedule, + ) + .await? + } + Ok(response) => { + let status_code = response.status(); + let _updated_event = update_event_in_storage( + state.clone(), + merchant_key_store.clone(), + &business_profile.merchant_id, + &event_id, + response, + ) + .await?; + + if status_code.is_success() { + success_response_handler( + state.clone(), + &business_profile.merchant_id, + //TODO: add outgoing webhook retries support + None, + business_status::INITIAL_DELIVERY_ATTEMPT_SUCCESSFUL, + ) + .await?; + } else { + error_response_handler( + state.clone(), + &business_profile.merchant_id, + delivery_attempt, + status_code.as_u16(), + "Ignoring error when sending webhook to merchant", + ScheduleWebhookRetry::NoSchedule, + ) + .await?; + } + } + }, + // TODO: Add support for automatic retries + enums::WebhookDeliveryAttempt::AutomaticRetry => todo!(), + enums::WebhookDeliveryAttempt::ManualRetry => match response { + Err(client_error) => { + api_client_error_handler( + state.clone(), + merchant_key_store.clone(), + &business_profile.merchant_id, + &event_id, + client_error, + delivery_attempt, + ScheduleWebhookRetry::NoSchedule, + ) + .await? + } + Ok(response) => { + let status_code = response.status(); + let _updated_event = update_event_in_storage( + state.clone(), + merchant_key_store.clone(), + &business_profile.merchant_id, + &event_id, + response, + ) + .await?; + + if status_code.is_success() { + increment_webhook_outgoing_received_count(&business_profile.merchant_id); + } else { + error_response_handler( + state, + &business_profile.merchant_id, + delivery_attempt, + status_code.as_u16(), + "Ignoring error when sending webhook to merchant", + ScheduleWebhookRetry::NoSchedule, + ) + .await?; + } + } + }, + } + + Ok(()) +} + +async fn raise_webhooks_analytics_event( + state: SessionState, + trigger_webhook_result: CustomResult<(), errors::WebhooksFlowError>, + content: Option, + merchant_id: common_utils::id_type::MerchantId, + event: domain::Event, + merchant_key_store: &domain::MerchantKeyStore, +) { + let key_manager_state: &KeyManagerState = &(&state).into(); + let event_id = event.event_id; + + let error = if let Err(error) = trigger_webhook_result { + logger::error!(?error, "Failed to send webhook to merchant"); + + serde_json::to_value(error.current_context()) + .change_context(errors::ApiErrorResponse::WebhookProcessingFailure) + .inspect_err(|error| { + logger::error!(?error, "Failed to serialize outgoing webhook error as JSON"); + }) + .ok() + } else { + None + }; + + let outgoing_webhook_event_content = content + .as_ref() + .and_then(api::OutgoingWebhookContent::get_outgoing_webhook_event_content) + .or_else(|| get_outgoing_webhook_event_content_from_event_metadata(event.metadata)); + + // Fetch updated_event from db + let updated_event = state + .store + .find_event_by_merchant_id_event_id( + key_manager_state, + &merchant_id, + &event_id, + merchant_key_store, + ) + .await + .attach_printable_lazy(|| format!("event not found for id: {}", &event_id)) + .map_err(|error| { + logger::error!(?error); + error + }) + .ok(); + + // Get status_code from webhook response + let status_code = updated_event.and_then(|updated_event| { + let webhook_response: Option = + updated_event.response.and_then(|res| { + res.peek() + .parse_struct("OutgoingWebhookResponseContent") + .map_err(|error| { + logger::error!(?error, "Error deserializing webhook response"); + error + }) + .ok() + }); + webhook_response.and_then(|res| res.status_code) + }); + + let webhook_event = OutgoingWebhookEvent::new( + merchant_id, + event_id, + event.event_type, + outgoing_webhook_event_content, + error, + event.initial_attempt_id, + status_code, + event.delivery_attempt, + ); + state.event_handler().log_event(&webhook_event); +} + +fn get_webhook_url_from_business_profile( + business_profile: &domain::Profile, +) -> CustomResult { + let webhook_details = business_profile + .webhook_details + .clone() + .get_required_value("webhook_details") + .change_context(errors::WebhooksFlowError::MerchantWebhookDetailsNotFound)?; + + webhook_details + .webhook_url + .get_required_value("webhook_url") + .change_context(errors::WebhooksFlowError::MerchantWebhookUrlNotConfigured) + .map(ExposeInterface::expose) +} + +pub(crate) fn get_outgoing_webhook_request( + outgoing_webhook: api::OutgoingWebhook, + business_profile: &domain::Profile, +) -> CustomResult { + #[inline] + fn get_outgoing_webhook_request_inner( + outgoing_webhook: api::OutgoingWebhook, + business_profile: &domain::Profile, + ) -> CustomResult { + let mut headers = vec![ + ( + reqwest::header::CONTENT_TYPE.to_string(), + mime::APPLICATION_JSON.essence_str().into(), + ), + ( + reqwest::header::USER_AGENT.to_string(), + consts::USER_AGENT.to_string().into(), + ), + ]; + + let transformed_outgoing_webhook = WebhookType::from(outgoing_webhook); + let payment_response_hash_key = business_profile.payment_response_hash_key.clone(); + let custom_headers = business_profile + .outgoing_webhook_custom_http_headers + .clone() + .map(|headers| { + headers + .into_inner() + .expose() + .parse_value::>("HashMap") + .change_context(errors::WebhooksFlowError::OutgoingWebhookEncodingFailed) + .attach_printable("Failed to deserialize outgoing webhook custom HTTP headers") + }) + .transpose()?; + if let Some(ref map) = custom_headers { + headers.extend( + map.iter() + .map(|(key, value)| (key.clone(), value.clone().into_masked())), + ); + }; + let outgoing_webhooks_signature = transformed_outgoing_webhook + .get_outgoing_webhooks_signature(payment_response_hash_key)?; + + if let Some(signature) = outgoing_webhooks_signature.signature { + WebhookType::add_webhook_header(&mut headers, signature) + } + + Ok(OutgoingWebhookRequestContent { + body: outgoing_webhooks_signature.payload, + headers: headers + .into_iter() + .map(|(name, value)| (name, Secret::new(value.into_inner()))) + .collect(), + }) + } + + get_outgoing_webhook_request_inner::( + outgoing_webhook, + business_profile, + ) +} + +#[derive(Debug)] +enum ScheduleWebhookRetry { + WithProcessTracker(Box), + NoSchedule, +} + +async fn update_event_if_client_error( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &common_utils::id_type::MerchantId, + event_id: &str, + error_message: String, +) -> CustomResult { + let is_webhook_notified = false; + let key_manager_state = &(&state).into(); + let response_to_store = OutgoingWebhookResponseContent { + body: None, + headers: None, + status_code: None, + error_message: Some(error_message), + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + crypto_operation( + key_manager_state, + type_name!(domain::Event), + CryptoOperation::Encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + ), + Identifier::Merchant(merchant_key_store.merchant_id.clone()), + merchant_key_store.key.get_inner().peek(), + ) + .await + .and_then(|val| val.try_into_operation()) + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + + state + .store + .update_event_by_merchant_id_event_id( + key_manager_state, + merchant_id, + event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) +} + +async fn api_client_error_handler( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &common_utils::id_type::MerchantId, + event_id: &str, + client_error: error_stack::Report, + delivery_attempt: enums::WebhookDeliveryAttempt, + _schedule_webhook_retry: ScheduleWebhookRetry, +) -> CustomResult<(), errors::WebhooksFlowError> { + // Not including detailed error message in response information since it contains too + // much of diagnostic information to be exposed to the merchant. + update_event_if_client_error( + state.clone(), + merchant_key_store, + merchant_id, + event_id, + "Unable to send request to merchant server".to_string(), + ) + .await?; + + let error = client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed); + logger::error!( + ?error, + ?delivery_attempt, + "An error occurred when sending webhook to merchant" + ); + + //TODO: add outgoing webhook retries support + // if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { + // // Schedule a retry attempt for webhook delivery + // outgoing_webhook_retry::retry_webhook_delivery_task( + // &*state.store, + // merchant_id, + // *process_tracker, + // ) + // .await + // .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; + // } + + Err(error) +} + +async fn update_event_in_storage( + state: SessionState, + merchant_key_store: domain::MerchantKeyStore, + merchant_id: &common_utils::id_type::MerchantId, + event_id: &str, + response: reqwest::Response, +) -> CustomResult { + let status_code = response.status(); + let is_webhook_notified = status_code.is_success(); + let key_manager_state = &(&state).into(); + let response_headers = response + .headers() + .iter() + .map(|(name, value)| { + ( + name.as_str().to_owned(), + value + .to_str() + .map(|s| Secret::from(String::from(s))) + .unwrap_or_else(|error| { + logger::warn!( + "Response header {} contains non-UTF-8 characters: {error:?}", + name.as_str() + ); + Secret::from(String::from("Non-UTF-8 header value")) + }), + ) + }) + .collect::>(); + let response_body = response + .text() + .await + .map(Secret::from) + .unwrap_or_else(|error| { + logger::warn!("Response contains non-UTF-8 characters: {error:?}"); + Secret::from(String::from("Non-UTF-8 response body")) + }); + let response_to_store = OutgoingWebhookResponseContent { + body: Some(response_body), + headers: Some(response_headers), + status_code: Some(status_code.as_u16()), + error_message: None, + }; + + let event_update = domain::EventUpdate::UpdateResponse { + is_webhook_notified, + response: Some( + crypto_operation( + key_manager_state, + type_name!(domain::Event), + CryptoOperation::Encrypt( + response_to_store + .encode_to_string_of_json() + .change_context( + errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed, + ) + .map(Secret::new)?, + ), + Identifier::Merchant(merchant_key_store.merchant_id.clone()), + merchant_key_store.key.get_inner().peek(), + ) + .await + .and_then(|val| val.try_into_operation()) + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) + .attach_printable("Failed to encrypt outgoing webhook response content")?, + ), + }; + state + .store + .update_event_by_merchant_id_event_id( + key_manager_state, + merchant_id, + event_id, + event_update, + &merchant_key_store, + ) + .await + .change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed) +} + +fn increment_webhook_outgoing_received_count(merchant_id: &common_utils::id_type::MerchantId) { + metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + merchant_id.get_string_repr().to_owned(), + )], + ) +} + +async fn success_response_handler( + state: SessionState, + merchant_id: &common_utils::id_type::MerchantId, + process_tracker: Option, + business_status: &'static str, +) -> CustomResult<(), errors::WebhooksFlowError> { + increment_webhook_outgoing_received_count(merchant_id); + + match process_tracker { + Some(process_tracker) => state + .store + .as_scheduler() + .finish_process_with_business_status(process_tracker, business_status) + .await + .change_context( + errors::WebhooksFlowError::OutgoingWebhookProcessTrackerTaskUpdateFailed, + ), + None => Ok(()), + } +} + +async fn error_response_handler( + _state: SessionState, + merchant_id: &common_utils::id_type::MerchantId, + delivery_attempt: enums::WebhookDeliveryAttempt, + status_code: u16, + log_message: &'static str, + _schedule_webhook_retry: ScheduleWebhookRetry, +) -> CustomResult<(), errors::WebhooksFlowError> { + metrics::WEBHOOK_OUTGOING_NOT_RECEIVED_COUNT.add( + &metrics::CONTEXT, + 1, + &[metrics::KeyValue::new( + MERCHANT_ID, + merchant_id.get_string_repr().to_owned(), + )], + ); + + let error = report!(errors::WebhooksFlowError::NotReceivedByMerchant); + logger::warn!(?error, ?delivery_attempt, status_code, %log_message); + + //TODO: add outgoing webhook retries support + // if let ScheduleWebhookRetry::WithProcessTracker(process_tracker) = schedule_webhook_retry { + // // Schedule a retry attempt for webhook delivery + // outgoing_webhook_retry::retry_webhook_delivery_task( + // &*state.store, + // merchant_id, + // *process_tracker, + // ) + // .await + // .change_context(errors::WebhooksFlowError::OutgoingWebhookRetrySchedulingFailed)?; + // } + + Err(error) +} + +impl ForeignFrom<&api::OutgoingWebhookContent> for storage::EventMetadata { + fn foreign_from(content: &api::OutgoingWebhookContent) -> Self { + match content { + webhooks::OutgoingWebhookContent::PaymentDetails(payments_response) => Self::Payment { + payment_id: payments_response.id.clone(), + }, + webhooks::OutgoingWebhookContent::RefundDetails(refund_response) => Self::Refund { + payment_id: refund_response.payment_id.clone(), + refund_id: refund_response.id.clone(), + }, + webhooks::OutgoingWebhookContent::DisputeDetails(dispute_response) => { + //TODO: add support for dispute outgoing webhook + todo!() + } + webhooks::OutgoingWebhookContent::MandateDetails(mandate_response) => Self::Mandate { + payment_method_id: mandate_response.payment_method_id.clone(), + mandate_id: mandate_response.mandate_id.clone(), + }, + #[cfg(feature = "payouts")] + webhooks::OutgoingWebhookContent::PayoutDetails(payout_response) => Self::Payout { + payout_id: payout_response.payout_id.clone(), + }, + } + } +} + +fn get_outgoing_webhook_event_content_from_event_metadata( + event_metadata: Option, +) -> Option { + event_metadata.map(|metadata| match metadata { + diesel_models::EventMetadata::Payment { payment_id } => { + OutgoingWebhookEventContent::Payment { + payment_id, + content: serde_json::Value::Null, + } + } + diesel_models::EventMetadata::Payout { payout_id } => OutgoingWebhookEventContent::Payout { + payout_id, + content: serde_json::Value::Null, + }, + diesel_models::EventMetadata::Refund { + payment_id, + refund_id, + } => OutgoingWebhookEventContent::Refund { + payment_id, + refund_id, + content: serde_json::Value::Null, + }, + diesel_models::EventMetadata::Dispute { + payment_id, + attempt_id, + dispute_id, + } => OutgoingWebhookEventContent::Dispute { + payment_id, + attempt_id, + dispute_id, + content: serde_json::Value::Null, + }, + diesel_models::EventMetadata::Mandate { + payment_method_id, + mandate_id, + } => OutgoingWebhookEventContent::Mandate { + payment_method_id, + mandate_id, + content: serde_json::Value::Null, + }, + }) +} diff --git a/crates/router/src/db/events.rs b/crates/router/src/db/events.rs index 651c2ece610b..ccbbc0087c21 100644 --- a/crates/router/src/db/events.rs +++ b/crates/router/src/db/events.rs @@ -717,6 +717,7 @@ mod tests { #[allow(clippy::unwrap_used)] #[tokio::test] + #[cfg(feature = "v1")] async fn test_mockdb_event_interface() { #[allow(clippy::expect_used)] let mockdb = MockDb::new(&redis_interface::RedisSettings::default()) @@ -825,4 +826,116 @@ mod tests { assert_eq!(updated_event.primary_object_id, payment_id); assert_eq!(updated_event.event_id, event_id); } + + #[allow(clippy::unwrap_used)] + #[tokio::test] + #[cfg(feature = "v2")] + async fn test_mockdb_event_interface() { + #[allow(clippy::expect_used)] + let mockdb = MockDb::new(&redis_interface::RedisSettings::default()) + .await + .expect("Failed to create Mock store"); + let event_id = "test_event_id"; + let (tx, _) = tokio::sync::oneshot::channel(); + let app_state = Box::pin(routes::AppState::with_storage( + Settings::default(), + StorageImpl::PostgresqlTest, + tx, + Box::new(services::MockApiClient), + )) + .await; + let state = &Arc::new(app_state) + .get_session_state("public", || {}) + .unwrap(); + let merchant_id = + common_utils::id_type::MerchantId::try_from(std::borrow::Cow::from("merchant_1")) + .unwrap(); + let business_profile_id = + common_utils::id_type::ProfileId::try_from(std::borrow::Cow::from("profile1")).unwrap(); + let payment_id = "test_payment_id"; + let key_manager_state = &state.into(); + let master_key = mockdb.get_master_key(); + mockdb + .insert_merchant_key_store( + key_manager_state, + domain::MerchantKeyStore { + merchant_id: merchant_id.clone(), + key: domain::types::crypto_operation( + key_manager_state, + type_name!(domain::MerchantKeyStore), + domain::types::CryptoOperation::Encrypt( + services::generate_aes256_key().unwrap().to_vec().into(), + ), + Identifier::Merchant(merchant_id.to_owned()), + master_key, + ) + .await + .and_then(|val| val.try_into_operation()) + .unwrap(), + created_at: datetime!(2023-02-01 0:00), + }, + &master_key.to_vec().into(), + ) + .await + .unwrap(); + let merchant_key_store = mockdb + .get_merchant_key_store_by_merchant_id( + key_manager_state, + &merchant_id, + &master_key.to_vec().into(), + ) + .await + .unwrap(); + + let event1 = mockdb + .insert_event( + key_manager_state, + domain::Event { + event_id: event_id.into(), + event_type: enums::EventType::PaymentSucceeded, + event_class: enums::EventClass::Payments, + is_webhook_notified: false, + primary_object_id: payment_id.into(), + primary_object_type: enums::EventObjectType::PaymentDetails, + created_at: common_utils::date_time::now(), + merchant_id: Some(merchant_id.to_owned()), + business_profile_id: Some(business_profile_id.to_owned()), + primary_object_created_at: Some(common_utils::date_time::now()), + idempotent_event_id: Some(event_id.into()), + initial_attempt_id: Some(event_id.into()), + request: None, + response: None, + delivery_attempt: Some(enums::WebhookDeliveryAttempt::InitialAttempt), + metadata: Some(EventMetadata::Payment { + payment_id: common_utils::id_type::GlobalPaymentId::try_from( + std::borrow::Cow::Borrowed(payment_id), + ) + .unwrap(), + }), + }, + &merchant_key_store, + ) + .await + .unwrap(); + + assert_eq!(event1.event_id, event_id); + + let updated_event = mockdb + .update_event_by_merchant_id_event_id( + key_manager_state, + &merchant_id, + event_id, + domain::EventUpdate::UpdateResponse { + is_webhook_notified: true, + response: None, + }, + &merchant_key_store, + ) + .await + .unwrap(); + + assert!(updated_event.is_webhook_notified); + assert_eq!(updated_event.primary_object_id, payment_id); + assert_eq!(updated_event.event_id, event_id); + } } diff --git a/crates/router/src/events/outgoing_webhook_logs.rs b/crates/router/src/events/outgoing_webhook_logs.rs index 64126ec76d98..90ed50217be5 100644 --- a/crates/router/src/events/outgoing_webhook_logs.rs +++ b/crates/router/src/events/outgoing_webhook_logs.rs @@ -49,15 +49,23 @@ pub enum OutgoingWebhookEventContent { #[cfg(feature = "v2")] Refund { payment_id: common_utils::id_type::GlobalPaymentId, - refund_id: String, + refund_id: common_utils::id_type::GlobalRefundId, content: Value, }, + #[cfg(feature = "v1")] Dispute { payment_id: common_utils::id_type::PaymentId, attempt_id: String, dispute_id: String, content: Value, }, + #[cfg(feature = "v2")] + Dispute { + payment_id: common_utils::id_type::GlobalPaymentId, + attempt_id: String, + dispute_id: String, + content: Value, + }, Mandate { payment_method_id: String, mandate_id: String, @@ -117,17 +125,14 @@ impl OutgoingWebhookEventMetric for OutgoingWebhookContent { }), Self::RefundDetails(refund_payload) => Some(OutgoingWebhookEventContent::Refund { payment_id: refund_payload.payment_id.clone(), - refund_id: refund_payload.get_refund_id_as_string(), + refund_id: refund_payload.id.clone(), content: masking::masked_serialize(&refund_payload) .unwrap_or(serde_json::json!({"error":"failed to serialize"})), }), - Self::DisputeDetails(dispute_payload) => Some(OutgoingWebhookEventContent::Dispute { - payment_id: dispute_payload.payment_id.clone(), - attempt_id: dispute_payload.attempt_id.clone(), - dispute_id: dispute_payload.dispute_id.clone(), - content: masking::masked_serialize(&dispute_payload) - .unwrap_or(serde_json::json!({"error":"failed to serialize"})), - }), + Self::DisputeDetails(dispute_payload) => { + //TODO: add support for dispute outgoing webhook + todo!() + } Self::MandateDetails(mandate_payload) => Some(OutgoingWebhookEventContent::Mandate { payment_method_id: mandate_payload.payment_method_id.clone(), mandate_id: mandate_payload.mandate_id.clone(),