fix(net): Avoid potential concurrency bugs in outbound handshakes (#6869)
* Stop sending peer errors on the PeerSet channel, to respect send limits * Move locking out of the cralwer select!, potential deadlock or hang risk * Move report_failed() out of the CandidateSet, reducing concurrency risks * Make CandidateSet Send * Make all CandidateSet operations concurrent, previous hand/deadlock bug * Reduce the gap between handshakes and peer set updates, and exit the task on shutdown
This commit is contained in:
parent
4fbc89fc93
commit
92077f4db5
|
@ -125,7 +125,11 @@ mod tests;
|
|||
// When we add the Seed state:
|
||||
// * show that seed peers that transition to other never attempted
|
||||
// states are already in the address book
|
||||
pub(crate) struct CandidateSet<S> {
|
||||
pub(crate) struct CandidateSet<S>
|
||||
where
|
||||
S: Service<Request, Response = Response, Error = BoxError> + Send,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
// Correctness: the address book must be private,
|
||||
// so all operations are performed on a blocking thread (see #1976).
|
||||
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
||||
|
@ -136,7 +140,7 @@ pub(crate) struct CandidateSet<S> {
|
|||
|
||||
impl<S> CandidateSet<S>
|
||||
where
|
||||
S: Service<Request, Response = Response, Error = BoxError>,
|
||||
S: Service<Request, Response = Response, Error = BoxError> + Send,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
/// Uses `address_book` and `peer_service` to manage a [`CandidateSet`] of peers.
|
||||
|
@ -180,8 +184,6 @@ where
|
|||
/// The handshaker sets up the peer message receiver so it also sends a
|
||||
/// [`Responded`] peer address update.
|
||||
///
|
||||
/// [`report_failed`][Self::report_failed] puts peers into the [`Failed`] state.
|
||||
///
|
||||
/// [`next`][Self::next] puts peers into the [`AttemptPending`] state.
|
||||
///
|
||||
/// ## Security
|
||||
|
@ -411,21 +413,9 @@ where
|
|||
Some(next_peer)
|
||||
}
|
||||
|
||||
/// Mark `addr` as a failed peer.
|
||||
pub async fn report_failed(&mut self, addr: &MetaAddr) {
|
||||
let addr = MetaAddr::new_errored(addr.addr, addr.services);
|
||||
|
||||
// # Correctness
|
||||
//
|
||||
// Spawn address book accesses on a blocking thread,
|
||||
// to avoid deadlocks (see #1976).
|
||||
let address_book = self.address_book.clone();
|
||||
let span = Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
span.in_scope(|| address_book.lock().unwrap().update(addr))
|
||||
})
|
||||
.await
|
||||
.expect("panic in peer failure address book update task");
|
||||
/// Returns the address book for this `CandidateSet`.
|
||||
pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
|
||||
self.address_book.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ proptest! {
|
|||
/// - if no reconnection peer is returned at all.
|
||||
async fn check_candidates_rate_limiting<S>(candidate_set: &mut CandidateSet<S>, candidates: u32)
|
||||
where
|
||||
S: tower::Service<Request, Response = Response, Error = BoxError>,
|
||||
S: tower::Service<Request, Response = Response, Error = BoxError> + Send,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
let mut now = Instant::now();
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
convert::Infallible,
|
||||
net::SocketAddr,
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
|
@ -13,7 +14,7 @@ use std::{
|
|||
use futures::{
|
||||
future::{self, FutureExt},
|
||||
sink::SinkExt,
|
||||
stream::{FuturesUnordered, StreamExt, TryStreamExt},
|
||||
stream::{FuturesUnordered, StreamExt},
|
||||
TryFutureExt,
|
||||
};
|
||||
use rand::seq::SliceRandom;
|
||||
|
@ -26,6 +27,7 @@ use tokio_stream::wrappers::IntervalStream;
|
|||
use tower::{
|
||||
buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
|
||||
};
|
||||
use tracing::Span;
|
||||
use tracing_futures::Instrument;
|
||||
|
||||
use zebra_chain::chain_tip::ChainTip;
|
||||
|
@ -46,11 +48,15 @@ use crate::{
|
|||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// The result of an outbound peer connection attempt or inbound connection
|
||||
/// handshake.
|
||||
/// A successful outbound peer connection attempt or inbound connection handshake.
|
||||
///
|
||||
/// This result comes from the `Handshaker`.
|
||||
type DiscoveredPeer = Result<(PeerSocketAddr, peer::Client), BoxError>;
|
||||
/// The [`Handshake`](peer::Handshake) service returns a [`Result`]. Only successful connections
|
||||
/// should be sent on the channel. Errors should be logged or ignored.
|
||||
///
|
||||
/// We don't allow any errors in this type, because:
|
||||
/// - The connection limits don't include failed connections
|
||||
/// - tower::Discover interprets an error as stream termination
|
||||
type DiscoveredPeer = (PeerSocketAddr, peer::Client);
|
||||
|
||||
/// Initialize a peer set, using a network `config`, `inbound_service`,
|
||||
/// and `latest_chain_tip`.
|
||||
|
@ -146,14 +152,15 @@ where
|
|||
|
||||
// Create an mpsc channel for peer changes,
|
||||
// based on the maximum number of inbound and outbound peers.
|
||||
//
|
||||
// The connection limit does not apply to errors,
|
||||
// so they need to be handled before sending to this channel.
|
||||
let (peerset_tx, peerset_rx) =
|
||||
futures::channel::mpsc::channel::<DiscoveredPeer>(config.peerset_total_connection_limit());
|
||||
|
||||
let discovered_peers = peerset_rx
|
||||
// Discover interprets an error as stream termination,
|
||||
// so discard any errored connections...
|
||||
.filter(|result| future::ready(result.is_ok()))
|
||||
.map_ok(|(address, client)| Change::Insert(address, client.into()));
|
||||
let discovered_peers = peerset_rx.map(|(address, client)| {
|
||||
Result::<_, Infallible>::Ok(Change::Insert(address, client.into()))
|
||||
});
|
||||
|
||||
// Create an mpsc channel for peerset demand signaling,
|
||||
// based on the maximum number of outbound peers.
|
||||
|
@ -210,6 +217,9 @@ where
|
|||
// because zcashd rate-limits `addr`/`addrv2` messages per connection,
|
||||
// and if we only have one initial peer,
|
||||
// we need to ensure that its `Response::Addr` is used by the crawler.
|
||||
//
|
||||
// TODO: this might not be needed after we added the Connection peer address cache,
|
||||
// try removing it in a future release?
|
||||
info!(
|
||||
?active_initial_peer_count,
|
||||
"sending initial request for peers"
|
||||
|
@ -342,7 +352,7 @@ where
|
|||
let handshake_result =
|
||||
handshake_result.expect("unexpected panic in initial peer handshake");
|
||||
match handshake_result {
|
||||
Ok(ref change) => {
|
||||
Ok(change) => {
|
||||
handshake_success_total += 1;
|
||||
debug!(
|
||||
?handshake_success_total,
|
||||
|
@ -350,6 +360,9 @@ where
|
|||
?change,
|
||||
"an initial peer handshake succeeded"
|
||||
);
|
||||
|
||||
// The connection limit makes sure this send doesn't block
|
||||
peerset_tx.send(change).await?;
|
||||
}
|
||||
Err((addr, ref e)) => {
|
||||
handshake_error_total += 1;
|
||||
|
@ -384,10 +397,6 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
peerset_tx
|
||||
.send(handshake_result.map_err(|(_addr, e)| e))
|
||||
.await?;
|
||||
|
||||
// Security: Let other tasks run after each connection is processed.
|
||||
//
|
||||
// Avoids remote peers starving other Zebra tasks using initial connection successes or errors.
|
||||
|
@ -617,7 +626,8 @@ where
|
|||
let handshake_result = handshake.await;
|
||||
|
||||
if let Ok(client) = handshake_result {
|
||||
let _ = peerset_tx.send(Ok((addr, client))).await;
|
||||
// The connection limit makes sure this send doesn't block
|
||||
let _ = peerset_tx.send((addr, client)).await;
|
||||
} else {
|
||||
debug!(?handshake_result, "error handshaking with inbound peer");
|
||||
}
|
||||
|
@ -660,20 +670,18 @@ where
|
|||
enum CrawlerAction {
|
||||
/// Drop the demand signal because there are too many pending handshakes.
|
||||
DemandDrop,
|
||||
/// Initiate a handshake to `candidate` in response to demand.
|
||||
DemandHandshake { candidate: MetaAddr },
|
||||
/// Crawl existing peers for more peers in response to demand, because there
|
||||
/// are no available candidates.
|
||||
DemandCrawl,
|
||||
/// Initiate a handshake to the next candidate peer in response to demand.
|
||||
///
|
||||
/// If there are no available candidates, crawl existing peers.
|
||||
DemandHandshakeOrCrawl,
|
||||
/// Crawl existing peers for more peers in response to a timer `tick`.
|
||||
TimerCrawl { tick: Instant },
|
||||
/// Handle a successfully connected handshake `peer_set_change`.
|
||||
HandshakeConnected {
|
||||
address: PeerSocketAddr,
|
||||
client: peer::Client,
|
||||
},
|
||||
/// Handle a handshake failure to `failed_addr`.
|
||||
HandshakeFailed { failed_addr: MetaAddr },
|
||||
/// Clear a finished handshake.
|
||||
HandshakeFinished,
|
||||
/// Clear a finished demand crawl (DemandHandshakeOrCrawl with no peers).
|
||||
DemandCrawlFinished,
|
||||
/// Clear a finished TimerCrawl.
|
||||
TimerCrawlFinished,
|
||||
}
|
||||
|
||||
/// Given a channel `demand_rx` that signals a need for new peers, try to find
|
||||
|
@ -709,11 +717,11 @@ enum CrawlerAction {
|
|||
)]
|
||||
async fn crawl_and_dial<C, S>(
|
||||
config: Config,
|
||||
mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
|
||||
demand_tx: futures::channel::mpsc::Sender<MorePeers>,
|
||||
mut demand_rx: futures::channel::mpsc::Receiver<MorePeers>,
|
||||
mut candidates: CandidateSet<S>,
|
||||
candidates: CandidateSet<S>,
|
||||
outbound_connector: C,
|
||||
mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
|
||||
peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
|
||||
mut active_outbound_connections: ActiveConnectionCounter,
|
||||
) -> Result<(), BoxError>
|
||||
where
|
||||
|
@ -725,31 +733,30 @@ where
|
|||
+ Send
|
||||
+ 'static,
|
||||
C::Future: Send + 'static,
|
||||
S: Service<Request, Response = Response, Error = BoxError>,
|
||||
S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
use CrawlerAction::*;
|
||||
|
||||
// CORRECTNESS
|
||||
//
|
||||
// To avoid hangs and starvation, the crawler must:
|
||||
// - spawn a separate task for each crawl and handshake, so they can make
|
||||
// progress independently (and avoid deadlocking each other)
|
||||
// - use the `select!` macro for all actions, because the `select` function
|
||||
// is biased towards the first ready future
|
||||
|
||||
info!(
|
||||
crawl_new_peer_interval = ?config.crawl_new_peer_interval,
|
||||
outbound_connections = ?active_outbound_connections.update_count(),
|
||||
"starting the peer address crawler",
|
||||
);
|
||||
|
||||
let address_book = candidates.address_book().await;
|
||||
|
||||
// # Concurrency
|
||||
//
|
||||
// Allow tasks using the candidate set to be spawned, so they can run concurrently.
|
||||
// Previously, Zebra has had deadlocks and long hangs caused by running dependent
|
||||
// candidate set futures in the same async task.
|
||||
let candidates = Arc::new(futures::lock::Mutex::new(candidates));
|
||||
|
||||
// This contains both crawl and handshake tasks.
|
||||
let mut handshakes = FuturesUnordered::new();
|
||||
// <FuturesUnordered as Stream> returns None when empty.
|
||||
// Keeping an unresolved future in the pool means the stream
|
||||
// never terminates.
|
||||
// We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse`
|
||||
// prevents us from adding items to the stream and checking its length.
|
||||
// Keeping an unresolved future in the pool means the stream never terminates.
|
||||
handshakes.push(future::pending().boxed());
|
||||
|
||||
let mut crawl_timer = tokio::time::interval(config.crawl_new_peer_interval);
|
||||
|
@ -759,6 +766,10 @@ where
|
|||
|
||||
let mut crawl_timer = IntervalStream::new(crawl_timer).map(|tick| TimerCrawl { tick });
|
||||
|
||||
// # Concurrency
|
||||
//
|
||||
// To avoid hangs and starvation, the crawler must spawn a separate task for each crawl
|
||||
// and handshake, so they can make progress independently (and avoid deadlocking each other).
|
||||
loop {
|
||||
metrics::gauge!(
|
||||
"crawler.in_flight_handshakes",
|
||||
|
@ -769,33 +780,45 @@ where
|
|||
);
|
||||
|
||||
let crawler_action = tokio::select! {
|
||||
biased;
|
||||
// Check for completed handshakes first, because the rest of the app needs them.
|
||||
// Pending handshakes are limited by the connection limit.
|
||||
next_handshake_res = handshakes.next() => next_handshake_res.expect(
|
||||
"handshakes never terminates, because it contains a future that never resolves"
|
||||
),
|
||||
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
|
||||
// turn the demand into an action, based on the crawler's current state
|
||||
_ = demand_rx.next() => {
|
||||
// The timer is rate-limited
|
||||
next_timer = crawl_timer.next() => Ok(next_timer.expect("timers never terminate")),
|
||||
// Turn any new demand into an action, based on the crawler's current state.
|
||||
//
|
||||
// # Concurrency
|
||||
//
|
||||
// Demand is potentially unlimited, so it must go last in a biased select!.
|
||||
next_demand = demand_rx.next() => next_demand.ok_or("demand stream closed, is Zebra shutting down?".into()).map(|MorePeers|{
|
||||
if active_outbound_connections.update_count() >= config.peerset_outbound_connection_limit() {
|
||||
// Too many open outbound connections or pending handshakes already
|
||||
DemandDrop
|
||||
} else if let Some(candidate) = candidates.next().await {
|
||||
// candidates.next has a short delay, and briefly holds the address
|
||||
// book lock, so it shouldn't hang
|
||||
DemandHandshake { candidate }
|
||||
} else {
|
||||
DemandCrawl
|
||||
DemandHandshakeOrCrawl
|
||||
}
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
match crawler_action {
|
||||
DemandDrop => {
|
||||
// Dummy actions
|
||||
Ok(DemandDrop) => {
|
||||
// This is set to trace level because when the peerset is
|
||||
// congested it can generate a lot of demand signal very
|
||||
// rapidly.
|
||||
// congested it can generate a lot of demand signal very rapidly.
|
||||
trace!("too many open connections or in-flight handshakes, dropping demand signal");
|
||||
}
|
||||
DemandHandshake { candidate } => {
|
||||
|
||||
// Spawned tasks
|
||||
Ok(DemandHandshakeOrCrawl) => {
|
||||
let candidates = candidates.clone();
|
||||
let outbound_connector = outbound_connector.clone();
|
||||
let peerset_tx = peerset_tx.clone();
|
||||
let address_book = address_book.clone();
|
||||
let demand_tx = demand_tx.clone();
|
||||
|
||||
// Increment the connection count before we spawn the connection.
|
||||
let outbound_connection_tracker = active_outbound_connections.track_connection();
|
||||
debug!(
|
||||
|
@ -803,74 +826,91 @@ where
|
|||
"opening an outbound peer connection"
|
||||
);
|
||||
|
||||
// Spawn each handshake into an independent task, so it can make
|
||||
// progress independently of the crawls.
|
||||
let hs_join = tokio::spawn(dial(
|
||||
candidate,
|
||||
outbound_connector.clone(),
|
||||
outbound_connection_tracker,
|
||||
))
|
||||
// Spawn each handshake or crawl into an independent task, so handshakes can make
|
||||
// progress while crawls are running.
|
||||
let handshake_or_crawl_handle = tokio::spawn(async move {
|
||||
// Try to get the next available peer for a handshake.
|
||||
//
|
||||
// candidates.next() has a short timeout, and briefly holds the address
|
||||
// book lock, so it shouldn't hang.
|
||||
//
|
||||
// Hold the lock for as short a time as possible.
|
||||
let candidate = { candidates.lock().await.next().await };
|
||||
|
||||
if let Some(candidate) = candidate {
|
||||
// we don't need to spawn here, because there's nothing running concurrently
|
||||
dial(
|
||||
candidate,
|
||||
outbound_connector,
|
||||
outbound_connection_tracker,
|
||||
peerset_tx,
|
||||
address_book,
|
||||
demand_tx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(HandshakeFinished)
|
||||
} else {
|
||||
// There weren't any peers, so try to get more peers.
|
||||
debug!("demand for peers but no available candidates");
|
||||
|
||||
crawl(candidates, demand_tx).await?;
|
||||
|
||||
Ok(DemandCrawlFinished)
|
||||
}
|
||||
})
|
||||
.map(move |res| match res {
|
||||
Ok(crawler_action) => crawler_action,
|
||||
Err(e) => {
|
||||
panic!("panic during handshaking with {candidate:?}: {e:?} ");
|
||||
panic!("panic during handshaking: {e:?}");
|
||||
}
|
||||
})
|
||||
.in_current_span();
|
||||
|
||||
handshakes.push(Box::pin(hs_join));
|
||||
handshakes.push(Box::pin(handshake_or_crawl_handle));
|
||||
}
|
||||
DemandCrawl => {
|
||||
debug!("demand for peers but no available candidates");
|
||||
// update has timeouts, and briefly holds the address book
|
||||
// lock, so it shouldn't hang
|
||||
//
|
||||
// TODO: refactor candidates into a buffered service, so we can
|
||||
// spawn independent tasks to avoid deadlocks
|
||||
let more_peers = candidates.update().await?;
|
||||
Ok(TimerCrawl { tick }) => {
|
||||
let candidates = candidates.clone();
|
||||
let demand_tx = demand_tx.clone();
|
||||
|
||||
// If we got more peers, try to connect to a new peer.
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Update attempts are rate-limited by the candidate set.
|
||||
//
|
||||
// We only try peers if there was actually an update.
|
||||
// So if all peers have had a recent attempt,
|
||||
// and there was recent update with no peers,
|
||||
// the channel will drain.
|
||||
// This prevents useless update attempt loops.
|
||||
if let Some(more_peers) = more_peers {
|
||||
let _ = demand_tx.try_send(more_peers);
|
||||
}
|
||||
}
|
||||
TimerCrawl { tick } => {
|
||||
debug!(
|
||||
?tick,
|
||||
"crawling for more peers in response to the crawl timer"
|
||||
);
|
||||
// TODO: spawn independent tasks to avoid deadlocks
|
||||
candidates.update().await?;
|
||||
// Try to connect to a new peer.
|
||||
let _ = demand_tx.try_send(MorePeers);
|
||||
}
|
||||
HandshakeConnected { address, client } => {
|
||||
debug!(candidate.addr = ?address, "successfully dialed new peer");
|
||||
// successes are handled by an independent task, except for `candidates.update` in
|
||||
// this task, which has a timeout, so they shouldn't hang
|
||||
peerset_tx.send(Ok((address, client))).await?;
|
||||
}
|
||||
HandshakeFailed { failed_addr } => {
|
||||
// The connection was never opened, or it failed the handshake and was dropped.
|
||||
let crawl_handle = tokio::spawn(async move {
|
||||
debug!(
|
||||
?tick,
|
||||
"crawling for more peers in response to the crawl timer"
|
||||
);
|
||||
|
||||
debug!(?failed_addr.addr, "marking candidate as failed");
|
||||
candidates.report_failed(&failed_addr).await;
|
||||
// The demand signal that was taken out of the queue
|
||||
// to attempt to connect to the failed candidate never
|
||||
// turned into a connection, so add it back:
|
||||
//
|
||||
// Security: handshake failures are rate-limited by peer attempt timeouts.
|
||||
let _ = demand_tx.try_send(MorePeers);
|
||||
crawl(candidates, demand_tx).await?;
|
||||
|
||||
Ok(TimerCrawlFinished)
|
||||
})
|
||||
.map(move |res| match res {
|
||||
Ok(crawler_action) => crawler_action,
|
||||
Err(e) => {
|
||||
panic!("panic during TimerCrawl: {tick:?} {e:?}");
|
||||
}
|
||||
})
|
||||
.in_current_span();
|
||||
|
||||
handshakes.push(Box::pin(crawl_handle));
|
||||
}
|
||||
|
||||
// Completed spawned tasks
|
||||
Ok(HandshakeFinished) => {
|
||||
// Already logged in dial()
|
||||
}
|
||||
Ok(DemandCrawlFinished) => {
|
||||
// This is set to trace level because when the peerset is
|
||||
// congested it can generate a lot of demand signal very rapidly.
|
||||
trace!("demand-based crawl finished");
|
||||
}
|
||||
Ok(TimerCrawlFinished) => {
|
||||
debug!("timer-based crawl finished");
|
||||
}
|
||||
|
||||
// Fatal errors and shutdowns
|
||||
Err(error) => {
|
||||
info!(?error, "crawler task exiting due to an error");
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -881,17 +921,79 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Try to get more peers using `candidates`, then queue a connection attempt using `demand_tx`.
|
||||
/// If there were no new peers, the connection attempt is skipped.
|
||||
#[instrument(skip(candidates, demand_tx))]
|
||||
async fn crawl<S>(
|
||||
candidates: Arc<futures::lock::Mutex<CandidateSet<S>>>,
|
||||
mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
|
||||
) -> Result<(), BoxError>
|
||||
where
|
||||
S: Service<Request, Response = Response, Error = BoxError> + Send + Sync + 'static,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
// update() has timeouts, and briefly holds the address book
|
||||
// lock, so it shouldn't hang.
|
||||
// Try to get new peers, holding the lock for as short a time as possible.
|
||||
let result = {
|
||||
let result = candidates.lock().await.update().await;
|
||||
std::mem::drop(candidates);
|
||||
result
|
||||
};
|
||||
let more_peers = match result {
|
||||
Ok(more_peers) => more_peers,
|
||||
Err(e) => {
|
||||
info!(
|
||||
?e,
|
||||
"candidate set returned an error, is Zebra shutting down?"
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// If we got more peers, try to connect to a new peer on our next loop.
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Update attempts are rate-limited by the candidate set,
|
||||
// and we only try peers if there was actually an update.
|
||||
//
|
||||
// So if all peers have had a recent attempt, and there was recent update
|
||||
// with no peers, the channel will drain. This prevents useless update attempt
|
||||
// loops.
|
||||
if let Some(more_peers) = more_peers {
|
||||
if let Err(send_error) = demand_tx.try_send(more_peers) {
|
||||
if send_error.is_disconnected() {
|
||||
// Zebra is shutting down
|
||||
return Err(send_error.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Try to connect to `candidate` using `outbound_connector`.
|
||||
/// Uses `outbound_connection_tracker` to track the active connection count.
|
||||
///
|
||||
/// Returns a `HandshakeConnected` action on success, and a
|
||||
/// `HandshakeFailed` action on error.
|
||||
#[instrument(skip(outbound_connector, outbound_connection_tracker))]
|
||||
/// On success, sends peers to `peerset_tx`.
|
||||
/// On failure, marks the peer as failed in the address book,
|
||||
/// then re-adds demand to `demand_tx`.
|
||||
#[instrument(skip(
|
||||
outbound_connector,
|
||||
outbound_connection_tracker,
|
||||
peerset_tx,
|
||||
address_book,
|
||||
demand_tx
|
||||
))]
|
||||
async fn dial<C>(
|
||||
candidate: MetaAddr,
|
||||
mut outbound_connector: C,
|
||||
outbound_connection_tracker: ConnectionTracker,
|
||||
) -> CrawlerAction
|
||||
mut peerset_tx: futures::channel::mpsc::Sender<DiscoveredPeer>,
|
||||
address_book: Arc<std::sync::Mutex<AddressBook>>,
|
||||
mut demand_tx: futures::channel::mpsc::Sender<MorePeers>,
|
||||
) -> Result<(), BoxError>
|
||||
where
|
||||
C: Service<
|
||||
OutboundConnectorRequest,
|
||||
|
@ -902,7 +1004,7 @@ where
|
|||
+ 'static,
|
||||
C::Future: Send + 'static,
|
||||
{
|
||||
// CORRECTNESS
|
||||
// # Correctness
|
||||
//
|
||||
// To avoid hangs, the dialer must only await:
|
||||
// - functions that return immediately, or
|
||||
|
@ -911,10 +1013,7 @@ where
|
|||
debug!(?candidate.addr, "attempting outbound connection in response to demand");
|
||||
|
||||
// the connector is always ready, so this can't hang
|
||||
let outbound_connector = outbound_connector
|
||||
.ready()
|
||||
.await
|
||||
.expect("outbound connector never errors");
|
||||
let outbound_connector = outbound_connector.ready().await?;
|
||||
|
||||
let req = OutboundConnectorRequest {
|
||||
addr: candidate.addr,
|
||||
|
@ -922,24 +1021,51 @@ where
|
|||
};
|
||||
|
||||
// the handshake has timeouts, so it shouldn't hang
|
||||
outbound_connector
|
||||
.call(req)
|
||||
.map_err(|e| (candidate, e))
|
||||
.map(Into::into)
|
||||
.await
|
||||
}
|
||||
let handshake_result = outbound_connector.call(req).map(Into::into).await;
|
||||
|
||||
impl From<Result<(PeerSocketAddr, peer::Client), (MetaAddr, BoxError)>> for CrawlerAction {
|
||||
fn from(dial_result: Result<(PeerSocketAddr, peer::Client), (MetaAddr, BoxError)>) -> Self {
|
||||
use CrawlerAction::*;
|
||||
match dial_result {
|
||||
Ok((address, client)) => HandshakeConnected { address, client },
|
||||
Err((candidate, e)) => {
|
||||
debug!(?candidate.addr, ?e, "failed to connect to candidate");
|
||||
HandshakeFailed {
|
||||
failed_addr: candidate,
|
||||
match handshake_result {
|
||||
Ok((address, client)) => {
|
||||
debug!(?candidate.addr, "successfully dialed new peer");
|
||||
|
||||
// The connection limit makes sure this send doesn't block.
|
||||
peerset_tx.send((address, client)).await?;
|
||||
}
|
||||
// The connection was never opened, or it failed the handshake and was dropped.
|
||||
Err(error) => {
|
||||
debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
|
||||
report_failed(address_book.clone(), candidate).await;
|
||||
|
||||
// The demand signal that was taken out of the queue to attempt to connect to the
|
||||
// failed candidate never turned into a connection, so add it back.
|
||||
//
|
||||
// # Security
|
||||
//
|
||||
// Handshake failures are rate-limited by peer attempt timeouts.
|
||||
if let Err(send_error) = demand_tx.try_send(MorePeers) {
|
||||
if send_error.is_disconnected() {
|
||||
// Zebra is shutting down
|
||||
return Err(send_error.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark `addr` as a failed peer in `address_book`.
|
||||
#[instrument(skip(address_book))]
|
||||
async fn report_failed(address_book: Arc<std::sync::Mutex<AddressBook>>, addr: MetaAddr) {
|
||||
let addr = MetaAddr::new_errored(addr.addr, addr.services);
|
||||
|
||||
// # Correctness
|
||||
//
|
||||
// Spawn address book accesses on a blocking thread,
|
||||
// to avoid deadlocks (see #1976).
|
||||
let span = Span::current();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
span.in_scope(|| address_book.lock().unwrap().update(addr))
|
||||
})
|
||||
.await
|
||||
.expect("panic in peer failure address book update task");
|
||||
}
|
||||
|
|
|
@ -459,15 +459,7 @@ async fn crawler_peer_limit_one_connect_ok_then_drop() {
|
|||
let peer_result = peerset_rx.try_next();
|
||||
match peer_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_result)) => {
|
||||
assert!(
|
||||
matches!(peer_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_result:?}\n\
|
||||
{peer_count} previous peers succeeded",
|
||||
);
|
||||
peer_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -521,15 +513,7 @@ async fn crawler_peer_limit_one_connect_ok_stay_open() {
|
|||
let peer_change_result = peerset_rx.try_next();
|
||||
match peer_change_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_change_result)) => {
|
||||
assert!(
|
||||
matches!(peer_change_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_change_result:?}\n\
|
||||
{peer_change_count} previous peers succeeded",
|
||||
);
|
||||
peer_change_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_change_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -631,15 +615,7 @@ async fn crawler_peer_limit_default_connect_ok_then_drop() {
|
|||
let peer_result = peerset_rx.try_next();
|
||||
match peer_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_result)) => {
|
||||
assert!(
|
||||
matches!(peer_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_result:?}\n\
|
||||
{peer_count} previous peers succeeded",
|
||||
);
|
||||
peer_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -694,15 +670,7 @@ async fn crawler_peer_limit_default_connect_ok_stay_open() {
|
|||
let peer_change_result = peerset_rx.try_next();
|
||||
match peer_change_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_change_result)) => {
|
||||
assert!(
|
||||
matches!(peer_change_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_change_result:?}\n\
|
||||
{peer_change_count} previous peers succeeded",
|
||||
);
|
||||
peer_change_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_change_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -834,15 +802,7 @@ async fn listener_peer_limit_one_handshake_ok_then_drop() {
|
|||
let peer_result = peerset_rx.try_next();
|
||||
match peer_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_result)) => {
|
||||
assert!(
|
||||
matches!(peer_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_result:?}\n\
|
||||
{peer_count} previous peers succeeded",
|
||||
);
|
||||
peer_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -900,15 +860,7 @@ async fn listener_peer_limit_one_handshake_ok_stay_open() {
|
|||
let peer_change_result = peerset_rx.try_next();
|
||||
match peer_change_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_change_result)) => {
|
||||
assert!(
|
||||
matches!(peer_change_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_change_result:?}\n\
|
||||
{peer_change_count} previous peers succeeded",
|
||||
);
|
||||
peer_change_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_change_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -1019,15 +971,7 @@ async fn listener_peer_limit_default_handshake_ok_then_drop() {
|
|||
let peer_result = peerset_rx.try_next();
|
||||
match peer_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_result)) => {
|
||||
assert!(
|
||||
matches!(peer_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_result:?}\n\
|
||||
{peer_count} previous peers succeeded",
|
||||
);
|
||||
peer_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -1085,15 +1029,7 @@ async fn listener_peer_limit_default_handshake_ok_stay_open() {
|
|||
let peer_change_result = peerset_rx.try_next();
|
||||
match peer_change_result {
|
||||
// A peer handshake succeeded.
|
||||
Ok(Some(peer_change_result)) => {
|
||||
assert!(
|
||||
matches!(peer_change_result, Ok((_, _))),
|
||||
"unexpected connection error: {peer_change_result:?}\n\
|
||||
{peer_change_count} previous peers succeeded",
|
||||
);
|
||||
peer_change_count += 1;
|
||||
}
|
||||
|
||||
Ok(Some(_peer_change)) => peer_change_count += 1,
|
||||
// The channel is closed and there are no messages left in the channel.
|
||||
Ok(None) => break,
|
||||
// The channel is still open, but there are no messages left in the channel.
|
||||
|
@ -1158,7 +1094,8 @@ async fn add_initial_peers_is_rate_limited() {
|
|||
|
||||
let elapsed = Instant::now() - before;
|
||||
|
||||
assert_eq!(connections.len(), PEER_COUNT);
|
||||
// Errors are ignored, so we don't expect any peers here
|
||||
assert_eq!(connections.len(), 0);
|
||||
// Make sure the rate limiting worked by checking if it took long enough
|
||||
assert!(
|
||||
elapsed
|
||||
|
|
Loading…
Reference in New Issue