Revert "introduce Transition enum"

This reverts commit 6906f87ead.
This commit is contained in:
teor 2021-02-23 10:54:48 +10:00 committed by Jane Lusby
parent a5e89f4f2b
commit 72e2e83828
5 changed files with 379 additions and 248 deletions

View File

@ -15,6 +15,7 @@ use client::ClientRequest;
use client::ClientRequestReceiver;
use client::InProgressClientRequest;
use client::MustUseOneshotSender;
use error::ErrorSlot;
pub use client::Client;
pub use connection::Connection;

View File

@ -6,14 +6,14 @@ use std::{
use futures::{
channel::{mpsc, oneshot},
future, ready,
stream::{Stream, StreamExt},
use tower::Service;
use crate::protocol::internal::{Request, Response};
use super::{PeerError, SharedPeerError};
use super::{ErrorSlot, PeerError, SharedPeerError};
/// The "client" duplex half of a peer connection.
pub struct Client {
@ -21,6 +21,7 @@ pub struct Client {
// This is always Some except when we take it on drop.
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
pub(super) server_tx: mpsc::Sender<ClientRequest>,
pub(super) error_slot: ErrorSlot,
/// A message from the `peer::Client` to the `peer::Server`.
@ -97,6 +98,13 @@ impl From<ClientRequest> for InProgressClientRequest {
impl ClientRequestReceiver {
/// Forwards to `inner.close()`
pub fn close(&mut self) {
impl Stream for ClientRequestReceiver {
type Item = InProgressClientRequest;
@ -191,7 +199,10 @@ impl Service<Request> for Client {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if ready!(self.server_tx.poll_ready(cx)).is_err() {
.expect("failed servers must set their error slot")))
} else {
@ -210,7 +221,13 @@ impl Service<Request> for Client {
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
Err(e) => {
if e.is_disconnected() {
async { Err(PeerError::ConnectionClosed.into()) }.boxed()
let ClientRequest { tx, .. } = e.into_inner();
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
.expect("failed servers must set their error slot")))
} else {
// sending fails when there's not enough
// channel space, but we called poll_ready

View File

@ -7,18 +7,14 @@
//! And it's unclear if these assumptions match the `zcashd` implementation.
//! It should be refactored into a cleaner set of request/response pairs (#1515).
use std::{
convert::{TryFrom, TryInto},
use std::{collections::HashSet, sync::Arc};
use futures::{
future::{self, Either},
use tokio::time::Sleep;
use tokio::time::{sleep, Sleep};
use tower::Service;
use tracing_futures::Instrument;
@ -29,6 +25,7 @@ use zebra_chain::{
use crate::{
external::{types::Nonce, InventoryHash, Message},
internal::{Request, Response},
@ -37,7 +34,7 @@ use crate::{
use super::{
ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender, PeerError,
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
@ -320,151 +317,13 @@ pub(super) enum State {
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
span: tracing::Span,
impl State {
async fn step<Rx, S, Tx>(self, conn: &mut Connection<S, Tx>, peer_rx: &mut Rx) -> Transition
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
S: Service<Request, Response = Response, Error = BoxError>,
S::Error: Into<BoxError>,
Tx: Sink<Message, Error = SerializationError> + Unpin,
match self {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
match future::select(, {
Either::Left((None, _)) => Transition::Exit(PeerError::ConnectionClosed.into()),
Either::Left((Some(Err(e)), _)) => Transition::Exit(e.into()),
Either::Left((Some(Ok(msg)), _)) => {
match conn.handle_message_as_request(msg).await {
Ok(()) => Transition::AwaitRequest,
Err(e) => Transition::Exit(e.into()),
Either::Right((None, _)) => {
trace!("client_rx closed, ending connection");
Either::Right((Some(req), _)) => {
if req.tx.is_canceled() {
metrics::counter!("peer.canceled", 1);
tracing::debug!("ignoring canceled request");
return Transition::AwaitRequest;
let span = req.span.clone();
// We're awaiting a response to a client request,
// so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse {
mut tx,
mut handler,
} => {
// we have to get rid of the span reference so we can tamper with the state
let span = span.clone();
trace!(parent: &span, "awaiting response to client request");
let timer_ref = conn
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(, cancel)
Either::Left((None, _)) => Transition::ExitResponse {
e: PeerError::ConnectionClosed.into(),
Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx },
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
let request_msg = span.in_scope(|| handler.process_message(peer_msg));
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
match conn.handle_message_as_request(msg).await {
Ok(()) => Transition::AwaitRequest,
Err(e) => Transition::Exit(e.into()),
} else {
// Otherwise, check whether the handler is finished
// processing messages and update the state.
match handler {
Handler::Finished(response) => {
let _ = tx.send(response.map_err(Into::into));
_ => Transition::AwaitResponse { tx, handler, span },
Either::Right((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
match handler {
Handler::Ping(_) => Transition::ExitResponse { e: e.into(), tx },
_ => {
let _ = tx.send(Err(e.into()));
Either::Right((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
/// Enum describing the next state transition that should be taken after any
/// given `step`.
enum Transition {
AwaitResponse {
handler: Handler,
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
span: tracing::Span,
// Exiting while no client response is expected
// Exiting while processing a client response
ExitResponse {
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
e: SharedPeerError,
impl TryFrom<Transition> for State {
type Error = SharedPeerError;
fn try_from(trans: Transition) -> Result<Self, Self::Error> {
match trans {
Transition::AwaitRequest => Ok(State::AwaitingRequest),
Transition::AwaitResponse { handler, tx, span } => {
Ok(State::AwaitingResponse { handler, tx, span })
Transition::Exit(e) => Err(e),
Transition::ExitResponse { tx, e } => {
let _ = tx.send(Err(e.clone()));
/// A failure has occurred and we are shutting down the connection.
/// The state associated with a peer connection.
pub struct Connection<S, Tx> {
pub(super) state: Option<State>,
pub(super) state: State,
/// A timeout for a client request. This is stored separately from
/// State so that we can move the future out of it independently of
/// other state handling.
@ -473,6 +332,8 @@ pub struct Connection<S, Tx> {
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
/// `InProgressClientRequest`
pub(super) client_rx: ClientRequestReceiver,
/// A slot for an error shared between the Connection and the Client that uses it.
pub(super) error_slot: ErrorSlot,
//pub(super) peer_rx: Rx,
pub(super) peer_tx: Tx,
@ -508,60 +369,287 @@ where
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.
loop {
let transition = self
.expect("state only None during steps")
.step(&mut self, &mut peer_rx)
self.state = match transition.try_into() {
Ok(state) => Some(state),
Err(e) => {
while let Some(InProgressClientRequest { tx, span, .. }) =
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
match future::select(, {
Either::Left((None, _)) => {
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(msg)), _)) => {
match self.handle_message_as_request(msg).await {
Ok(()) => {}
Err(e) => self.fail_with(e),
Either::Right((None, _)) => {
trace!("client_rx closed, ending connection");
Either::Right((Some(req), _)) => {
let span = req.span.clone();
// We're awaiting a response to a client request,
// so wait on either a peer message, or on a request cancellation.
State::AwaitingResponse {
ref span,
ref mut tx,
} => {
// we have to get rid of the span reference so we can tamper with the state
let span = span.clone();
trace!(parent: &span, "awaiting response to client request");
let timer_ref = self
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(, cancel)
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
// call to handle_message_as_request, which takes
// &mut self. This is a sign that we don't properly
// factor the state required for inbound and
// outbound requests.
let request_msg = match self.state {
State::AwaitingResponse {
ref mut handler, ..
} => span.in_scope(|| handler.process_message(peer_msg)),
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
// If the message was not consumed, check whether it
// should be handled as a request.
if let Some(msg) = request_msg {
// do NOT instrument with the request span, this is
// independent work
match self.handle_message_as_request(msg).await {
Ok(()) => {}
Err(e) => self.fail_with(e),
} else {
// Otherwise, check whether the handler is finished
// processing messages and update the state.
self.state = match self.state {
State::AwaitingResponse {
handler: Handler::Finished(response),
} => {
let _ = tx.send(response.map_err(Into::into));
pending @ State::AwaitingResponse { .. } => pending,
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
Either::Right((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
// Special case: ping timeouts fail the connection.
State::AwaitingResponse {
handler: Handler::Ping(_),
} => {
// Other request timeouts fail the request.
State::AwaitingResponse { tx, .. } => {
let _ = tx.send(Err(e.into()));
_ => unreachable!(
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
Either::Right((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
// We've failed, but we need to flush all pending client
// requests before we can return and complete the future.
State::Failed => {
match {
Some(InProgressClientRequest { tx, span, .. }) => {
parent: &span,
"sending an error response to a pending request on a failed connection"
let _ = tx.send(Err(e.clone()));
let e = self
.expect("cannot enter failed state without setting error slot");
let _ = tx.send(Err(e));
// Continue until we've errored all queued reqs
None => return,
/// Marks the peer as having failed with error `e`.
fn fail_with<E>(&mut self, e: E)
E: Into<SharedPeerError>,
let e = e.into();
connection_state = ?self.state,
client_receiver = ?self.client_rx,
"failing peer service with error");
// Update the shared error slot
let mut guard = self
.expect("mutex should be unpoisoned");
if let Some(original_error) = guard.clone() {
// This panic typically happens due to these bugs:
// * we mark a connection as failed without using fail_with
// * we call fail_with without checking for a failed connection
// state
// See the original bug #1510 and PR #1531, and the later bug #1599
// and PR #1600.
"calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}",
} else {
*guard = Some(e);
// Drop the guard immediately to release the mutex.
// We want to close the client channel and set State::Failed so
// that we can flush any pending client requests. However, we may have
// an outstanding client request in State::AwaitingResponse, so
// we need to deal with it first if it exists.
let old_state = std::mem::replace(&mut self.state, State::Failed);
if let State::AwaitingResponse { tx, .. } = old_state {
// We know the slot has Some(e) because we just set it above,
// and the error slot is never unset.
let e = self.error_slot.try_get_error().unwrap();
let _ = tx.send(Err(e));
/// Handle an incoming client request, possibly generating outgoing messages to the
/// remote peer.
/// Correctness: This function MUST only be called while in the AwaitingRequest state
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
async fn handle_client_request(&mut self, req: InProgressClientRequest) -> Transition {
async fn handle_client_request(&mut self, req: InProgressClientRequest) {
use State::*;
if req.tx.is_canceled() {
metrics::counter!("peer.canceled", 1);
tracing::debug!("ignoring canceled request");
let new_state_result = self._handle_client_request(req).await;
// Updates state or fails.
match new_state_result {
Ok(AwaitingRequest) => {
self.state = AwaitingRequest;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
Ok(new_state @ AwaitingResponse { .. }) => {
self.state = new_state;
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
Err((e, tx)) => {
let e = SharedPeerError::from(e);
let _ = tx.send(Err(e.clone()));
// unreachable states
Ok(Failed) => unreachable!(
"failed client requests must use fail_with(error) to reach a Failed state."
async fn _handle_client_request(
&mut self,
req: InProgressClientRequest,
) -> Result<
MustUseOneshotSender<Result<Response, SharedPeerError>>,
> {
use Request::*;
use State::*;
let InProgressClientRequest { request, tx, span } = req;
match request {
Peers => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Transition::AwaitResponse {
match (&self.state, request) {
(Failed, request) => panic!(
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
(pending @ AwaitingResponse { .. }, request) => panic!(
"tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::Peers,
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
Ping(nonce) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Transition::AwaitResponse {
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::Ping(nonce),
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
BlocksByHash(hashes) => {
(AwaitingRequest, BlocksByHash(hashes)) => {
match self
@ -569,7 +657,8 @@ where
Ok(()) => Transition::AwaitResponse {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::BlocksByHash {
blocks: Vec::with_capacity(hashes.len()),
@ -577,10 +666,11 @@ where
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
TransactionsByHash(hashes) => {
(AwaitingRequest, TransactionsByHash(hashes)) => {
match self
@ -588,7 +678,8 @@ where
Ok(()) => Transition::AwaitResponse {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::TransactionsByHash {
transactions: Vec::with_capacity(hashes.len()),
@ -596,57 +687,66 @@ where
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
FindBlocks { known_blocks, stop } => {
(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
match self
.send(Message::GetBlocks { known_blocks, stop })
Ok(()) => Transition::AwaitResponse {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::FindBlocks,
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
FindHeaders { known_blocks, stop } => {
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
match self
.send(Message::GetHeaders { known_blocks, stop })
Ok(()) => Transition::AwaitResponse {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::FindHeaders,
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
MempoolTransactions => match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Transition::AwaitResponse {
(AwaitingRequest, MempoolTransactions) => {
match self.peer_tx.send(Message::Mempool).await {
Ok(()) => Ok(
AwaitingResponse {
handler: Handler::MempoolTransactions,
Err(e) => Transition::ExitResponse { e: e.into(), tx },
PushTransaction(transaction) => {
Err(e) => Err((e, tx)),
(AwaitingRequest, PushTransaction(transaction)) => {
match self.peer_tx.send(Message::Tx(transaction)).await {
Ok(()) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
AdvertiseTransactions(hashes) => {
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
match self
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
@ -656,20 +756,20 @@ where
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
AdvertiseBlock(hash) => {
(AwaitingRequest, AdvertiseBlock(hash)) => {
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
Ok(()) => {
// Since we're not waiting for further messages, we need to
// send a response before dropping tx.
let _ = tx.send(Ok(Response::Nil));
Err(e) => Transition::ExitResponse { e: e.into(), tx },
Err(e) => Err((e, tx)),
@ -802,7 +902,7 @@ where
match rsp {
Response::Nil => { /* generic success, do nothing */ }
Response::Nil => { /* generic success, do nothing */ },
Response::Peers(addrs) => self.peer_tx.send(Message::Addr(addrs)).await?,
Response::Transactions(transactions) => {
// Generate one tx message per transaction.

View File

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use thiserror::Error;
@ -28,9 +28,6 @@ pub enum PeerError {
/// The remote peer closed the connection.
#[error("Peer closed connection")]
/// The local client closed the connection.
#[error("Internal client dropped connection")]
/// The remote peer did not respond to a [`peer::Client`] request in time.
#[error("Client request timed out")]
@ -67,6 +64,19 @@ pub enum PeerError {
#[derive(Default, Clone)]
pub(super) struct ErrorSlot(pub(super) Arc<Mutex<Option<SharedPeerError>>>);
impl ErrorSlot {
pub fn try_get_error(&self) -> Option<SharedPeerError> {
.expect("error mutex should be unpoisoned")
/// An error during a handshake with a remote peer.
#[derive(Error, Debug)]
pub enum HandshakeError {

View File

@ -30,7 +30,7 @@ use crate::{
BoxError, Config, PeerAddrState,
use super::{Client, Connection, HandshakeError, PeerError};
use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};
/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
@ -349,10 +349,12 @@ where
// in this block, see for more.
let (server_tx, server_rx) = mpsc::channel(0);
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let slot = ErrorSlot::default();
let client = Client {
shutdown_tx: Some(shutdown_tx),
server_tx: server_tx.clone(),
error_slot: slot.clone(),
let (peer_tx, peer_rx) = stream.split();
@ -432,9 +434,10 @@ where
use super::connection;
let server = Connection {
state: Some(connection::State::AwaitingRequest),
state: connection::State::AwaitingRequest,
svc: inbound_service,
client_rx: server_rx.into(),
error_slot: slot,
request_timer: None,