From 373a8fbcfd515f5869ffec20f5be50e59d1906fd Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Fri, 11 Oct 2019 09:46:48 -0700 Subject: [PATCH] Refactor PeerServer event loop to avoid select! --- zebra-network/src/peer/server.rs | 110 +++++++++++-------------------- 1 file changed, 40 insertions(+), 70 deletions(-) diff --git a/zebra-network/src/peer/server.rs b/zebra-network/src/peer/server.rs index 94a2ce28a..8fd157820 100644 --- a/zebra-network/src/peer/server.rs +++ b/zebra-network/src/peer/server.rs @@ -91,89 +91,59 @@ where // // If there is a pending request, we wait only on an incoming peer message, and // check whether it can be interpreted as a response to the pending request. - - use futures::future::FutureExt; - use futures::select; - - // This future represents the next message received from the peer. - // It needs to be stored outside of the event loop, so that we can overwrite - // it with the new "next message future" every time we get a new message. - let mut peer_rx_fut = peer_rx.next().fuse(); loop { match self.state { - // We're awaiting a client request, so listen for both - // client requests and peer messages simultaneously. - ServerState::AwaitingRequest => select! { - req = self.client_rx.next() => { - match req { - Some(req) => self.handle_client_request(req).await, - None => { - trace!("client_rx closed, shutting down"); - return; - } + ServerState::AwaitingRequest => { + trace!("awaiting client request or peer message"); + match future::select(peer_rx.next(), self.client_rx.next()).await { + Either::Left((None, _)) => { + info!("peer stream closed, shutting down"); + return; } - } - msg = peer_rx_fut => { - peer_rx_fut = peer_rx.next().fuse(); - match msg { - None => { - trace!("peer stream closed, shutting down"); - return; - } - // We got a peer message but it was malformed. - //Some(Err(e)) => self.fail_with(e.into()), - // XXX remove this when we parse all message types - Some(Err(e)) => { - error!(%e); - } - // We got a peer message and it was well-formed. - Some(Ok(msg)) => self.handle_message_as_request(msg).await, + // XXX switch back to hard failure when we parse all message types + //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), + Either::Left((Some(Err(e)), _)) => error!(%e), + Either::Left((Some(Ok(msg)), _)) => { + self.handle_message_as_request(msg).await } + Either::Right((None, _)) => { + info!("client stream closed, shutting down"); + return; + } + Either::Right((Some(req), _)) => self.handle_client_request(req).await, } - }, + } // We're awaiting a response to a client request, // so wait on either a peer message, or on a request timeout. ServerState::AwaitingResponse { .. } => { - let timer: Delay = self + trace!("awaiting response to client request"); + let timer_ref = self .request_timer - .take() + .as_mut() .expect("timeout must be set while awaiting response"); - match future::select(peer_rx_fut, timer).await { - Either::Left((msg, timer)) => { - // The timer didn't resolve, put it back. - self.request_timer = Some(timer); - // XXX this can maybe be cleaned up. - peer_rx_fut = peer_rx.next().fuse(); - match msg { - // The peer channel has closed -- no more messages. - // However, we still need to flush pending client requests. - None => { - self.fail_with(format_err!("peer closed connection").into()) - } - // We got a peer message but it was malformed. - //Some(Err(e)) => self.fail_with(e.into()), - // XXX remove this when we parse all message types - Some(Err(e)) => { - error!(%e); - } - // We got a peer message and it was well-formed. - Some(Ok(msg)) => match self.handle_message_as_response(msg) { - None => continue, - Some(msg) => self.handle_message_as_request(msg).await, - }, + match future::select(peer_rx.next(), timer_ref).await { + Either::Left((None, _)) => { + self.fail_with(format_err!("peer closed connection").into()) + } + // XXX switch back to hard failure when we parse all message types + //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()), + Either::Left((Some(Err(e)), _)) => error!(%e), + Either::Left((Some(Ok(msg)), _)) => { + match self.handle_message_as_response(msg) { + None => continue, + Some(msg) => self.handle_message_as_request(msg).await, } } - Either::Right(((), prev_peer_rx_fut)) => { - // XXX cleanup - peer_rx_fut = prev_peer_rx_fut; + Either::Right(((), _)) => { trace!("client request timed out"); - let old_state = - std::mem::replace(&mut self.state, ServerState::AwaitingRequest); - if let ServerState::AwaitingResponse(_, tx) = old_state { - let _ = tx.send(Err(format_err!("request timed out").into())); - } else { - panic!("unreachable"); - } + // Re-matching lets us take ownership of tx + self.state = match self.state { + ServerState::AwaitingResponse(_, tx) => { + let _ = tx.send(Err(format_err!("request timed out").into())); + ServerState::AwaitingRequest + } + _ => panic!("unreachable"), + }; } } }