Skip to content

Commit da29aea

Browse files
fix client shutdown; distinguish push and stream ids (#174, #175) (#176)
1 parent 6a2be4a commit da29aea

File tree

12 files changed

+240
-171
lines changed

12 files changed

+240
-171
lines changed

h3/src/client.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ use crate::{
1616
connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
1717
error::{Code, Error, ErrorLevel},
1818
frame::FrameStream,
19-
proto::{frame::Frame, headers::Header, varint::VarInt},
20-
qpack, quic, stream,
19+
proto::{frame::Frame, headers::Header, push::PushId, varint::VarInt},
20+
qpack,
21+
quic::{self, StreamId},
22+
stream,
2123
};
2224

2325
/// Start building a new HTTP/3 client
@@ -147,7 +149,7 @@ where
147149
(state.peer_max_field_section_size, state.closing)
148150
};
149151

150-
if closing.is_some() {
152+
if closing {
151153
return Err(Error::closing());
152154
}
153155

@@ -348,16 +350,21 @@ where
348350
B: Buf,
349351
{
350352
inner: ConnectionInner<C, B>,
353+
// Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
354+
sent_closing: Option<PushId>,
355+
// Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
356+
recv_closing: Option<StreamId>,
351357
}
352358

353359
impl<C, B> Connection<C, B>
354360
where
355361
C: quic::Connection<B>,
356362
B: Buf,
357363
{
358-
/// Itiniate a graceful shutdown, accepting `max_request` potentially in-flight server push
359-
pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
360-
self.inner.shutdown(max_requests).await
364+
/// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
365+
pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> {
366+
// TODO: Calculate remaining pushes once server push is implemented.
367+
self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
361368
}
362369

363370
/// Wait until the connection is closed
@@ -396,12 +403,14 @@ where
396403
//# initiated bidirectional stream encoded as a variable-length integer.
397404
//# A client MUST treat receipt of a GOAWAY frame containing a stream ID
398405
//# of any other type as a connection error of type H3_ID_ERROR.
399-
if !id.is_request() {
406+
if !StreamId::from(id).is_request() {
400407
return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
401408
format!("non-request StreamId in a GoAway frame: {}", id),
402409
ErrorLevel::ConnectionError,
403410
)));
404411
}
412+
self.inner.process_goaway(&mut self.recv_closing, id)?;
413+
405414
info!("Server initiated graceful shutdown, last: StreamId({})", id);
406415
}
407416

