diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 26305ceff..48fb47de2 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -580,7 +580,7 @@ where // &mut self. This is a sign that we don't properly // factor the state required for inbound and // outbound requests. - let mut request_msg = match self.state { + let request_msg = match self.state { State::AwaitingResponse { ref mut handler, .. } => span.in_scope(|| handler.process_message(peer_msg)), @@ -593,17 +593,32 @@ where self.update_state_metrics(None); - // Check whether the handler is finished - // processing messages and update the state. + // # Correctness // - // Replace the state with a temporary value, - // so we can take ownership of the response sender. - self.state = match std::mem::replace(&mut self.state, State::Failed) { - State::AwaitingResponse { - handler: Handler::Finished(response), - tx, - .. - } => { + // Handle any unsolicited messages first, to clear the queue. + // Then check for responses to our request messages. + // + // This significantly reduces our message failure rate. + // (Otherwise, every unsolicited message can disrupt our pending request.) + + // If the message was not consumed, check whether it + // should be handled as a request. + if let Some(msg) = request_msg { + // do NOT instrument with the request span, this is + // independent work + self.handle_message_as_request(msg).await; + } else { + // Otherwise, check whether the handler is finished + // processing messages and update the state. + // + // Replace the state with a temporary value, + // so we can take ownership of the response sender. + self.state = match std::mem::replace(&mut self.state, State::Failed) { + State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } => { if let Ok(response) = response.as_ref() { debug!(%response, "finished receiving peer response to Zebra request"); // Add a metric for inbound responses to outbound requests. @@ -616,38 +631,27 @@ where } else { debug!(error = ?response, "error in peer response to Zebra request"); } - - let _ = tx.send(response.map_err(Into::into)); - State::AwaitingRequest - } - pending @ State::AwaitingResponse { .. } => { - // Drop the un-consumed request message, - // because we can't process multiple messages at the same time. - debug!( - new_request = %request_msg - .as_ref() - .map(|m| m.to_string()) - .unwrap_or_else(|| "None".into()), - awaiting_response = %pending, + let _ = tx.send(response.map_err(Into::into)); + State::AwaitingRequest + } + pending @ State::AwaitingResponse { .. } => { + // Drop the new request message from the remote peer, + // because we can't process multiple requests at the same time. + debug!( + new_request = %request_msg + .as_ref() + .map(|m| m.to_string()) + .unwrap_or_else(|| "None".into()), + awaiting_response = %pending, "ignoring new request while awaiting a response" - ); - request_msg = None; - pending - }, - _ => unreachable!( - "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", - self.client_rx - ), - }; - - self.update_state_metrics(None); - - // If the message was not consumed, check whether it - // should be handled as a request. - if let Some(msg) = request_msg { - // do NOT instrument with the request span, this is - // independent work - self.handle_message_as_request(msg).await; + ); + pending + }, + _ => unreachable!( + "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", + self.client_rx + ), + }; } } Either::Left((Either::Right(_), _peer_fut)) => {