Rename PeerConnector to PeerHandshake.

It's only responsible for doing the handshakes, so it should be named that way,
and then we can have a Connector responsible for actually opening the TCP
connection.
This commit is contained in:
Henry de Valence 2019-10-22 11:30:24 -07:00
parent 121cea610b
commit 9e2678d76c
4 changed files with 32 additions and 42 deletions

View File

@ -68,5 +68,5 @@ pub mod types {
/// This will be removed when we finish encapsulation /// This will be removed when we finish encapsulation
pub mod should_be_private { pub mod should_be_private {
pub use crate::{peer::PeerConnector, timestamp_collector::TimestampCollector}; pub use crate::{peer::PeerHandshake, timestamp_collector::TimestampCollector};
} }

View File

@ -2,14 +2,14 @@
/// Handles outbound requests from our node to the network. /// Handles outbound requests from our node to the network.
mod client; mod client;
/// Asynchronously connects to peers.
mod connector;
/// Peer-related errors. /// Peer-related errors.
mod error; mod error;
/// Performs peer handshakes.
mod handshake;
/// Handles inbound requests from the network to our node. /// Handles inbound requests from the network to our node.
mod server; mod server;
pub use client::PeerClient; pub use client::PeerClient;
pub use connector::PeerConnector;
pub use error::{HandshakeError, PeerError, SharedPeerError}; pub use error::{HandshakeError, PeerError, SharedPeerError};
pub use handshake::PeerHandshake;
pub use server::PeerServer; pub use server::PeerServer;

View File

@ -25,15 +25,16 @@ use crate::{
use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer}; use super::{error::ErrorSlot, server::ServerState, HandshakeError, PeerClient, PeerServer};
/// A [`Service`] that connects to a remote peer and constructs a client/server pair. /// A [`Service`] that handshakes with a remote peer and constructs a
pub struct PeerConnector<S> { /// client/server pair.
pub struct PeerHandshake<S> {
config: Config, config: Config,
internal_service: S, internal_service: S,
timestamp_collector: mpsc::Sender<MetaAddr>, timestamp_collector: mpsc::Sender<MetaAddr>,
nonces: Arc<Mutex<HashSet<Nonce>>>, nonces: Arc<Mutex<HashSet<Nonce>>>,
} }
impl<S> PeerConnector<S> impl<S> PeerHandshake<S>
where where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,
@ -49,7 +50,7 @@ where
// Builder2, ..., with Builder1::with_config() -> Builder2; // Builder2, ..., with Builder1::with_config() -> Builder2;
// Builder2::with_internal_service() -> ... or use Options in a single // Builder2::with_internal_service() -> ... or use Options in a single
// Builder type or use the derive_builder crate. // Builder type or use the derive_builder crate.
PeerConnector { PeerHandshake {
config, config,
internal_service, internal_service,
timestamp_collector, timestamp_collector,
@ -58,7 +59,7 @@ where
} }
} }
impl<S> Service<(TcpStream, SocketAddr)> for PeerConnector<S> impl<S> Service<(TcpStream, SocketAddr)> for PeerHandshake<S>
where where
S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static, S: Service<Request, Response = Response, Error = BoxedStdError> + Clone + Send + 'static,
S::Future: Send, S::Future: Send,

View File

