Limit initial candidate set fanout to the number of initial peers

If there is a small number of initial peers, and they are slow, the
initial candidate set update can appear to hang. To avoid this issue,
limit the initial candidate set fanout to the number of initial peers.

Once the initial peers have sent us more peer addresses, there is no need
to limit the fanouts for future updates.

Reported by Niklas Long of Equilibrium.
This commit is contained in:
teor 2021-05-14 12:15:39 +10:00
parent 679920f6b8
commit 458c26f1e3
2 changed files with 27 additions and 5 deletions

View File

@ -131,7 +131,15 @@ where
} }
} }
/// Update the peer set from the network. /// Update the peer set from the network, using the default fanout limit.
///
/// See `update_initial` for details.
pub async fn update(&mut self) -> Result<(), BoxError> {
self.update_inner(None).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
/// ///
/// - Ask a few live `Responded` peers to send us more peers. /// - Ask a few live `Responded` peers to send us more peers.
/// - Process all completed peer responses, adding new peers in the /// - Process all completed peer responses, adding new peers in the
@ -139,6 +147,9 @@ where
/// ///
/// ## Correctness /// ## Correctness
/// ///
/// Pass the initial peer set size as `fanout_limit` during initialization,
/// so that Zebra does not send duplicate requests to the same peer.
///
/// The crawler exits when update returns an error, so it must only return /// The crawler exits when update returns an error, so it must only return
/// errors on permanent failures. /// errors on permanent failures.
/// ///
@ -148,7 +159,15 @@ where
/// `report_failed` puts peers into the `Failed` state. /// `report_failed` puts peers into the `Failed` state.
/// ///
/// `next` puts peers into the `AttemptPending` state. /// `next` puts peers into the `AttemptPending` state.
pub async fn update(&mut self) -> Result<(), BoxError> { pub async fn update_initial(&mut self, fanout_limit: usize) -> Result<(), BoxError> {
self.update_inner(Some(fanout_limit)).await
}
/// Update the peer set from the network, limiting the fanout to
/// `fanout_limit`.
///
/// See `update_initial` for details.
async fn update_inner(&mut self, fanout_limit: Option<usize>) -> Result<(), BoxError> {
// Opportunistically crawl the network on every update call to ensure // Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we // we're actively fetching peers. Continue independently of whether we
// actually receive any peers, but always ask the network for more. // actually receive any peers, but always ask the network for more.
@ -159,7 +178,7 @@ where
// called while the peer set is already loaded. // called while the peer set is already loaded.
let mut responses = FuturesUnordered::new(); let mut responses = FuturesUnordered::new();
trace!("sending GetPeers requests"); trace!("sending GetPeers requests");
for _ in 0..constants::GET_ADDR_FANOUT { for _ in 0..fanout_limit.unwrap_or(constants::GET_ADDR_FANOUT) {
// CORRECTNESS // CORRECTNESS
// //
// Use a timeout to avoid deadlocks when there are no connected // Use a timeout to avoid deadlocks when there are no connected

View File

@ -137,12 +137,14 @@ where
); );
// 2. Initial peers, specified in the config. // 2. Initial peers, specified in the config.
let (initial_peer_count_tx, initial_peer_count_rx) = tokio::sync::oneshot::channel();
let initial_peers_fut = { let initial_peers_fut = {
let config = config.clone(); let config = config.clone();
let outbound_connector = outbound_connector.clone(); let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone(); let peerset_tx = peerset_tx.clone();
async move { async move {
let initial_peers = config.initial_peers().await; let initial_peers = config.initial_peers().await;
let _ = initial_peer_count_tx.send(initial_peers.len());
// Connect the tx end to the 3 peer sources: // Connect the tx end to the 3 peer sources:
add_initial_peers(initial_peers, outbound_connector, peerset_tx).await add_initial_peers(initial_peers, outbound_connector, peerset_tx).await
} }
@ -157,10 +159,11 @@ where
// We need to await candidates.update() here, because zcashd only sends one // We need to await candidates.update() here, because zcashd only sends one
// `addr` message per connection, and if we only have one initial peer we // `addr` message per connection, and if we only have one initial peer we
// need to ensure that its `addr` message is used by the crawler. // need to ensure that its `addr` message is used by the crawler.
// XXX this should go in CandidateSet::new, but we need init() -> Result<_,_>
info!("Sending initial request for peers"); info!("Sending initial request for peers");
let _ = candidates.update().await; let _ = candidates
.update_initial(initial_peer_count_rx.await.expect("value sent before drop"))
.await;
for _ in 0..config.peerset_initial_target_size { for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(());