zebra/zebra-network/src/peer_set/initialize.rs

536 lines
20 KiB
Rust
Raw Normal View History

//! A peer set whose size is dynamically determined by resource constraints.
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).
use std::{net::SocketAddr, sync::Arc};
use futures::{
channel::mpsc,
future::{self, FutureExt},
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
TryFutureExt,
};
Reject connections from outdated peers (#2519) * Simplify state service initialization in test Use the test helper function to remove redundant code. * Create `BestTipHeight` helper type This type abstracts away the calculation of the best tip height based on the finalized block height and the best non-finalized chain's tip. * Add `best_tip_height` field to `StateService` The receiver endpoint is currently ignored. * Return receiver endpoint from service constructor Make it available so that the best tip height can be watched. * Update finalized height after finalizing blocks After blocks from the queue are finalized and committed to disk, update the finalized block height. * Update best non-finalized height after validation Update the value of the best non-finalized chain tip block height after a new block is committed to the non-finalized state. * Update finalized height after loading from disk When `FinalizedState` is first created, it loads the state from persistent storage, and the finalized tip height is updated. Therefore, the `best_tip_height` must be notified of the initial value. * Update the finalized height on checkpoint commit When a checkpointed block is commited, it bypasses the non-finalized state, so there's an extra place where the finalized height has to be updated. * Add `best_tip_height` to `Handshake` service It can be configured using the `Builder::with_best_tip_height`. It's currently not used, but it will be used to determine if a connection to a remote peer should be rejected or not based on that peer's protocol version. * Require best tip height to init. `zebra_network` Without it the handshake service can't properly enforce the minimum network protocol version from peers. Zebrad obtains the best tip height endpoint from `zebra_state`, and the test vectors simply use a dummy endpoint that's fixed at the genesis height. * Pass `best_tip_height` to proto. ver. negotiation The protocol version negotiation code will reject connections to peers if they are using an old protocol version. An old version is determined based on the current known best chain tip height. * Handle an optional height in `Version` Fallback to the genesis height in `None` is specified. * Reject connections to peers on old proto. versions Avoid connecting to peers that are on protocol versions that don't recognize a network update. * Document why peers on old versions are rejected Describe why it's a security issue above the check. * Test if `BestTipHeight` starts with `None` Check if initially there is no best tip height. * Test if best tip height is max. of latest values After applying a list of random updates where each one either sets the finalized height or the non-finalized height, check that the best tip height is the maximum of the most recently set finalized height and the most recently set non-finalized height. * Add `queue_and_commit_finalized` method A small refactor to make testing easier. The handling of requests for committing non-finalized and finalized blocks is now more consistent. * Add `assert_block_can_be_validated` helper Refactor to move into a separate method some assertions that are done before a block is validated. This is to allow moving these assertions more easily to simplify testing. * Remove redundant PoW block assertion It's also checked in `zebra_state::service::check::block_is_contextually_valid`, and it was getting in the way of tests that received a gossiped block before finalizing enough blocks. * Create a test strategy for test vector chain Splits a chain loaded from the test vectors in two parts, containing the blocks to finalize and the blocks to keep in the non-finalized state. * Test committing blocks update best tip height Create a mock blockchain state, with a chain of finalized blocks and a chain of non-finalized blocks. Commit all the blocks appropriately, and verify that the best tip height is updated. Co-authored-by: teor <teor@riseup.net>
2021-08-08 16:52:52 -07:00
use tokio::{
net::TcpListener,
sync::{broadcast, watch},
time::Instant,
};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
util::BoxService, Service, ServiceExt,
};
use tracing::Span;
use tracing_futures::Instrument;
use crate::{
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
BoxError, Config, Request, Response,
};
Reject connections from outdated peers (#2519) * Simplify state service initialization in test Use the test helper function to remove redundant code. * Create `BestTipHeight` helper type This type abstracts away the calculation of the best tip height based on the finalized block height and the best non-finalized chain's tip. * Add `best_tip_height` field to `StateService` The receiver endpoint is currently ignored. * Return receiver endpoint from service constructor Make it available so that the best tip height can be watched. * Update finalized height after finalizing blocks After blocks from the queue are finalized and committed to disk, update the finalized block height. * Update best non-finalized height after validation Update the value of the best non-finalized chain tip block height after a new block is committed to the non-finalized state. * Update finalized height after loading from disk When `FinalizedState` is first created, it loads the state from persistent storage, and the finalized tip height is updated. Therefore, the `best_tip_height` must be notified of the initial value. * Update the finalized height on checkpoint commit When a checkpointed block is commited, it bypasses the non-finalized state, so there's an extra place where the finalized height has to be updated. * Add `best_tip_height` to `Handshake` service It can be configured using the `Builder::with_best_tip_height`. It's currently not used, but it will be used to determine if a connection to a remote peer should be rejected or not based on that peer's protocol version. * Require best tip height to init. `zebra_network` Without it the handshake service can't properly enforce the minimum network protocol version from peers. Zebrad obtains the best tip height endpoint from `zebra_state`, and the test vectors simply use a dummy endpoint that's fixed at the genesis height. * Pass `best_tip_height` to proto. ver. negotiation The protocol version negotiation code will reject connections to peers if they are using an old protocol version. An old version is determined based on the current known best chain tip height. * Handle an optional height in `Version` Fallback to the genesis height in `None` is specified. * Reject connections to peers on old proto. versions Avoid connecting to peers that are on protocol versions that don't recognize a network update. * Document why peers on old versions are rejected Describe why it's a security issue above the check. * Test if `BestTipHeight` starts with `None` Check if initially there is no best tip height. * Test if best tip height is max. of latest values After applying a list of random updates where each one either sets the finalized height or the non-finalized height, check that the best tip height is the maximum of the most recently set finalized height and the most recently set non-finalized height. * Add `queue_and_commit_finalized` method A small refactor to make testing easier. The handling of requests for committing non-finalized and finalized blocks is now more consistent. * Add `assert_block_can_be_validated` helper Refactor to move into a separate method some assertions that are done before a block is validated. This is to allow moving these assertions more easily to simplify testing. * Remove redundant PoW block assertion It's also checked in `zebra_state::service::check::block_is_contextually_valid`, and it was getting in the way of tests that received a gossiped block before finalizing enough blocks. * Create a test strategy for test vector chain Splits a chain loaded from the test vectors in two parts, containing the blocks to finalize and the blocks to keep in the non-finalized state. * Test committing blocks update best tip height Create a mock blockchain state, with a chain of finalized blocks and a chain of non-finalized blocks. Commit all the blocks appropriately, and verify that the best tip height is updated. Co-authored-by: teor <teor@riseup.net>
2021-08-08 16:52:52 -07:00
use zebra_chain::{block, parameters::Network};
use super::CandidateSet;
use super::PeerSet;
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
use peer::Client;
#[cfg(test)]
mod tests;
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
/// Initialize a peer set.
///
/// The peer set abstracts away peer management to provide a
/// [`tower::Service`] representing "the network" that load-balances requests
/// over available peers. The peer set automatically crawls the network to
/// find more peer addresses and opportunistically connects to new peers.
///
/// Each peer connection's message handling is isolated from other
/// connections, unlike in `zcashd`. The peer connection first attempts to
/// interpret inbound messages as part of a response to a previously-issued
/// request. Otherwise, inbound messages are interpreted as requests and sent
/// to the supplied `inbound_service`.
///
/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
/// cause the peer set to shrink when the inbound service is unable to keep up
/// with the volume of inbound requests.
///
/// In addition to returning a service for outbound requests, this method
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
/// connected peers.
pub async fn init<S>(
config: Config,
inbound_service: S,
Reject connections from outdated peers (#2519) * Simplify state service initialization in test Use the test helper function to remove redundant code. * Create `BestTipHeight` helper type This type abstracts away the calculation of the best tip height based on the finalized block height and the best non-finalized chain's tip. * Add `best_tip_height` field to `StateService` The receiver endpoint is currently ignored. * Return receiver endpoint from service constructor Make it available so that the best tip height can be watched. * Update finalized height after finalizing blocks After blocks from the queue are finalized and committed to disk, update the finalized block height. * Update best non-finalized height after validation Update the value of the best non-finalized chain tip block height after a new block is committed to the non-finalized state. * Update finalized height after loading from disk When `FinalizedState` is first created, it loads the state from persistent storage, and the finalized tip height is updated. Therefore, the `best_tip_height` must be notified of the initial value. * Update the finalized height on checkpoint commit When a checkpointed block is commited, it bypasses the non-finalized state, so there's an extra place where the finalized height has to be updated. * Add `best_tip_height` to `Handshake` service It can be configured using the `Builder::with_best_tip_height`. It's currently not used, but it will be used to determine if a connection to a remote peer should be rejected or not based on that peer's protocol version. * Require best tip height to init. `zebra_network` Without it the handshake service can't properly enforce the minimum network protocol version from peers. Zebrad obtains the best tip height endpoint from `zebra_state`, and the test vectors simply use a dummy endpoint that's fixed at the genesis height. * Pass `best_tip_height` to proto. ver. negotiation The protocol version negotiation code will reject connections to peers if they are using an old protocol version. An old version is determined based on the current known best chain tip height. * Handle an optional height in `Version` Fallback to the genesis height in `None` is specified. * Reject connections to peers on old proto. versions Avoid connecting to peers that are on protocol versions that don't recognize a network update. * Document why peers on old versions are rejected Describe why it's a security issue above the check. * Test if `BestTipHeight` starts with `None` Check if initially there is no best tip height. * Test if best tip height is max. of latest values After applying a list of random updates where each one either sets the finalized height or the non-finalized height, check that the best tip height is the maximum of the most recently set finalized height and the most recently set non-finalized height. * Add `queue_and_commit_finalized` method A small refactor to make testing easier. The handling of requests for committing non-finalized and finalized blocks is now more consistent. * Add `assert_block_can_be_validated` helper Refactor to move into a separate method some assertions that are done before a block is validated. This is to allow moving these assertions more easily to simplify testing. * Remove redundant PoW block assertion It's also checked in `zebra_state::service::check::block_is_contextually_valid`, and it was getting in the way of tests that received a gossiped block before finalizing enough blocks. * Create a test strategy for test vector chain Splits a chain loaded from the test vectors in two parts, containing the blocks to finalize and the blocks to keep in the non-finalized state. * Test committing blocks update best tip height Create a mock blockchain state, with a chain of finalized blocks and a chain of non-finalized blocks. Commit all the blocks appropriately, and verify that the best tip height is updated. Co-authored-by: teor <teor@riseup.net>
2021-08-08 16:52:52 -07:00
best_tip_height: Option<watch::Receiver<Option<block::Height>>>,
) -> (
Buffer<BoxService<Request, Response, BoxError>, Request>,
Arc<std::sync::Mutex<AddressBook>>,
)
where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);
// Construct services that handle inbound handshakes and perform outbound
// handshakes. These use the same handshake service internally to detect
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
// enforce timeouts as specified in the Config.
let (listen_handshaker, outbound_connector) = {
use tower::timeout::TimeoutLayer;
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
use crate::protocol::external::types::PeerServices;
let hs = peer::Handshake::builder()
.with_config(config.clone())
.with_inbound_service(inbound_service)
.with_inventory_collector(inv_sender)
.with_timestamp_collector(timestamp_collector)
.with_advertised_services(PeerServices::NODE_NETWORK)
.with_user_agent(crate::constants::USER_AGENT.to_string())
Reject connections from outdated peers (#2519) * Simplify state service initialization in test Use the test helper function to remove redundant code. * Create `BestTipHeight` helper type This type abstracts away the calculation of the best tip height based on the finalized block height and the best non-finalized chain's tip. * Add `best_tip_height` field to `StateService` The receiver endpoint is currently ignored. * Return receiver endpoint from service constructor Make it available so that the best tip height can be watched. * Update finalized height after finalizing blocks After blocks from the queue are finalized and committed to disk, update the finalized block height. * Update best non-finalized height after validation Update the value of the best non-finalized chain tip block height after a new block is committed to the non-finalized state. * Update finalized height after loading from disk When `FinalizedState` is first created, it loads the state from persistent storage, and the finalized tip height is updated. Therefore, the `best_tip_height` must be notified of the initial value. * Update the finalized height on checkpoint commit When a checkpointed block is commited, it bypasses the non-finalized state, so there's an extra place where the finalized height has to be updated. * Add `best_tip_height` to `Handshake` service It can be configured using the `Builder::with_best_tip_height`. It's currently not used, but it will be used to determine if a connection to a remote peer should be rejected or not based on that peer's protocol version. * Require best tip height to init. `zebra_network` Without it the handshake service can't properly enforce the minimum network protocol version from peers. Zebrad obtains the best tip height endpoint from `zebra_state`, and the test vectors simply use a dummy endpoint that's fixed at the genesis height. * Pass `best_tip_height` to proto. ver. negotiation The protocol version negotiation code will reject connections to peers if they are using an old protocol version. An old version is determined based on the current known best chain tip height. * Handle an optional height in `Version` Fallback to the genesis height in `None` is specified. * Reject connections to peers on old proto. versions Avoid connecting to peers that are on protocol versions that don't recognize a network update. * Document why peers on old versions are rejected Describe why it's a security issue above the check. * Test if `BestTipHeight` starts with `None` Check if initially there is no best tip height. * Test if best tip height is max. of latest values After applying a list of random updates where each one either sets the finalized height or the non-finalized height, check that the best tip height is the maximum of the most recently set finalized height and the most recently set non-finalized height. * Add `queue_and_commit_finalized` method A small refactor to make testing easier. The handling of requests for committing non-finalized and finalized blocks is now more consistent. * Add `assert_block_can_be_validated` helper Refactor to move into a separate method some assertions that are done before a block is validated. This is to allow moving these assertions more easily to simplify testing. * Remove redundant PoW block assertion It's also checked in `zebra_state::service::check::block_is_contextually_valid`, and it was getting in the way of tests that received a gossiped block before finalizing enough blocks. * Create a test strategy for test vector chain Splits a chain loaded from the test vectors in two parts, containing the blocks to finalize and the blocks to keep in the non-finalized state. * Test committing blocks update best tip height Create a mock blockchain state, with a chain of finalized blocks and a chain of non-finalized blocks. Commit all the blocks appropriately, and verify that the best tip height is updated. Co-authored-by: teor <teor@riseup.net>
2021-08-08 16:52:52 -07:00
.with_best_tip_height(best_tip_height)
.want_transactions(true)
.finish()
.expect("configured all required parameters");
(
hs_timeout.layer(hs.clone()),
hs_timeout.layer(peer::Connector::new(hs)),
)
};
// Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
let peer_set = PeerSet::new(
PeakEwmaDiscover::new(
// Discover interprets an error as stream termination,
// so discard any errored connections...
peerset_rx.filter(|result| future::ready(result.is_ok())),
constants::EWMA_DEFAULT_RTT,
constants::EWMA_DECAY_TIME,
tower::load::CompleteOnResponse::default(),
),
demand_tx.clone(),
handle_rx,
inv_receiver,
address_book.clone(),
);
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
// 1. Incoming peer connections, via a listener.
let listen_guard = tokio::spawn(
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
);
// 2. Initial peers, specified in the config.
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
let initial_peers_fut = {
let config = config.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
async move {
let initial_peers = config.initial_peers().await;
let _ = initial_peer_count_tx.send(initial_peers.len());
// Connect the tx end to the 3 peer sources:
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
}
.boxed()
};
let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
2020-07-05 03:58:50 -07:00
// We need to await candidates.update() here, because zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler.
info!("Sending initial request for peers");
let _ = candidates
.update_initial(initial_peer_count_rx.await.expect("value sent before drop"))
.await;
for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(());
}
let crawl_guard = tokio::spawn(
crawl_and_dial(
config.crawl_new_peer_interval,
demand_tx,
demand_rx,
candidates,
outbound_connector,
peerset_tx,
)
.instrument(Span::current()),
);
handle_tx
.send(vec![add_guard, listen_guard, crawl_guard])
.unwrap();
(peer_set, address_book)
}
/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`.
#[instrument(skip(initial_peers, outbound_connector, tx))]
async fn add_initial_peers<S>(
initial_peers: std::collections::HashSet<SocketAddr>,
outbound_connector: S,
mut tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
info!(?initial_peers, "connecting to initial peer set");
// # Security
//
// TODO: rate-limit initial seed peer connections (#2326)
//
// # Correctness
//
// Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for
// an indefinite period. We can use `FuturesUnordered` without filling
// the underlying network buffers, because we immediately drive this
// single `FuturesUnordered` to completion, and handshakes have a short timeout.
let mut handshakes: FuturesUnordered<_> = initial_peers
.into_iter()
.map(|addr| {
outbound_connector
.clone()
.oneshot(addr)
.map_err(move |e| (addr, e))
})
.collect();
while let Some(handshake_result) = handshakes.next().await {
// this is verbose, but it's better than just hanging with no output
if let Err((addr, ref e)) = handshake_result {
info!(?addr, ?e, "an initial peer connection failed");
}
tx.send(handshake_result.map_err(|(_addr, e)| e)).await?;
}
Ok(())
}
/// Open a peer connection listener on `config.listen_addr`,
/// returning the opened [`TcpListener`], and the address it is bound to.
///
/// If the listener is configured to use an automatically chosen port (port `0`),
/// then the returned address will contain the actual port.
///
/// # Panics
///
/// If opening the listener fails.
#[instrument(skip(config), fields(addr = ?config.listen_addr))]
async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {
Mainnet => Testnet,
Testnet => Mainnet,
};
if config.listen_addr.port() == wrong_net.default_port() {
warn!(
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
config.listen_addr.port(),
config.network,
wrong_net
);
}
info!(
"Trying to open Zcash protocol endpoint at {}...",
config.listen_addr
);
let listener_result = TcpListener::bind(config.listen_addr).await;
let listener = match listener_result {
Ok(l) => l,
Err(e) => panic!(
"Opening Zcash network protocol listener {:?} failed: {:?}. \
Hint: Check if another zebrad or zcashd process is running. \
Try changing the network listen_addr in the Zebra config.",
config.listen_addr, e,
),
};
let local_addr = listener
.local_addr()
.expect("unexpected missing local addr for open listener");
info!("Opened Zcash protocol endpoint at {}", local_addr);
(listener, local_addr)
}
/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`Client`][peer::Client] result over `tx`.
#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
listener: TcpListener,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
let accept_span = info_span!("listen_accept", peer = ?connected_addr);
let _guard = accept_span.enter();
debug!("got incoming connection");
2020-05-26 18:00:58 -07:00
handshaker.ready_and().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone();
tokio::spawn(
async move {
if let Ok(client) = handshake.await {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
}
}
.instrument(handshaker_span),
);
}
}
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
/// An action that the peer crawler can take.
#[allow(dead_code)]
enum CrawlerAction {
/// Drop the demand signal because there are too many pending handshakes.
DemandDrop,
/// Initiate a handshake to `candidate` in response to demand.
DemandHandshake { candidate: MetaAddr },
/// Crawl existing peers for more peers in response to demand, because there
/// are no available candidates.
DemandCrawl,
/// Crawl existing peers for more peers in response to a timer `tick`.
TimerCrawl { tick: Instant },
/// Handle a successfully connected handshake `peer_set_change`.
HandshakeConnected {
peer_set_change: Change<SocketAddr, Client>,
},
/// Handle a handshake failure to `failed_addr`.
HandshakeFailed { failed_addr: MetaAddr },
}
/// Given a channel `demand_rx` that signals a need for new peers, try to find
/// and connect to new peers, and send the resulting `peer::Client`s through the
/// `success_tx` channel.
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
/// one new peer using `outbound_connector`.
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
///
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
///
/// The crawler terminates when `candidates.update()` or `success_tx` returns a
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
outbound_connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
S: Service<Request, Response = Response, Error = BoxError>,
S::Future: Send + 'static,
{
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
use CrawlerAction::*;
// CORRECTNESS
//
// To avoid hangs and starvation, the crawler must:
// - spawn a separate task for each crawl and handshake, so they can make
// progress independently (and avoid deadlocking each other)
// - use the `select!` macro for all actions, because the `select` function
// is biased towards the first ready future
let mut handshakes = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream
// never terminates.
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
// We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse`
// prevents us from adding items to the stream and checking its length.
handshakes.push(future::pending().boxed());
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
loop {
metrics::gauge!(
"crawler.in_flight_handshakes",
handshakes
.len()
.checked_sub(1)
.expect("the pool always contains an unresolved future") as f64
);
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
let crawler_action = tokio::select! {
next_handshake_res = handshakes.next() => next_handshake_res.expect(
"handshakes never terminates, because it contains a future that never resolves"
),
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
// turn the demand into an action, based on the crawler's current state
_ = demand_rx.next() => {
if handshakes.len() > 50 {
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
// Too many pending handshakes already
DemandDrop
} else if let Some(candidate) = candidates.next().await {
// candidates.next has a short delay, and briefly holds the address
// book lock, so it shouldn't hang
DemandHandshake { candidate }
} else {
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
DemandCrawl
}
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
};
match crawler_action {
DemandDrop => {
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
DemandHandshake { candidate } => {
// spawn each handshake into an independent task, so it can make
// progress independently of the crawls
let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone()))
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
}
})
.instrument(Span::current());
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
handshakes.push(Box::pin(hs_join));
}
DemandCrawl => {
debug!("demand for peers but no available candidates");
// update has timeouts, and briefly holds the address book
// lock, so it shouldn't hang
//
// TODO: refactor candidates into a buffered service, so we can
// spawn independent tasks to avoid deadlocks
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
TimerCrawl { tick } => {
debug!(
?tick,
"crawling for more peers in response to the crawl timer"
);
// TODO: spawn independent tasks to avoid deadlocks
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
HandshakeConnected { peer_set_change } => {
if let Change::Insert(ref addr, _) = peer_set_change {
debug!(candidate.addr = ?addr, "successfully dialed new peer");
} else {
unreachable!("unexpected handshake result: all changes should be Insert");
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
// successes are handled by an independent task, so they
// shouldn't hang
success_tx.send(Ok(peer_set_change)).await?;
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
HandshakeFailed { failed_addr } => {
debug!(?failed_addr.addr, "marking candidate as failed");
candidates.report_failed(&failed_addr);
// The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never
// turned into a connection, so add it back:
let _ = demand_tx.try_send(());
}
}
}
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
/// Try to connect to `candidate` using `outbound_connector`.
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
///
/// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error.
#[instrument(skip(outbound_connector,))]
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
// CORRECTNESS
//
// To avoid hangs, the dialer must only await:
// - functions that return immediately, or
// - functions that have a reasonable timeout
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
let outbound_connector = outbound_connector
.ready_and()
.await
.expect("outbound connector never errors");
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
// the handshake has timeouts, so it shouldn't hang
outbound_connector
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
.call(candidate.addr)
.map_err(|e| (candidate, e))
.map(Into::into)
.await
}
impl From<Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>> for CrawlerAction {
fn from(dial_result: Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>) -> Self {
use CrawlerAction::*;
match dial_result {
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err((candidate, e)) => {
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
}
}
}
}
Fix a deadlock between the crawler and dialer, and other hangs (#1950) * Stop ignoring inbound message errors and handshake timeouts To avoid hangs, Zebra needs to maintain the following invariants in the handshake and heartbeat code: - each handshake should run in a separate spawned task (not yet implemented) - every message, error, timeout, and shutdown must update the peer address state - every await that depends on the network must have a timeout Once the Connection is created, it should handle timeouts. But we need to handle timeouts during handshake setup. * Avoid hangs by adding a timeout to the candidate set update Also increase the fanout from 1 to 2, to increase address diversity. But only return permanent errors from `CandidateSet::update`, because the crawler task exits if `update` returns an error. Also log Peers response errors in the CandidateSet. * Use the select macro in the crawler to reduce hangs The `select` function is biased towards its first argument, risking starvation. As a side-benefit, this change also makes the code a lot easier to read and maintain. * Split CrawlerAction::Demand into separate actions This refactor makes the code a bit easier to read, at the cost of sometimes blocking the crawler on `candidates.next()`. That's ok, because `next` only has a short (< 100 ms) delay. And we're just about to spawn a separate task for each handshake. * Spawn a separate task for each handshake This change avoids deadlocks by letting each handshake make progress independently. * Move the dial task into a separate function This refactor improves readability. * Fix buggy future::select function usage And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
}