@ -28,7 +28,7 @@ use tracing::Level;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use crate::{ use crate::{
peer::{HandshakeError, PeerClient, PeerConnector}, peer::{HandshakeError, PeerClient, PeerHandshake},
timestamp_collector::TimestampCollector, timestamp_collector::TimestampCollector,
AddressBook, BoxedStdError, Config, Request, Response, AddressBook, BoxedStdError, Config, Request, Response,
}; };
@ -75,9 +75,9 @@ where
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
let (address_book, timestamp_collector) = TimestampCollector::spawn(); let (address_book, timestamp_collector) = TimestampCollector::spawn();
let peer_connector = Buffer::new( let handshaker = Buffer::new(
Timeout::new( Timeout::new(
PeerConnector::new(config.clone(), inbound_service, timestamp_collector), PeerHandshake::new(config.clone(), inbound_service, timestamp_collector),
config.handshake_timeout, config.handshake_timeout,
), ),
1, 1,
@ -111,18 +111,13 @@ where
// 1. Initial peers, specified in the config. // 1. Initial peers, specified in the config.
tokio::spawn(add_initial_peers( tokio::spawn(add_initial_peers(
config.initial_peers.clone(), config.initial_peers.clone(),
peer_connector.clone(), handshaker.clone(),
peerset_tx.clone(), peerset_tx.clone(),
)); ));
// 2. Incoming peer connections, via a listener. // 2. Incoming peer connections, via a listener.
tokio::spawn( tokio::spawn(
listen( listen(config.listen_addr, handshaker.clone(), peerset_tx.clone()).map(|result| {
config.listen_addr,
peer_connector.clone(),
peerset_tx.clone(),
)
.map(|result| {
if let Err(e) = result { if let Err(e) = result {
error!(%e); error!(%e);
} }
@ -145,7 +140,7 @@ where
config.new_peer_interval, config.new_peer_interval,
demand_rx, demand_rx,
candidates, candidates,
peer_connector, handshaker,
peerset_tx, peerset_tx,
) )
.map(|result| { .map(|result| {
@ -158,12 +153,12 @@ where
(peer_set, address_book) (peer_set, address_book)
} }
/// Use the provided `peer_connector` to connect to `initial_peers`, then send /// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`. /// the results over `tx`.
#[instrument(skip(initial_peers, tx, peer_connector))] #[instrument(skip(initial_peers, tx, handshaker))]
async fn add_initial_peers<S>( async fn add_initial_peers<S>(
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_connector: S, handshaker: S,
mut tx: mpsc::Sender<PeerChange>, mut tx: mpsc::Sender<PeerChange>,
) where ) where
S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone, S: Service<(TcpStream, SocketAddr), Response = PeerClient, Error = BoxedStdError> + Clone,
@ -173,11 +168,11 @@ async fn add_initial_peers<S>(
let mut handshakes = initial_peers let mut handshakes = initial_peers
.into_iter() .into_iter()
.map(|addr| { .map(|addr| {
let mut pc = peer_connector.clone(); let mut hs = handshaker.clone();
async move { async move {
let stream = TcpStream::connect(addr).await?; let stream = TcpStream::connect(addr).await?;
pc.ready().await?; hs.ready().await?;
let client = pc.call((stream, addr)).await?; let client = hs.call((stream, addr)).await?;
Ok::<_, BoxedStdError>(Change::Insert(addr, client)) Ok::<_, BoxedStdError>(Change::Insert(addr, client))
} }
}) })
@ -187,12 +182,12 @@ async fn add_initial_peers<S>(
} }
} }
/// Bind to `addr`, listen for peers using `peer_connector`, then send the /// Bind to `addr`, listen for peers using `handshaker`, then send the
/// results over `tx`. /// results over `tx`.
#[instrument(skip(tx, peer_connector))] #[instrument(skip(tx, handshaker))]
async fn listen<S>( async fn listen<S>(
addr: SocketAddr, addr: SocketAddr,
mut peer_connector: S, mut handshaker: S,
tx: mpsc::Sender<PeerChange>, tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxedStdError> ) -> Result<(), BoxedStdError>
where where
@ -203,9 +198,9 @@ where
loop { loop {
if let Ok((tcp_stream, addr)) = listener.accept().await { if let Ok((tcp_stream, addr)) = listener.accept().await {
debug!(?addr, "got incoming connection"); debug!(?addr, "got incoming connection");
peer_connector.ready().await?; handshaker.ready().await?;
// Construct a handshake future but do not drive it yet.... // Construct a handshake future but do not drive it yet....
let handshake = peer_connector.call((tcp_stream, addr)); let handshake = handshaker.call((tcp_stream, addr));
// ... instead, spawn a new task to handle this connection // ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone(); let mut tx2 = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -220,18 +215,12 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer /// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `PeerClient` through a channel. /// and send the resulting `PeerClient` through a channel.
/// ///
#[instrument(skip( #[instrument(skip(new_peer_interval, demand_signal, candidates, handshaker, success_tx))]
new_peer_interval,
demand_signal,
candidates,
peer_connector,
success_tx
))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
new_peer_interval: Duration, new_peer_interval: Duration,
demand_signal: mpsc::Receiver<()>, demand_signal: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
peer_connector: C, handshaker: C,
mut success_tx: mpsc::Sender<PeerChange>, mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxedStdError> ) -> Result<(), BoxedStdError>
where where
@ -251,11 +240,11 @@ where
use crate::types::MetaAddr; use crate::types::MetaAddr;
use futures::TryFutureExt; use futures::TryFutureExt;
let try_connect = |candidate: MetaAddr| { let try_connect = |candidate: MetaAddr| {
let mut pc = peer_connector.clone(); let mut hs = handshaker.clone();
async move { async move {
let stream = TcpStream::connect(candidate.addr).await?; let stream = TcpStream::connect(candidate.addr).await?;
pc.ready().await?; hs.ready().await?;
pc.call((stream, candidate.addr)) hs.call((stream, candidate.addr))
.await .await
.map(|client| Change::Insert(candidate.addr, client)) .map(|client| Change::Insert(candidate.addr, client))
} }