Move the CandidateSet to its own file.

Co-authored-by: Deirdre Connolly <deirdre@zfnd.org>
This commit is contained in:
Henry de Valence 2019-10-21 21:25:49 -07:00
parent 2f3292759f
commit e1a35490af
2 changed files with 86 additions and 95 deletions

View File

@ -118,15 +118,9 @@ where
);
// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
tokio::spawn(
crawl_and_dial(
demand_rx,
peer_set.clone(),
address_book.clone(),
peer_connector,
peerset_tx,
)
.map(|result| {
crawl_and_dial(demand_rx, candidates, peer_connector, peerset_tx).map(|result| {
if let Err(e) = result {
error!(%e);
}
@ -198,76 +192,15 @@ where
/// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `PeerClient` through a channel.
///
/// ```ascii,no_run
/// ┌─────────────────┐
/// │ PeerSet │
/// │GetPeers Requests│
/// └─────────────────┘
/// │
/// │
/// │
/// │
/// ▼
/// ┌─────────────┐ filter by Λ filter by
/// │ PeerSet │!contains_addr ╲ !contains_addr
/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐
/// │ └─────────────┘ ╲
/// │ │ V │
/// │ │disconnected_peers │ │
/// │ ▼ │ │
/// │ Λ filter by │ │
/// │ ╲ !contains_addr │ │
/// │ ▕ ▏◀───────────────────┼──────────────────────┤
/// │ ╲ │ │
/// │ V │ │
/// │ │ │ │
/// │┌────────┼──────────────────────┼──────────────────────┼────────┐
/// ││ ▼ ▼ │ │
/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │
/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐
/// ││ │ AddressBook │ │ AddressBook │ │ │ ││
/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││
/// ││ │ │ │ ││
/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││
/// ││ │ │ │ ││
/// ││ ├──────────────────────┴──────────────────────┘ ││
/// ││ │ disjoint candidate sets ││
/// │└────────┼──────────────────────────────────────────────────────┘│
/// │ ▼ │
/// │ Λ │
/// │ ╲ filter by │
/// └──────▶▕ ▏!is_potentially_connected │
/// ╲
/// V │
/// │ │
/// │ │
/// ▼ │
/// Λ │
/// ╲ │
/// ▕ ▏─────────────────────────────────────────────────────┘
/// ╲ connection failed, update last_seen to now()
/// V
/// │
/// │
/// ▼
/// ┌────────────┐
/// │ send │
/// │ PeerClient │
/// │to Discover │
/// └────────────┘
/// ```
#[instrument(skip(
demand_signal,
peer_set_service,
peer_set_address_book,
candidates,
peer_connector,
success_tx
))]
async fn crawl_and_dial<C, S>(
mut demand_signal: mpsc::Receiver<()>,
peer_set_service: S,
peer_set_address_book: Arc<Mutex<AddressBook>>,
mut candidates: CandidateSet<S>,
mut peer_connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxedStdError>
@ -277,18 +210,6 @@ where
S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Future: Send + 'static,
{
use tracing::Level;
let mut candidates = CandidateSet {
disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")),
gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")),
failed: AddressBook::new(span!(Level::TRACE, "failed peers")),
peer_set: peer_set_address_book.clone(),
peer_service: peer_set_service,
};
info!("Sending initial request for peers");
let _ = candidates.update().await;
// XXX instead of just responding to demand, we could respond to demand *or*
// to a interval timer (to continuously grow the peer set).
while let Some(()) = demand_signal.next().await {
@ -301,16 +222,6 @@ where
None => continue,
};
// Check that we have not connected to the candidate since it was
// pulled into the candidate set.
if peer_set_address_book
.lock()
.unwrap()
.is_potentially_connected(&addr)
{
continue;
};
if let Ok(stream) = TcpStream::connect(addr).await {
peer_connector.ready().await?;
if let Ok(client) = peer_connector.call((stream, addr)).await {

View File

@ -3,9 +3,77 @@ use std::sync::{Arc, Mutex};
use chrono::{TimeZone, Utc};
use futures::stream::{FuturesUnordered, Stream, StreamExt};
use tower::{Service, ServiceExt};
use tracing::Level;
use crate::{types::MetaAddr, AddressBook, BoxedStdError, Request, Response};
/// The `CandidateSet` maintains a pool of candidate peers.
///
/// It divides the set of all possible candidate peers into three disjoint subsets:
///
/// 1. Disconnected peers, which we previously connected to but are not currently connected to;
/// 2. Gossiped peers, which we learned about from other peers but have never connected to;
/// 3. Failed peers, to whom we attempted to connect but were unable to.
///
/// ```ascii,no_run
/// ┌─────────────────┐
/// │ PeerSet │
/// │GetPeers Requests│
/// └─────────────────┘
/// │
/// │
/// │
/// │
/// ▼
/// ┌─────────────┐ filter by Λ filter by
/// │ PeerSet │!contains_addr ╲ !contains_addr
/// ┌──│ AddressBook │────────────▶▕ ▏◀───────────────────┐
/// │ └─────────────┘ ╲
/// │ │ V │
/// │ │disconnected_peers │ │
/// │ ▼ │ │
/// │ Λ filter by │ │
/// │ ╲ !contains_addr │ │
/// │ ▕ ▏◀───────────────────┼──────────────────────┤
/// │ ╲ │ │
/// │ V │ │
/// │ │ │ │
/// │┌────────┼──────────────────────┼──────────────────────┼────────┐
/// ││ ▼ ▼ │ │
/// ││ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
/// ││ │Disconnected │ │ Gossiped │ │Failed Peers │ │
/// ││ │ Peers │ │ Peers │ │ AddressBook │◀┼┐
/// ││ │ AddressBook │ │ AddressBook │ │ │ ││
/// ││ └─────────────┘ └─────────────┘ └─────────────┘ ││
/// ││ │ │ │ ││
/// ││ #1 drain_oldest #2 drain_newest #3 drain_oldest ││
/// ││ │ │ │ ││
/// ││ ├──────────────────────┴──────────────────────┘ ││
/// ││ │ disjoint candidate sets ││
/// │└────────┼──────────────────────────────────────────────────────┘│
/// │ ▼ │
/// │ Λ │
/// │ ╲ filter by │
/// └──────▶▕ ▏!is_potentially_connected │
/// ╲
/// V │
/// │ │
/// │ │
/// ▼ │
/// Λ │
/// ╲ │
/// ▕ ▏─────────────────────────────────────────────────────┘
/// ╲ connection failed, update last_seen to now()
/// V
/// │
/// │
/// ▼
/// ┌────────────┐
/// │ send │
/// │ PeerClient │
/// │to Discover │
/// └────────────┘
/// ```
pub(super) struct CandidateSet<S> {
pub(super) disconnected: AddressBook,
pub(super) gossiped: AddressBook,
@ -19,6 +87,16 @@ where
S: Service<Request, Response = Response, Error = BoxedStdError>,
S::Future: Send + 'static,
{
pub fn new(peer_set: Arc<Mutex<AddressBook>>, peer_service: S) -> CandidateSet<S> {
CandidateSet {
disconnected: AddressBook::new(span!(Level::TRACE, "disconnected peers")),
gossiped: AddressBook::new(span!(Level::TRACE, "gossiped peers")),
failed: AddressBook::new(span!(Level::TRACE, "failed peers")),
peer_set,
peer_service,
}
}
pub async fn update(&mut self) -> Result<(), BoxedStdError> {
// Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we
@ -41,8 +119,8 @@ where
let peer_set = &self.peer_set;
let new_addrs = addrs
.into_iter()
.filter(|meta| failed.contains_addr(&meta.addr))
.filter(|meta| peer_set.lock().unwrap().contains_addr(&meta.addr));
.filter(|meta| !failed.contains_addr(&meta.addr))
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr));
self.gossiped.extend(new_addrs);
trace!(
addr_len,
@ -69,10 +147,12 @@ where
}
pub fn next(&mut self) -> Option<MetaAddr> {
let guard = self.peer_set.lock().unwrap();
self.disconnected
.drain_oldest()
.chain(self.gossiped.drain_newest())
.chain(self.failed.drain_oldest())
.filter(|meta| !guard.is_potentially_connected(&meta.addr))
.next()
}