@@ -4,13 +4,13 @@ use jsonrpc_core::{Error, ErrorCode, Result as RpcResult};
4
4
use std:: time:: Duration ;
5
5
use tracing:: debug;
6
6
7
- use mycelium:: metrics:: Metrics ;
8
7
use mycelium:: message:: { MessageId , MessageInfo } ;
8
+ use mycelium:: metrics:: Metrics ;
9
9
10
- use crate :: HttpServerState ;
11
10
use crate :: message:: { MessageReceiveInfo , MessageSendInfo , PushMessageResponse } ;
12
11
use crate :: rpc:: models:: error_codes;
13
12
use crate :: rpc:: traits:: MessageApi ;
13
+ use crate :: HttpServerState ;
14
14
15
15
/// Implementation of Message-related JSON-RPC methods
16
16
pub struct MessageRpc < M >
@@ -28,10 +28,11 @@ where
28
28
pub fn new ( state : HttpServerState < M > ) -> Self {
29
29
Self { state }
30
30
}
31
-
31
+
32
32
/// Convert a base64 string to bytes
33
33
fn decode_base64 ( & self , s : & str ) -> Result < Vec < u8 > , Error > {
34
- base64:: engine:: general_purpose:: STANDARD . decode ( s. as_bytes ( ) )
34
+ base64:: engine:: general_purpose:: STANDARD
35
+ . decode ( s. as_bytes ( ) )
35
36
. map_err ( |e| Error {
36
37
code : ErrorCode :: InvalidParams ,
37
38
message : format ! ( "Invalid base64 encoding: {}" , e) ,
@@ -44,19 +45,24 @@ impl<M> MessageApi for MessageRpc<M>
44
45
where
45
46
M : Metrics + Clone + Send + Sync + ' static ,
46
47
{
47
- fn pop_message ( & self , peek : Option < bool > , timeout : Option < u64 > , topic : Option < String > ) -> RpcResult < MessageReceiveInfo > {
48
+ fn pop_message (
49
+ & self ,
50
+ peek : Option < bool > ,
51
+ timeout : Option < u64 > ,
52
+ topic : Option < String > ,
53
+ ) -> RpcResult < MessageReceiveInfo > {
48
54
debug ! (
49
55
"Attempt to get message via RPC, peek {}, timeout {} seconds" ,
50
56
peek. unwrap_or( false ) ,
51
57
timeout. unwrap_or( 0 )
52
58
) ;
53
-
59
+
54
60
let topic_bytes = if let Some ( topic_str) = topic {
55
61
Some ( self . decode_base64 ( & topic_str) ?)
56
62
} else {
57
63
None
58
64
} ;
59
-
65
+
60
66
// A timeout of 0 seconds essentially means get a message if there is one, and return
61
67
// immediately if there isn't.
62
68
let result = tokio:: task:: block_in_place ( || {
72
78
. await
73
79
} )
74
80
} ) ;
75
-
81
+
76
82
match result {
77
83
Ok ( Ok ( m) ) => Ok ( MessageReceiveInfo {
78
84
id : m. id ,
@@ -94,22 +100,26 @@ where
94
100
} ) ,
95
101
}
96
102
}
97
-
98
- fn push_message ( & self , message : MessageSendInfo , reply_timeout : Option < u64 > ) -> RpcResult < PushMessageResponse > {
103
+
104
+ fn push_message (
105
+ & self ,
106
+ message : MessageSendInfo ,
107
+ reply_timeout : Option < u64 > ,
108
+ ) -> RpcResult < PushMessageResponse > {
99
109
let dst = match message. dst {
100
110
crate :: message:: MessageDestination :: Ip ( ip) => ip,
101
111
crate :: message:: MessageDestination :: Pk ( pk) => pk. address ( ) . into ( ) ,
102
112
} ;
103
-
113
+
104
114
debug ! (
105
115
message. dst=%dst,
106
116
message. len=message. payload. len( ) ,
107
117
"Pushing new message via RPC" ,
108
118
) ;
109
-
119
+
110
120
// Default message try duration
111
121
const DEFAULT_MESSAGE_TRY_DURATION : Duration = Duration :: from_secs ( 60 * 5 ) ;
112
-
122
+
113
123
let result = tokio:: task:: block_in_place ( || {
114
124
tokio:: runtime:: Handle :: current ( ) . block_on ( async {
115
125
self . state . node . lock ( ) . await . push_message (
@@ -121,7 +131,7 @@ where
121
131
)
122
132
} )
123
133
} ) ;
124
-
134
+
125
135
let ( id, sub) = match result {
126
136
Ok ( ( id, sub) ) => ( id, sub) ,
127
137
Err ( _) => {
@@ -132,14 +142,16 @@ where
132
142
} ) ;
133
143
}
134
144
} ;
135
-
145
+
136
146
if reply_timeout. is_none ( ) {
137
147
// If we don't wait for the reply just return here.
138
- return Ok ( PushMessageResponse :: Id ( crate :: message:: MessageIdReply { id } ) ) ;
148
+ return Ok ( PushMessageResponse :: Id ( crate :: message:: MessageIdReply {
149
+ id,
150
+ } ) ) ;
139
151
}
140
-
152
+
141
153
let mut sub = sub. unwrap ( ) ;
142
-
154
+
143
155
// Wait for reply with timeout
144
156
let reply_result = tokio:: task:: block_in_place ( || {
145
157
tokio:: runtime:: Handle :: current ( ) . block_on ( async {
@@ -183,13 +195,13 @@ where
183
195
}
184
196
} )
185
197
} ) ;
186
-
198
+
187
199
match reply_result {
188
200
Ok ( response) => Ok ( response) ,
189
201
Err ( e) => Err ( e) ,
190
202
}
191
203
}
192
-
204
+
193
205
fn push_message_reply ( & self , id : String , message : MessageSendInfo ) -> RpcResult < bool > {
194
206
let message_id = match MessageId :: from_hex ( & id) {
195
207
Ok ( id) => id,
@@ -201,22 +213,22 @@ where
201
213
} ) ;
202
214
}
203
215
} ;
204
-
216
+
205
217
let dst = match message. dst {
206
218
crate :: message:: MessageDestination :: Ip ( ip) => ip,
207
219
crate :: message:: MessageDestination :: Pk ( pk) => pk. address ( ) . into ( ) ,
208
220
} ;
209
-
221
+
210
222
debug ! (
211
223
message. id=id,
212
224
message. dst=%dst,
213
225
message. len=message. payload. len( ) ,
214
226
"Pushing new reply to message via RPC" ,
215
227
) ;
216
-
228
+
217
229
// Default message try duration
218
230
const DEFAULT_MESSAGE_TRY_DURATION : Duration = Duration :: from_secs ( 60 * 5 ) ;
219
-
231
+
220
232
tokio:: task:: block_in_place ( || {
221
233
tokio:: runtime:: Handle :: current ( ) . block_on ( async {
222
234
self . state . node . lock ( ) . await . reply_message (
@@ -227,10 +239,10 @@ where
227
239
) ;
228
240
} )
229
241
} ) ;
230
-
242
+
231
243
Ok ( true )
232
244
}
233
-
245
+
234
246
fn get_message_info ( & self , id : String ) -> RpcResult < MessageInfo > {
235
247
let message_id = match MessageId :: from_hex ( & id) {
236
248
Ok ( id) => id,
@@ -242,15 +254,14 @@ where
242
254
} ) ;
243
255
}
244
256
} ;
245
-
257
+
246
258
debug ! ( message. id=%id, "Fetching message status via RPC" ) ;
247
-
259
+
248
260
let result = tokio:: task:: block_in_place ( || {
249
- tokio:: runtime:: Handle :: current ( ) . block_on ( async {
250
- self . state . node . lock ( ) . await . message_status ( message_id)
251
- } )
261
+ tokio:: runtime:: Handle :: current ( )
262
+ . block_on ( async { self . state . node . lock ( ) . await . message_status ( message_id) } )
252
263
} ) ;
253
-
264
+
254
265
match result {
255
266
Some ( info) => Ok ( info) ,
256
267
None => Err ( Error {
@@ -260,4 +271,5 @@ where
260
271
} ) ,
261
272
}
262
273
}
263
- }
274
+ }
275
+
0 commit comments