Add a PeerConnector wrapper around PeerHandshake

This commit is contained in:
Henry de Valence 2019-10-22 12:44:08 -07:00
parent 9e2678d76c
commit ed2ee9d42f
7 changed files with 126 additions and 74 deletions

View File

@ -57,7 +57,7 @@ mod timestamp_collector;
pub use crate::{
address_book::AddressBook,
config::Config,
peer_set::{init, BoxedZebraService},
peer_set::init,
protocol::internal::{Request, Response},
};

View File

@ -2,6 +2,8 @@
/// Handles outbound requests from our node to the network.
mod client;
/// Wrapper around handshake logic that also opens a TCP connection.
mod connector;
/// Peer-related errors.
mod error;
/// Performs peer handshakes.
@ -10,6 +12,7 @@ mod handshake;
mod server;
pub use client::PeerClient;
pub use connector::PeerConnector;
pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::PeerHandshake;
pub use server::PeerServer;

View File

@ -0,0 +1,59 @@
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use tokio::{net::TcpStream, prelude::*};
use tower::{discover::Change, Service, ServiceExt};
use crate::{BoxedStdError, Request, Response};
use super::{HandshakeError, PeerClient, PeerHandshake};
/// A wrapper around [`PeerHandshake`] that opens a TCP connection before
/// forwarding to the inner handshake service. Writing this as its own
/// [`tower::Service`] lets us apply unified timeout policies, etc.
pub struct PeerConnector<S> {
handshaker: PeerHandshake<S>,
}
impl<S: Clone> Clone for PeerConnector<S> {
fn clone(&self) -> Self {
Self {
handshaker: self.handshaker.clone(),
}
}
}
impl<S> PeerConnector<S> {
pub fn new(handshaker: PeerHandshake<S>) -> Self {
Self { handshaker }
}
}
impl<S> Service<SocketAddr> for PeerConnector<S>
where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send,
{
type Response = Change<SocketAddr, PeerClient>;
type Error = HandshakeError;
type Future =
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, addr: SocketAddr) -> Self::Future {
let mut hs = self.handshaker.clone();
async move {
let stream = TcpStream::connect(addr).await?;
hs.ready().await?;
let client = hs.call((stream, addr)).await?;
Ok(Change::Insert(addr, client))
}
.boxed()
}
}

View File

