From 72e2e838289ae9703b15d73b0108d0c43093aaf1 Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 23 Feb 2021 10:54:48 +1000 Subject: [PATCH] Revert "introduce Transition enum" This reverts commit 6906f87eadd9df2cba990e1b3e721b6ee25e057e. --- zebra-network/src/peer.rs | 1 + zebra-network/src/peer/client.rs | 25 +- zebra-network/src/peer/connection.rs | 576 ++++++++++++++++----------- zebra-network/src/peer/error.rs | 18 +- zebra-network/src/peer/handshake.rs | 7 +- 5 files changed, 379 insertions(+), 248 deletions(-) diff --git a/zebra-network/src/peer.rs b/zebra-network/src/peer.rs index 0f9c6be06..1711ee9f7 100644 --- a/zebra-network/src/peer.rs +++ b/zebra-network/src/peer.rs @@ -15,6 +15,7 @@ use client::ClientRequest; use client::ClientRequestReceiver; use client::InProgressClientRequest; use client::MustUseOneshotSender; +use error::ErrorSlot; pub use client::Client; pub use connection::Connection; diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index edc363825..d3a5640e7 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -6,14 +6,14 @@ use std::{ use futures::{ channel::{mpsc, oneshot}, - ready, + future, ready, stream::{Stream, StreamExt}, }; use tower::Service; use crate::protocol::internal::{Request, Response}; -use super::{PeerError, SharedPeerError}; +use super::{ErrorSlot, PeerError, SharedPeerError}; /// The "client" duplex half of a peer connection. pub struct Client { @@ -21,6 +21,7 @@ pub struct Client { // This is always Some except when we take it on drop. pub(super) shutdown_tx: Option>, pub(super) server_tx: mpsc::Sender, + pub(super) error_slot: ErrorSlot, } /// A message from the `peer::Client` to the `peer::Server`. @@ -97,6 +98,13 @@ impl From for InProgressClientRequest { } } +impl ClientRequestReceiver { + /// Forwards to `inner.close()` + pub fn close(&mut self) { + self.inner.close() + } +} + impl Stream for ClientRequestReceiver { type Item = InProgressClientRequest; @@ -191,7 +199,10 @@ impl Service for Client { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if ready!(self.server_tx.poll_ready(cx)).is_err() { - Poll::Ready(Err(PeerError::ConnectionClosed.into())) + Poll::Ready(Err(self + .error_slot + .try_get_error() + .expect("failed servers must set their error slot"))) } else { Poll::Ready(Ok(())) } @@ -210,7 +221,13 @@ impl Service for Client { match self.server_tx.try_send(ClientRequest { request, span, tx }) { Err(e) => { if e.is_disconnected() { - async { Err(PeerError::ConnectionClosed.into()) }.boxed() + let ClientRequest { tx, .. } = e.into_inner(); + let _ = tx.send(Err(PeerError::ConnectionClosed.into())); + future::ready(Err(self + .error_slot + .try_get_error() + .expect("failed servers must set their error slot"))) + .boxed() } else { // sending fails when there's not enough // channel space, but we called poll_ready diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index cc09e3416..893b63881 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -7,18 +7,14 @@ //! And it's unclear if these assumptions match the `zcashd` implementation. //! It should be refactored into a cleaner set of request/response pairs (#1515). -use std::{ - collections::HashSet, - convert::{TryFrom, TryInto}, - sync::Arc, -}; +use std::{collections::HashSet, sync::Arc}; use futures::{ future::{self, Either}, prelude::*, stream::Stream, }; -use tokio::time::Sleep; +use tokio::time::{sleep, Sleep}; use tower::Service; use tracing_futures::Instrument; @@ -29,6 +25,7 @@ use zebra_chain::{ }; use crate::{ + constants, protocol::{ external::{types::Nonce, InventoryHash, Message}, internal::{Request, Response}, @@ -37,7 +34,7 @@ use crate::{ }; use super::{ - ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender, PeerError, + ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError, SharedPeerError, }; @@ -320,151 +317,13 @@ pub(super) enum State { tx: MustUseOneshotSender>, span: tracing::Span, }, -} - -impl State { - async fn step(self, conn: &mut Connection, peer_rx: &mut Rx) -> Transition - where - Rx: Stream> + Unpin, - S: Service, - S::Error: Into, - Tx: Sink + Unpin, - { - match self { - State::AwaitingRequest => { - trace!("awaiting client request or peer message"); - match future::select(peer_rx.next(), conn.client_rx.next()).await { - Either::Left((None, _)) => Transition::Exit(PeerError::ConnectionClosed.into()), - Either::Left((Some(Err(e)), _)) => Transition::Exit(e.into()), - Either::Left((Some(Ok(msg)), _)) => { - match conn.handle_message_as_request(msg).await { - Ok(()) => Transition::AwaitRequest, - Err(e) => Transition::Exit(e.into()), - } - } - Either::Right((None, _)) => { - trace!("client_rx closed, ending connection"); - Transition::Exit(PeerError::ConnectionDropped.into()) - } - Either::Right((Some(req), _)) => { - if req.tx.is_canceled() { - metrics::counter!("peer.canceled", 1); - tracing::debug!("ignoring canceled request"); - return Transition::AwaitRequest; - } - - let span = req.span.clone(); - conn.handle_client_request(req).instrument(span).await - } - } - } - // We're awaiting a response to a client request, - // so wait on either a peer message, or on a request cancellation. - State::AwaitingResponse { - span, - mut tx, - mut handler, - } => { - // we have to get rid of the span reference so we can tamper with the state - let span = span.clone(); - trace!(parent: &span, "awaiting response to client request"); - let timer_ref = conn - .request_timer - .as_mut() - .expect("timeout must be set while awaiting response"); - let cancel = future::select(timer_ref, tx.cancellation()); - match future::select(peer_rx.next(), cancel) - .instrument(span.clone()) - .await - { - Either::Left((None, _)) => Transition::ExitResponse { - e: PeerError::ConnectionClosed.into(), - tx, - }, - Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx }, - Either::Left((Some(Ok(peer_msg)), _cancel)) => { - let request_msg = span.in_scope(|| handler.process_message(peer_msg)); - // If the message was not consumed, check whether it - // should be handled as a request. - if let Some(msg) = request_msg { - // do NOT instrument with the request span, this is - // independent work - match conn.handle_message_as_request(msg).await { - Ok(()) => Transition::AwaitRequest, - Err(e) => Transition::Exit(e.into()), - } - } else { - // Otherwise, check whether the handler is finished - // processing messages and update the state. - match handler { - Handler::Finished(response) => { - let _ = tx.send(response.map_err(Into::into)); - Transition::AwaitRequest - } - _ => Transition::AwaitResponse { tx, handler, span }, - } - } - } - Either::Right((Either::Left(_), _peer_fut)) => { - trace!(parent: &span, "client request timed out"); - let e = PeerError::ClientRequestTimeout; - match handler { - Handler::Ping(_) => Transition::ExitResponse { e: e.into(), tx }, - _ => { - let _ = tx.send(Err(e.into())); - Transition::AwaitRequest - } - } - } - Either::Right((Either::Right(_), _peer_fut)) => { - trace!(parent: &span, "client request was cancelled"); - Transition::AwaitRequest - } - } - } - } - } -} - -/// Enum describing the next state transition that should be taken after any -/// given `step`. -enum Transition { - AwaitRequest, - AwaitResponse { - handler: Handler, - tx: MustUseOneshotSender>, - span: tracing::Span, - }, - // Exiting while no client response is expected - Exit(SharedPeerError), - // Exiting while processing a client response - ExitResponse { - tx: MustUseOneshotSender>, - e: SharedPeerError, - }, -} - -impl TryFrom for State { - type Error = SharedPeerError; - - fn try_from(trans: Transition) -> Result { - match trans { - Transition::AwaitRequest => Ok(State::AwaitingRequest), - Transition::AwaitResponse { handler, tx, span } => { - Ok(State::AwaitingResponse { handler, tx, span }) - } - Transition::Exit(e) => Err(e), - Transition::ExitResponse { tx, e } => { - let _ = tx.send(Err(e.clone())); - Err(e) - } - } - } + /// A failure has occurred and we are shutting down the connection. + Failed, } /// The state associated with a peer connection. pub struct Connection { - pub(super) state: Option, + pub(super) state: State, /// A timeout for a client request. This is stored separately from /// State so that we can move the future out of it independently of /// other state handling. @@ -473,6 +332,8 @@ pub struct Connection { /// A `mpsc::Receiver` that converts its results to /// `InProgressClientRequest` pub(super) client_rx: ClientRequestReceiver, + /// A slot for an error shared between the Connection and the Client that uses it. + pub(super) error_slot: ErrorSlot, //pub(super) peer_rx: Rx, pub(super) peer_tx: Tx, } @@ -508,60 +369,287 @@ where // If there is a pending request, we wait only on an incoming peer message, and // check whether it can be interpreted as a response to the pending request. loop { - let transition = self - .state - .take() - .expect("state only None during steps") - .step(&mut self, &mut peer_rx) - .await; - - self.state = match transition.try_into() { - Ok(state) => Some(state), - Err(e) => { - while let Some(InProgressClientRequest { tx, span, .. }) = - self.client_rx.next().await - { - trace!( - parent: &span, - "sending an error response to a pending request on a failed connection" - ); - let _ = tx.send(Err(e.clone())); + match self.state { + State::AwaitingRequest => { + trace!("awaiting client request or peer message"); + match future::select(peer_rx.next(), self.client_rx.next()).await { + Either::Left((None, _)) => { + self.fail_with(PeerError::ConnectionClosed); + } + Either::Left((Some(Err(e)), _)) => self.fail_with(e), + Either::Left((Some(Ok(msg)), _)) => { + match self.handle_message_as_request(msg).await { + Ok(()) => {} + Err(e) => self.fail_with(e), + } + } + Either::Right((None, _)) => { + trace!("client_rx closed, ending connection"); + return; + } + Either::Right((Some(req), _)) => { + let span = req.span.clone(); + self.handle_client_request(req).instrument(span).await + } + } + } + // We're awaiting a response to a client request, + // so wait on either a peer message, or on a request cancellation. + State::AwaitingResponse { + ref span, + ref mut tx, + .. + } => { + // we have to get rid of the span reference so we can tamper with the state + let span = span.clone(); + trace!(parent: &span, "awaiting response to client request"); + let timer_ref = self + .request_timer + .as_mut() + .expect("timeout must be set while awaiting response"); + let cancel = future::select(timer_ref, tx.cancellation()); + match future::select(peer_rx.next(), cancel) + .instrument(span.clone()) + .await + { + Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), + Either::Left((Some(Err(e)), _)) => self.fail_with(e), + Either::Left((Some(Ok(peer_msg)), _cancel)) => { + // Try to process the message using the handler. + // This extremely awkward construction avoids + // keeping a live reference to handler across the + // call to handle_message_as_request, which takes + // &mut self. This is a sign that we don't properly + // factor the state required for inbound and + // outbound requests. + let request_msg = match self.state { + State::AwaitingResponse { + ref mut handler, .. + } => span.in_scope(|| handler.process_message(peer_msg)), + _ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}", + self.state, + peer_msg, + self.client_rx, + ), + }; + // If the message was not consumed, check whether it + // should be handled as a request. + if let Some(msg) = request_msg { + // do NOT instrument with the request span, this is + // independent work + match self.handle_message_as_request(msg).await { + Ok(()) => {} + Err(e) => self.fail_with(e), + } + } else { + // Otherwise, check whether the handler is finished + // processing messages and update the state. + self.state = match self.state { + State::AwaitingResponse { + handler: Handler::Finished(response), + tx, + .. + } => { + let _ = tx.send(response.map_err(Into::into)); + State::AwaitingRequest + } + pending @ State::AwaitingResponse { .. } => pending, + _ => unreachable!( + "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", + self.client_rx + ), + }; + } + } + Either::Right((Either::Left(_), _peer_fut)) => { + trace!(parent: &span, "client request timed out"); + let e = PeerError::ClientRequestTimeout; + self.state = match self.state { + // Special case: ping timeouts fail the connection. + State::AwaitingResponse { + handler: Handler::Ping(_), + .. + } => { + self.fail_with(e); + State::Failed + } + // Other request timeouts fail the request. + State::AwaitingResponse { tx, .. } => { + let _ = tx.send(Err(e.into())); + State::AwaitingRequest + } + _ => unreachable!( + "unexpected failed connection state while AwaitingResponse: client_receiver: {:?}", + self.client_rx + ), + }; + } + Either::Right((Either::Right(_), _peer_fut)) => { + trace!(parent: &span, "client request was cancelled"); + self.state = State::AwaitingRequest; + } + } + } + // We've failed, but we need to flush all pending client + // requests before we can return and complete the future. + State::Failed => { + match self.client_rx.next().await { + Some(InProgressClientRequest { tx, span, .. }) => { + trace!( + parent: &span, + "sending an error response to a pending request on a failed connection" + ); + let e = self + .error_slot + .try_get_error() + .expect("cannot enter failed state without setting error slot"); + let _ = tx.send(Err(e)); + // Continue until we've errored all queued reqs + continue; + } + None => return, } - return; } } } } + /// Marks the peer as having failed with error `e`. + fn fail_with(&mut self, e: E) + where + E: Into, + { + let e = e.into(); + debug!(%e, + connection_state = ?self.state, + client_receiver = ?self.client_rx, + "failing peer service with error"); + // Update the shared error slot + let mut guard = self + .error_slot + .0 + .lock() + .expect("mutex should be unpoisoned"); + if let Some(original_error) = guard.clone() { + // This panic typically happens due to these bugs: + // * we mark a connection as failed without using fail_with + // * we call fail_with without checking for a failed connection + // state + // + // See the original bug #1510 and PR #1531, and the later bug #1599 + // and PR #1600. + panic!( + "calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}", + self.state, + original_error, + e, + self.client_rx + ); + } else { + *guard = Some(e); + } + // Drop the guard immediately to release the mutex. + std::mem::drop(guard); + + // We want to close the client channel and set State::Failed so + // that we can flush any pending client requests. However, we may have + // an outstanding client request in State::AwaitingResponse, so + // we need to deal with it first if it exists. + self.client_rx.close(); + let old_state = std::mem::replace(&mut self.state, State::Failed); + if let State::AwaitingResponse { tx, .. } = old_state { + // We know the slot has Some(e) because we just set it above, + // and the error slot is never unset. + let e = self.error_slot.try_get_error().unwrap(); + let _ = tx.send(Err(e)); + } + } + /// Handle an incoming client request, possibly generating outgoing messages to the /// remote peer. /// - /// Correctness: This function MUST only be called while in the AwaitingRequest state - /// /// NOTE: the caller should use .instrument(msg.span) to instrument the function. - async fn handle_client_request(&mut self, req: InProgressClientRequest) -> Transition { + async fn handle_client_request(&mut self, req: InProgressClientRequest) { trace!(?req.request); + use State::*; + + if req.tx.is_canceled() { + metrics::counter!("peer.canceled", 1); + tracing::debug!("ignoring canceled request"); + return; + } + + let new_state_result = self._handle_client_request(req).await; + + // Updates state or fails. + match new_state_result { + Ok(AwaitingRequest) => { + self.state = AwaitingRequest; + self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); + } + Ok(new_state @ AwaitingResponse { .. }) => { + self.state = new_state; + self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT)); + } + Err((e, tx)) => { + let e = SharedPeerError::from(e); + let _ = tx.send(Err(e.clone())); + self.fail_with(e); + } + // unreachable states + Ok(Failed) => unreachable!( + "failed client requests must use fail_with(error) to reach a Failed state." + ), + }; + } + + async fn _handle_client_request( + &mut self, + req: InProgressClientRequest, + ) -> Result< + State, + ( + SerializationError, + MustUseOneshotSender>, + ), + > { use Request::*; + use State::*; let InProgressClientRequest { request, tx, span } = req; - match request { - Peers => match self.peer_tx.send(Message::GetAddr).await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::Peers, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + match (&self.state, request) { + (Failed, request) => panic!( + "failed connection cannot handle new request: {:?}, client_receiver: {:?}", + request, + self.client_rx + ), + (pending @ AwaitingResponse { .. }, request) => panic!( + "tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}", + request, + pending, + self.client_rx + ), + (AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await { + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::Peers, + tx, + span, + }, + ), + Err(e) => Err((e, tx)), }, - Ping(nonce) => match self.peer_tx.send(Message::Ping(nonce)).await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::Ping(nonce), - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + (AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await { + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::Ping(nonce), + tx, + span, + }, + ), + Err(e) => Err((e, tx)), }, - BlocksByHash(hashes) => { + (AwaitingRequest, BlocksByHash(hashes)) => { match self .peer_tx .send(Message::GetData( @@ -569,18 +657,20 @@ where )) .await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::BlocksByHash { - blocks: Vec::with_capacity(hashes.len()), - hashes, + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::BlocksByHash { + blocks: Vec::with_capacity(hashes.len()), + hashes, + }, + tx, + span, }, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + ), + Err(e) => Err((e, tx)), } } - TransactionsByHash(hashes) => { + (AwaitingRequest, TransactionsByHash(hashes)) => { match self .peer_tx .send(Message::GetData( @@ -588,65 +678,75 @@ where )) .await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::TransactionsByHash { - transactions: Vec::with_capacity(hashes.len()), - hashes, + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::TransactionsByHash { + transactions: Vec::with_capacity(hashes.len()), + hashes, + }, + tx, + span, }, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + ), + Err(e) => Err((e, tx)), } } - FindBlocks { known_blocks, stop } => { + (AwaitingRequest, FindBlocks { known_blocks, stop }) => { match self .peer_tx .send(Message::GetBlocks { known_blocks, stop }) .await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::FindBlocks, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::FindBlocks, + tx, + span, + }, + ), + Err(e) => Err((e, tx)), } } - FindHeaders { known_blocks, stop } => { + (AwaitingRequest, FindHeaders { known_blocks, stop }) => { match self .peer_tx .send(Message::GetHeaders { known_blocks, stop }) .await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::FindHeaders, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::FindHeaders, + tx, + span, + }, + ), + Err(e) => Err((e, tx)), } } - MempoolTransactions => match self.peer_tx.send(Message::Mempool).await { - Ok(()) => Transition::AwaitResponse { - handler: Handler::MempoolTransactions, - tx, - span, - }, - Err(e) => Transition::ExitResponse { e: e.into(), tx }, - }, - PushTransaction(transaction) => { + (AwaitingRequest, MempoolTransactions) => { + match self.peer_tx.send(Message::Mempool).await { + Ok(()) => Ok( + AwaitingResponse { + handler: Handler::MempoolTransactions, + tx, + span, + }, + ), + Err(e) => Err((e, tx)), + } + } + (AwaitingRequest, PushTransaction(transaction)) => { match self.peer_tx.send(Message::Tx(transaction)).await { Ok(()) => { // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Transition::AwaitRequest - } - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(AwaitingRequest) + }, + Err(e) => Err((e, tx)), } } - AdvertiseTransactions(hashes) => { + (AwaitingRequest, AdvertiseTransactions(hashes)) => { match self .peer_tx .send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect())) @@ -656,20 +756,20 @@ where // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Transition::AwaitRequest - } - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(AwaitingRequest) + }, + Err(e) => Err((e, tx)), } } - AdvertiseBlock(hash) => { + (AwaitingRequest, AdvertiseBlock(hash)) => { match self.peer_tx.send(Message::Inv(vec![hash.into()])).await { Ok(()) => { // Since we're not waiting for further messages, we need to // send a response before dropping tx. let _ = tx.send(Ok(Response::Nil)); - Transition::AwaitRequest - } - Err(e) => Transition::ExitResponse { e: e.into(), tx }, + Ok(AwaitingRequest) + }, + Err(e) => Err((e, tx)), } } } @@ -802,7 +902,7 @@ where }; match rsp { - Response::Nil => { /* generic success, do nothing */ } + Response::Nil => { /* generic success, do nothing */ }, Response::Peers(addrs) => self.peer_tx.send(Message::Addr(addrs)).await?, Response::Transactions(transactions) => { // Generate one tx message per transaction. diff --git a/zebra-network/src/peer/error.rs b/zebra-network/src/peer/error.rs index 8436fb54f..d18f6b932 100644 --- a/zebra-network/src/peer/error.rs +++ b/zebra-network/src/peer/error.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use thiserror::Error; @@ -28,9 +28,6 @@ pub enum PeerError { /// The remote peer closed the connection. #[error("Peer closed connection")] ConnectionClosed, - /// The local client closed the connection. - #[error("Internal client dropped connection")] - ConnectionDropped, /// The remote peer did not respond to a [`peer::Client`] request in time. #[error("Client request timed out")] ClientRequestTimeout, @@ -67,6 +64,19 @@ pub enum PeerError { NotFound(Vec), } +#[derive(Default, Clone)] +pub(super) struct ErrorSlot(pub(super) Arc>>); + +impl ErrorSlot { + pub fn try_get_error(&self) -> Option { + self.0 + .lock() + .expect("error mutex should be unpoisoned") + .as_ref() + .cloned() + } +} + /// An error during a handshake with a remote peer. #[derive(Error, Debug)] pub enum HandshakeError { diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index 92fc366dd..2d8597240 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -30,7 +30,7 @@ use crate::{ BoxError, Config, PeerAddrState, }; -use super::{Client, Connection, HandshakeError, PeerError}; +use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError}; /// A [`Service`] that handshakes with a remote peer and constructs a /// client/server pair. @@ -349,10 +349,12 @@ where // in this block, see constants.rs for more. let (server_tx, server_rx) = mpsc::channel(0); let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let slot = ErrorSlot::default(); let client = Client { shutdown_tx: Some(shutdown_tx), server_tx: server_tx.clone(), + error_slot: slot.clone(), }; let (peer_tx, peer_rx) = stream.split(); @@ -432,9 +434,10 @@ where use super::connection; let server = Connection { - state: Some(connection::State::AwaitingRequest), + state: connection::State::AwaitingRequest, svc: inbound_service, client_rx: server_rx.into(), + error_slot: slot, peer_tx, request_timer: None, };