Connect to multiple peers concurrently.

The previous outbound peer connection logic got requests to connect to new
peers and processed them one at a time, making single connection attempts
and retrying if the connection attempt failed.  This was quite slow, because
many connections fail, and we have to wait for timeouts.  Instead, this logic
connects to new peers concurrently (up to 50 at a time).
This commit is contained in:
Henry de Valence 2020-02-09 20:34:53 -08:00 committed by Deirdre Connolly
parent 7ba007f23d
commit 8000f888fd
3 changed files with 73 additions and 65 deletions

View File

@ -43,6 +43,9 @@ pub struct Config {
/// How frequently we attempt to connect to a new peer. /// How frequently we attempt to connect to a new peer.
pub new_peer_interval: Duration, pub new_peer_interval: Duration,
/// The initial target size for the peer set.
pub peerset_initial_target_size: usize,
} }
impl Config { impl Config {
@ -92,6 +95,7 @@ impl Default for Config {
peerset_request_buffer_size: 1, peerset_request_buffer_size: 1,
handshake_timeout: Duration::from_secs(4), handshake_timeout: Duration::from_secs(4),
new_peer_interval: Duration::from_secs(120), new_peer_interval: Duration::from_secs(120),
peerset_initial_target_size: 50,
} }
} }
} }

View File

@ -105,7 +105,7 @@ where
let network = self.config.network; let network = self.config.network;
let fut = async move { let fut = async move {
info!("connecting to remote peer"); debug!("connecting to remote peer");
let mut stream = let mut stream =
Framed::new(tcp_stream, Codec::builder().for_network(network).finish()); Framed::new(tcp_stream, Codec::builder().for_network(network).finish());

View File

@ -6,7 +6,6 @@
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration,
}; };
use futures::{ use futures::{
@ -72,7 +71,7 @@ where
// Create an mpsc channel for peer changes, with a generous buffer. // Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100); let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling. // Create an mpsc channel for peerset demand signaling.
let (demand_tx, demand_rx) = mpsc::channel::<()>(100); let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
// Connect the rx end to a PeerSet, wrapping new peers in load instruments. // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = Buffer::new( let peer_set = Buffer::new(
@ -87,7 +86,7 @@ where
config.ewma_decay_time, config.ewma_decay_time,
NoInstrument, NoInstrument,
), ),
demand_tx, demand_tx.clone(),
), ),
config.peerset_request_buffer_size, config.peerset_request_buffer_size,
); );
@ -121,9 +120,15 @@ where
let _ = candidates.update().await; let _ = candidates.update().await;
info!("Sending initial request for peers"); info!("Sending initial request for peers");
for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(());
}
tokio::spawn( tokio::spawn(
crawl_and_dial( crawl_and_dial(
config.new_peer_interval, config.new_peer_interval,
demand_tx,
demand_rx, demand_rx,
candidates, candidates,
connector, connector,
@ -192,11 +197,11 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer /// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `peer::Client` through a channel. /// and send the resulting `peer::Client` through a channel.
/// #[instrument(skip(new_peer_interval, demand_tx, demand_rx, candidates, connector, success_tx))]
#[instrument(skip(new_peer_interval, demand_signal, candidates, connector, success_tx))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
new_peer_interval: Duration, new_peer_interval: std::time::Duration,
demand_signal: mpsc::Receiver<()>, mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
mut connector: C, mut connector: C,
mut success_tx: mpsc::Sender<PeerChange>, mut success_tx: mpsc::Sender<PeerChange>,
@ -208,69 +213,68 @@ where
S: Service<Request, Response = Response, Error = BoxedStdError>, S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Future: Send + 'static, S::Future: Send + 'static,
{ {
use futures::TryFutureExt; use futures::{future::{select, Either::{Left, Right}}, TryFutureExt};
// On creation, we are likely to have very few peers, so try to get more
// connections quickly by concurrently connecting to a large number of
// candidates.
let mut handshakes = FuturesUnordered::new(); let mut handshakes = FuturesUnordered::new();
for _ in 0..50usize { // <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream
// never terminates.
handshakes.push(future::pending().boxed());
let mut crawl_timer = tokio::time::interval(new_peer_interval);
loop {
// This is a little awkward because there's no select3.
match select(
select(demand_rx.next(), crawl_timer.next()),
handshakes.next(),
)
.await
{
Left((Left((Some(_demand), _)), _)) => {
if handshakes.len() > 50 {
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
if let Some(candidate) = candidates.next() { if let Some(candidate) = candidates.next() {
debug!(?candidate.addr, "attempting outbound connection in response to demand");
connector.ready().await?; connector.ready().await?;
handshakes.push( handshakes.push(
connector connector
.call(candidate.addr)
// Use map_err to tag failed connections with the MetaAddr,
// so they can be reported to the CandidateSet.
.map_err(move |_| candidate),
)
}
}
while let Some(handshake) = handshakes.next().await {
match handshake {
Ok(change) => {
debug!("Successfully dialed new peer, sending to peerset");
success_tx.send(Ok(change)).await?;
}
Err(candidate) => {
debug!(?candidate.addr, "marking address as failed");
candidates.report_failed(candidate);
}
}
}
let mut connect_signal = futures::stream::select(
tokio::time::interval(new_peer_interval).map(|_| ()),
demand_signal,
);
while let Some(()) = connect_signal.next().await {
debug!("got demand signal from peer set, updating candidates");
candidates.update().await?;
loop {
let candidate = match candidates.next() {
Some(candidate) => candidate,
None => {
warn!("got demand for more peers but no available candidates");
break;
}
};
connector.ready().await?;
match connector
.call(candidate.addr) .call(candidate.addr)
.map_err(move |_| candidate) .map_err(move |_| candidate)
.await .boxed(),
{ );
Ok(change) => { } else {
debug!("Successfully dialed new peer, sending to peerset"); warn!("demand for peers but no available candidates");
}
}
// did a drill sergeant write this? no there's just no Either3
Left((Right((Some(_timer), _)), _)) => {
debug!("crawling for more peers");
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
Right((Some(Ok(change)), _)) => {
// in fact all changes are Insert so this branch is always taken
if let Change::Insert(ref addr, _) = change {
debug!(candidate.addr = ?addr, "successfully dialed new peer");
}
success_tx.send(Ok(change)).await?; success_tx.send(Ok(change)).await?;
break;
} }
Err(candidate) => { Right((Some(Err(candidate)), _)) => {
debug!(?candidate.addr, "marking address as failed"); debug!(?candidate.addr, "failed to connect to peer");
candidates.report_failed(candidate); candidates.report_failed(candidate);
// The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never
// turned into a connection, so add it back:
let _ = demand_tx.try_send(());
} }
} // If we don't match one of these patterns, shutdown.
_ => break,
} }
} }
Ok(()) Ok(())