@@ -561,8 +561,7 @@ pub async fn send_request(
561
561
key : consts:: METRICS_HOST_TAG_NAME . into ( ) ,
562
562
value : url. host_str ( ) . unwrap_or_default ( ) . to_string ( ) . into ( ) ,
563
563
} ;
564
-
565
- let send_request = async {
564
+ let request = {
566
565
match request. method {
567
566
Method :: Get => client. get ( url) ,
568
567
Method :: Post => {
@@ -616,32 +615,92 @@ pub async fn send_request(
616
615
. timeout ( Duration :: from_secs (
617
616
option_timeout_secs. unwrap_or ( crate :: consts:: REQUEST_TIME_OUT ) ,
618
617
) )
619
- . send ( )
620
- . await
621
- . map_err ( |error| match error {
622
- error if error. is_timeout ( ) => {
623
- metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
624
- errors:: ApiClientError :: RequestTimeoutReceived
625
- }
626
- error if is_connection_closed ( & error) => {
627
- metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
628
- errors:: ApiClientError :: ConnectionClosed
629
- }
630
- _ => errors:: ApiClientError :: RequestNotSent ( error. to_string ( ) ) ,
631
- } )
632
- . into_report ( )
633
- . attach_printable ( "Unable to send request to connector" )
634
618
} ;
635
619
636
- metrics_request:: record_operation_time (
620
+ // We cannot clone the request type, because it has Form trait which is not clonable. So we are cloning the request builder here.
621
+ let cloned_send_request = request. try_clone ( ) . map ( |cloned_request| async {
622
+ cloned_request
623
+ . send ( )
624
+ . await
625
+ . map_err ( |error| match error {
626
+ error if error. is_timeout ( ) => {
627
+ metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
628
+ errors:: ApiClientError :: RequestTimeoutReceived
629
+ }
630
+ error if is_connection_closed_before_message_could_complete ( & error) => {
631
+ metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
632
+ errors:: ApiClientError :: ConnectionClosedIncompleteMessage
633
+ }
634
+ _ => errors:: ApiClientError :: RequestNotSent ( error. to_string ( ) ) ,
635
+ } )
636
+ . into_report ( )
637
+ . attach_printable ( "Unable to send request to connector" )
638
+ } ) ;
639
+
640
+ let send_request = async {
641
+ request
642
+ . send ( )
643
+ . await
644
+ . map_err ( |error| match error {
645
+ error if error. is_timeout ( ) => {
646
+ metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
647
+ errors:: ApiClientError :: RequestTimeoutReceived
648
+ }
649
+ error if is_connection_closed_before_message_could_complete ( & error) => {
650
+ metrics:: REQUEST_BUILD_FAILURE . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
651
+ errors:: ApiClientError :: ConnectionClosedIncompleteMessage
652
+ }
653
+ _ => errors:: ApiClientError :: RequestNotSent ( error. to_string ( ) ) ,
654
+ } )
655
+ . into_report ( )
656
+ . attach_printable ( "Unable to send request to connector" )
657
+ } ;
658
+
659
+ let response = metrics_request:: record_operation_time (
637
660
send_request,
638
661
& metrics:: EXTERNAL_REQUEST_TIME ,
639
- & [ metrics_tag] ,
662
+ & [ metrics_tag. clone ( ) ] ,
640
663
)
641
- . await
664
+ . await ;
665
+ // Retry once if the response is connection closed.
666
+ //
667
+ // This is just due to the racy nature of networking.
668
+ // hyper has a connection pool of idle connections, and it selected one to send your request.
669
+ // Most of the time, hyper will receive the server’s FIN and drop the dead connection from its pool.
670
+ // But occasionally, a connection will be selected from the pool
671
+ // and written to at the same time the server is deciding to close the connection.
672
+ // Since hyper already wrote some of the request,
673
+ // it can’t really retry it automatically on a new connection, since the server may have acted already
674
+ match response {
675
+ Ok ( response) => Ok ( response) ,
676
+ Err ( error)
677
+ if error. current_context ( )
678
+ == & errors:: ApiClientError :: ConnectionClosedIncompleteMessage =>
679
+ {
680
+ metrics:: AUTO_RETRY_CONNECTION_CLOSED . add ( & metrics:: CONTEXT , 1 , & [ ] ) ;
681
+ match cloned_send_request {
682
+ Some ( cloned_request) => {
683
+ logger:: info!(
684
+ "Retrying request due to connection closed before message could complete"
685
+ ) ;
686
+ metrics_request:: record_operation_time (
687
+ cloned_request,
688
+ & metrics:: EXTERNAL_REQUEST_TIME ,
689
+ & [ metrics_tag] ,
690
+ )
691
+ . await
692
+ }
693
+ None => {
694
+ logger:: info!( "Retrying request due to connection closed before message could complete failed as request is not clonable" ) ;
695
+ Err ( error)
696
+ }
697
+ }
698
+ }
699
+ err @ Err ( _) => err,
700
+ }
642
701
}
643
702
644
- fn is_connection_closed ( error : & reqwest:: Error ) -> bool {
703
+ fn is_connection_closed_before_message_could_complete ( error : & reqwest:: Error ) -> bool {
645
704
let mut source = error. source ( ) ;
646
705
while let Some ( err) = source {
647
706
if let Some ( hyper_err) = err. downcast_ref :: < hyper:: Error > ( ) {
0 commit comments