introduce Transition enum
This commit is contained in:
parent
e6cb20e13f
commit
6906f87ead
|
@ -15,7 +15,6 @@ use client::ClientRequest;
|
|||
use client::ClientRequestReceiver;
|
||||
use client::InProgressClientRequest;
|
||||
use client::MustUseOneshotSender;
|
||||
use error::ErrorSlot;
|
||||
|
||||
pub use client::Client;
|
||||
pub use connection::Connection;
|
||||
|
|
|
@ -6,14 +6,14 @@ use std::{
|
|||
|
||||
use futures::{
|
||||
channel::{mpsc, oneshot},
|
||||
future, ready,
|
||||
ready,
|
||||
stream::{Stream, StreamExt},
|
||||
};
|
||||
use tower::Service;
|
||||
|
||||
use crate::protocol::internal::{Request, Response};
|
||||
|
||||
use super::{ErrorSlot, PeerError, SharedPeerError};
|
||||
use super::{PeerError, SharedPeerError};
|
||||
|
||||
/// The "client" duplex half of a peer connection.
|
||||
pub struct Client {
|
||||
|
@ -21,7 +21,6 @@ pub struct Client {
|
|||
// This is always Some except when we take it on drop.
|
||||
pub(super) shutdown_tx: Option<oneshot::Sender<()>>,
|
||||
pub(super) server_tx: mpsc::Sender<ClientRequest>,
|
||||
pub(super) error_slot: ErrorSlot,
|
||||
}
|
||||
|
||||
/// A message from the `peer::Client` to the `peer::Server`.
|
||||
|
@ -98,13 +97,6 @@ impl From<ClientRequest> for InProgressClientRequest {
|
|||
}
|
||||
}
|
||||
|
||||
impl ClientRequestReceiver {
|
||||
/// Forwards to `inner.close()`
|
||||
pub fn close(&mut self) {
|
||||
self.inner.close()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for ClientRequestReceiver {
|
||||
type Item = InProgressClientRequest;
|
||||
|
||||
|
@ -199,10 +191,7 @@ impl Service<Request> for Client {
|
|||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
if ready!(self.server_tx.poll_ready(cx)).is_err() {
|
||||
Poll::Ready(Err(self
|
||||
.error_slot
|
||||
.try_get_error()
|
||||
.expect("failed servers must set their error slot")))
|
||||
Poll::Ready(Err(PeerError::ConnectionClosed.into()))
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
@ -221,13 +210,7 @@ impl Service<Request> for Client {
|
|||
match self.server_tx.try_send(ClientRequest { request, span, tx }) {
|
||||
Err(e) => {
|
||||
if e.is_disconnected() {
|
||||
let ClientRequest { tx, .. } = e.into_inner();
|
||||
let _ = tx.send(Err(PeerError::ConnectionClosed.into()));
|
||||
future::ready(Err(self
|
||||
.error_slot
|
||||
.try_get_error()
|
||||
.expect("failed servers must set their error slot")))
|
||||
.boxed()
|
||||
async { Err(PeerError::ConnectionClosed.into()) }.boxed()
|
||||
} else {
|
||||
// sending fails when there's not enough
|
||||
// channel space, but we called poll_ready
|
||||
|
|
|
@ -7,14 +7,18 @@
|
|||
//! And it's unclear if these assumptions match the `zcashd` implementation.
|
||||
//! It should be refactored into a cleaner set of request/response pairs (#1515).
|
||||
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
convert::{TryFrom, TryInto},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use futures::{
|
||||
future::{self, Either},
|
||||
prelude::*,
|
||||
stream::Stream,
|
||||
};
|
||||
use tokio::time::{sleep, Sleep};
|
||||
use tokio::time::Sleep;
|
||||
use tower::Service;
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
|
@ -25,7 +29,6 @@ use zebra_chain::{
|
|||
};
|
||||
|
||||
use crate::{
|
||||
constants,
|
||||
protocol::{
|
||||
external::{types::Nonce, InventoryHash, Message},
|
||||
internal::{Request, Response},
|
||||
|
@ -34,7 +37,7 @@ use crate::{
|
|||
};
|
||||
|
||||
use super::{
|
||||
ClientRequestReceiver, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
|
||||
ClientRequestReceiver, InProgressClientRequest, MustUseOneshotSender, PeerError,
|
||||
SharedPeerError,
|
||||
};
|
||||
|
||||
|
@ -317,13 +320,151 @@ pub(super) enum State {
|
|||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
span: tracing::Span,
|
||||
},
|
||||
/// A failure has occurred and we are shutting down the connection.
|
||||
Failed,
|
||||
}
|
||||
|
||||
impl State {
|
||||
async fn step<Rx, S, Tx>(self, conn: &mut Connection<S, Tx>, peer_rx: &mut Rx) -> Transition
|
||||
where
|
||||
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
|
||||
S: Service<Request, Response = Response, Error = BoxError>,
|
||||
S::Error: Into<BoxError>,
|
||||
Tx: Sink<Message, Error = SerializationError> + Unpin,
|
||||
{
|
||||
match self {
|
||||
State::AwaitingRequest => {
|
||||
trace!("awaiting client request or peer message");
|
||||
match future::select(peer_rx.next(), conn.client_rx.next()).await {
|
||||
Either::Left((None, _)) => Transition::Exit(PeerError::ConnectionClosed.into()),
|
||||
Either::Left((Some(Err(e)), _)) => Transition::Exit(e.into()),
|
||||
Either::Left((Some(Ok(msg)), _)) => {
|
||||
match conn.handle_message_as_request(msg).await {
|
||||
Ok(()) => Transition::AwaitRequest,
|
||||
Err(e) => Transition::Exit(e.into()),
|
||||
}
|
||||
}
|
||||
Either::Right((None, _)) => {
|
||||
trace!("client_rx closed, ending connection");
|
||||
Transition::Exit(PeerError::ConnectionDropped.into())
|
||||
}
|
||||
Either::Right((Some(req), _)) => {
|
||||
if req.tx.is_canceled() {
|
||||
metrics::counter!("peer.canceled", 1);
|
||||
tracing::debug!("ignoring canceled request");
|
||||
return Transition::AwaitRequest;
|
||||
}
|
||||
|
||||
let span = req.span.clone();
|
||||
conn.handle_client_request(req).instrument(span).await
|
||||
}
|
||||
}
|
||||
}
|
||||
// We're awaiting a response to a client request,
|
||||
// so wait on either a peer message, or on a request cancellation.
|
||||
State::AwaitingResponse {
|
||||
span,
|
||||
mut tx,
|
||||
mut handler,
|
||||
} => {
|
||||
// we have to get rid of the span reference so we can tamper with the state
|
||||
let span = span.clone();
|
||||
trace!(parent: &span, "awaiting response to client request");
|
||||
let timer_ref = conn
|
||||
.request_timer
|
||||
.as_mut()
|
||||
.expect("timeout must be set while awaiting response");
|
||||
let cancel = future::select(timer_ref, tx.cancellation());
|
||||
match future::select(peer_rx.next(), cancel)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
{
|
||||
Either::Left((None, _)) => Transition::ExitResponse {
|
||||
e: PeerError::ConnectionClosed.into(),
|
||||
tx,
|
||||
},
|
||||
Either::Left((Some(Err(e)), _)) => Transition::ExitResponse { e: e.into(), tx },
|
||||
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
|
||||
let request_msg = span.in_scope(|| handler.process_message(peer_msg));
|
||||
// If the message was not consumed, check whether it
|
||||
// should be handled as a request.
|
||||
if let Some(msg) = request_msg {
|
||||
// do NOT instrument with the request span, this is
|
||||
// independent work
|
||||
match conn.handle_message_as_request(msg).await {
|
||||
Ok(()) => Transition::AwaitRequest,
|
||||
Err(e) => Transition::Exit(e.into()),
|
||||
}
|
||||
} else {
|
||||
// Otherwise, check whether the handler is finished
|
||||
// processing messages and update the state.
|
||||
match handler {
|
||||
Handler::Finished(response) => {
|
||||
let _ = tx.send(response.map_err(Into::into));
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
_ => Transition::AwaitResponse { tx, handler, span },
|
||||
}
|
||||
}
|
||||
}
|
||||
Either::Right((Either::Left(_), _peer_fut)) => {
|
||||
trace!(parent: &span, "client request timed out");
|
||||
let e = PeerError::ClientRequestTimeout;
|
||||
match handler {
|
||||
Handler::Ping(_) => Transition::ExitResponse { e: e.into(), tx },
|
||||
_ => {
|
||||
let _ = tx.send(Err(e.into()));
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
}
|
||||
}
|
||||
Either::Right((Either::Right(_), _peer_fut)) => {
|
||||
trace!(parent: &span, "client request was cancelled");
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Enum describing the next state transition that should be taken after any
|
||||
/// given `step`.
|
||||
enum Transition {
|
||||
AwaitRequest,
|
||||
AwaitResponse {
|
||||
handler: Handler,
|
||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
span: tracing::Span,
|
||||
},
|
||||
// Exiting while no client response is expected
|
||||
Exit(SharedPeerError),
|
||||
// Exiting while processing a client response
|
||||
ExitResponse {
|
||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
e: SharedPeerError,
|
||||
},
|
||||
}
|
||||
|
||||
impl TryFrom<Transition> for State {
|
||||
type Error = SharedPeerError;
|
||||
|
||||
fn try_from(trans: Transition) -> Result<Self, Self::Error> {
|
||||
match trans {
|
||||
Transition::AwaitRequest => Ok(State::AwaitingRequest),
|
||||
Transition::AwaitResponse { handler, tx, span } => {
|
||||
Ok(State::AwaitingResponse { handler, tx, span })
|
||||
}
|
||||
Transition::Exit(e) => Err(e),
|
||||
Transition::ExitResponse { tx, e } => {
|
||||
let _ = tx.send(Err(e.clone()));
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The state associated with a peer connection.
|
||||
pub struct Connection<S, Tx> {
|
||||
pub(super) state: State,
|
||||
pub(super) state: Option<State>,
|
||||
/// A timeout for a client request. This is stored separately from
|
||||
/// State so that we can move the future out of it independently of
|
||||
/// other state handling.
|
||||
|
@ -332,8 +473,6 @@ pub struct Connection<S, Tx> {
|
|||
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
|
||||
/// `InProgressClientRequest`
|
||||
pub(super) client_rx: ClientRequestReceiver,
|
||||
/// A slot for an error shared between the Connection and the Client that uses it.
|
||||
pub(super) error_slot: ErrorSlot,
|
||||
//pub(super) peer_rx: Rx,
|
||||
pub(super) peer_tx: Tx,
|
||||
}
|
||||
|
@ -369,287 +508,60 @@ where
|
|||
// If there is a pending request, we wait only on an incoming peer message, and
|
||||
// check whether it can be interpreted as a response to the pending request.
|
||||
loop {
|
||||
match self.state {
|
||||
State::AwaitingRequest => {
|
||||
trace!("awaiting client request or peer message");
|
||||
match future::select(peer_rx.next(), self.client_rx.next()).await {
|
||||
Either::Left((None, _)) => {
|
||||
self.fail_with(PeerError::ConnectionClosed);
|
||||
}
|
||||
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
|
||||
Either::Left((Some(Ok(msg)), _)) => {
|
||||
match self.handle_message_as_request(msg).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => self.fail_with(e),
|
||||
}
|
||||
}
|
||||
Either::Right((None, _)) => {
|
||||
trace!("client_rx closed, ending connection");
|
||||
return;
|
||||
}
|
||||
Either::Right((Some(req), _)) => {
|
||||
let span = req.span.clone();
|
||||
self.handle_client_request(req).instrument(span).await
|
||||
}
|
||||
}
|
||||
}
|
||||
// We're awaiting a response to a client request,
|
||||
// so wait on either a peer message, or on a request cancellation.
|
||||
State::AwaitingResponse {
|
||||
ref span,
|
||||
ref mut tx,
|
||||
..
|
||||
} => {
|
||||
// we have to get rid of the span reference so we can tamper with the state
|
||||
let span = span.clone();
|
||||
trace!(parent: &span, "awaiting response to client request");
|
||||
let timer_ref = self
|
||||
.request_timer
|
||||
.as_mut()
|
||||
.expect("timeout must be set while awaiting response");
|
||||
let cancel = future::select(timer_ref, tx.cancellation());
|
||||
match future::select(peer_rx.next(), cancel)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
let transition = self
|
||||
.state
|
||||
.take()
|
||||
.expect("state only None during steps")
|
||||
.step(&mut self, &mut peer_rx)
|
||||
.await;
|
||||
|
||||
self.state = match transition.try_into() {
|
||||
Ok(state) => Some(state),
|
||||
Err(e) => {
|
||||
while let Some(InProgressClientRequest { tx, span, .. }) =
|
||||
self.client_rx.next().await
|
||||
{
|
||||
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
|
||||
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
|
||||
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
|
||||
// Try to process the message using the handler.
|
||||
// This extremely awkward construction avoids
|
||||
// keeping a live reference to handler across the
|
||||
// call to handle_message_as_request, which takes
|
||||
// &mut self. This is a sign that we don't properly
|
||||
// factor the state required for inbound and
|
||||
// outbound requests.
|
||||
let request_msg = match self.state {
|
||||
State::AwaitingResponse {
|
||||
ref mut handler, ..
|
||||
} => span.in_scope(|| handler.process_message(peer_msg)),
|
||||
_ => unreachable!("unexpected state after AwaitingResponse: {:?}, peer_msg: {:?}, client_receiver: {:?}",
|
||||
self.state,
|
||||
peer_msg,
|
||||
self.client_rx,
|
||||
),
|
||||
};
|
||||
// If the message was not consumed, check whether it
|
||||
// should be handled as a request.
|
||||
if let Some(msg) = request_msg {
|
||||
// do NOT instrument with the request span, this is
|
||||
// independent work
|
||||
match self.handle_message_as_request(msg).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => self.fail_with(e),
|
||||
}
|
||||
} else {
|
||||
// Otherwise, check whether the handler is finished
|
||||
// processing messages and update the state.
|
||||
self.state = match self.state {
|
||||
State::AwaitingResponse {
|
||||
handler: Handler::Finished(response),
|
||||
tx,
|
||||
..
|
||||
} => {
|
||||
let _ = tx.send(response.map_err(Into::into));
|
||||
State::AwaitingRequest
|
||||
}
|
||||
pending @ State::AwaitingResponse { .. } => pending,
|
||||
_ => unreachable!(
|
||||
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
|
||||
self.client_rx
|
||||
),
|
||||
};
|
||||
}
|
||||
}
|
||||
Either::Right((Either::Left(_), _peer_fut)) => {
|
||||
trace!(parent: &span, "client request timed out");
|
||||
let e = PeerError::ClientRequestTimeout;
|
||||
self.state = match self.state {
|
||||
// Special case: ping timeouts fail the connection.
|
||||
State::AwaitingResponse {
|
||||
handler: Handler::Ping(_),
|
||||
..
|
||||
} => {
|
||||
self.fail_with(e);
|
||||
State::Failed
|
||||
}
|
||||
// Other request timeouts fail the request.
|
||||
State::AwaitingResponse { tx, .. } => {
|
||||
let _ = tx.send(Err(e.into()));
|
||||
State::AwaitingRequest
|
||||
}
|
||||
_ => unreachable!(
|
||||
"unexpected failed connection state while AwaitingResponse: client_receiver: {:?}",
|
||||
self.client_rx
|
||||
),
|
||||
};
|
||||
}
|
||||
Either::Right((Either::Right(_), _peer_fut)) => {
|
||||
trace!(parent: &span, "client request was cancelled");
|
||||
self.state = State::AwaitingRequest;
|
||||
}
|
||||
}
|
||||
}
|
||||
// We've failed, but we need to flush all pending client
|
||||
// requests before we can return and complete the future.
|
||||
State::Failed => {
|
||||
match self.client_rx.next().await {
|
||||
Some(InProgressClientRequest { tx, span, .. }) => {
|
||||
trace!(
|
||||
parent: &span,
|
||||
"sending an error response to a pending request on a failed connection"
|
||||
);
|
||||
let e = self
|
||||
.error_slot
|
||||
.try_get_error()
|
||||
.expect("cannot enter failed state without setting error slot");
|
||||
let _ = tx.send(Err(e));
|
||||
// Continue until we've errored all queued reqs
|
||||
continue;
|
||||
}
|
||||
None => return,
|
||||
trace!(
|
||||
parent: &span,
|
||||
"sending an error response to a pending request on a failed connection"
|
||||
);
|
||||
let _ = tx.send(Err(e.clone()));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks the peer as having failed with error `e`.
|
||||
fn fail_with<E>(&mut self, e: E)
|
||||
where
|
||||
E: Into<SharedPeerError>,
|
||||
{
|
||||
let e = e.into();
|
||||
debug!(%e,
|
||||
connection_state = ?self.state,
|
||||
client_receiver = ?self.client_rx,
|
||||
"failing peer service with error");
|
||||
// Update the shared error slot
|
||||
let mut guard = self
|
||||
.error_slot
|
||||
.0
|
||||
.lock()
|
||||
.expect("mutex should be unpoisoned");
|
||||
if let Some(original_error) = guard.clone() {
|
||||
// This panic typically happens due to these bugs:
|
||||
// * we mark a connection as failed without using fail_with
|
||||
// * we call fail_with without checking for a failed connection
|
||||
// state
|
||||
//
|
||||
// See the original bug #1510 and PR #1531, and the later bug #1599
|
||||
// and PR #1600.
|
||||
panic!(
|
||||
"calling fail_with on already-failed connection state: failed connections must stop processing pending requests and responses, then close the connection. state: {:?} original error: {:?} new error: {:?} client receiver: {:?}",
|
||||
self.state,
|
||||
original_error,
|
||||
e,
|
||||
self.client_rx
|
||||
);
|
||||
} else {
|
||||
*guard = Some(e);
|
||||
}
|
||||
// Drop the guard immediately to release the mutex.
|
||||
std::mem::drop(guard);
|
||||
|
||||
// We want to close the client channel and set State::Failed so
|
||||
// that we can flush any pending client requests. However, we may have
|
||||
// an outstanding client request in State::AwaitingResponse, so
|
||||
// we need to deal with it first if it exists.
|
||||
self.client_rx.close();
|
||||
let old_state = std::mem::replace(&mut self.state, State::Failed);
|
||||
if let State::AwaitingResponse { tx, .. } = old_state {
|
||||
// We know the slot has Some(e) because we just set it above,
|
||||
// and the error slot is never unset.
|
||||
let e = self.error_slot.try_get_error().unwrap();
|
||||
let _ = tx.send(Err(e));
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an incoming client request, possibly generating outgoing messages to the
|
||||
/// remote peer.
|
||||
///
|
||||
/// Correctness: This function MUST only be called while in the AwaitingRequest state
|
||||
///
|
||||
/// NOTE: the caller should use .instrument(msg.span) to instrument the function.
|
||||
async fn handle_client_request(&mut self, req: InProgressClientRequest) {
|
||||
async fn handle_client_request(&mut self, req: InProgressClientRequest) -> Transition {
|
||||
trace!(?req.request);
|
||||
use State::*;
|
||||
|
||||
if req.tx.is_canceled() {
|
||||
metrics::counter!("peer.canceled", 1);
|
||||
tracing::debug!("ignoring canceled request");
|
||||
return;
|
||||
}
|
||||
|
||||
let new_state_result = self._handle_client_request(req).await;
|
||||
|
||||
// Updates state or fails.
|
||||
match new_state_result {
|
||||
Ok(AwaitingRequest) => {
|
||||
self.state = AwaitingRequest;
|
||||
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
||||
}
|
||||
Ok(new_state @ AwaitingResponse { .. }) => {
|
||||
self.state = new_state;
|
||||
self.request_timer = Some(sleep(constants::REQUEST_TIMEOUT));
|
||||
}
|
||||
Err((e, tx)) => {
|
||||
let e = SharedPeerError::from(e);
|
||||
let _ = tx.send(Err(e.clone()));
|
||||
self.fail_with(e);
|
||||
}
|
||||
// unreachable states
|
||||
Ok(Failed) => unreachable!(
|
||||
"failed client requests must use fail_with(error) to reach a Failed state."
|
||||
),
|
||||
};
|
||||
}
|
||||
|
||||
async fn _handle_client_request(
|
||||
&mut self,
|
||||
req: InProgressClientRequest,
|
||||
) -> Result<
|
||||
State,
|
||||
(
|
||||
SerializationError,
|
||||
MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||
),
|
||||
> {
|
||||
use Request::*;
|
||||
use State::*;
|
||||
let InProgressClientRequest { request, tx, span } = req;
|
||||
|
||||
match (&self.state, request) {
|
||||
(Failed, request) => panic!(
|
||||
"failed connection cannot handle new request: {:?}, client_receiver: {:?}",
|
||||
request,
|
||||
self.client_rx
|
||||
),
|
||||
(pending @ AwaitingResponse { .. }, request) => panic!(
|
||||
"tried to process new request: {:?} while awaiting a response: {:?}, client_receiver: {:?}",
|
||||
request,
|
||||
pending,
|
||||
self.client_rx
|
||||
),
|
||||
(AwaitingRequest, Peers) => match self.peer_tx.send(Message::GetAddr).await {
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::Peers,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
match request {
|
||||
Peers => match self.peer_tx.send(Message::GetAddr).await {
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::Peers,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
},
|
||||
(AwaitingRequest, Ping(nonce)) => match self.peer_tx.send(Message::Ping(nonce)).await {
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::Ping(nonce),
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
Ping(nonce) => match self.peer_tx.send(Message::Ping(nonce)).await {
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::Ping(nonce),
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
},
|
||||
(AwaitingRequest, BlocksByHash(hashes)) => {
|
||||
BlocksByHash(hashes) => {
|
||||
match self
|
||||
.peer_tx
|
||||
.send(Message::GetData(
|
||||
|
@ -657,20 +569,18 @@ where
|
|||
))
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::BlocksByHash {
|
||||
blocks: Vec::with_capacity(hashes.len()),
|
||||
hashes,
|
||||
},
|
||||
tx,
|
||||
span,
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::BlocksByHash {
|
||||
blocks: Vec::with_capacity(hashes.len()),
|
||||
hashes,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, TransactionsByHash(hashes)) => {
|
||||
TransactionsByHash(hashes) => {
|
||||
match self
|
||||
.peer_tx
|
||||
.send(Message::GetData(
|
||||
|
@ -678,75 +588,65 @@ where
|
|||
))
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::TransactionsByHash {
|
||||
transactions: Vec::with_capacity(hashes.len()),
|
||||
hashes,
|
||||
},
|
||||
tx,
|
||||
span,
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::TransactionsByHash {
|
||||
transactions: Vec::with_capacity(hashes.len()),
|
||||
hashes,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, FindBlocks { known_blocks, stop }) => {
|
||||
FindBlocks { known_blocks, stop } => {
|
||||
match self
|
||||
.peer_tx
|
||||
.send(Message::GetBlocks { known_blocks, stop })
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::FindBlocks,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::FindBlocks,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, FindHeaders { known_blocks, stop }) => {
|
||||
FindHeaders { known_blocks, stop } => {
|
||||
match self
|
||||
.peer_tx
|
||||
.send(Message::GetHeaders { known_blocks, stop })
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::FindHeaders,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::FindHeaders,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, MempoolTransactions) => {
|
||||
match self.peer_tx.send(Message::Mempool).await {
|
||||
Ok(()) => Ok(
|
||||
AwaitingResponse {
|
||||
handler: Handler::MempoolTransactions,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
),
|
||||
Err(e) => Err((e, tx)),
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, PushTransaction(transaction)) => {
|
||||
MempoolTransactions => match self.peer_tx.send(Message::Mempool).await {
|
||||
Ok(()) => Transition::AwaitResponse {
|
||||
handler: Handler::MempoolTransactions,
|
||||
tx,
|
||||
span,
|
||||
},
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
},
|
||||
PushTransaction(transaction) => {
|
||||
match self.peer_tx.send(Message::Tx(transaction)).await {
|
||||
Ok(()) => {
|
||||
// Since we're not waiting for further messages, we need to
|
||||
// send a response before dropping tx.
|
||||
let _ = tx.send(Ok(Response::Nil));
|
||||
Ok(AwaitingRequest)
|
||||
},
|
||||
Err(e) => Err((e, tx)),
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, AdvertiseTransactions(hashes)) => {
|
||||
AdvertiseTransactions(hashes) => {
|
||||
match self
|
||||
.peer_tx
|
||||
.send(Message::Inv(hashes.iter().map(|h| (*h).into()).collect()))
|
||||
|
@ -756,20 +656,20 @@ where
|
|||
// Since we're not waiting for further messages, we need to
|
||||
// send a response before dropping tx.
|
||||
let _ = tx.send(Ok(Response::Nil));
|
||||
Ok(AwaitingRequest)
|
||||
},
|
||||
Err(e) => Err((e, tx)),
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
(AwaitingRequest, AdvertiseBlock(hash)) => {
|
||||
AdvertiseBlock(hash) => {
|
||||
match self.peer_tx.send(Message::Inv(vec![hash.into()])).await {
|
||||
Ok(()) => {
|
||||
// Since we're not waiting for further messages, we need to
|
||||
// send a response before dropping tx.
|
||||
let _ = tx.send(Ok(Response::Nil));
|
||||
Ok(AwaitingRequest)
|
||||
},
|
||||
Err(e) => Err((e, tx)),
|
||||
Transition::AwaitRequest
|
||||
}
|
||||
Err(e) => Transition::ExitResponse { e: e.into(), tx },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -902,7 +802,7 @@ where
|
|||
};
|
||||
|
||||
match rsp {
|
||||
Response::Nil => { /* generic success, do nothing */ },
|
||||
Response::Nil => { /* generic success, do nothing */ }
|
||||
Response::Peers(addrs) => self.peer_tx.send(Message::Addr(addrs)).await?,
|
||||
Response::Transactions(transactions) => {
|
||||
// Generate one tx message per transaction.
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::Arc;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
|
@ -28,6 +28,9 @@ pub enum PeerError {
|
|||
/// The remote peer closed the connection.
|
||||
#[error("Peer closed connection")]
|
||||
ConnectionClosed,
|
||||
/// The local client closed the connection.
|
||||
#[error("Internal client dropped connection")]
|
||||
ConnectionDropped,
|
||||
/// The remote peer did not respond to a [`peer::Client`] request in time.
|
||||
#[error("Client request timed out")]
|
||||
ClientRequestTimeout,
|
||||
|
@ -64,19 +67,6 @@ pub enum PeerError {
|
|||
NotFound(Vec<InventoryHash>),
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub(super) struct ErrorSlot(pub(super) Arc<Mutex<Option<SharedPeerError>>>);
|
||||
|
||||
impl ErrorSlot {
|
||||
pub fn try_get_error(&self) -> Option<SharedPeerError> {
|
||||
self.0
|
||||
.lock()
|
||||
.expect("error mutex should be unpoisoned")
|
||||
.as_ref()
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
/// An error during a handshake with a remote peer.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HandshakeError {
|
||||
|
|
|
@ -30,7 +30,7 @@ use crate::{
|
|||
BoxError, Config, PeerAddrState,
|
||||
};
|
||||
|
||||
use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};
|
||||
use super::{Client, Connection, HandshakeError, PeerError};
|
||||
|
||||
/// A [`Service`] that handshakes with a remote peer and constructs a
|
||||
/// client/server pair.
|
||||
|
@ -349,12 +349,10 @@ where
|
|||
// in this block, see constants.rs for more.
|
||||
let (server_tx, server_rx) = mpsc::channel(0);
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||
let slot = ErrorSlot::default();
|
||||
|
||||
let client = Client {
|
||||
shutdown_tx: Some(shutdown_tx),
|
||||
server_tx: server_tx.clone(),
|
||||
error_slot: slot.clone(),
|
||||
};
|
||||
|
||||
let (peer_tx, peer_rx) = stream.split();
|
||||
|
@ -434,10 +432,9 @@ where
|
|||
|
||||
use super::connection;
|
||||
let server = Connection {
|
||||
state: connection::State::AwaitingRequest,
|
||||
state: Some(connection::State::AwaitingRequest),
|
||||
svc: inbound_service,
|
||||
client_rx: server_rx.into(),
|
||||
error_slot: slot,
|
||||
peer_tx,
|
||||
request_timer: None,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue