2019-11-26 23:04:05 -08:00
|
|
|
//! A peer set whose size is dynamically determined by resource constraints.
|
|
|
|
|
|
|
|
// Portions of this submodule were adapted from tower-balance,
|
|
|
|
// which is (c) 2019 Tower Contributors (MIT licensed).
|
|
|
|
|
|
|
|
use std::{
|
|
|
|
net::SocketAddr,
|
|
|
|
sync::{Arc, Mutex},
|
|
|
|
};
|
|
|
|
|
|
|
|
use futures::{
|
|
|
|
channel::mpsc,
|
2020-08-11 13:07:44 -07:00
|
|
|
future::{self, FutureExt},
|
2019-11-26 23:04:05 -08:00
|
|
|
sink::SinkExt,
|
|
|
|
stream::{FuturesUnordered, StreamExt},
|
|
|
|
};
|
2020-09-01 14:28:54 -07:00
|
|
|
use tokio::{
|
|
|
|
net::{TcpListener, TcpStream},
|
|
|
|
sync::broadcast,
|
|
|
|
};
|
2019-11-26 23:04:05 -08:00
|
|
|
use tower::{
|
2020-09-21 14:00:20 -07:00
|
|
|
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
|
|
|
|
util::BoxService, Service, ServiceExt,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
2021-02-19 14:36:50 -08:00
|
|
|
use tracing::Span;
|
|
|
|
use tracing_futures::Instrument;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
use crate::{
|
2020-09-18 11:20:55 -07:00
|
|
|
constants, peer, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config,
|
2020-08-06 11:29:00 -07:00
|
|
|
Request, Response,
|
2019-11-26 23:04:05 -08:00
|
|
|
};
|
|
|
|
|
2020-08-15 15:45:37 -07:00
|
|
|
use zebra_chain::parameters::Network;
|
2020-07-27 21:59:32 -07:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
use super::CandidateSet;
|
|
|
|
use super::PeerSet;
|
|
|
|
|
2020-09-18 11:20:55 -07:00
|
|
|
type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-09-18 12:37:01 -07:00
|
|
|
/// Initialize a peer set.
|
|
|
|
///
|
|
|
|
/// The peer set abstracts away peer management to provide a
|
|
|
|
/// [`tower::Service`] representing "the network" that load-balances requests
|
|
|
|
/// over available peers. The peer set automatically crawls the network to
|
|
|
|
/// find more peer addresses and opportunistically connects to new peers.
|
|
|
|
///
|
|
|
|
/// Each peer connection's message handling is isolated from other
|
|
|
|
/// connections, unlike in `zcashd`. The peer connection first attempts to
|
|
|
|
/// interpret inbound messages as part of a response to a previously-issued
|
|
|
|
/// request. Otherwise, inbound messages are interpreted as requests and sent
|
|
|
|
/// to the supplied `inbound_service`.
|
|
|
|
///
|
|
|
|
/// Wrapping the `inbound_service` in [`tower::load_shed`] middleware will
|
|
|
|
/// cause the peer set to shrink when the inbound service is unable to keep up
|
|
|
|
/// with the volume of inbound requests.
|
|
|
|
///
|
|
|
|
/// In addition to returning a service for outbound requests, this method
|
|
|
|
/// returns a shared [`AddressBook`] updated with last-seen timestamps for
|
|
|
|
/// connected peers.
|
2019-11-26 23:04:05 -08:00
|
|
|
pub async fn init<S>(
|
|
|
|
config: Config,
|
|
|
|
inbound_service: S,
|
|
|
|
) -> (
|
2020-09-18 11:20:55 -07:00
|
|
|
Buffer<BoxService<Request, Response, BoxError>, Request>,
|
2019-11-26 23:04:05 -08:00
|
|
|
Arc<Mutex<AddressBook>>,
|
|
|
|
)
|
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
|
|
|
let (address_book, timestamp_collector) = TimestampCollector::spawn();
|
2020-09-01 14:28:54 -07:00
|
|
|
let (inv_sender, inv_receiver) = broadcast::channel(100);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Construct services that handle inbound handshakes and perform outbound
|
|
|
|
// handshakes. These use the same handshake service internally to detect
|
|
|
|
// self-connection attempts. Both are decorated with a tower TimeoutLayer to
|
|
|
|
// enforce timeouts as specified in the Config.
|
|
|
|
let (listener, connector) = {
|
|
|
|
use tower::timeout::TimeoutLayer;
|
2020-08-06 11:29:00 -07:00
|
|
|
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
|
2020-08-31 21:32:35 -07:00
|
|
|
use crate::protocol::external::types::PeerServices;
|
|
|
|
let hs = peer::Handshake::builder()
|
|
|
|
.with_config(config.clone())
|
|
|
|
.with_inbound_service(inbound_service)
|
2020-09-01 14:28:54 -07:00
|
|
|
.with_inventory_collector(inv_sender)
|
2020-08-31 21:32:35 -07:00
|
|
|
.with_timestamp_collector(timestamp_collector)
|
|
|
|
.with_advertised_services(PeerServices::NODE_NETWORK)
|
|
|
|
.with_user_agent(crate::constants::USER_AGENT.to_string())
|
2020-09-01 10:55:55 -07:00
|
|
|
.want_transactions(true)
|
2020-08-31 21:32:35 -07:00
|
|
|
.finish()
|
|
|
|
.expect("configured all required parameters");
|
2019-11-26 23:04:05 -08:00
|
|
|
(
|
|
|
|
hs_timeout.layer(hs.clone()),
|
2019-11-27 11:43:59 -08:00
|
|
|
hs_timeout.layer(peer::Connector::new(hs)),
|
2019-11-26 23:04:05 -08:00
|
|
|
)
|
|
|
|
};
|
|
|
|
|
|
|
|
// Create an mpsc channel for peer changes, with a generous buffer.
|
|
|
|
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
|
|
|
|
// Create an mpsc channel for peerset demand signaling.
|
2020-02-09 20:34:53 -08:00
|
|
|
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100);
|
2020-06-09 12:24:28 -07:00
|
|
|
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
// Connect the rx end to a PeerSet, wrapping new peers in load instruments.
|
2020-06-09 12:24:28 -07:00
|
|
|
let peer_set = PeerSet::new(
|
|
|
|
PeakEwmaDiscover::new(
|
2020-09-21 14:00:20 -07:00
|
|
|
// Discover interprets an error as stream termination,
|
|
|
|
// so discard any errored connections...
|
|
|
|
peerset_rx.filter(|result| future::ready(result.is_ok())),
|
2020-08-06 11:29:00 -07:00
|
|
|
constants::EWMA_DEFAULT_RTT,
|
|
|
|
constants::EWMA_DECAY_TIME,
|
2020-09-21 14:00:20 -07:00
|
|
|
tower::load::CompleteOnResponse::default(),
|
2019-11-26 23:04:05 -08:00
|
|
|
),
|
2020-06-09 12:24:28 -07:00
|
|
|
demand_tx.clone(),
|
|
|
|
handle_rx,
|
2020-09-01 14:28:54 -07:00
|
|
|
inv_receiver,
|
2019-11-26 23:04:05 -08:00
|
|
|
);
|
2020-08-11 13:07:44 -07:00
|
|
|
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-01-29 04:36:33 -08:00
|
|
|
// 1. Incoming peer connections, via a listener.
|
2020-07-27 21:59:32 -07:00
|
|
|
|
|
|
|
// Warn if we're configured using the wrong network port.
|
|
|
|
// TODO: use the right port if the port is unspecified
|
|
|
|
// split the address and port configs?
|
|
|
|
let (wrong_net, wrong_net_port) = match config.network {
|
2020-08-15 15:45:37 -07:00
|
|
|
Network::Mainnet => (Network::Testnet, 18233),
|
|
|
|
Network::Testnet => (Network::Mainnet, 8233),
|
2020-07-27 21:59:32 -07:00
|
|
|
};
|
|
|
|
if config.listen_addr.port() == wrong_net_port {
|
|
|
|
warn!(
|
|
|
|
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
|
|
|
|
config.listen_addr.port(),
|
|
|
|
config.network,
|
|
|
|
wrong_net
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let listen_guard = tokio::spawn(
|
|
|
|
listen(config.listen_addr, listener, peerset_tx.clone()).instrument(Span::current()),
|
|
|
|
);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-02-02 18:20:26 -08:00
|
|
|
// 2. Initial peers, specified in the config.
|
2021-01-29 04:36:33 -08:00
|
|
|
let initial_peers_fut = {
|
2021-02-02 18:20:26 -08:00
|
|
|
let config = config.clone();
|
2021-01-29 04:36:33 -08:00
|
|
|
let connector = connector.clone();
|
2021-02-02 18:20:26 -08:00
|
|
|
let peerset_tx = peerset_tx.clone();
|
|
|
|
async move {
|
|
|
|
let initial_peers = config.initial_peers().await;
|
|
|
|
// Connect the tx end to the 3 peer sources:
|
|
|
|
add_initial_peers(initial_peers, connector, peerset_tx).await
|
|
|
|
}
|
|
|
|
.boxed()
|
2021-01-29 04:36:33 -08:00
|
|
|
};
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
|
2021-01-29 04:36:33 -08:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
// 3. Outgoing peers we connect to in response to load.
|
|
|
|
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
|
|
|
|
|
2020-07-05 03:58:50 -07:00
|
|
|
// We need to await candidates.update() here, because zcashd only sends one
|
2019-11-26 23:04:05 -08:00
|
|
|
// `addr` message per connection, and if we only have one initial peer we
|
|
|
|
// need to ensure that its `addr` message is used by the crawler.
|
|
|
|
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>
|
|
|
|
|
|
|
|
info!("Sending initial request for peers");
|
2020-06-22 19:19:27 -07:00
|
|
|
let _ = candidates.update().await;
|
2020-02-09 20:34:53 -08:00
|
|
|
|
|
|
|
for _ in 0..config.peerset_initial_target_size {
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
|
2021-02-19 14:36:50 -08:00
|
|
|
let crawl_guard = tokio::spawn(
|
|
|
|
crawl_and_dial(
|
2021-03-09 17:36:05 -08:00
|
|
|
config.crawl_new_peer_interval,
|
2021-02-19 14:36:50 -08:00
|
|
|
demand_tx,
|
|
|
|
demand_rx,
|
|
|
|
candidates,
|
|
|
|
connector,
|
|
|
|
peerset_tx,
|
|
|
|
)
|
|
|
|
.instrument(Span::current()),
|
|
|
|
);
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
handle_tx
|
|
|
|
.send(vec![add_guard, listen_guard, crawl_guard])
|
|
|
|
.unwrap();
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
(peer_set, address_book)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Use the provided `handshaker` to connect to `initial_peers`, then send
|
|
|
|
/// the results over `tx`.
|
|
|
|
#[instrument(skip(initial_peers, connector, tx))]
|
|
|
|
async fn add_initial_peers<S>(
|
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
|
|
|
initial_peers: std::collections::HashSet<SocketAddr>,
|
2019-11-26 23:04:05 -08:00
|
|
|
connector: S,
|
|
|
|
mut tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2020-06-09 12:24:28 -07:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2021-02-16 17:01:36 -08:00
|
|
|
info!(?initial_peers, "connecting to initial peer set");
|
2021-02-14 17:43:49 -08:00
|
|
|
// ## Correctness:
|
|
|
|
//
|
|
|
|
// Each `CallAll` can hold one `Buffer` or `Batch` reservation for
|
|
|
|
// an indefinite period. We can use `CallAllUnordered` without filling
|
|
|
|
// the underlying `Inbound` buffer, because we immediately drive this
|
|
|
|
// single `CallAll` to completion, and handshakes have a short timeout.
|
2021-02-14 17:31:59 -08:00
|
|
|
use tower::util::CallAllUnordered;
|
|
|
|
let addr_stream = futures::stream::iter(initial_peers.into_iter());
|
|
|
|
let mut handshakes = CallAllUnordered::new(connector, addr_stream);
|
2020-06-09 12:24:28 -07:00
|
|
|
|
2019-11-26 23:04:05 -08:00
|
|
|
while let Some(handshake_result) = handshakes.next().await {
|
2021-02-16 17:01:36 -08:00
|
|
|
// this is verbose, but it's better than just hanging with no output
|
|
|
|
if let Err(ref e) = handshake_result {
|
|
|
|
info!(?e, "an initial peer connection failed");
|
|
|
|
}
|
2020-06-09 12:24:28 -07:00
|
|
|
tx.send(handshake_result).await?;
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
2020-06-09 12:24:28 -07:00
|
|
|
|
|
|
|
Ok(())
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Bind to `addr`, listen for peers using `handshaker`, then send the
|
|
|
|
/// results over `tx`.
|
|
|
|
#[instrument(skip(tx, handshaker))]
|
|
|
|
async fn listen<S>(
|
|
|
|
addr: SocketAddr,
|
|
|
|
mut handshaker: S,
|
|
|
|
tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2019-11-26 23:04:05 -08:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<(TcpStream, SocketAddr), Response = peer::Client, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2021-02-18 07:15:09 -08:00
|
|
|
info!("Trying to open Zcash protocol endpoint at {}...", addr);
|
2021-01-29 04:36:33 -08:00
|
|
|
let listener_result = TcpListener::bind(addr).await;
|
|
|
|
|
|
|
|
let listener = match listener_result {
|
|
|
|
Ok(l) => l,
|
|
|
|
Err(e) => panic!(
|
|
|
|
"Opening Zcash network protocol listener {:?} failed: {:?}. \
|
|
|
|
Hint: Check if another zebrad or zcashd process is running. \
|
|
|
|
Try changing the network listen_addr in the Zebra config.",
|
|
|
|
addr, e,
|
|
|
|
),
|
|
|
|
};
|
|
|
|
|
2020-08-10 12:47:26 -07:00
|
|
|
let local_addr = listener.local_addr()?;
|
2020-08-12 14:22:54 -07:00
|
|
|
info!("Opened Zcash protocol endpoint at {}", local_addr);
|
2019-11-26 23:04:05 -08:00
|
|
|
loop {
|
|
|
|
if let Ok((tcp_stream, addr)) = listener.accept().await {
|
|
|
|
debug!(?addr, "got incoming connection");
|
2020-05-26 18:00:58 -07:00
|
|
|
handshaker.ready_and().await?;
|
2019-11-26 23:04:05 -08:00
|
|
|
// Construct a handshake future but do not drive it yet....
|
|
|
|
let handshake = handshaker.call((tcp_stream, addr));
|
|
|
|
// ... instead, spawn a new task to handle this connection
|
|
|
|
let mut tx2 = tx.clone();
|
|
|
|
tokio::spawn(async move {
|
|
|
|
if let Ok(client) = handshake.await {
|
|
|
|
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Given a channel that signals a need for new peers, try to connect to a peer
|
2019-11-27 11:27:17 -08:00
|
|
|
/// and send the resulting `peer::Client` through a channel.
|
2020-02-18 11:32:25 -08:00
|
|
|
#[instrument(skip(
|
2021-03-09 17:36:05 -08:00
|
|
|
crawl_new_peer_interval,
|
2020-02-18 11:32:25 -08:00
|
|
|
demand_tx,
|
|
|
|
demand_rx,
|
|
|
|
candidates,
|
|
|
|
connector,
|
|
|
|
success_tx
|
|
|
|
))]
|
2019-11-26 23:04:05 -08:00
|
|
|
async fn crawl_and_dial<C, S>(
|
2021-03-09 17:36:05 -08:00
|
|
|
crawl_new_peer_interval: std::time::Duration,
|
2020-02-09 20:34:53 -08:00
|
|
|
mut demand_tx: mpsc::Sender<()>,
|
|
|
|
mut demand_rx: mpsc::Receiver<()>,
|
2019-11-26 23:04:05 -08:00
|
|
|
mut candidates: CandidateSet<S>,
|
|
|
|
mut connector: C,
|
|
|
|
mut success_tx: mpsc::Sender<PeerChange>,
|
2020-09-18 11:20:55 -07:00
|
|
|
) -> Result<(), BoxError>
|
2019-11-26 23:04:05 -08:00
|
|
|
where
|
2020-09-18 11:20:55 -07:00
|
|
|
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
|
2019-11-26 23:04:05 -08:00
|
|
|
C::Future: Send + 'static,
|
2020-09-18 11:20:55 -07:00
|
|
|
S: Service<Request, Response = Response, Error = BoxError>,
|
2019-11-26 23:04:05 -08:00
|
|
|
S::Future: Send + 'static,
|
|
|
|
{
|
2020-02-18 11:32:25 -08:00
|
|
|
use futures::{
|
|
|
|
future::{
|
|
|
|
select,
|
|
|
|
Either::{Left, Right},
|
|
|
|
},
|
|
|
|
TryFutureExt,
|
|
|
|
};
|
2019-11-26 23:04:05 -08:00
|
|
|
|
|
|
|
let mut handshakes = FuturesUnordered::new();
|
2020-02-09 20:34:53 -08:00
|
|
|
// <FuturesUnordered as Stream> returns None when empty.
|
|
|
|
// Keeping an unresolved future in the pool means the stream
|
|
|
|
// never terminates.
|
|
|
|
handshakes.push(future::pending().boxed());
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2021-03-09 17:36:05 -08:00
|
|
|
let mut crawl_timer = tokio::time::interval(crawl_new_peer_interval);
|
2019-11-26 23:04:05 -08:00
|
|
|
|
2020-02-09 20:34:53 -08:00
|
|
|
loop {
|
2021-01-11 18:28:56 -08:00
|
|
|
metrics::gauge!(
|
|
|
|
"crawler.in_flight_handshakes",
|
|
|
|
handshakes
|
|
|
|
.len()
|
|
|
|
.checked_sub(1)
|
|
|
|
.expect("the pool always contains an unresolved future") as f64
|
|
|
|
);
|
2020-02-09 20:34:53 -08:00
|
|
|
// This is a little awkward because there's no select3.
|
|
|
|
match select(
|
|
|
|
select(demand_rx.next(), crawl_timer.next()),
|
|
|
|
handshakes.next(),
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
|
|
|
Left((Left((Some(_demand), _)), _)) => {
|
|
|
|
if handshakes.len() > 50 {
|
|
|
|
// This is set to trace level because when the peerset is
|
|
|
|
// congested it can generate a lot of demand signal very rapidly.
|
|
|
|
trace!("too many in-flight handshakes, dropping demand signal");
|
|
|
|
continue;
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
2021-03-09 17:36:05 -08:00
|
|
|
if let Some(candidate) = candidates.next().await {
|
2020-02-09 20:34:53 -08:00
|
|
|
debug!(?candidate.addr, "attempting outbound connection in response to demand");
|
2020-05-26 18:00:58 -07:00
|
|
|
connector.ready_and().await?;
|
2020-02-09 20:34:53 -08:00
|
|
|
handshakes.push(
|
|
|
|
connector
|
|
|
|
.call(candidate.addr)
|
2020-12-01 14:25:02 -08:00
|
|
|
.map_err(move |e| {
|
|
|
|
debug!(?candidate.addr, ?e, "failed to connect to candidate");
|
|
|
|
candidate
|
|
|
|
})
|
2020-02-09 20:34:53 -08:00
|
|
|
.boxed(),
|
|
|
|
);
|
|
|
|
} else {
|
2020-02-14 22:34:59 -08:00
|
|
|
debug!("demand for peers but no available candidates");
|
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
2020-02-09 20:34:53 -08:00
|
|
|
// did a drill sergeant write this? no there's just no Either3
|
|
|
|
Left((Right((Some(_timer), _)), _)) => {
|
|
|
|
debug!("crawling for more peers");
|
|
|
|
candidates.update().await?;
|
|
|
|
// Try to connect to a new peer.
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
|
|
|
Right((Some(Ok(change)), _)) => {
|
|
|
|
if let Change::Insert(ref addr, _) = change {
|
|
|
|
debug!(candidate.addr = ?addr, "successfully dialed new peer");
|
2020-12-13 17:00:39 -08:00
|
|
|
} else {
|
|
|
|
unreachable!("unexpected handshake result: all changes should be Insert");
|
2020-02-09 20:34:53 -08:00
|
|
|
}
|
|
|
|
success_tx.send(Ok(change)).await?;
|
|
|
|
}
|
|
|
|
Right((Some(Err(candidate)), _)) => {
|
2020-12-01 14:25:02 -08:00
|
|
|
debug!(?candidate.addr, "marking candidate as failed");
|
2020-02-09 20:34:53 -08:00
|
|
|
candidates.report_failed(candidate);
|
|
|
|
// The demand signal that was taken out of the queue
|
|
|
|
// to attempt to connect to the failed candidate never
|
|
|
|
// turned into a connection, so add it back:
|
|
|
|
let _ = demand_tx.try_send(());
|
|
|
|
}
|
2020-12-13 17:00:39 -08:00
|
|
|
// We don't expect to see these patterns during normal operation
|
|
|
|
Left((Left((None, _)), _)) => {
|
|
|
|
unreachable!("demand_rx never fails, because we hold demand_tx");
|
|
|
|
}
|
|
|
|
Left((Right((None, _)), _)) => {
|
|
|
|
unreachable!("crawl_timer never terminates");
|
|
|
|
}
|
|
|
|
Right((None, _)) => {
|
|
|
|
unreachable!(
|
|
|
|
"handshakes never terminates, because it contains a future that never resolves"
|
|
|
|
);
|
|
|
|
}
|
2019-11-26 23:04:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|