2019-11-26 23:04:05 -08:00
|
|
|
//! A peer set whose size is dynamically determined by resource constraints.
|
|
|
|
|
|
|
|
// Portions of this submodule were adapted from tower-balance,
|
|
|
|
// which is (c) 2019 Tower Contributors (MIT licensed).
|
|
|
|
|
|
|
|
use std::{
|
|
|
|
net::SocketAddr,
|
|
|
|
sync::{Arc, Mutex},
|
|
|
|
};
|
|
|
|
|
|
|
|
use futures::{
|
|
|
|
channel::mpsc,
|
2020-08-11 13:07:44 -07:00
|
|
|
future::{self, FutureExt},
|
2019-11-26 23:04:05 -08:00
|
|
|
sink::SinkExt,
|
|
|
|
stream::{FuturesUnordered, StreamExt},
|
2021-04-13 00:46:17 -07:00
|
|
|
TryFutureExt,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
2020-09-01 14:28:54 -07:00
|
|
|
use tokio::{
|
|
|
|
net::{TcpListener, TcpStream},
|
|
|
|
sync::broadcast,
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
time::Instant,
|
2020-09-01 14:28:54 -07:00
|
|
|
};
|
2019-11-26 23:04:05 -08:00
|
|
|
use tower::{
|
2020-09-21 14:00:20 -07:00
|
|
|
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
|
|
|
|
util::BoxService, Service, ServiceExt,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
2021-02-19 14:36:50 -08:00
|
|
|
use tracing::Span;
|
|
|
|
use tracing_futures::Instrument;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
use crate::{
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
|
|
|
|
BoxError, Config, Request, Response,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
|
|
|
|
2020-08-15 15:45:37 -07:00
|
|
|
use zebra_chain::parameters::Network;
|
2020-07-27 21:59:32 -07:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
use super::CandidateSet;
|
|
|
|
use super::PeerSet;
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
use peer::Client;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-09-18 11:20:55 -07:00
|
|
|
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-09-18 12:37:01 -07:00
|
|
|
/// Initialize a peer set.
|
|
|
|
///
|
|
|
|
/// The peer set abstracts away peer management to provide a
|
|
|
|
/// [`tower::Service`] representing "the network" that load-balances requests
|
|
|
|
/// over available peers. The peer set automatically crawls the network to
|
|
|
|
/// find more peer addresses and opportunistically connects to new peers.
|
|
|
|
///
|
|
|
|
/// Each peer connection's message handling is isolated from other
|
|
|
|
/// connections, unlike in `zcashd`. The peer connection first attempts to
|
|
|
|
/// interpret inbound messages as part of a response to a previously-issued
|
|
|
|
/// request. Otherwise, inbound messages are interpreted as requests and sent
|
|
|
|
/// to the supplied `inbound_service`.
|
|
|
|
///
|
|
|
|
/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
|
|
|
|
/// cause the peer set to shrink when the inbound service is unable to keep up
|
|
|
|
/// with the volume of inbound requests.
|
|
|
|
///
|
|
|
|
/// In addition to returning a service for outbound requests, this method
|
|
|
|
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
|
|
|
|
/// connected peers.
|
2019-11-26 23:04:05 -08:00
|
|
|
pub async fn init<S>(
|
|
|
|
config: Config,
|
|
|
|
inbound_service: S,
|
|
|
|
) -> (
|
2020-09-18 11:20:55 -07:00
|
|
|
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
2019-11-26 23:04:05 -08:00
|
|
|
Arc<Mutex<AddressBook>>,
|
|
|
|
)
|
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
|
|
|
let (address_book, timestamp_collector) = TimestampCollector::spawn();
|
2020-09-01 14:28:54 -07:00
|
|
|
let (inv_sender, inv_receiver) = broadcast::channel(100);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Construct services that handle inbound handshakes and perform outbound
|
|
|
|
// handshakes. These use the same handshake service internally to detect
|
|
|
|
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
|
|
|
|
// enforce timeouts as specified in the Config.
|
|
|
|
let (listener, connector) = {
|
|
|
|
use tower::timeout::TimeoutLayer;
|
2020-08-06 11:29:00 -07:00
|
|
|
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
|
2020-08-31 21:32:35 -07:00
|
|
|
use crate::protocol::external::types::PeerServices;
|
|
|
|
let hs = peer::Handshake::builder()
|
|
|
|
.with_config(config.clone())
|
|
|
|
.with_inbound_service(inbound_service)
|
2020-09-01 14:28:54 -07:00
|
|
|
.with_inventory_collector(inv_sender)
|
2020-08-31 21:32:35 -07:00
|
|
|
.with_timestamp_collector(timestamp_collector)
|
|
|
|
.with_advertised_services(PeerServices::NODE_NETWORK)
|
|
|
|
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
2020-09-01 10:55:55 -07:00
|
|
|
.want_transactions(true)
|
2020-08-31 21:32:35 -07:00
|
|
|
.finish()
|
|
|
|
.expect("configured all required parameters");
|
2019-11-26 23:04:05 -08:00
|
|
|
(
|
|
|
|
hs_timeout.layer(hs.clone()),
|
2019-11-27 11:43:59 -08:00
|
|
|
hs_timeout.layer(peer::Connector::new(hs)),
|
2019-11-26 23:04:05 -08:00
|
|
|
)
|
|
|
|
};
|
|
|
|
|
|
|
|
// Create an mpsc channel for peer changes, with a generous buffer.
|
|
|
|
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
|
|
|
|
// Create an mpsc channel for peerset demand signaling.
|
2020-02-09 20:34:53 -08:00
|
|
|
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
|
2020-06-09 12:24:28 -07:00
|
|
|
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
|
2020-06-09 12:24:28 -07:00
|
|
|
let peer_set = PeerSet::new(
|
|
|
|
PeakEwmaDiscover::new(
|
2020-09-21 14:00:20 -07:00
|
|
|
// Discover interprets an error as stream termination,
|
|
|
|
// so discard any errored connections...
|
|
|
|
peerset_rx.filter(|result| future::ready(result.is_ok())),
|
2020-08-06 11:29:00 -07:00
|
|
|
constants::EWMA_DEFAULT_RTT,
|
|
|
|
constants::EWMA_DECAY_TIME,
|
2020-09-21 14:00:20 -07:00
|
|
|
tower::load::CompleteOnResponse::default(),
|
2019-11-26 23:04:05 -08:00
|
|
|
),
|
2020-06-09 12:24:28 -07:00
|
|
|
demand_tx.clone(),
|
|
|
|
handle_rx,
|
2020-09-01 14:28:54 -07:00
|
|
|
inv_receiver,
|
2021-03-15 05:02:12 -07:00
|
|
|
address_book.clone(),
|
2019-11-26 23:04:05 -08:00
|
|
|
);
|
2020-08-11 13:07:44 -07:00
|
|
|
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-01-29 04:36:33 -08:00
|
|
|
// 1. Incoming peer connections, via a listener.
|
2020-07-27 21:59:32 -07:00
|
|
|
|
|
|
|
// Warn if we're configured using the wrong network port.
|
|
|
|
// TODO: use the right port if the port is unspecified
|
|
|
|
// split the address and port configs?
|
|
|
|
let (wrong_net, wrong_net_port) = match config.network {
|
2020-08-15 15:45:37 -07:00
|
|
|
Network::Mainnet => (Network::Testnet, 18233),
|
|
|
|
Network::Testnet => (Network::Mainnet, 8233),
|
2020-07-27 21:59:32 -07:00
|
|
|
};
|
|
|
|
if config.listen_addr.port() == wrong_net_port {
|
|
|
|
warn!(
|
|
|
|
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
|
|
|
|
config.listen_addr.port(),
|
|
|
|
config.network,
|
|
|
|
wrong_net
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let listen_guard = tokio::spawn(
|
|
|
|
listen(config.listen_addr, listener, peerset_tx.clone()).instrument(Span::current()),
|
|
|
|
);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-02-02 18:20:26 -08:00
|
|
|
// 2. Initial peers, specified in the config.
|
2021-01-29 04:36:33 -08:00
|
|
|
let initial_peers_fut = {
|
2021-02-02 18:20:26 -08:00
|
|
|
let config = config.clone();
|
2021-01-29 04:36:33 -08:00
|
|
|
let connector = connector.clone();
|
2021-02-02 18:20:26 -08:00
|
|
|
let peerset_tx = peerset_tx.clone();
|
|
|
|
async move {
|
|
|
|
let initial_peers = config.initial_peers().await;
|
|
|
|
// Connect the tx end to the 3 peer sources:
|
|
|
|
add_initial_peers(initial_peers, connector, peerset_tx).await
|
|
|
|
}
|
|
|
|
.boxed()
|
2021-01-29 04:36:33 -08:00
|
|
|
};
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
|
2021-01-29 04:36:33 -08:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
// 3. Outgoing peers we connect to in response to load.
|
|
|
|
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
|
|
|
|
|
2020-07-05 03:58:50 -07:00
|
|
|
// We need to await candidates.update() here, because zcashd only sends one
|
2019-11-26 23:04:05 -08:00
|
|
|
// `addr` message per connection, and if we only have one initial peer we
|
|
|
|
// need to ensure that its `addr` message is used by the crawler.
|
|
|
|
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>
|
|
|
|
|
|
|
|
info!("Sending initial request for peers");
|
2020-06-22 19:19:27 -07:00
|
|
|
let _ = candidates.update().await;
|
2020-02-09 20:34:53 -08:00
|
|
|
|
|
|
|
for _ in 0..config.peerset_initial_target_size {
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let crawl_guard = tokio::spawn(
|
|
|
|
crawl_and_dial(
|
2021-03-09 17:36:05 -08:00
|
|
|
config.crawl_new_peer_interval,
|
2021-02-19 14:36:50 -08:00
|
|
|
demand_tx,
|
|
|
|
demand_rx,
|
|
|
|
candidates,
|
|
|
|
connector,
|
|
|
|
peerset_tx,
|
|
|
|
)
|
|
|
|
.instrument(Span::current()),
|
|
|
|
);
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
handle_tx
|
|
|
|
.send(vec![add_guard, listen_guard, crawl_guard])
|
|
|
|
.unwrap();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
(peer_set, address_book)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Use the provided `handshaker` to connect to `initial_peers`, then send
|
|
|
|
/// the results over `tx`.
|
|
|
|
#[instrument(skip(initial_peers, connector, tx))]
|
|
|
|
async fn add_initial_peers<S>(
|
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
|
|
|
initial_peers: std::collections::HashSet<SocketAddr>,
|
2019-11-26 23:04:05 -08:00
|
|
|
connector: S,
|
|
|
|
mut tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2020-06-09 12:24:28 -07:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2021-02-16 17:01:36 -08:00
|
|
|
info!(?initial_peers, "connecting to initial peer set");
|
2021-02-14 17:43:49 -08:00
|
|
|
// ## Correctness:
|
|
|
|
//
|
|
|
|
// Each `CallAll` can hold one `Buffer` or `Batch` reservation for
|
|
|
|
// an indefinite period. We can use `CallAllUnordered` without filling
|
|
|
|
// the underlying `Inbound` buffer, because we immediately drive this
|
|
|
|
// single `CallAll` to completion, and handshakes have a short timeout.
|
2021-02-14 17:31:59 -08:00
|
|
|
use tower::util::CallAllUnordered;
|
|
|
|
let addr_stream = futures::stream::iter(initial_peers.into_iter());
|
|
|
|
let mut handshakes = CallAllUnordered::new(connector, addr_stream);
|
2020-06-09 12:24:28 -07:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
while let Some(handshake_result) = handshakes.next().await {
|
2021-02-16 17:01:36 -08:00
|
|
|
// this is verbose, but it's better than just hanging with no output
|
|
|
|
if let Err(ref e) = handshake_result {
|
|
|
|
info!(?e, "an initial peer connection failed");
|
|
|
|
}
|
2020-06-09 12:24:28 -07:00
|
|
|
tx.send(handshake_result).await?;
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
Ok(())
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Bind to `addr`, listen for peers using `handshaker`, then send the
|
|
|
|
/// results over `tx`.
|
|
|
|
#[instrument(skip(tx, handshaker))]
|
|
|
|
async fn listen<S>(
|
|
|
|
addr: SocketAddr,
|
|
|
|
mut handshaker: S,
|
|
|
|
tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2019-11-26 23:04:05 -08:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2021-02-18 07:15:09 -08:00
|
|
|
info!("Trying to open Zcash protocol endpoint at {}...", addr);
|
2021-01-29 04:36:33 -08:00
|
|
|
let listener_result = TcpListener::bind(addr).await;
|
|
|
|
|
|
|
|
let listener = match listener_result {
|
|
|
|
Ok(l) => l,
|
|
|
|
Err(e) => panic!(
|
|
|
|
"Opening Zcash network protocol listener {:?} failed: {:?}. \
|
|
|
|
Hint: Check if another zebrad or zcashd process is running. \
|
|
|
|
Try changing the network listen_addr in the Zebra config.",
|
|
|
|
addr, e,
|
|
|
|
),
|
|
|
|
};
|
|
|
|
|
2020-08-10 12:47:26 -07:00
|
|
|
let local_addr = listener.local_addr()?;
|
2020-08-12 14:22:54 -07:00
|
|
|
info!("Opened Zcash protocol endpoint at {}", local_addr);
|
2019-11-26 23:04:05 -08:00
|
|
|
loop {
|
|
|
|
if let Ok((tcp_stream, addr)) = listener.accept().await {
|
|
|
|
debug!(?addr, "got incoming connection");
|
2020-05-26 18:00:58 -07:00
|
|
|
handshaker.ready_and().await?;
|
2019-11-26 23:04:05 -08:00
|
|
|
// Construct a handshake future but do not drive it yet....
|
|
|
|
let handshake = handshaker.call((tcp_stream, addr));
|
|
|
|
// ... instead, spawn a new task to handle this connection
|
|
|
|
let mut tx2 = tx.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
if let Ok(client) = handshake.await {
|
|
|
|
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
/// An action that the peer crawler can take.
|
|
|
|
#[allow(dead_code)]
|
|
|
|
enum CrawlerAction {
|
|
|
|
/// Drop the demand signal because there are too many pending handshakes.
|
|
|
|
DemandDrop,
|
|
|
|
/// Initiate a handshake to `candidate` in response to demand.
|
|
|
|
DemandHandshake { candidate: MetaAddr },
|
|
|
|
/// Crawl existing peers for more peers in response to demand, because there
|
|
|
|
/// are no available candidates.
|
|
|
|
DemandCrawl,
|
|
|
|
/// Crawl existing peers for more peers in response to a timer `tick`.
|
|
|
|
TimerCrawl { tick: Instant },
|
|
|
|
/// Handle a successfully connected handshake `peer_set_change`.
|
|
|
|
HandshakeConnected {
|
|
|
|
peer_set_change: Change<SocketAddr, Client>,
|
|
|
|
},
|
|
|
|
/// Handle a handshake failure to `failed_addr`.
|
|
|
|
HandshakeFailed { failed_addr: MetaAddr },
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Given a channel `demand_rx` that signals a need for new peers, try to find
|
|
|
|
/// and connect to new peers, and send the resulting `peer::Client`s through the
|
|
|
|
/// `success_tx` channel.
|
|
|
|
///
|
|
|
|
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
|
|
|
|
/// demand, but no new peers in `candidates`. After crawling, try to connect to
|
|
|
|
/// one new peer using `connector`.
|
|
|
|
///
|
|
|
|
/// If a handshake fails, restore the unused demand signal by sending it to
|
|
|
|
/// `demand_tx`.
|
|
|
|
///
|
|
|
|
/// The crawler terminates when `candidates.update()` or `success_tx` returns a
|
|
|
|
/// permanent internal error. Transient errors and individual peer errors should
|
|
|
|
/// be handled within the crawler.
|
|
|
|
#[instrument(skip(demand_tx, demand_rx, candidates, connector, success_tx))]
|
2019-11-26 23:04:05 -08:00
|
|
|
async fn crawl_and_dial<C, S>(
|
2021-03-09 17:36:05 -08:00
|
|
|
crawl_new_peer_interval: std::time::Duration,
|
2020-02-09 20:34:53 -08:00
|
|
|
mut demand_tx: mpsc::Sender<()>,
|
|
|
|
mut demand_rx: mpsc::Receiver<()>,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut candidates: CandidateSet<S>,
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
connector: C,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut success_tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2019-11-26 23:04:05 -08:00
|
|
|
where
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
|
|
|
|
+ Clone
|
|
|
|
+ Send
|
|
|
|
+ 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
C::Future: Send + 'static,
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError>,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
use CrawlerAction::*;
|
|
|
|
|
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// To avoid hangs and starvation, the crawler must:
|
|
|
|
// - spawn a separate task for each crawl and handshake, so they can make
|
|
|
|
// progress independently (and avoid deadlocking each other)
|
|
|
|
// - use the `select!` macro for all actions, because the `select` function
|
|
|
|
// is biased towards the first ready future
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
let mut handshakes = FuturesUnordered::new();
|
2020-02-09 20:34:53 -08:00
|
|
|
// <FuturesUnordered as Stream> returns None when empty.
|
|
|
|
// Keeping an unresolved future in the pool means the stream
|
|
|
|
// never terminates.
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse`
|
|
|
|
// prevents us from adding items to the stream and checking its length.
|
2020-02-09 20:34:53 -08:00
|
|
|
handshakes.push(future::pending().boxed());
|
2019-11-26 23:04:05 -08:00
|
|
|
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
let mut crawl_timer =
|
|
|
|
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-02-09 20:34:53 -08:00
|
|
|
loop {
|
2021-01-11 18:28:56 -08:00
|
|
|
metrics::gauge!(
|
|
|
|
"crawler.in_flight_handshakes",
|
|
|
|
handshakes
|
|
|
|
.len()
|
|
|
|
.checked_sub(1)
|
|
|
|
.expect("the pool always contains an unresolved future") as f64
|
|
|
|
);
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
|
|
|
|
let crawler_action = tokio::select! {
|
|
|
|
a = handshakes.next() => a.expect("handshakes never terminates, because it contains a future that never resolves"),
|
|
|
|
a = crawl_timer.next() => a.expect("timers never terminate"),
|
|
|
|
// turn the demand into an action, based on the crawler's current state
|
|
|
|
_ = demand_rx.next() => {
|
2020-02-09 20:34:53 -08:00
|
|
|
if handshakes.len() > 50 {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// Too many pending handshakes already
|
|
|
|
DemandDrop
|
|
|
|
} else if let Some(candidate) = candidates.next().await {
|
|
|
|
// candidates.next has a short delay, and briefly holds the address
|
|
|
|
// book lock, so it shouldn't hang
|
|
|
|
DemandHandshake { candidate }
|
2020-02-09 20:34:53 -08:00
|
|
|
} else {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
DemandCrawl
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
match crawler_action {
|
|
|
|
DemandDrop => {
|
|
|
|
// This is set to trace level because when the peerset is
|
|
|
|
// congested it can generate a lot of demand signal very
|
|
|
|
// rapidly.
|
|
|
|
trace!("too many in-flight handshakes, dropping demand signal");
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
DemandHandshake { candidate } => {
|
|
|
|
// spawn each handshake into an independent task, so it can make
|
|
|
|
// progress independently of the crawls
|
|
|
|
let hs_join =
|
|
|
|
tokio::spawn(dial(candidate, connector.clone())).map(move |res| match res {
|
|
|
|
Ok(crawler_action) => crawler_action,
|
|
|
|
Err(e) => {
|
|
|
|
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
handshakes.push(Box::pin(hs_join));
|
|
|
|
}
|
|
|
|
DemandCrawl => {
|
|
|
|
debug!("demand for peers but no available candidates");
|
|
|
|
// update has timeouts, and briefly holds the address book
|
|
|
|
// lock, so it shouldn't hang
|
|
|
|
//
|
|
|
|
// TODO: refactor candidates into a buffered service, so we can
|
|
|
|
// spawn independent tasks to avoid deadlocks
|
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
TimerCrawl { tick } => {
|
|
|
|
debug!(
|
|
|
|
?tick,
|
|
|
|
"crawling for more peers in response to the crawl timer"
|
|
|
|
);
|
|
|
|
// TODO: spawn independent tasks to avoid deadlocks
|
2020-02-09 20:34:53 -08:00
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
HandshakeConnected { peer_set_change } => {
|
|
|
|
if let Change::Insert(ref addr, _) = peer_set_change {
|
2020-02-09 20:34:53 -08:00
|
|
|
debug!(candidate.addr = ?addr, "successfully dialed new peer");
|
2020-12-13 17:00:39 -08:00
|
|
|
} else {
|
|
|
|
unreachable!("unexpected handshake result: all changes should be Insert");
|
2020-02-09 20:34:53 -08:00
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
// successes are handled by an independent task, so they
|
|
|
|
// shouldn't hang
|
|
|
|
success_tx.send(Ok(peer_set_change)).await?;
|
2020-02-09 20:34:53 -08:00
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
HandshakeFailed { failed_addr } => {
|
|
|
|
debug!(?failed_addr.addr, "marking candidate as failed");
|
|
|
|
candidates.report_failed(&failed_addr);
|
2020-02-09 20:34:53 -08:00
|
|
|
// The demand signal that was taken out of the queue
|
|
|
|
// to attempt to connect to the failed candidate never
|
|
|
|
// turned into a connection, so add it back:
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
|
|
|
|
/// Try to connect to `candidate` using `connector`.
|
|
|
|
///
|
|
|
|
/// Returns a `HandshakeConnected` action on success, and a
|
|
|
|
/// `HandshakeFailed` action on error.
|
|
|
|
#[instrument(skip(connector,))]
|
|
|
|
async fn dial<C>(candidate: MetaAddr, mut connector: C) -> CrawlerAction
|
|
|
|
where
|
|
|
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
|
|
|
|
+ Clone
|
|
|
|
+ Send
|
|
|
|
+ 'static,
|
|
|
|
C::Future: Send + 'static,
|
|
|
|
{
|
|
|
|
// CORRECTNESS
|
|
|
|
//
|
|
|
|
// To avoid hangs, the dialer must only await:
|
|
|
|
// - functions that return immediately, or
|
|
|
|
// - functions that have a reasonable timeout
|
|
|
|
|
|
|
|
debug!(?candidate.addr, "attempting outbound connection in response to demand");
|
|
|
|
|
|
|
|
// the connector is always ready, so this can't hang
|
|
|
|
let connector = connector.ready_and().await.expect("connector never errors");
|
|
|
|
|
|
|
|
// the handshake has timeouts, so it shouldn't hang
|
|
|
|
connector
|
|
|
|
.call(candidate.addr)
|
2021-04-13 00:46:17 -07:00
|
|
|
.map_err(|e| (candidate, e))
|
|
|
|
.map(Into::into)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>> for CrawlerAction {
|
|
|
|
fn from(dial_result: Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>) -> Self {
|
|
|
|
use CrawlerAction::*;
|
|
|
|
match dial_result {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
|
2021-04-13 00:46:17 -07:00
|
|
|
Err((candidate, e)) => {
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
debug!(?candidate.addr, ?e, "failed to connect to candidate");
|
|
|
|
HandshakeFailed {
|
|
|
|
failed_addr: candidate,
|
|
|
|
}
|
|
|
|
}
|
2021-04-13 00:46:17 -07:00
|
|
|
}
|
|
|
|
}
|
Fix a deadlock between the crawler and dialer, and other hangs (#1950)
* Stop ignoring inbound message errors and handshake timeouts
To avoid hangs, Zebra needs to maintain the following invariants in the
handshake and heartbeat code:
- each handshake should run in a separate spawned task
(not yet implemented)
- every message, error, timeout, and shutdown must update the peer address state
- every await that depends on the network must have a timeout
Once the Connection is created, it should handle timeouts.
But we need to handle timeouts during handshake setup.
* Avoid hangs by adding a timeout to the candidate set update
Also increase the fanout from 1 to 2, to increase address diversity.
But only return permanent errors from `CandidateSet::update`, because
the crawler task exits if `update` returns an error.
Also log Peers response errors in the CandidateSet.
* Use the select macro in the crawler to reduce hangs
The `select` function is biased towards its first argument, risking
starvation.
As a side-benefit, this change also makes the code a lot easier to read
and maintain.
* Split CrawlerAction::Demand into separate actions
This refactor makes the code a bit easier to read, at the cost of
sometimes blocking the crawler on `candidates.next()`.
That's ok, because `next` only has a short (< 100 ms) delay. And we're
just about to spawn a separate task for each handshake.
* Spawn a separate task for each handshake
This change avoids deadlocks by letting each handshake make progress
independently.
* Move the dial task into a separate function
This refactor improves readability.
* Fix buggy future::select function usage
And document the correctness of the new code.
2021-04-07 06:25:10 -07:00
|
|
|
}
|