@@ -420,22 +429,18 @@ where
420429
)))
421430
}
422431
Err(e) => {
423-
let connection_error = self
424-
.inner
425-
.shared
426-
.read("poll_close error read")
427-
.error
428-
.as_ref()
429-
.cloned();
430-
431-
match connection_error {
432-
Some(e) if e.is_closed() => return Poll::Ready(Ok(())),
433-
Some(e) => return Poll::Ready(Err(e)),
432+
let connection_error = self.inner.shared.read("poll_close").error.clone();
433+
let connection_error = match connection_error {
434+
Some(e) => e,
434435
None => {
435-
self.inner.shared.write("poll_close error").error = e.clone().into();
436-
return Poll::Ready(Err(e));
436+
self.inner.shared.write("poll_close error").error = Some(e.clone());
437+
e
437438
}
439+
};
440+
if connection_error.is_closed() {
441+
return Poll::Ready(Ok(()));
438442
}
443+
return Poll::Ready(Err(connection_error));
439444
}
440445
}
441446
}
@@ -523,6 +528,8 @@ impl Builder {
523528
self.send_grease,
524529
)
525530
.await?,
531+
sent_closing: None,
532+
recv_closing: None,
526533
},
527534
SendRequest {
528535
open,

h3/src/connection.rs

Lines changed: 63 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
proto::{
1616
frame::{Frame, PayloadLen, SettingId, Settings},
1717
headers::Header,
18-
stream::{StreamId, StreamType},
18+
stream::StreamType,
1919
varint::VarInt,
2020
},
2121
qpack,
@@ -29,10 +29,8 @@ pub struct SharedState {
2929
pub peer_max_field_section_size: u64,
3030
// connection-wide error, concerns all RequestStreams and drivers
3131
pub error: Option<Error>,
32-
// Has the connection received a GoAway frame? If so, this StreamId is the last
33-
// we're willing to accept. This lets us finish the requests or pushes that were
34-
// already in flight when the graceful shutdown was initiated.
35-
pub closing: Option<StreamId>,
32+
// Has a GOAWAY frame been sent or received?
33+
pub closing: bool,
3634
}
3735

3836
#[derive(Clone)]
@@ -54,7 +52,7 @@ impl Default for SharedStateRef {
5452
Self(Arc::new(RwLock::new(SharedState {
5553
peer_max_field_section_size: VarInt::MAX.0,
5654
error: None,
57-
closing: None,
55+
closing: false,
5856
})))
5957
}
6058
}
@@ -83,9 +81,6 @@ where
8381
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
8482
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
8583
pending_recv_streams: Vec<AcceptRecvStream<C::RecvStream>>,
86-
// The id of the last stream received by this connection:
87-
// request and push stream for server and clients respectively.
88-
last_accepted_stream: Option<StreamId>,
8984
got_peer_settings: bool,
9085
pub(super) send_grease_frame: bool,
9186
}
@@ -180,7 +175,6 @@ where
180175
decoder_recv: None,
181176
encoder_recv: None,
182177
pending_recv_streams: Vec::with_capacity(3),
183-
last_accepted_stream: None,
184178
got_peer_settings: false,
185179
send_grease_frame: grease,
186180
};
@@ -198,22 +192,32 @@ where
198192
Ok(conn_inner)
199193
}
200194

