@@ -184,15 +184,22 @@ func (h Handler) isBidirectionalStream(req *http.Request, res *http.Response) bo
184
184
(ae == "identity" || ae == "" )
185
185
}
186
186
187
- func (h Handler ) copyResponse (dst http.ResponseWriter , src io.Reader , flushInterval time.Duration ) error {
187
+ func (h Handler ) copyResponse (dst http.ResponseWriter , src io.Reader , flushInterval time.Duration , logger * zap. Logger ) error {
188
188
var w io.Writer = dst
189
189
190
190
if flushInterval != 0 {
191
+ var mlwLogger * zap.Logger
192
+ if h .VerboseLogs {
193
+ mlwLogger = logger .Named ("max_latency_writer" )
194
+ } else {
195
+ mlwLogger = zap .NewNop ()
196
+ }
191
197
mlw := & maxLatencyWriter {
192
198
dst : dst ,
193
199
//nolint:bodyclose
194
200
flush : http .NewResponseController (dst ).Flush ,
195
201
latency : flushInterval ,
202
+ logger : mlwLogger ,
196
203
}
197
204
defer mlw .stop ()
198
205
@@ -205,19 +212,30 @@ func (h Handler) copyResponse(dst http.ResponseWriter, src io.Reader, flushInter
205
212
206
213
buf := streamingBufPool .Get ().(* []byte )
207
214
defer streamingBufPool .Put (buf )
208
- _ , err := h .copyBuffer (w , src , * buf )
215
+
216
+ var copyLogger * zap.Logger
217
+ if h .VerboseLogs {
218
+ copyLogger = logger
219
+ } else {
220
+ copyLogger = zap .NewNop ()
221
+ }
222
+
223
+ _ , err := h .copyBuffer (w , src , * buf , copyLogger )
209
224
return err
210
225
}
211
226
212
227
// copyBuffer returns any write errors or non-EOF read errors, and the amount
213
228
// of bytes written.
214
- func (h Handler ) copyBuffer (dst io.Writer , src io.Reader , buf []byte ) (int64 , error ) {
229
+ func (h Handler ) copyBuffer (dst io.Writer , src io.Reader , buf []byte , logger * zap. Logger ) (int64 , error ) {
215
230
if len (buf ) == 0 {
216
231
buf = make ([]byte , defaultBufferSize )
217
232
}
218
233
var written int64
219
234
for {
235
+ logger .Debug ("waiting to read from upstream" )
220
236
nr , rerr := src .Read (buf )
237
+ logger := logger .With (zap .Int ("read" , nr ))
238
+ logger .Debug ("read from upstream" , zap .Error (rerr ))
221
239
if rerr != nil && rerr != io .EOF && rerr != context .Canceled {
222
240
// TODO: this could be useful to know (indeed, it revealed an error in our
223
241
// fastcgi PoC earlier; but it's this single error report here that necessitates
@@ -229,10 +247,15 @@ func (h Handler) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, er
229
247
h .logger .Error ("reading from backend" , zap .Error (rerr ))
230
248
}
231
249
if nr > 0 {
250
+ logger .Debug ("writing to downstream" )
232
251
nw , werr := dst .Write (buf [:nr ])
233
252
if nw > 0 {
234
253
written += int64 (nw )
235
254
}
255
+ logger .Debug ("wrote to downstream" ,
256
+ zap .Int ("written" , nw ),
257
+ zap .Int64 ("written_total" , written ),
258
+ zap .Error (werr ))
236
259
if werr != nil {
237
260
return written , fmt .Errorf ("writing: %w" , werr )
238
261
}
@@ -452,25 +475,30 @@ type maxLatencyWriter struct {
452
475
mu sync.Mutex // protects t, flushPending, and dst.Flush
453
476
t * time.Timer
454
477
flushPending bool
478
+ logger * zap.Logger
455
479
}
456
480
457
481
func (m * maxLatencyWriter ) Write (p []byte ) (n int , err error ) {
458
482
m .mu .Lock ()
459
483
defer m .mu .Unlock ()
460
484
n , err = m .dst .Write (p )
485
+ m .logger .Debug ("wrote bytes" , zap .Int ("n" , n ), zap .Error (err ))
461
486
if m .latency < 0 {
487
+ m .logger .Debug ("flushing immediately" )
462
488
//nolint:errcheck
463
489
m .flush ()
464
490
return
465
491
}
466
492
if m .flushPending {
493
+ m .logger .Debug ("delayed flush already pending" )
467
494
return
468
495
}
469
496
if m .t == nil {
470
497
m .t = time .AfterFunc (m .latency , m .delayedFlush )
471
498
} else {
472
499
m .t .Reset (m .latency )
473
500
}
501
+ m .logger .Debug ("timer set for delayed flush" , zap .Duration ("duration" , m .latency ))
474
502
m .flushPending = true
475
503
return
476
504
}
@@ -479,8 +507,10 @@ func (m *maxLatencyWriter) delayedFlush() {
479
507
m .mu .Lock ()
480
508
defer m .mu .Unlock ()
481
509
if ! m .flushPending { // if stop was called but AfterFunc already started this goroutine
510
+ m .logger .Debug ("delayed flush is not pending" )
482
511
return
483
512
}
513
+ m .logger .Debug ("delayed flush" )
484
514
//nolint:errcheck
485
515
m .flush ()
486
516
m .flushPending = false
0 commit comments