Revert "update comments throughout connection.rs"
This reverts commit 651d352ce1
.
This commit is contained in:
parent
fc44a97925
commit
292a4391e2
|
@ -2,6 +2,10 @@
|
||||||
//!
|
//!
|
||||||
//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
|
//! Maps the external Zcash/Bitcoin protocol to Zebra's internal request/response
|
||||||
//! protocol.
|
//! protocol.
|
||||||
|
//!
|
||||||
|
//! This module contains a lot of undocumented state, assumptions and invariants.
|
||||||
|
//! And it's unclear if these assumptions match the `zcashd` implementation.
|
||||||
|
//! It should be refactored into a cleaner set of request/response pairs (#1515).
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
|
@ -39,13 +43,10 @@ use super::{
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Internal state machine for [`State::AwaitingResponse`] used to coordinate
|
|
||||||
/// receiving expected responses.
|
|
||||||
pub(super) enum Handler {
|
pub(super) enum Handler {
|
||||||
/// Indicates that the handler has finished processing the request.
|
/// Indicates that the handler has finished processing the request.
|
||||||
/// An error here is scoped to the request.
|
/// An error here is scoped to the request.
|
||||||
Finished(Result<Response, PeerError>),
|
Finished(Result<Response, PeerError>),
|
||||||
// Expected response states
|
|
||||||
Ping(Nonce),
|
Ping(Nonce),
|
||||||
Peers,
|
Peers,
|
||||||
FindBlocks,
|
FindBlocks,
|
||||||
|
@ -311,32 +312,18 @@ impl Handler {
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
|
#[must_use = "AwaitingResponse.tx.send() must be called before drop"]
|
||||||
/// The current state of the [`Connection`], consumed to execute the next step of
|
|
||||||
/// the state machine.
|
|
||||||
pub(super) enum State {
|
pub(super) enum State {
|
||||||
/// Awaiting a client request or a peer message.
|
/// Awaiting a client request or a peer message.
|
||||||
AwaitingRequest,
|
AwaitingRequest,
|
||||||
/// Awaiting a peer message we can interpret as a client request.
|
/// Awaiting a peer message we can interpret as a client request.
|
||||||
AwaitingResponse {
|
AwaitingResponse {
|
||||||
/// Inner state machine for handling external responses.
|
|
||||||
handler: Handler,
|
handler: Handler,
|
||||||
/// Channel used to propagate responses back to the [`Client`] in our
|
|
||||||
/// internal Response format.
|
|
||||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||||
span: tracing::Span,
|
span: tracing::Span,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
/// Execute one step of the [`Connection`] state machine event loop. This
|
|
||||||
/// function represents the core logic of [`Connection::run`] method and
|
|
||||||
/// isolates consuming the previous state and producing the next state into a
|
|
||||||
/// single function.
|
|
||||||
///
|
|
||||||
/// This function's primary purpose is to provide compile time guarantees
|
|
||||||
/// that iterations of the run loop never leave the Connection with an
|
|
||||||
/// invalid `state`, by forcing all code paths to produce state transition in
|
|
||||||
/// order to exit the function.
|
|
||||||
async fn step<Rx, S, Tx>(self, conn: &mut Connection<S, Tx>, peer_rx: &mut Rx) -> Transition
|
async fn step<Rx, S, Tx>(self, conn: &mut Connection<S, Tx>, peer_rx: &mut Rx) -> Transition
|
||||||
where
|
where
|
||||||
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
|
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
|
||||||
|
@ -344,29 +331,6 @@ impl State {
|
||||||
S::Error: Into<BoxError>,
|
S::Error: Into<BoxError>,
|
||||||
Tx: Sink<Message, Error = SerializationError> + Unpin,
|
Tx: Sink<Message, Error = SerializationError> + Unpin,
|
||||||
{
|
{
|
||||||
// At a high level, the event loop we want is as follows: we check for
|
|
||||||
// any incoming messages from the remote peer, check if they should be
|
|
||||||
// interpreted as a response to a pending client request
|
|
||||||
// (Handler::process_request), and if not, interpret them as a request
|
|
||||||
// from the remote peer to our node
|
|
||||||
// (Connection::handle_message_as_request/drive_peer_request).
|
|
||||||
//
|
|
||||||
// We also need to handle those client requests in the first place
|
|
||||||
// (Connection::handle_client_request). The client requests are received
|
|
||||||
// from the corresponding `peer::Client` over a bounded channel (with
|
|
||||||
// bound 1, to minimize buffering), but there is no relationship between
|
|
||||||
// the stream of client requests and the stream of peer messages, so we
|
|
||||||
// cannot ignore one kind while waiting on the other. Moreover, we
|
|
||||||
// cannot accept a second client request while the first one is still
|
|
||||||
// pending.
|
|
||||||
//
|
|
||||||
// To do this, we inspect the current request state.
|
|
||||||
//
|
|
||||||
// If there is no pending request, we wait on either an incoming peer message or
|
|
||||||
// an incoming request, whichever comes first.
|
|
||||||
//
|
|
||||||
// If there is a pending request, we wait only on an incoming peer message, and
|
|
||||||
// check whether it can be interpreted as a response to the pending request.
|
|
||||||
match self {
|
match self {
|
||||||
State::AwaitingRequest => {
|
State::AwaitingRequest => {
|
||||||
trace!("awaiting client request or peer message");
|
trace!("awaiting client request or peer message");
|
||||||
|
@ -473,9 +437,7 @@ impl State {
|
||||||
/// Enum describing the next state transition that should be taken after any
|
/// Enum describing the next state transition that should be taken after any
|
||||||
/// given `step`.
|
/// given `step`.
|
||||||
enum Transition {
|
enum Transition {
|
||||||
/// Connection should start waiting for new requests.
|
|
||||||
AwaitRequest,
|
AwaitRequest,
|
||||||
/// Connection should wait for a response to a previous request.
|
|
||||||
AwaitResponse {
|
AwaitResponse {
|
||||||
handler: Handler,
|
handler: Handler,
|
||||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||||
|
@ -484,16 +446,15 @@ enum Transition {
|
||||||
/// Closing because the client was closed or dropped, and there are
|
/// Closing because the client was closed or dropped, and there are
|
||||||
/// no more client requests.
|
/// no more client requests.
|
||||||
ClientClose,
|
ClientClose,
|
||||||
/// Closing while awaiting further client requests.
|
/// Closing while awaiting further client requests
|
||||||
Close(SharedPeerError),
|
Close(SharedPeerError),
|
||||||
/// Closing while processing a peer response to a client request.
|
/// Closing while processing a peer response to a client request
|
||||||
CloseResponse {
|
CloseResponse {
|
||||||
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
tx: MustUseOneshotSender<Result<Response, SharedPeerError>>,
|
||||||
e: SharedPeerError,
|
e: SharedPeerError,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Construct the appropriate `State` from a given `Transition` if possible.
|
|
||||||
impl TryFrom<Transition> for State {
|
impl TryFrom<Transition> for State {
|
||||||
type Error = Option<SharedPeerError>;
|
type Error = Option<SharedPeerError>;
|
||||||
|
|
||||||
|
@ -519,8 +480,6 @@ pub struct Connection<S, Tx> {
|
||||||
/// A timeout for a client request. This is stored separately from
|
/// A timeout for a client request. This is stored separately from
|
||||||
/// State so that we can move the future out of it independently of
|
/// State so that we can move the future out of it independently of
|
||||||
/// other state handling.
|
/// other state handling.
|
||||||
// I don't think this is necessary, and will try moving it into `State` in
|
|
||||||
// the next commit TODO(jane)
|
|
||||||
pub(super) request_timer: Option<Sleep>,
|
pub(super) request_timer: Option<Sleep>,
|
||||||
pub(super) svc: S,
|
pub(super) svc: S,
|
||||||
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
|
/// A `mpsc::Receiver<ClientRequest>` that converts its results to
|
||||||
|
@ -541,6 +500,25 @@ where
|
||||||
where
|
where
|
||||||
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
|
Rx: Stream<Item = Result<Message, SerializationError>> + Unpin,
|
||||||
{
|
{
|
||||||
|
// At a high level, the event loop we want is as follows: we check for any
|
||||||
|
// incoming messages from the remote peer, check if they should be interpreted
|
||||||
|
// as a response to a pending client request, and if not, interpret them as a
|
||||||
|
// request from the remote peer to our node.
|
||||||
|
//
|
||||||
|
// We also need to handle those client requests in the first place. The client
|
||||||
|
// requests are received from the corresponding `peer::Client` over a bounded
|
||||||
|
// channel (with bound 1, to minimize buffering), but there is no relationship
|
||||||
|
// between the stream of client requests and the stream of peer messages, so we
|
||||||
|
// cannot ignore one kind while waiting on the other. Moreover, we cannot accept
|
||||||
|
// a second client request while the first one is still pending.
|
||||||
|
//
|
||||||
|
// To do this, we inspect the current request state.
|
||||||
|
//
|
||||||
|
// If there is no pending request, we wait on either an incoming peer message or
|
||||||
|
// an incoming request, whichever comes first.
|
||||||
|
//
|
||||||
|
// If there is a pending request, we wait only on an incoming peer message, and
|
||||||
|
// check whether it can be interpreted as a response to the pending request.
|
||||||
loop {
|
loop {
|
||||||
let transition = self
|
let transition = self
|
||||||
.state
|
.state
|
||||||
|
|
Loading…
Reference in New Issue