diff --git a/zebra-network/src/peer_set.rs b/zebra-network/src/peer_set.rs index 130c429e4..52dbf59ed 100644 --- a/zebra-network/src/peer_set.rs +++ b/zebra-network/src/peer_set.rs @@ -118,15 +118,9 @@ where ); // 3. Outgoing peers we connect to in response to load. + let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone()); tokio::spawn( - crawl_and_dial( - demand_rx, - peer_set.clone(), - address_book.clone(), - peer_connector, - peerset_tx, - ) - .map(|result| { + crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| { if let Err(e) = result { error!(%e); } @@ -198,76 +192,15 @@ where /// Given a channel that signals a need for new peers, try to connect to a peer /// and send the resulting `PeerClient` through a channel. /// -/// ```ascii,no_run -/// ┌─────────────────┐ -/// │ PeerSet │ -/// │GetPeers Requests│ -/// └─────────────────┘ -/// │ -/// │ -/// │ -/// │ -/// ▼ -/// ┌─────────────┐ filter by Λ filter by -/// │ PeerSet │!contains_addr╱ ╲ !contains_addr -/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐ -/// │ └─────────────┘ ╲ ╱ │ -/// │ │ V │ -/// │ │disconnected_peers │ │ -/// │ ▼ │ │ -/// │ Λ filter by │ │ -/// │ ╱ ╲ !contains_addr │ │ -/// │ ▕ ▏◀───────────────────┼──────────────────────┤ -/// │ ╲ ╱ │ │ -/// │ V │ │ -/// │ │ │ │ -/// │┌────────┼──────────────────────┼──────────────────────┼────────┐ -/// ││ ▼ ▼ │ │ -/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ -/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │ -/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐ -/// ││ │ AddressBook │ │ AddressBook │ │ │ ││ -/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ -/// ││ │ │ │ ││ -/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││ -/// ││ │ │ │ ││ -/// ││ ├──────────────────────┴──────────────────────┘ ││ -/// ││ │ disjoint candidate sets ││ -/// │└────────┼──────────────────────────────────────────────────────┘│ -/// │ ▼ │ -/// │ Λ │ -/// │ ╱ ╲ filter by │ -/// └──────▶▕ ▏!is_potentially_connected │ -/// ╲ ╱ │ -/// V │ -/// │ │ -/// │ │ -/// ▼ │ -/// Λ │ -/// ╱ ╲ │ -/// ▕ ▏─────────────────────────────────────────────────────┘ -/// ╲ ╱ connection failed, update last_seen to now() -/// V -/// │ -/// │ -/// ▼ -/// ┌────────────┐ -/// │ send │ -/// │ PeerClient │ -/// │to Discover │ -/// └────────────┘ -/// ``` #[instrument(skip( demand_signal, - peer_set_service, - peer_set_address_book, + candidates, peer_connector, success_tx ))] async fn crawl_and_dial( mut demand_signal: mpsc::Receiver<()>, - peer_set_service: S, - peer_set_address_book: Arc>, + mut candidates: CandidateSet, mut peer_connector: C, mut success_tx: mpsc::Sender, ) -> Result<(), BoxedStdError> @@ -277,18 +210,6 @@ where S: Service, S::Future: Send + 'static, { - use tracing::Level; - let mut candidates = CandidateSet { - disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")), - gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")), - failed: AddressBook::new(span!(Level::TRACE, "failed peers")), - peer_set: peer_set_address_book.clone(), - peer_service: peer_set_service, - }; - - info!("Sending initial request for peers"); - let _ = candidates.update().await; - // XXX instead of just responding to demand, we could respond to demand *or* // to a interval timer (to continuously grow the peer set). while let Some(()) = demand_signal.next().await { @@ -301,16 +222,6 @@ where None => continue, }; - // Check that we have not connected to the candidate since it was - // pulled into the candidate set. - if peer_set_address_book - .lock() - .unwrap() - .is_potentially_connected(&addr) - { - continue; - }; - if let Ok(stream) = TcpStream::connect(addr).await { peer_connector.ready().await?; if let Ok(client) = peer_connector.call((stream, addr)).await { diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index f38ff7c6d..981b24a39 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -3,9 +3,77 @@ use std::sync::{Arc, Mutex}; use chrono::{TimeZone, Utc}; use futures::stream::{FuturesUnordered, Stream, StreamExt}; use tower::{Service, ServiceExt}; +use tracing::Level; use crate::{types::MetaAddr, AddressBook, BoxedStdError, Request, Response}; +/// The `CandidateSet` maintains a pool of candidate peers. +/// +/// It divides the set of all possible candidate peers into three disjoint subsets: +/// +/// 1. Disconnected peers, which we previously connected to but are not currently connected to; +/// 2. Gossiped peers, which we learned about from other peers but have never connected to; +/// 3. Failed peers, to whom we attempted to connect but were unable to. +/// +/// ```ascii,no_run +/// ┌─────────────────┐ +/// │ PeerSet │ +/// │GetPeers Requests│ +/// └─────────────────┘ +/// │ +/// │ +/// │ +/// │ +/// ▼ +/// ┌─────────────┐ filter by Λ filter by +/// │ PeerSet │!contains_addr╱ ╲ !contains_addr +/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐ +/// │ └─────────────┘ ╲ ╱ │ +/// │ │ V │ +/// │ │disconnected_peers │ │ +/// │ ▼ │ │ +/// │ Λ filter by │ │ +/// │ ╱ ╲ !contains_addr │ │ +/// │ ▕ ▏◀───────────────────┼──────────────────────┤ +/// │ ╲ ╱ │ │ +/// │ V │ │ +/// │ │ │ │ +/// │┌────────┼──────────────────────┼──────────────────────┼────────┐ +/// ││ ▼ ▼ │ │ +/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │ +/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐ +/// ││ │ AddressBook │ │ AddressBook │ │ │ ││ +/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││ +/// ││ │ │ │ ││ +/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││ +/// ││ │ │ │ ││ +/// ││ ├──────────────────────┴──────────────────────┘ ││ +/// ││ │ disjoint candidate sets ││ +/// │└────────┼──────────────────────────────────────────────────────┘│ +/// │ ▼ │ +/// │ Λ │ +/// │ ╱ ╲ filter by │ +/// └──────▶▕ ▏!is_potentially_connected │ +/// ╲ ╱ │ +/// V │ +/// │ │ +/// │ │ +/// ▼ │ +/// Λ │ +/// ╱ ╲ │ +/// ▕ ▏─────────────────────────────────────────────────────┘ +/// ╲ ╱ connection failed, update last_seen to now() +/// V +/// │ +/// │ +/// ▼ +/// ┌────────────┐ +/// │ send │ +/// │ PeerClient │ +/// │to Discover │ +/// └────────────┘ +/// ``` pub(super) struct CandidateSet { pub(super) disconnected: AddressBook, pub(super) gossiped: AddressBook, @@ -19,6 +87,16 @@ where S: Service, S::Future: Send + 'static, { + pub fn new(peer_set: Arc>, peer_service: S) -> CandidateSet { + CandidateSet { + disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")), + gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")), + failed: AddressBook::new(span!(Level::TRACE, "failed peers")), + peer_set, + peer_service, + } + } + pub async fn update(&mut self) -> Result<(), BoxedStdError> { // Opportunistically crawl the network on every update call to ensure // we're actively fetching peers. Continue independently of whether we @@ -41,8 +119,8 @@ where let peer_set = &self.peer_set; let new_addrs = addrs .into_iter() - .filter(|meta| failed.contains_addr(&meta.addr)) - .filter(|meta| peer_set.lock().unwrap().contains_addr(&meta.addr)); + .filter(|meta| !failed.contains_addr(&meta.addr)) + .filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr)); self.gossiped.extend(new_addrs); trace!( addr_len, @@ -69,10 +147,12 @@ where } pub fn next(&mut self) -> Option { + let guard = self.peer_set.lock().unwrap(); self.disconnected .drain_oldest() .chain(self.gossiped.drain_newest()) .chain(self.failed.drain_oldest()) + .filter(|meta| !guard.is_potentially_connected(&meta.addr)) .next() }