network: handle request cancellation in Connection

We handle request cancellation in two places: before we transition into
the AwaitingResponse state, and while we are in AwaitingResponse.  We
need both places, or else if we started processing a request, we
wouldn't process the cancellation until the timeout elapsed.

The first is a check that the oneshot is not already canceled.

For the second, we wait on a cancellation, either from a timeout or from
the tx channel closing.
This commit is contained in:
Henry de Valence 2020-09-09 14:47:34 -07:00
parent b636660d6a
commit 13daefa729
1 changed files with 20 additions and 5 deletions

View File

@ -283,8 +283,12 @@ where
} }
} }
// We're awaiting a response to a client request, // We're awaiting a response to a client request,
// so wait on either a peer message, or on a request timeout. // so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse { ref span, .. } => { State::AwaitingResponse {
ref span,
ref mut tx,
..
} => {
// we have to get rid of the span reference so we can tamper with the state // we have to get rid of the span reference so we can tamper with the state
let span = span.clone(); let span = span.clone();
trace!(parent: &span, "awaiting response to client request"); trace!(parent: &span, "awaiting response to client request");
@ -292,13 +296,14 @@ where
.request_timer .request_timer
.as_mut() .as_mut()
.expect("timeout must be set while awaiting response"); .expect("timeout must be set while awaiting response");
match future::select(peer_rx.next(), timer_ref) let cancel = future::select(timer_ref, tx.cancellation());
match future::select(peer_rx.next(), cancel)
.instrument(span.clone()) .instrument(span.clone())
.await .await
{ {
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Ok(peer_msg)), _timer)) => { Either::Left((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler. // Try to process the message using the handler.
// This extremely awkward construction avoids // This extremely awkward construction avoids
// keeping a live reference to handler across the // keeping a live reference to handler across the
@ -335,7 +340,7 @@ where
}; };
} }
} }
Either::Right(((), _peer_fut)) => { Either::Right((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out"); trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout; let e = PeerError::ClientRequestTimeout;
self.state = match self.state { self.state = match self.state {
@ -355,6 +360,10 @@ where
_ => unreachable!(), _ => unreachable!(),
}; };
} }
Either::Right((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
}
} }
} }
// We've failed, but we need to flush all pending client // We've failed, but we need to flush all pending client
@ -422,6 +431,12 @@ where
use State::*; use State::*;
let ClientRequest { request, tx, span } = req; let ClientRequest { request, tx, span } = req;
if tx.is_canceled() {
metrics::counter!("peer.canceled", 1);
tracing::debug!("ignoring canceled request");
return;
}
// XXX(hdevalence) this is truly horrible, but let's fix it later // XXX(hdevalence) this is truly horrible, but let's fix it later
// Inner match returns Result with the new state or an error. // Inner match returns Result with the new state or an error.