@@ -55,6 +55,8 @@ pub mod pool;
55
55
pub mod routines;
56
56
pub mod stmt_cache;
57
57
58
+ const DEFAULT_WAIT_TIMEOUT : usize = 28800 ;
59
+
58
60
/// Helper that asynchronously disconnects the givent connection on the default tokio executor.
59
61
fn disconnect ( mut conn : Conn ) {
60
62
let disconnected = conn. inner . disconnected ;
@@ -962,42 +964,123 @@ impl Conn {
962
964
/// * It reads and stores `wait_timeout` in the connection unless it's already in [`Opts`]
963
965
///
964
966
async fn read_settings ( & mut self ) -> Result < ( ) > {
965
- let read_socket = self . inner . opts . prefer_socket ( ) && self . inner . socket . is_none ( ) ;
966
- let read_max_allowed_packet = self . opts ( ) . max_allowed_packet ( ) . is_none ( ) ;
967
- let read_wait_timeout = self . opts ( ) . wait_timeout ( ) . is_none ( ) ;
967
+ enum Action {
968
+ Load ( Cfg ) ,
969
+ Apply ( CfgData ) ,
970
+ }
968
971
969
- let settings: Option < Row > = if read_socket || read_max_allowed_packet || read_wait_timeout {
970
- self . query_internal ( "SELECT @@socket, @@max_allowed_packet, @@wait_timeout" )
971
- . await ?
972
- } else {
973
- None
974
- } ;
972
+ enum CfgData {
973
+ MaxAllowedPacket ( usize ) ,
974
+ WaitTimeout ( usize ) ,
975
+ }
975
976
976
- // set socket inside the connection
977
- if read_socket {
978
- self . inner . socket = settings. as_ref ( ) . map ( |s| s. get ( "@@socket" ) ) . unwrap_or ( None ) ;
977
+ impl CfgData {
978
+ fn apply ( & self , conn : & mut Conn ) {
979
+ match self {
980
+ Self :: MaxAllowedPacket ( value) => {
981
+ if let Some ( stream) = conn. inner . stream . as_mut ( ) {
982
+ stream. set_max_allowed_packet ( * value) ;
983
+ }
984
+ }
985
+ Self :: WaitTimeout ( value) => {
986
+ conn. inner . wait_timeout = Duration :: from_secs ( * value as u64 ) ;
987
+ }
988
+ }
989
+ }
979
990
}
980
991
981
- // set max_allowed_packet
982
- let max_allowed_packet = if read_max_allowed_packet {
983
- settings
984
- . as_ref ( )
985
- . map ( |s| s. get ( "@@max_allowed_packet" ) )
986
- . unwrap ( )
987
- } else {
988
- self . opts ( ) . max_allowed_packet ( )
989
- } ;
990
- if let Some ( stream) = self . inner . stream . as_mut ( ) {
991
- stream. set_max_allowed_packet ( max_allowed_packet. unwrap_or ( DEFAULT_MAX_ALLOWED_PACKET ) ) ;
992
+ enum Cfg {
993
+ Socket ,
994
+ MaxAllowedPacket ,
995
+ WaitTimeout ,
992
996
}
993
997
994
- // set read_wait_timeout
995
- let wait_timeout = if read_wait_timeout {
996
- settings. as_ref ( ) . map ( |s| s. get ( "@@wait_timeout" ) ) . unwrap ( )
998
+ impl Cfg {
999
+ const fn name ( & self ) -> & ' static str {
1000
+ match self {
1001
+ Self :: Socket => "@@socket" ,
1002
+ Self :: MaxAllowedPacket => "@@max_allowed_packet" ,
1003
+ Self :: WaitTimeout => "@@wait_timeout" ,
1004
+ }
1005
+ }
1006
+
1007
+ fn apply ( & self , conn : & mut Conn , value : Option < crate :: Value > ) {
1008
+ match self {
1009
+ Cfg :: Socket => {
1010
+ conn. inner . socket = value. map ( crate :: from_value) . flatten ( ) ;
1011
+ }
1012
+ Cfg :: MaxAllowedPacket => {
1013
+ if let Some ( stream) = conn. inner . stream . as_mut ( ) {
1014
+ stream. set_max_allowed_packet (
1015
+ value
1016
+ . map ( crate :: from_value)
1017
+ . flatten ( )
1018
+ . unwrap_or ( DEFAULT_MAX_ALLOWED_PACKET ) ,
1019
+ ) ;
1020
+ }
1021
+ }
1022
+ Cfg :: WaitTimeout => {
1023
+ conn. inner . wait_timeout = Duration :: from_secs (
1024
+ value
1025
+ . map ( crate :: from_value)
1026
+ . flatten ( )
1027
+ . unwrap_or ( DEFAULT_WAIT_TIMEOUT ) as u64 ,
1028
+ ) ;
1029
+ }
1030
+ }
1031
+ }
1032
+ }
1033
+
1034
+ let mut actions = vec ! [
1035
+ if let Some ( x) = self . opts( ) . max_allowed_packet( ) {
1036
+ Action :: Apply ( CfgData :: MaxAllowedPacket ( x) )
1037
+ } else {
1038
+ Action :: Load ( Cfg :: MaxAllowedPacket )
1039
+ } ,
1040
+ if let Some ( x) = self . opts( ) . wait_timeout( ) {
1041
+ Action :: Apply ( CfgData :: WaitTimeout ( x) )
1042
+ } else {
1043
+ Action :: Load ( Cfg :: WaitTimeout )
1044
+ } ,
1045
+ ] ;
1046
+
1047
+ if self . inner . opts . prefer_socket ( ) && self . inner . socket . is_none ( ) {
1048
+ actions. push ( Action :: Load ( Cfg :: Socket ) )
1049
+ }
1050
+
1051
+ let loads = actions
1052
+ . iter ( )
1053
+ . filter_map ( |x| match x {
1054
+ Action :: Load ( x) => Some ( x) ,
1055
+ Action :: Apply ( _) => None ,
1056
+ } )
1057
+ . collect :: < Vec < _ > > ( ) ;
1058
+
1059
+ let loaded = if !loads. is_empty ( ) {
1060
+ let query = loads
1061
+ . iter ( )
1062
+ . zip ( std:: iter:: once ( ' ' ) . chain ( std:: iter:: repeat ( ',' ) ) )
1063
+ . fold ( "SELECT" . to_owned ( ) , |mut acc, ( cfg, prefix) | {
1064
+ acc. push ( prefix) ;
1065
+ acc. push_str ( cfg. name ( ) ) ;
1066
+ acc
1067
+ } ) ;
1068
+
1069
+ self . query_internal :: < Row , String > ( query)
1070
+ . await ?
1071
+ . map ( |row| row. unwrap ( ) )
1072
+ . unwrap_or_else ( || vec ! [ crate :: Value :: NULL ; loads. len( ) ] )
997
1073
} else {
998
- self . opts ( ) . wait_timeout ( )
1074
+ vec ! [ ]
999
1075
} ;
1000
- self . inner . wait_timeout = Duration :: from_secs ( wait_timeout. unwrap_or ( 28800 ) as u64 ) ;
1076
+ let mut loaded = loaded. into_iter ( ) ;
1077
+
1078
+ for action in actions {
1079
+ match action {
1080
+ Action :: Load ( cfg) => cfg. apply ( self , loaded. next ( ) ) ,
1081
+ Action :: Apply ( cfg) => cfg. apply ( self ) ,
1082
+ }
1083
+ }
1001
1084
1002
1085
Ok ( ( ) )
1003
1086
}
0 commit comments