Refactor PeerServer event loop to avoid select!

This commit is contained in:
Henry de Valence 2019-10-11 09:46:48 -07:00 committed by Deirdre Connolly
parent 16f51e4d48
commit 373a8fbcfd
1 changed files with 40 additions and 70 deletions

View File

@ -91,89 +91,59 @@ where
// //
// If there is a pending request, we wait only on an incoming peer message, and // If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request. // check whether it can be interpreted as a response to the pending request.
use futures::future::FutureExt;
use futures::select;
// This future represents the next message received from the peer.
// It needs to be stored outside of the event loop, so that we can overwrite
// it with the new "next message future" every time we get a new message.
let mut peer_rx_fut = peer_rx.next().fuse();
loop { loop {
match self.state { match self.state {
// We're awaiting a client request, so listen for both ServerState::AwaitingRequest => {
// client requests and peer messages simultaneously. trace!("awaiting client request or peer message");
ServerState::AwaitingRequest => select! { match future::select(peer_rx.next(), self.client_rx.next()).await {
req = self.client_rx.next() => { Either::Left((None, _)) => {
match req { info!("peer stream closed, shutting down");
Some(req) => self.handle_client_request(req).await,
None => {
trace!("client_rx closed, shutting down");
return; return;
} }
// XXX switch back to hard failure when we parse all message types
//Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Err(e)), _)) => error!(%e),
Either::Left((Some(Ok(msg)), _)) => {
self.handle_message_as_request(msg).await
} }
} Either::Right((None, _)) => {
msg = peer_rx_fut => { info!("client stream closed, shutting down");
peer_rx_fut = peer_rx.next().fuse();
match msg {
None => {
trace!("peer stream closed, shutting down");
return; return;
} }
// We got a peer message but it was malformed. Either::Right((Some(req), _)) => self.handle_client_request(req).await,
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => self.handle_message_as_request(msg).await,
} }
} }
},
// We're awaiting a response to a client request, // We're awaiting a response to a client request,
// so wait on either a peer message, or on a request timeout. // so wait on either a peer message, or on a request timeout.
ServerState::AwaitingResponse { .. } => { ServerState::AwaitingResponse { .. } => {
let timer: Delay = self trace!("awaiting response to client request");
let timer_ref = self
.request_timer .request_timer
.take() .as_mut()
.expect("timeout must be set while awaiting response"); .expect("timeout must be set while awaiting response");
match future::select(peer_rx_fut, timer).await { match future::select(peer_rx.next(), timer_ref).await {
Either::Left((msg, timer)) => { Either::Left((None, _)) => {
// The timer didn't resolve, put it back.
self.request_timer = Some(timer);
// XXX this can maybe be cleaned up.
peer_rx_fut = peer_rx.next().fuse();
match msg {
// The peer channel has closed -- no more messages.
// However, we still need to flush pending client requests.
None => {
self.fail_with(format_err!("peer closed connection").into()) self.fail_with(format_err!("peer closed connection").into())
} }
// We got a peer message but it was malformed. // XXX switch back to hard failure when we parse all message types
//Some(Err(e)) => self.fail_with(e.into()), //Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types Either::Left((Some(Err(e)), _)) => error!(%e),
Some(Err(e)) => { Either::Left((Some(Ok(msg)), _)) => {
error!(%e); match self.handle_message_as_response(msg) {
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => match self.handle_message_as_response(msg) {
None => continue, None => continue,
Some(msg) => self.handle_message_as_request(msg).await, Some(msg) => self.handle_message_as_request(msg).await,
},
} }
} }
Either::Right(((), prev_peer_rx_fut)) => { Either::Right(((), _)) => {
// XXX cleanup
peer_rx_fut = prev_peer_rx_fut;
trace!("client request timed out"); trace!("client request timed out");
let old_state = // Re-matching lets us take ownership of tx
std::mem::replace(&mut self.state, ServerState::AwaitingRequest); self.state = match self.state {
if let ServerState::AwaitingResponse(_, tx) = old_state { ServerState::AwaitingResponse(_, tx) => {
let _ = tx.send(Err(format_err!("request timed out").into())); let _ = tx.send(Err(format_err!("request timed out").into()));
} else { ServerState::AwaitingRequest
panic!("unreachable");
} }
_ => panic!("unreachable"),
};
} }
} }
} }