201-
/// Initiate graceful shutdown, accepting `max_streams` potentially in-flight streams
202-
pub async fn shutdown(&mut self, max_streams: usize) -> Result<(), Error> {
203-
let max_id = self
204-
.last_accepted_stream
205-
.map(|id| id + max_streams)
206-
.unwrap_or_else(StreamId::first_request);
195+
/// Send GOAWAY with specified max_id, iff max_id is smaller than the previous one.
196+
pub async fn shutdown<T>(
197+
&mut self,
198+
sent_closing: &mut Option<T>,
199+
max_id: T,
200+
) -> Result<(), Error>
201+
where
202+
T: From<VarInt> + PartialOrd<T> + Copy,
203+
VarInt: From<T>,
204+
{
205+
if let Some(sent_id) = sent_closing {
206+
if *sent_id <= max_id {
207+
return Ok(());
208+
}
209+
}
207210

208-
self.shared.write("graceful shutdown").closing = Some(max_id);
211+
*sent_closing = Some(max_id);
212+
self.shared.write("shutdown").closing = true;
209213

210214
//= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
211215
//# When either endpoint chooses to close the HTTP/3
212216
//# connection, the terminating endpoint SHOULD first send a GOAWAY frame
213217
//# (Section 5.2) so that both endpoints can reliably determine whether
214218
//# previously sent frames have been processed and gracefully complete or
215219
//# terminate any necessary remaining tasks.
216-
stream::write(&mut self.control_send, Frame::Goaway(max_id)).await
220+
stream::write(&mut self.control_send, Frame::Goaway(max_id.into())).await
217221
}
218222

219223
pub fn poll_accept_request(
@@ -370,43 +374,7 @@ where
370374
.unwrap_or(VarInt::MAX.0);
371375
Ok(Frame::Settings(settings))
372376
}
373-
Frame::Goaway(id) => {
374-
let closing = self.shared.read("connection goaway read").closing;
375-
match closing {
376-
Some(closing_id) if closing_id.initiator() == id.initiator() => {
377-
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
378-
//# An endpoint MAY send multiple GOAWAY frames indicating different
379-
//# identifiers, but the identifier in each frame MUST NOT be greater
380-
//# than the identifier in any previous frame, since clients might
381-
//# already have retried unprocessed requests on another HTTP connection.
382-
383-
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
384-
//# Like the server,
385-
//# the client MAY send subsequent GOAWAY frames so long as the specified
386-
//# push ID is no greater than any previously sent value.
387-
if id <= closing_id {
388-
self.shared.write("connection goaway overwrite").closing =
389-
Some(id);
390-
Ok(Frame::Goaway(id))
391-
} else {
392-
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
393-
//# Receiving a GOAWAY containing a larger identifier than previously
394-
//# received MUST be treated as a connection error of type H3_ID_ERROR.
395-
Err(self.close(
396-
Code::H3_ID_ERROR,
397-
format!("received a GoAway({}) greater than the former one ({})", id, closing_id)
398-
))
399-
}
400-
}
401-
// When closing initiator is different, the current side has already started to close
402-
// and should not be initiating any new requests / pushes anyway. So we can ignore it.
403-
Some(_) => Ok(Frame::Goaway(id)),
404-
None => {
405-
self.shared.write("connection goaway write").closing = Some(id);
406-
Ok(Frame::Goaway(id))
407-
}
408-
}
409-
}
377+
f @ Frame::Goaway(_) => Ok(f),
410378
f @ Frame::CancelPush(_) | f @ Frame::MaxPushId(_) => {
411379
if self.got_peer_settings {
412380
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
@@ -460,8 +428,46 @@ where
460428
Poll::Ready(res)
461429
}
462430

463-
pub fn start_stream(&mut self, id: StreamId) {
464-
self.last_accepted_stream = Some(id);
431+
pub(crate) fn process_goaway<T>(
432+
&mut self,
433+
recv_closing: &mut Option<T>,
434+
id: VarInt,
435+
) -> Result<(), Error>
436+
where
437+
T: From<VarInt> + Copy,
438+
VarInt: From<T>,
439+
{
440+
{
441+
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
442+
//# An endpoint MAY send multiple GOAWAY frames indicating different
443+
//# identifiers, but the identifier in each frame MUST NOT be greater
444+
//# than the identifier in any previous frame, since clients might
445+
//# already have retried unprocessed requests on another HTTP connection.
446+
447+
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
448+
//# Like the server,
449+
//# the client MAY send subsequent GOAWAY frames so long as the specified
450+
//# push ID is no greater than any previously sent value.
451+
if let Some(prev_id) = recv_closing.map(VarInt::from) {
452+
if prev_id < id {
453+
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
454+
//# Receiving a GOAWAY containing a larger identifier than previously
455+
//# received MUST be treated as a connection error of type H3_ID_ERROR.
456+
return Err(self.close(
457+
Code::H3_ID_ERROR,
458+
format!(
459+
"received a GoAway({}) greater than the former one ({})",
460+
id, prev_id
461+
),
462+
));
463+
}
464+
}
465+
*recv_closing = Some(id.into());
466+
if !self.shared.read("connection goaway read").closing {
467+
self.shared.write("connection goaway overwrite").closing = true;
468+
}
469+
Ok(())
470+
}
465471
}
466472

467473
/// Closes a Connection with code and reason.

h3/src/error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ impl From<frame::FrameStreamError> for Error {
411411
.with_reason("received incomplete frame", ErrorLevel::ConnectionError),
412412

413413
frame::FrameStreamError::Proto(e) => match e {
414-
proto::frame::FrameError::InvalidStreamId(_) => Code::H3_ID_ERROR,
414+
proto::frame::FrameError::InvalidStreamId(_)
415+
| proto::frame::FrameError::InvalidPushId(_) => Code::H3_ID_ERROR,
415416
proto::frame::FrameError::Settings(_) => Code::H3_SETTINGS_ERROR,
416417
proto::frame::FrameError::UnsupportedFrame(_)
417418
| proto::frame::FrameError::UnknownFrame(_) => Code::H3_FRAME_UNEXPECTED,

0 commit comments

Comments
 (0)