Only reject pending client requests when the peer has errored
- Add an `ExitClient` transition, used when the internal client channel is closed or dropped, and there are no more pending requests - Ignore pending requests after an `ExitClient` transition - Reject pending requests when the peer has caused an error (the `Exit` and `ExitRequest` transitions) - Remove `PeerError::ConnectionDropped`, because it is now handled by `ExitClient`. (Which is an internal error, not a peer error.)
This commit is contained in:
parent
9d9734ea81
commit
e06705ed81
|
@ -345,7 +345,7 @@ impl State {
|
|||
}
|
||||
Either::Right((None, _)) => {
|
||||
trace!("client_rx closed, ending connection");
|
||||
Transition::Exit(PeerError::ConnectionDropped.into())
|
||||
Transition::ExitClient
|
||||
}
|
||||
Either::Right((Some(req), _)) => {
|
||||
if req.tx.is_canceled() {
|
||||
|
@ -439,9 +439,12 @@ enum Transition {
|
|||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
span: tracing::Span,
|
||||
},
|
||||
// Exiting while no client response is expected
|
||||
/// Exiting because the client was closed or dropped, and there are
|
||||
/// no more client requests.
|
||||
ExitClient,
|
||||
/// Exiting while awaiting further client requests
|
||||
Exit(SharedPeerError),
|
||||
// Exiting while processing a client response
|
||||
/// Exiting while processing a peer response to a client request
|
||||
ExitResponse {
|
||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
e: SharedPeerError,
|
||||
|
@ -449,7 +452,7 @@ enum Transition {
|
|||
}
|
||||
|
||||
impl TryFrom<Transition> for State {
|
||||
type Error = SharedPeerError;
|
||||
type Error = Option<SharedPeerError>;
|
||||
|
||||
fn try_from(trans: Transition) -> Result<Self, Self::Error> {
|
||||
match trans {
|
||||
|
@ -457,10 +460,11 @@ impl TryFrom<Transition> for State {
|
|||
Transition::AwaitResponse { handler, tx, span } => {
|
||||
Ok(State::AwaitingResponse { handler, tx, span })
|
||||
}
|
||||
Transition::Exit(e) => Err(e),
|
||||
Transition::ExitClient => Err(None),
|
||||
Transition::Exit(e) => Err(Some(e)),
|
||||
Transition::ExitResponse { tx, e } => {
|
||||
let _ = tx.send(Err(e.clone()));
|
||||
Err(e)
|
||||
Err(Some(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -525,16 +529,20 @@ where
|
|||
|
||||
self.state = match transition.try_into() {
|
||||
Ok(state) => Some(state),
|
||||
Err(e) => {
|
||||
// while let Some(InProgressClientRequest { tx, span, .. }) =
|
||||
// self.client_rx.next().await
|
||||
// {
|
||||
// trace!(
|
||||
// parent: &span,
|
||||
// "sending an error response to a pending request on a failed connection"
|
||||
// );
|
||||
// let _ = tx.send(Err(e.clone()));
|
||||
// }
|
||||
Err(None) => {
|
||||
trace!("client_rx dropped: no pending client requests");
|
||||
return;
|
||||
}
|
||||
Err(Some(e)) => {
|
||||
while let Some(InProgressClientRequest { tx, span, .. }) =
|
||||
self.client_rx.next().await
|
||||
{
|
||||
trace!(
|
||||
parent: &span,
|
||||
"sending an error response to a pending client request on a failed connection"
|
||||
);
|
||||
let _ = tx.send(Err(e.clone()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,9 +28,6 @@ pub enum PeerError {
|
|||
/// The remote peer closed the connection.
|
||||
#[error("Peer closed connection")]
|
||||
ConnectionClosed,
|
||||
/// The local client closed the connection.
|
||||
#[error("Internal client dropped connection")]
|
||||
ConnectionDropped,
|
||||
/// The remote peer did not respond to a [`peer::Client`] request in time.
|
||||
#[error("Client request timed out")]
|
||||
ClientRequestTimeout,
|
||||
|
|
Loading…
Reference in New Issue