Add a timeout to the `PeerServer` event loop.

I think this code could be cleaned up significantly (e.g., removing the
other use of select!) but that's potentially a larger change than this
PR.
This commit is contained in:
Henry de Valence 2019-10-10 17:54:15 -07:00 committed by Deirdre Connolly
parent b45efbdaf2
commit 16f51e4d48
3 changed files with 59 additions and 19 deletions

View File

@ -5,6 +5,9 @@ use std::time::Duration;
// XXX should these constants be split into protocol also?
use crate::protocol::types::*;
/// The timeout for requests made to a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// We expect to receive a message from a live peer at least once in this time duration.
/// XXX this needs to be synchronized with the ping transmission times.
pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(12);

View File

@ -164,6 +164,7 @@ where
client_rx: rx,
error_slot: slot,
peer_tx,
request_timer: None,
};
let hooked_peer_rx = peer_rx

View File

@ -3,12 +3,17 @@ use std::sync::{Arc, Mutex};
use failure::Error;
use futures::{
channel::{mpsc, oneshot},
future::{self, Either},
stream::Stream,
};
use tokio::prelude::*;
use tokio::{
prelude::*,
timer::{delay_for, Delay},
};
use tower::Service;
use crate::{
constants,
protocol::{
internal::{Request, Response},
message::Message,
@ -43,6 +48,10 @@ pub(super) enum ServerState {
/// The "server" duplex half of a peer connection.
pub struct PeerServer<S, Tx> {
pub(super) state: ServerState,
/// A timeout for a client request. This is stored separately from
/// ServerState so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Delay>,
pub(super) svc: S,
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
/// A slot shared between the PeerServer and PeerClient for storing an error.
@ -123,25 +132,49 @@ where
}
},
// We're awaiting a response to a client request,
// so only listen to peer messages, not further requests.
// so wait on either a peer message, or on a request timeout.
ServerState::AwaitingResponse { .. } => {
let msg = peer_rx_fut.await;
peer_rx_fut = peer_rx.next().fuse();
match msg {
// The peer channel has closed -- no more messages.
// However, we still need to flush pending client requests.
None => self.fail_with(format_err!("peer closed connection").into()),
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
let timer: Delay = self
.request_timer
.take()
.expect("timeout must be set while awaiting response");
match future::select(peer_rx_fut, timer).await {
Either::Left((msg, timer)) => {
// The timer didn't resolve, put it back.
self.request_timer = Some(timer);
// XXX this can maybe be cleaned up.
peer_rx_fut = peer_rx.next().fuse();
match msg {
// The peer channel has closed -- no more messages.
// However, we still need to flush pending client requests.
None => {
self.fail_with(format_err!("peer closed connection").into())
}
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => match self.handle_message_as_response(msg) {
None => continue,
Some(msg) => self.handle_message_as_request(msg).await,
},
}
}
Either::Right(((), prev_peer_rx_fut)) => {
// XXX cleanup
peer_rx_fut = prev_peer_rx_fut;
trace!("client request timed out");
let old_state =
std::mem::replace(&mut self.state, ServerState::AwaitingRequest);
if let ServerState::AwaitingResponse(_, tx) = old_state {
let _ = tx.send(Err(format_err!("request timed out").into()));
} else {
panic!("unreachable");
}
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => match self.handle_message_as_response(msg) {
None => continue,
Some(msg) => self.handle_message_as_request(msg).await,
},
}
}
// We've failed, but we need to flush all pending client
@ -229,7 +262,10 @@ where
AwaitingRequest
}),
} {
Ok(new_state) => self.state = new_state,
Ok(new_state) => {
self.state = new_state;
self.request_timer = Some(delay_for(constants::REQUEST_TIMEOUT));
}
Err(e) => self.fail_with(e),
}
}