9
9
use futures_core:: ready;
10
10
use mysql_common:: {
11
11
binlog:: {
12
- consts:: BinlogVersion :: Version4 ,
13
- events:: { Event , TableMapEvent } ,
12
+ consts:: { BinlogVersion :: Version4 , EventType } ,
13
+ events:: { Event , TableMapEvent , TransactionPayloadEvent } ,
14
14
EventStreamReader ,
15
15
} ,
16
16
io:: ParseBuf ,
@@ -19,7 +19,7 @@ use mysql_common::{
19
19
20
20
use std:: {
21
21
future:: Future ,
22
- io:: ErrorKind ,
22
+ io:: { Cursor , ErrorKind } ,
23
23
pin:: Pin ,
24
24
task:: { Context , Poll } ,
25
25
} ;
@@ -71,6 +71,9 @@ impl super::Conn {
71
71
pub struct BinlogStream {
72
72
read_packet : ReadPacket < ' static , ' static > ,
73
73
esr : EventStreamReader ,
74
+ // TODO: Use 'static reader here (requires impl on the mysql_common side).
75
+ /// Uncompressed Transaction_payload_event we are iterating over (if any).
76
+ tpe : Option < Cursor < Vec < u8 > > > ,
74
77
}
75
78
76
79
impl BinlogStream {
@@ -79,6 +82,7 @@ impl BinlogStream {
79
82
BinlogStream {
80
83
read_packet : ReadPacket :: new ( conn) ,
81
84
esr : EventStreamReader :: new ( Version4 ) ,
85
+ tpe : None ,
82
86
}
83
87
}
84
88
@@ -114,6 +118,22 @@ impl futures_core::stream::Stream for BinlogStream {
114
118
type Item = Result < Event > ;
115
119
116
120
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
121
+ {
122
+ let Self {
123
+ ref mut tpe,
124
+ ref mut esr,
125
+ ..
126
+ } = * self ;
127
+
128
+ if let Some ( tpe) = tpe. as_mut ( ) {
129
+ match esr. read_decompressed ( tpe) {
130
+ Ok ( Some ( event) ) => return Poll :: Ready ( Some ( Ok ( event) ) ) ,
131
+ Ok ( None ) => self . tpe = None ,
132
+ Err ( err) => return Poll :: Ready ( Some ( Err ( err. into ( ) ) ) ) ,
133
+ }
134
+ }
135
+ }
136
+
117
137
let packet = match ready ! ( Pin :: new( & mut self . read_packet) . poll( cx) ) {
118
138
Ok ( packet) => packet,
119
139
Err ( err) => return Poll :: Ready ( Some ( Err ( err. into ( ) ) ) ) ,
@@ -143,9 +163,17 @@ impl futures_core::stream::Stream for BinlogStream {
143
163
if first_byte == Some ( 0 ) {
144
164
let event_data = & packet[ 1 ..] ;
145
165
match self . esr . read ( event_data) {
146
- Ok ( event) => {
166
+ Ok ( Some ( event) ) => {
167
+ if event. header ( ) . event_type_raw ( ) == EventType :: TRANSACTION_PAYLOAD_EVENT as u8
168
+ {
169
+ match event. read_event :: < TransactionPayloadEvent < ' _ > > ( ) {
170
+ Ok ( e) => self . tpe = Some ( Cursor :: new ( e. danger_decompress ( ) ) ) ,
171
+ Err ( _) => ( /* TODO: Log the error */ ) ,
172
+ }
173
+ }
147
174
return Poll :: Ready ( Some ( Ok ( event) ) ) ;
148
175
}
176
+ Ok ( None ) => return Poll :: Ready ( None ) ,
149
177
Err ( err) => return Poll :: Ready ( Some ( Err ( err. into ( ) ) ) ) ,
150
178
}
151
179
} else {
@@ -168,21 +196,21 @@ mod tests {
168
196
use crate :: prelude:: * ;
169
197
use crate :: { test_misc:: get_opts, * } ;
170
198
171
- async fn gen_dummy_data ( ) -> super :: Result < ( ) > {
172
- let mut conn = Conn :: new ( get_opts ( ) ) . await ?;
173
-
199
+ async fn gen_dummy_data ( conn : & mut Conn ) -> super :: Result < ( ) > {
174
200
"CREATE TABLE IF NOT EXISTS customers (customer_id int not null)"
175
- . ignore ( & mut conn)
201
+ . ignore ( & mut * conn)
176
202
. await ?;
177
203
204
+ let mut tx = conn. start_transaction ( Default :: default ( ) ) . await ?;
178
205
for i in 0_u8 ..100 {
179
206
"INSERT INTO customers(customer_id) VALUES (?)"
180
207
. with ( ( i, ) )
181
- . ignore ( & mut conn )
208
+ . ignore ( & mut tx )
182
209
. await ?;
183
210
}
211
+ tx. commit ( ) . await ?;
184
212
185
- "DROP TABLE customers" . ignore ( & mut conn) . await ?;
213
+ "DROP TABLE customers" . ignore ( conn) . await ?;
186
214
187
215
Ok ( ( ) )
188
216
}
@@ -193,6 +221,12 @@ mod tests {
193
221
Some ( pool) => pool. get_conn ( ) . await . unwrap ( ) ,
194
222
} ;
195
223
224
+ if conn. server_version ( ) >= ( 8 , 0 , 31 ) && conn. server_version ( ) < ( 9 , 0 , 0 ) {
225
+ let _ = "SET binlog_transaction_compression=ON"
226
+ . ignore ( & mut conn)
227
+ . await ;
228
+ }
229
+
196
230
if let Ok ( Some ( gtid_mode) ) = "SELECT @@GLOBAL.GTID_MODE"
197
231
. first :: < String , _ > ( & mut conn)
198
232
. await
@@ -209,7 +243,7 @@ mod tests {
209
243
let filename = row. get ( 0 ) . unwrap ( ) ;
210
244
let position = row. get ( 1 ) . unwrap ( ) ;
211
245
212
- gen_dummy_data ( ) . await . unwrap ( ) ;
246
+ gen_dummy_data ( & mut conn ) . await . unwrap ( ) ;
213
247
Ok ( ( conn, filename, position) )
214
248
}
215
249
0 commit comments