@ -67,6 +67,9 @@ pub enum HandshakeError {
/// The remote peer closed the connection.
#[error("Peer closed connection")]
ConnectionClosed,
/// An error occurred while performing an IO operation.
#[error("Underlying IO error")]
Io(#[from] std::io::Error),
/// A serialization error occurred while reading or writing a message.
#[error("Serialization error")]
Serialization(#[from] SerializationError),

View File

@ -4,13 +4,12 @@ use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
use chrono::Utc;
use futures::channel::mpsc;
use tokio::{codec::Framed, net::TcpStream, prelude::*, timer::Interval};
use tower::{Service, ServiceExt};
use tower::Service;
use tracing::{span, Level};
use tracing_futures::Instrument;
@ -34,6 +33,17 @@ pub struct PeerHandshake<S> {
nonces: Arc<Mutex<HashSet<Nonce>>>,
}
impl<S: Clone> Clone for PeerHandshake<S> {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
internal_service: self.internal_service.clone(),
timestamp_collector: self.timestamp_collector.clone(),
nonces: self.nonces.clone(),
}
}
}
impl<S> PeerHandshake<S>
where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,

View File

@ -3,6 +3,8 @@
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).
// XXX these imports should go in a peer_set::initialize submodule
use std::{
net::SocketAddr,
pin::Pin,
@ -20,7 +22,7 @@ use tokio::net::{TcpListener, TcpStream};
use tower::{
buffer::Buffer,
discover::{Change, ServiceStream},
timeout::Timeout,
layer::Layer,
Service, ServiceExt,
};
use tower_load::{peak_ewma::PeakEwmaDiscover, NoInstrument};
@ -28,7 +30,7 @@ use tracing::Level;
use tracing_futures::Instrument;
use crate::{
peer::{HandshakeError, PeerClient, PeerHandshake},
peer::{HandshakeError, PeerClient, PeerConnector, PeerHandshake},
timestamp_collector::TimestampCollector,
AddressBook, BoxedStdError, Config, Request, Response,
};
@ -39,19 +41,7 @@ mod set;
mod unready_service;
use candidate_set::CandidateSet;
pub use discover::PeerDiscover;
pub use set::PeerSet;
/// A type alias for a boxed [`tower::Service`] used to process [`Request`]s into [`Response`]s.
pub type BoxedZebraService = Box<
dyn Service<
Request,
Response = Response,
Error = BoxedStdError,
Future = Pin<Box<dyn Future<Output = Result<Response, BoxedStdError>> + Send>>,
> + Send
+ 'static,
>;
use set::PeerSet;
type PeerChange = Result<Change<SocketAddr, PeerClient>, BoxedStdError>;
@ -75,13 +65,20 @@ where
S::Future: Send + 'static,
{
let (address_book, timestamp_collector) = TimestampCollector::spawn();
let handshaker = Buffer::new(
Timeout::new(
PeerHandshake::new(config.clone(), inbound_service, timestamp_collector),
config.handshake_timeout,
),
1,
);
// Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
// enforce timeouts as specified in the Config.
let (listener, connector) = {
use tower::timeout::TimeoutLayer;
let hs_timeout = TimeoutLayer::new(config.handshake_timeout);
let hs = PeerHandshake::new(config.clone(), inbound_service, timestamp_collector);
(
hs_timeout.layer(hs.clone()),
hs_timeout.layer(PeerConnector::new(hs)),
)
};
// Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
@ -111,13 +108,13 @@ where
// 1. Initial peers, specified in the config.
tokio::spawn(add_initial_peers(
config.initial_peers.clone(),
handshaker.clone(),
connector.clone(),
peerset_tx.clone(),
));
// 2. Incoming peer connections, via a listener.
tokio::spawn(
listen(config.listen_addr, handshaker.clone(), peerset_tx.clone()).map(|result| {
listen(config.listen_addr, listener, peerset_tx.clone()).map(|result| {
if let Err(e) = result {
error!(%e);
}
@ -140,7 +137,7 @@ where
config.new_peer_interval,
demand_rx,
candidates,
handshaker,
connector,
peerset_tx,
)
.map(|result| {
@ -155,28 +152,20 @@ where
/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`.
#[instrument(skip(initial_peers, tx, handshaker))]
#[instrument(skip(initial_peers, connector, tx))]
async fn add_initial_peers<S>(
initial_peers: Vec<SocketAddr>,
handshaker: S,
connector: S,
mut tx: mpsc::Sender<PeerChange>,
) where
S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone,
S: Service<SocketAddr, Response = Change<SocketAddr, PeerClient>, Error = BoxedStdError>
+ Clone,
S::Future: Send + 'static,
{
info!(?initial_peers, "Connecting to initial peer set");
let mut handshakes = initial_peers
.into_iter()
.map(|addr| {
let mut hs = handshaker.clone();
async move {
let stream = TcpStream::connect(addr).await?;
hs.ready().await?;
let client = hs.call((stream, addr)).await?;
Ok::<_, BoxedStdError>(Change::Insert(addr, client))
}
})
.collect::<FuturesUnordered<_>>();
use tower::util::CallAllUnordered;
let addr_stream = futures::stream::iter(initial_peers.into_iter());
let mut handshakes = CallAllUnordered::new(connector, addr_stream);
while let Some(handshake_result) = handshakes.next().await {
let _ = tx.send(handshake_result).await;
}
@ -215,43 +204,22 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `PeerClient` through a channel.
///
#[instrument(skip(new_peer_interval, demand_signal, candidates, handshaker, success_tx))]
#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))]
async fn crawl_and_dial<C, S>(
new_peer_interval: Duration,
demand_signal: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
handshaker: C,
mut connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxedStdError>
where
C: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone,
C: Service<SocketAddr, Response = Change<SocketAddr, PeerClient>, Error = BoxedStdError>
+ Clone,
C::Future: Send + 'static,
S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Future: Send + 'static,
{
// XXX this kind of boilerplate didn't exist before we made PeerConnector
// take (TcpStream, SocketAddr), which made it so that we could share code
// between inbound and outbound handshakes. Probably the cleanest way to
// make it go away again is to rename "Connector" to "Handshake" (since it
// is really responsible just for the handshake) and to have a "Connector"
// Service wrapper around "Handshake" that opens a TCP stream.
// We could also probably make the Handshake service `Clone` directly,
// which might be more efficient than using a Buffer wrapper.
use crate::types::MetaAddr;
use futures::TryFutureExt;
let try_connect = |candidate: MetaAddr| {
let mut hs = handshaker.clone();
async move {
let stream = TcpStream::connect(candidate.addr).await?;
hs.ready().await?;
hs.call((stream, candidate.addr))
.await
.map(|client| Change::Insert(candidate.addr, client))
}
// Use map_err to tag failed connections with the MetaAddr,
// so they can be reported to the CandidateSet.
.map_err(move |_| candidate)
};
// On creation, we are likely to have very few peers, so try to get more
// connections quickly by concurrently connecting to a large number of
@ -259,7 +227,14 @@ where
let mut handshakes = FuturesUnordered::new();
for _ in 0..50usize {
if let Some(candidate) = candidates.next() {
handshakes.push(try_connect(candidate))
connector.ready().await?;
handshakes.push(
connector
.call(candidate.addr)
// Use map_err to tag failed connections with the MetaAddr,
// so they can be reported to the CandidateSet.
.map_err(move |_| candidate),
)
}
}
while let Some(handshake) = handshakes.next().await {
@ -292,7 +267,12 @@ where
}
};
match try_connect(candidate).await {
connector.ready().await?;
match connector
.call(candidate.addr)
.map_err(move |_| candidate)
.await
{
Ok(change) => {
debug!("Successfully dialed new peer, sending to peerset");
success_tx.send(Ok(change)).await?;

View File

@ -25,10 +25,7 @@ use crate::{
BoxedStdError,
};
use super::{
unready_service::{Error as UnreadyError, UnreadyService},
PeerDiscover,
};
use super::unready_service::{Error as UnreadyError, UnreadyService};
/// A [`tower::Service`] that abstractly represents "the rest of the network".
///