From 16f51e4d480766de788c8d80c5322a874e52a4d5 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Thu, 10 Oct 2019 17:54:15 -0700 Subject: [PATCH] Add a timeout to the `PeerServer` event loop. I think this code could be cleaned up significantly (e.g., removing the other use of select!) but that's potentially a larger change than this PR. --- zebra-network/src/constants.rs | 3 ++ zebra-network/src/peer/connector.rs | 1 + zebra-network/src/peer/server.rs | 74 +++++++++++++++++++++-------- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 11aa40ae6..2a93f010a 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -5,6 +5,9 @@ use std::time::Duration; // XXX should these constants be split into protocol also? use crate::protocol::types::*; +/// The timeout for requests made to a remote peer. +pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + /// We expect to receive a message from a live peer at least once in this time duration. /// XXX this needs to be synchronized with the ping transmission times. pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(12); diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 497afef45..08ba7b1b4 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -164,6 +164,7 @@ where client_rx: rx, error_slot: slot, peer_tx, + request_timer: None, }; let hooked_peer_rx = peer_rx diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index 201c7dcce..94a2ce28a 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -3,12 +3,17 @@ use std::sync::{Arc, Mutex}; use failure::Error; use futures::{ channel::{mpsc, oneshot}, + future::{self, Either}, stream::Stream, }; -use tokio::prelude::*; +use tokio::{ + prelude::*, + timer::{delay_for, Delay}, +}; use tower::Service; use crate::{ + constants, protocol::{ internal::{Request, Response}, message::Message, @@ -43,6 +48,10 @@ pub(super) enum ServerState { /// The "server" duplex half of a peer connection. pub struct PeerServer { pub(super) state: ServerState, + /// A timeout for a client request. This is stored separately from + /// ServerState so that we can move the future out of it independently of + /// other state handling. + pub(super) request_timer: Option, pub(super) svc: S, pub(super) client_rx: mpsc::Receiver, /// A slot shared between the PeerServer and PeerClient for storing an error. @@ -123,25 +132,49 @@ where } }, // We're awaiting a response to a client request, - // so only listen to peer messages, not further requests. + // so wait on either a peer message, or on a request timeout. ServerState::AwaitingResponse { .. } => { - let msg = peer_rx_fut.await; - peer_rx_fut = peer_rx.next().fuse(); - match msg { - // The peer channel has closed -- no more messages. - // However, we still need to flush pending client requests. - None => self.fail_with(format_err!("peer closed connection").into()), - // We got a peer message but it was malformed. - //Some(Err(e)) => self.fail_with(e.into()), - // XXX remove this when we parse all message types - Some(Err(e)) => { - error!(%e); + let timer: Delay = self + .request_timer + .take() + .expect("timeout must be set while awaiting response"); + match future::select(peer_rx_fut, timer).await { + Either::Left((msg, timer)) => { + // The timer didn't resolve, put it back. + self.request_timer = Some(timer); + // XXX this can maybe be cleaned up. + peer_rx_fut = peer_rx.next().fuse(); + match msg { + // The peer channel has closed -- no more messages. + // However, we still need to flush pending client requests. + None => { + self.fail_with(format_err!("peer closed connection").into()) + } + // We got a peer message but it was malformed. + //Some(Err(e)) => self.fail_with(e.into()), + // XXX remove this when we parse all message types + Some(Err(e)) => { + error!(%e); + } + // We got a peer message and it was well-formed. + Some(Ok(msg)) => match self.handle_message_as_response(msg) { + None => continue, + Some(msg) => self.handle_message_as_request(msg).await, + }, + } + } + Either::Right(((), prev_peer_rx_fut)) => { + // XXX cleanup + peer_rx_fut = prev_peer_rx_fut; + trace!("client request timed out"); + let old_state = + std::mem::replace(&mut self.state, ServerState::AwaitingRequest); + if let ServerState::AwaitingResponse(_, tx) = old_state { + let _ = tx.send(Err(format_err!("request timed out").into())); + } else { + panic!("unreachable"); + } } - // We got a peer message and it was well-formed. - Some(Ok(msg)) => match self.handle_message_as_response(msg) { - None => continue, - Some(msg) => self.handle_message_as_request(msg).await, - }, } } // We've failed, but we need to flush all pending client @@ -229,7 +262,10 @@ where AwaitingRequest }), } { - Ok(new_state) => self.state = new_state, + Ok(new_state) => { + self.state = new_state; + self.request_timer = Some(delay_for(constants::REQUEST_TIMEOUT)); + } Err(e) => self.fail_with(e), } }