2019-11-26 23:04:05 -08:00
|
|
|
//! A peer set whose size is dynamically determined by resource constraints.
|
|
|
|
|
|
|
|
// Portions of this submodule were adapted from tower-balance,
|
|
|
|
// which is (c) 2019 Tower Contributors (MIT licensed).
|
|
|
|
|
2021-04-18 23:04:24 -07:00
|
|
|
use std::{net::SocketAddr, sync::Arc};
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
use futures::{
|
|
|
|
channel::mpsc,
|
2020-08-11 13:07:44 -07:00
|
|
|
future::{self, FutureExt},
|
2019-11-26 23:04:05 -08:00
|
|
|
sink::SinkExt,
|
|
|
|
stream::{FuturesUnordered, StreamExt},
|
2021-04-13 00:46:17 -07:00
|
|
|
TryFutureExt,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
2021-08-26 18:34:33 -07:00
|
|
|
use tokio::{net::TcpListener, sync::broadcast, time::Instant};
|
2019-11-26 23:04:05 -08:00
|
|
|
use tower::{
|
2020-09-21 14:00:20 -07:00
|
|
|
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
|
|
|
|
util::BoxService, Service, ServiceExt,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
2021-02-19 14:36:50 -08:00
|
|
|
use tracing::Span;
|
|
|
|
use tracing_futures::Instrument;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
use crate::{
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
|
|
|
|
BoxError, Config, Request, Response,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
|
|
|
|
2021-08-26 18:34:33 -07:00
|
|
|
use zebra_chain::{chain_tip::ChainTip, parameters::Network};
|
|
|
|
|
|
|
|
use super::{CandidateSet, PeerSet};
|
2020-07-27 21:59:32 -07:00
|
|
|
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
use peer::Client;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-06-22 14:59:06 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests;
|
|
|
|
|
2020-09-18 11:20:55 -07:00
|
|
|
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-08-26 18:34:33 -07:00
|
|
|
/// Initialize a peer set, using a network `config`, `inbound_service`,
|
2021-09-01 15:31:16 -07:00
|
|
|
/// and `latest_chain_tip`.
|
2020-09-18 12:37:01 -07:00
|
|
|
///
|
|
|
|
/// The peer set abstracts away peer management to provide a
|
|
|
|
/// [`tower::Service`] representing "the network" that load-balances requests
|
|
|
|
/// over available peers. The peer set automatically crawls the network to
|
|
|
|
/// find more peer addresses and opportunistically connects to new peers.
|
|
|
|
///
|
|
|
|
/// Each peer connection's message handling is isolated from other
|
|
|
|
/// connections, unlike in `zcashd`. The peer connection first attempts to
|
|
|
|
/// interpret inbound messages as part of a response to a previously-issued
|
|
|
|
/// request. Otherwise, inbound messages are interpreted as requests and sent
|
|
|
|
/// to the supplied `inbound_service`.
|
|
|
|
///
|
|
|
|
/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
|
|
|
|
/// cause the peer set to shrink when the inbound service is unable to keep up
|
|
|
|
/// with the volume of inbound requests.
|
|
|
|
///
|
2021-08-26 18:34:33 -07:00
|
|
|
/// Use [`NoChainTip`] to explicitly provide no chain tip receiver.
|
|
|
|
///
|
2020-09-18 12:37:01 -07:00
|
|
|
/// In addition to returning a service for outbound requests, this method
|
|
|
|
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
|
|
|
|
/// connected peers.
|
2021-08-26 18:34:33 -07:00
|
|
|
pub async fn init<S, C>(
|
2019-11-26 23:04:05 -08:00
|
|
|
config: Config,
|
|
|
|
inbound_service: S,
|
2021-09-01 15:31:16 -07:00
|
|
|
latest_chain_tip: C,
|
2019-11-26 23:04:05 -08:00
|
|
|
) -> (
|
2020-09-18 11:20:55 -07:00
|
|
|
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
2021-04-18 23:04:24 -07:00
|
|
|
Arc<std::sync::Mutex<AddressBook>>,
|
2019-11-26 23:04:05 -08:00
|
|
|
)
|
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
2021-08-26 18:34:33 -07:00
|
|
|
C: ChainTip + Clone + Send + 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
{
|
2021-06-22 14:59:06 -07:00
|
|
|
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;
|
|
|
|
|
|
|
|
let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
|
2020-09-01 14:28:54 -07:00
|
|
|
let (inv_sender, inv_receiver) = broadcast::channel(100);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Construct services that handle inbound handshakes and perform outbound
|
|
|
|
// handshakes. These use the same handshake service internally to detect
|
|
|
|
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
|
|
|
|
// enforce timeouts as specified in the Config.
|
2021-05-06 17:50:04 -07:00
|
|
|
let (listen_handshaker, outbound_connector) = {
|
2019-11-26 23:04:05 -08:00
|
|
|
use tower::timeout::TimeoutLayer;
|
2020-08-06 11:29:00 -07:00
|
|
|
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
|
2020-08-31 21:32:35 -07:00
|
|
|
use crate::protocol::external::types::PeerServices;
|
|
|
|
let hs = peer::Handshake::builder()
|
|
|
|
.with_config(config.clone())
|
|
|
|
.with_inbound_service(inbound_service)
|
2020-09-01 14:28:54 -07:00
|
|
|
.with_inventory_collector(inv_sender)
|
2020-08-31 21:32:35 -07:00
|
|
|
.with_timestamp_collector(timestamp_collector)
|
|
|
|
.with_advertised_services(PeerServices::NODE_NETWORK)
|
|
|
|
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
2021-09-01 15:31:16 -07:00
|
|
|
.with_latest_chain_tip(latest_chain_tip)
|
2020-09-01 10:55:55 -07:00
|
|
|
.want_transactions(true)
|
2020-08-31 21:32:35 -07:00
|
|
|
.finish()
|
|
|
|
.expect("configured all required parameters");
|
2019-11-26 23:04:05 -08:00
|
|
|
(
|
|
|
|
hs_timeout.layer(hs.clone()),
|
2019-11-27 11:43:59 -08:00
|
|
|
hs_timeout.layer(peer::Connector::new(hs)),
|
2019-11-26 23:04:05 -08:00
|
|
|
)
|
|
|
|
};
|
|
|
|
|
|
|
|
// Create an mpsc channel for peer changes, with a generous buffer.
|
|
|
|
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
|
|
|
|
// Create an mpsc channel for peerset demand signaling.
|
2020-02-09 20:34:53 -08:00
|
|
|
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
|
2020-06-09 12:24:28 -07:00
|
|
|
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
|
2020-06-09 12:24:28 -07:00
|
|
|
let peer_set = PeerSet::new(
|
|
|
|
PeakEwmaDiscover::new(
|
2020-09-21 14:00:20 -07:00
|
|
|
// Discover interprets an error as stream termination,
|
|
|
|
// so discard any errored connections...
|
|
|
|
peerset_rx.filter(|result| future::ready(result.is_ok())),
|
2020-08-06 11:29:00 -07:00
|
|
|
constants::EWMA_DEFAULT_RTT,
|
|
|
|
constants::EWMA_DECAY_TIME,
|
2020-09-21 14:00:20 -07:00
|
|
|
tower::load::CompleteOnResponse::default(),
|
2019-11-26 23:04:05 -08:00
|
|
|
),
|
2020-06-09 12:24:28 -07:00
|
|
|
demand_tx.clone(),
|
|
|
|
handle_rx,
|
2020-09-01 14:28:54 -07:00
|
|
|
inv_receiver,
|
2021-03-15 05:02:12 -07:00
|
|
|
address_book.clone(),
|
2019-11-26 23:04:05 -08:00
|
|
|
);
|
2020-08-11 13:07:44 -07:00
|
|
|
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-01-29 04:36:33 -08:00
|
|
|
// 1. Incoming peer connections, via a listener.
|
2021-02-19 14:36:50 -08:00
|
|
|
let listen_guard = tokio::spawn(
|
2021-06-22 14:59:06 -07:00
|
|
|
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
|
2021-05-06 17:50:04 -07:00
|
|
|
.instrument(Span::current()),
|
2021-02-19 14:36:50 -08:00
|
|
|
);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-02-02 18:20:26 -08:00
|
|
|
// 2. Initial peers, specified in the config.
|
2021-05-13 19:15:39 -07:00
|
|
|
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
|
2021-01-29 04:36:33 -08:00
|
|
|
let initial_peers_fut = {
|
2021-02-02 18:20:26 -08:00
|
|
|
let config = config.clone();
|
2021-05-06 17:50:04 -07:00
|
|
|
let outbound_connector = outbound_connector.clone();
|
2021-02-02 18:20:26 -08:00
|
|
|
let peerset_tx = peerset_tx.clone();
|
|
|
|
async move {
|
|
|
|
let initial_peers = config.initial_peers().await;
|
2021-05-13 19:15:39 -07:00
|
|
|
let _ = initial_peer_count_tx.send(initial_peers.len());
|
2021-02-02 18:20:26 -08:00
|
|
|
// Connect the tx end to the 3 peer sources:
|
2021-05-06 17:50:04 -07:00
|
|
|
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
|
2021-02-02 18:20:26 -08:00
|
|
|
}
|
|
|
|
.boxed()
|
2021-01-29 04:36:33 -08:00
|
|
|
};
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
|
2021-01-29 04:36:33 -08:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
// 3. Outgoing peers we connect to in response to load.
|
|
|
|
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
|
|
|
|
|
2020-07-05 03:58:50 -07:00
|
|
|
// We need to await candidates.update() here, because zcashd only sends one
|
2019-11-26 23:04:05 -08:00
|
|
|
// `addr` message per connection, and if we only have one initial peer we
|
|
|
|
// need to ensure that its `addr` message is used by the crawler.
|
|
|
|
|
|
|
|
info!("Sending initial request for peers");
|
2021-05-13 19:15:39 -07:00
|
|
|
let _ = candidates
|
|
|
|
.update_initial(initial_peer_count_rx.await.expect("value sent before drop"))
|
|
|
|
.await;
|
2020-02-09 20:34:53 -08:00
|
|
|
|
|
|
|
for _ in 0..config.peerset_initial_target_size {
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let crawl_guard = tokio::spawn(
|
|
|
|
crawl_and_dial(
|
2021-03-09 17:36:05 -08:00
|
|
|
config.crawl_new_peer_interval,
|
2021-02-19 14:36:50 -08:00
|
|
|
demand_tx,
|
|
|
|
demand_rx,
|
|
|
|
candidates,
|
2021-05-06 17:50:04 -07:00
|
|
|
outbound_connector,
|
2021-02-19 14:36:50 -08:00
|
|
|
peerset_tx,
|
|
|
|
)
|
|
|
|
.instrument(Span::current()),
|
|
|
|
);
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
handle_tx
|
|
|
|
.send(vec![add_guard, listen_guard, crawl_guard])
|
|
|
|
.unwrap();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
(peer_set, address_book)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Use the provided `handshaker` to connect to `initial_peers`, then send
|
|
|
|
/// the results over `tx`.
|
2021-05-06 17:50:04 -07:00
|
|
|
#[instrument(skip(initial_peers, outbound_connector, tx))]
|
2019-11-26 23:04:05 -08:00
|
|
|
async fn add_initial_peers<S>(
|
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
|
|
|
initial_peers: std::collections::HashSet<SocketAddr>,
|
2021-05-06 17:50:04 -07:00
|
|
|
outbound_connector: S,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2020-06-09 12:24:28 -07:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2021-02-16 17:01:36 -08:00
|
|
|
info!(?initial_peers, "connecting to initial peer set");
|
2021-06-21 19:16:59 -07:00
|
|
|
// # Security
|
2021-02-14 17:43:49 -08:00
|
|
|
//
|
2021-06-21 19:16:59 -07:00
|
|
|
// TODO: rate-limit initial seed peer connections (#2326)
|
|
|
|
//
|
|
|
|
// # Correctness
|
|
|
|
//
|
|
|
|
// Each `FuturesUnordered` can hold one `Buffer` or `Batch` reservation for
|
|
|
|
// an indefinite period. We can use `FuturesUnordered` without filling
|
|
|
|
// the underlying network buffers, because we immediately drive this
|
|
|
|
// single `FuturesUnordered` to completion, and handshakes have a short timeout.
|
2021-05-17 13:49:16 -07:00
|
|
|
let mut handshakes: FuturesUnordered<_> = initial_peers
|
|
|
|
.into_iter()
|
|
|
|
.map(|addr| {
|
|
|
|
outbound_connector
|
|
|
|
.clone()
|
|
|
|
.oneshot(addr)
|
|
|
|
.map_err(move |e| (addr, e))
|
|
|
|
})
|
|
|
|
.collect();
|
2020-06-09 12:24:28 -07:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
while let Some(handshake_result) = handshakes.next().await {
|
2021-02-16 17:01:36 -08:00
|
|
|
// this is verbose, but it's better than just hanging with no output
|
2021-05-17 13:49:16 -07:00
|
|
|
if let Err((addr, ref e)) = handshake_result {
|
|
|
|
info!(?addr, ?e, "an initial peer connection failed");
|
2021-02-16 17:01:36 -08:00
|
|
|
}
|
2021-05-17 13:49:16 -07:00
|
|
|
tx.send(handshake_result.map_err(|(_addr, e)| e)).await?;
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
Ok(())
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
|
2021-06-22 14:59:06 -07:00
|
|
|
/// Open a peer connection listener on `config.listen_addr`,
|
|
|
|
/// returning the opened [`TcpListener`], and the address it is bound to.
|
2021-05-06 17:50:04 -07:00
|
|
|
///
|
2021-06-22 14:59:06 -07:00
|
|
|
/// If the listener is configured to use an automatically chosen port (port `0`),
|
|
|
|
/// then the returned address will contain the actual port.
|
|
|
|
///
|
|
|
|
/// # Panics
|
|
|
|
///
|
|
|
|
/// If opening the listener fails.
|
|
|
|
#[instrument(skip(config), fields(addr = ?config.listen_addr))]
|
|
|
|
async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
|
|
|
|
// Warn if we're configured using the wrong network port.
|
|
|
|
use Network::*;
|
|
|
|
let wrong_net = match config.network {
|
|
|
|
Mainnet => Testnet,
|
|
|
|
Testnet => Mainnet,
|
|
|
|
};
|
|
|
|
if config.listen_addr.port() == wrong_net.default_port() {
|
|
|
|
warn!(
|
|
|
|
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
|
|
|
|
config.listen_addr.port(),
|
|
|
|
config.network,
|
|
|
|
wrong_net
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
info!(
|
|
|
|
"Trying to open Zcash protocol endpoint at {}...",
|
|
|
|
config.listen_addr
|
|
|
|
);
|
|
|
|
let listener_result = TcpListener::bind(config.listen_addr).await;
|
2021-01-29 04:36:33 -08:00
|
|
|
|
|
|
|
let listener = match listener_result {
|
|
|
|
Ok(l) => l,
|
|
|
|
Err(e) => panic!(
|
|
|
|
"Opening Zcash network protocol listener {:?} failed: {:?}. \
|
|
|
|
Hint: Check if another zebrad or zcashd process is running. \
|
|
|
|
Try changing the network listen_addr in the Zebra config.",
|
2021-06-22 14:59:06 -07:00
|
|
|
config.listen_addr, e,
|
2021-01-29 04:36:33 -08:00
|
|
|
),
|
|
|
|
};
|
|
|
|
|
2021-06-22 14:59:06 -07:00
|
|
|
let local_addr = listener
|
|
|
|
.local_addr()
|
|
|
|
.expect("unexpected missing local addr for open listener");
|
2020-08-12 14:22:54 -07:00
|
|
|
info!("Opened Zcash protocol endpoint at {}", local_addr);
|
2021-06-22 14:59:06 -07:00
|
|
|
|
|
|
|
(listener, local_addr)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Listens for peer connections on `addr`, then sets up each connection as a
|
|
|
|
/// Zcash peer.
|
|
|
|
///
|
|
|
|
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
|
|
|
|
/// the [`Client`][peer::Client] result over `tx`.
|
|
|
|
#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))]
|
|
|
|
async fn accept_inbound_connections<S>(
|
|
|
|
listener: TcpListener,
|
|
|
|
mut handshaker: S,
|
|
|
|
tx: mpsc::Sender<PeerChange>,
|
|
|
|
) -> Result<(), BoxError>
|
|
|
|
where
|
|
|
|
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
|
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2019-11-26 23:04:05 -08:00
|
|
|
loop {
|
|
|
|
if let Ok((tcp_stream, addr)) = listener.accept().await {
|
2021-05-17 13:49:16 -07:00
|
|
|
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
|
|
|
|
let accept_span = info_span!("listen_accept", peer = ?connected_addr);
|
|
|
|
let _guard = accept_span.enter();
|
|
|
|
|
|
|
|
debug!("got incoming connection");
|
2020-05-26 18:00:58 -07:00
|
|
|
handshaker.ready_and().await?;
|
2021-05-06 17:50:04 -07:00
|
|
|
// TODO: distinguish between proxied listeners and direct listeners
|
2021-05-17 13:49:16 -07:00
|
|
|
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);
|
2019-11-26 23:04:05 -08:00
|
|
|
// Construct a handshake future but do not drive it yet....
|
2021-05-06 17:50:04 -07:00
|
|
|
let handshake = handshaker.call((tcp_stream, connected_addr));
|
2019-11-26 23:04:05 -08:00
|
|
|
// ... instead, spawn a new task to handle this connection
|
|
|
|
let mut tx2 = tx.clone();
|
2021-05-17 13:49:16 -07:00
|
|
|
tokio::spawn(
|
|
|
|
async move {
|
|
|
|
if let Ok(client) = handshake.await {
|
|
|
|
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
|
|
|
|
}
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
2021-05-17 13:49:16 -07:00
|
|
|
.instrument(handshaker_span),
|
|
|
|
);
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
/// An action that the peer crawler can take.
|
|
|
|
#[allow(dead_code)]
|
|
|
|
enum CrawlerAction {
|
|
|
|
/// Drop the demand signal because there are too many pending handshakes.
|
|
|
|
DemandDrop,
|
|
|
|
/// Initiate a handshake to `candidate` in response to demand.
|
|
|
|
DemandHandshake { candidate: MetaAddr },
|
|
|
|
/// Crawl existing peers for more peers in response to demand, because there
|
|
|
|
/// are no available candidates.
|
|
|
|
DemandCrawl,
|
|
|
|
/// Crawl existing peers for more peers in response to a timer `tick`.
|
|
|
|
TimerCrawl { tick: Instant },
|
|
|
|
/// Handle a successfully connected handshake `peer_set_change`.
|
|
|
|
HandshakeConnected {
|
|
|
|
peer_set_change: Change<SocketAddr, Client>,
|
|
|
|
},
|
|
|
|
/// Handle a handshake failure to `failed_addr`.
|
|
|
|
HandshakeFailed { failed_addr: MetaAddr },
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Given a channel `demand_rx` that signals a need for new peers, try to find
|
|
|
|
/// and connect to new peers, and send the resulting `peer::Client`s through the
|
|
|
|
/// `success_tx` channel.
|
|
|
|
///
|
|
|
|
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
|
|
|
|
/// demand, but no new peers in `candidates`. After crawling, try to connect to
|
2021-05-06 17:50:04 -07:00
|
|
|
/// one new peer using `outbound_connector`.
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
///
|
|
|
|
/// If a handshake fails, restore the unused demand signal by sending it to
|
|
|
|
/// `demand_tx`.
|
|
|
|
///
|
|
|
|
/// The crawler terminates when `candidates.update()` or `success_tx` returns a
|
|
|
|
/// permanent internal error. Transient errors and individual peer errors should
|
|
|
|
/// be handled within the crawler.
|
2021-05-06 17:50:04 -07:00
|
|
|
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))]
|
2019-11-26 23:04:05 -08:00
|
|
|
async fn crawl_and_dial<C, S>(
|
2021-03-09 17:36:05 -08:00
|
|
|
crawl_new_peer_interval: std::time::Duration,
|
2020-02-09 20:34:53 -08:00
|
|
|
mut demand_tx: mpsc::Sender<()>,
|
|
|
|
mut demand_rx: mpsc::Receiver<()>,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut candidates: CandidateSet<S>,
|
2021-05-06 17:50:04 -07:00
|
|
|
outbound_connector: C,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut success_tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2019-11-26 23:04:05 -08:00
|
|
|
where
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
|
|
|
|
+ Clone
|
|
|
|
+ Send
|
|
|
|
+ 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
C::Future: Send + 'static,
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError>,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
use CrawlerAction::*;
|
|
|
|
|
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// To avoid hangs and starvation, the crawler must:
|
|
|
|
// - spawn a separate task for each crawl and handshake, so they can make
|
|
|
|
// progress independently (and avoid deadlocking each other)
|
|
|
|
// - use the `select!` macro for all actions, because the `select` function
|
|
|
|
// is biased towards the first ready future
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
let mut handshakes = FuturesUnordered::new();
|
2020-02-09 20:34:53 -08:00
|
|
|
// <FuturesUnordered as Stream> returns None when empty.
|
|
|
|
// Keeping an unresolved future in the pool means the stream
|
|
|
|
// never terminates.
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse`
|
|
|
|
// prevents us from adding items to the stream and checking its length.
|
2020-02-09 20:34:53 -08:00
|
|
|
handshakes.push(future::pending().boxed());
|
2019-11-26 23:04:05 -08:00
|
|
|
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
let mut crawl_timer =
|
|
|
|
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-02-09 20:34:53 -08:00
|
|
|
loop {
|
2021-01-11 18:28:56 -08:00
|
|
|
metrics::gauge!(
|
|
|
|
"crawler.in_flight_handshakes",
|
|
|
|
handshakes
|
|
|
|
.len()
|
|
|
|
.checked_sub(1)
|
|
|
|
.expect("the pool always contains an unresolved future") as f64
|
|
|
|
);
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
|
|
|
|
let crawler_action = tokio::select! {
|
2021-04-13 17:16:47 -07:00
|
|
|
next_handshake_res = handshakes.next() => next_handshake_res.expect(
|
|
|
|
"handshakes never terminates, because it contains a future that never resolves"
|
|
|
|
),
|
|
|
|
next_timer = crawl_timer.next() => next_timer.expect("timers never terminate"),
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// turn the demand into an action, based on the crawler's current state
|
|
|
|
_ = demand_rx.next() => {
|
2020-02-09 20:34:53 -08:00
|
|
|
if handshakes.len() > 50 {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// Too many pending handshakes already
|
|
|
|
DemandDrop
|
|
|
|
} else if let Some(candidate) = candidates.next().await {
|
|
|
|
// candidates.next has a short delay, and briefly holds the address
|
|
|
|
// book lock, so it shouldn't hang
|
|
|
|
DemandHandshake { candidate }
|
2020-02-09 20:34:53 -08:00
|
|
|
} else {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
DemandCrawl
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
match crawler_action {
|
|
|
|
DemandDrop => {
|
|
|
|
// This is set to trace level because when the peerset is
|
|
|
|
// congested it can generate a lot of demand signal very
|
|
|
|
// rapidly.
|
|
|
|
trace!("too many in-flight handshakes, dropping demand signal");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
DemandHandshake { candidate } => {
|
|
|
|
// spawn each handshake into an independent task, so it can make
|
|
|
|
// progress independently of the crawls
|
2021-05-17 13:49:16 -07:00
|
|
|
let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone()))
|
|
|
|
.map(move |res| match res {
|
|
|
|
Ok(crawler_action) => crawler_action,
|
|
|
|
Err(e) => {
|
|
|
|
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
}
|
2021-05-17 13:49:16 -07:00
|
|
|
})
|
|
|
|
.instrument(Span::current());
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
handshakes.push(Box::pin(hs_join));
|
|
|
|
}
|
|
|
|
DemandCrawl => {
|
|
|
|
debug!("demand for peers but no available candidates");
|
|
|
|
// update has timeouts, and briefly holds the address book
|
|
|
|
// lock, so it shouldn't hang
|
|
|
|
//
|
|
|
|
// TODO: refactor candidates into a buffered service, so we can
|
|
|
|
// spawn independent tasks to avoid deadlocks
|
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
TimerCrawl { tick } => {
|
|
|
|
debug!(
|
|
|
|
?tick,
|
|
|
|
"crawling for more peers in response to the crawl timer"
|
|
|
|
);
|
|
|
|
// TODO: spawn independent tasks to avoid deadlocks
|
2020-02-09 20:34:53 -08:00
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
HandshakeConnected { peer_set_change } => {
|
|
|
|
if let Change::Insert(ref addr, _) = peer_set_change {
|
2020-02-09 20:34:53 -08:00
|
|
|
debug!(candidate.addr = ?addr, "successfully dialed new peer");
|
2020-12-13 17:00:39 -08:00
|
|
|
} else {
|
|
|
|
unreachable!("unexpected handshake result: all changes should be Insert");
|
2020-02-09 20:34:53 -08:00
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// successes are handled by an independent task, so they
|
|
|
|
// shouldn't hang
|
|
|
|
success_tx.send(Ok(peer_set_change)).await?;
|
2020-02-09 20:34:53 -08:00
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
HandshakeFailed { failed_addr } => {
|
|
|
|
debug!(?failed_addr.addr, "marking candidate as failed");
|
|
|
|
candidates.report_failed(&failed_addr);
|
2020-02-09 20:34:53 -08:00
|
|
|
// The demand signal that was taken out of the queue
|
|
|
|
// to attempt to connect to the failed candidate never
|
|
|
|
// turned into a connection, so add it back:
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
|
2021-05-06 17:50:04 -07:00
|
|
|
/// Try to connect to `candidate` using `outbound_connector`.
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
///
|
|
|
|
/// Returns a `HandshakeConnected` action on success, and a
|
|
|
|
/// `HandshakeFailed` action on error.
|
2021-05-06 17:50:04 -07:00
|
|
|
#[instrument(skip(outbound_connector,))]
|
|
|
|
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
where
|
|
|
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
|
|
|
|
+ Clone
|
|
|
|
+ Send
|
|
|
|
+ 'static,
|
|
|
|
C::Future: Send + 'static,
|
|
|
|
{
|
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// To avoid hangs, the dialer must only await:
|
|
|
|
// - functions that return immediately, or
|
|
|
|
// - functions that have a reasonable timeout
|
|
|
|
|
|
|
|
debug!(?candidate.addr, "attempting outbound connection in response to demand");
|
|
|
|
|
|
|
|
// the connector is always ready, so this can't hang
|
2021-05-06 17:50:04 -07:00
|
|
|
let outbound_connector = outbound_connector
|
|
|
|
.ready_and()
|
|
|
|
.await
|
|
|
|
.expect("outbound connector never errors");
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
|
|
|
|
// the handshake has timeouts, so it shouldn't hang
|
2021-05-06 17:50:04 -07:00
|
|
|
outbound_connector
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
.call(candidate.addr)
|
2021-04-13 00:46:17 -07:00
|
|
|
.map_err(|e| (candidate, e))
|
|
|
|
.map(Into::into)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>> for CrawlerAction {
|
|
|
|
fn from(dial_result: Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>) -> Self {
|
|
|
|
use CrawlerAction::*;
|
|
|
|
match dial_result {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
|
2021-04-13 00:46:17 -07:00
|
|
|
Err((candidate, e)) => {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
debug!(?candidate.addr, ?e, "failed to connect to candidate");
|
|
|
|
HandshakeFailed {
|
|
|
|
failed_addr: candidate,
|
|
|
|
}
|
|
|
|
}
|
2021-04-13 00:46:17 -07:00
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
}
|