Improve documentation and types in the PeerSet (#2925)

* Replace some unit tuples with named unit structs

This helps distinguish generic channels and make them type-safe.

Also tidy imports and documentation in `peer_set::set`.

* Link to the tower balance crate from docs

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
This commit is contained in:
teor 2021-10-22 11:26:04 +10:00 committed by GitHub
parent ad5f5ff24a
commit 424edfa4d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 63 deletions

View File

@ -27,7 +27,7 @@ use crate::{
constants, constants,
meta_addr::MetaAddr, meta_addr::MetaAddr,
peer::{self, HandshakeRequest, OutboundConnectorRequest}, peer::{self, HandshakeRequest, OutboundConnectorRequest},
peer_set::{ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet}, peer_set::{set::MorePeers, ActiveConnectionCounter, CandidateSet, ConnectionTracker, PeerSet},
timestamp_collector::TimestampCollector, timestamp_collector::TimestampCollector,
AddressBook, BoxError, Config, Request, Response, AddressBook, BoxError, Config, Request, Response,
}; };
@ -109,7 +109,9 @@ where
// Create an mpsc channel for peer changes, with a generous buffer. // Create an mpsc channel for peer changes, with a generous buffer.
let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100); let (peerset_tx, peerset_rx) = mpsc::channel::<PeerChange>(100);
// Create an mpsc channel for peerset demand signaling. // Create an mpsc channel for peerset demand signaling.
let (mut demand_tx, demand_rx) = mpsc::channel::<()>(100); let (mut demand_tx, demand_rx) = mpsc::channel::<MorePeers>(100);
// Create a oneshot to send background task JoinHandles to the peer set
let (handle_tx, handle_rx) = tokio::sync::oneshot::channel(); let (handle_tx, handle_rx) = tokio::sync::oneshot::channel();
// Connect the rx end to a PeerSet, wrapping new peers in load instruments. // Connect the rx end to a PeerSet, wrapping new peers in load instruments.
@ -167,8 +169,9 @@ where
); );
let _ = candidates.update_initial(active_initial_peer_count).await; let _ = candidates.update_initial(active_initial_peer_count).await;
// TODO: reduce demand by `active_outbound_connections.update_count()` (#2902)
for _ in 0..config.peerset_initial_target_size { for _ in 0..config.peerset_initial_target_size {
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(MorePeers);
} }
let crawl_guard = tokio::spawn( let crawl_guard = tokio::spawn(
@ -469,8 +472,8 @@ enum CrawlerAction {
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))] #[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx,))]
async fn crawl_and_dial<C, S>( async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration, crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>, mut demand_tx: mpsc::Sender<MorePeers>,
mut demand_rx: mpsc::Receiver<()>, mut demand_rx: mpsc::Receiver<MorePeers>,
mut candidates: CandidateSet<S>, mut candidates: CandidateSet<S>,
outbound_connector: C, outbound_connector: C,
mut peerset_tx: mpsc::Sender<PeerChange>, mut peerset_tx: mpsc::Sender<PeerChange>,
@ -579,7 +582,7 @@ where
// spawn independent tasks to avoid deadlocks // spawn independent tasks to avoid deadlocks
candidates.update().await?; candidates.update().await?;
// Try to connect to a new peer. // Try to connect to a new peer.
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(MorePeers);
} }
TimerCrawl { tick } => { TimerCrawl { tick } => {
debug!( debug!(
@ -589,7 +592,7 @@ where
// TODO: spawn independent tasks to avoid deadlocks // TODO: spawn independent tasks to avoid deadlocks
candidates.update().await?; candidates.update().await?;
// Try to connect to a new peer. // Try to connect to a new peer.
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(MorePeers);
} }
HandshakeConnected { peer_set_change } => { HandshakeConnected { peer_set_change } => {
if let Change::Insert(ref addr, _) = peer_set_change { if let Change::Insert(ref addr, _) = peer_set_change {
@ -609,7 +612,7 @@ where
// The demand signal that was taken out of the queue // The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never // to attempt to connect to the failed candidate never
// turned into a connection, so add it back: // turned into a connection, so add it back:
let _ = demand_tx.try_send(()); let _ = demand_tx.try_send(MorePeers);
} }
} }
} }

View File

@ -1,9 +1,53 @@
use std::net::SocketAddr; //! Abstractions that represent "the rest of the network".
//!
//! # Implementation
//!
//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate, and as
//! described in that crate's documentation, it
//!
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
//! >
//! > As described in the [Finagle Guide][finagle]:
//! >
//! > > The algorithm randomly picks two services from the set of ready endpoints and
//! > > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > > expect a manageable upper bound on the maximum load of any server.
//! > >
//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > > `n` is the number of servers in the cluster.
//!
//! This should work well for many network requests, but not all of them: some
//! requests, e.g., a request for some particular inventory item, can only be
//! made to a subset of connected peers, e.g., the ones that have recently
//! advertised that inventory hash, and other requests require specialized logic
//! (e.g., transaction diffusion).
//!
//! Implementing this specialized routing logic inside the `PeerSet` -- so that
//! it continues to abstract away "the rest of the network" into one endpoint --
//! is not a problem, as the `PeerSet` can simply maintain more information on
//! its peers and route requests appropriately. However, there is a problem with
//! maintaining accurate backpressure information, because the `Service` trait
//! requires that service readiness is independent of the data in the request.
//!
//! For this reason, in the future, this code will probably be refactored to
//! address this backpressure mismatch. One possibility is to refactor the code
//! so that one entity holds and maintains the peer set and metadata on the
//! peers, and each "backpressure category" of request is assigned to different
//! `Service` impls with specialized `poll_ready()` implementations. Another
//! less-elegant solution (which might be useful as an intermediate step for the
//! inventory case) is to provide a way to borrow a particular backing service,
//! say by address.
//!
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
//! [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [tower-balance]: https://crates.io/crates/tower-balance
use std::{ use std::{
collections::HashMap, collections::HashMap,
fmt::Debug, fmt::Debug,
future::Future, future::Future,
marker::PhantomData, marker::PhantomData,
net::SocketAddr,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
@ -17,8 +61,10 @@ use futures::{
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use indexmap::IndexMap; use indexmap::IndexMap;
use tokio::sync::{broadcast, oneshot::error::TryRecvError}; use tokio::{
use tokio::task::JoinHandle; sync::{broadcast, oneshot::error::TryRecvError},
task::JoinHandle,
};
use tower::{ use tower::{
discover::{Change, Discover}, discover::{Change, Discover},
load::Load, load::Load,
@ -26,6 +72,10 @@ use tower::{
}; };
use crate::{ use crate::{
peer_set::{
unready_service::{Error as UnreadyError, UnreadyService},
InventoryRegistry,
},
protocol::{ protocol::{
external::InventoryHash, external::InventoryHash,
internal::{Request, Response}, internal::{Request, Response},
@ -33,10 +83,17 @@ use crate::{
AddressBook, BoxError, AddressBook, BoxError,
}; };
use super::{ /// A signal sent by the [`PeerSet`] when it has no ready peers, and gets a request from Zebra.
unready_service::{Error as UnreadyError, UnreadyService}, ///
InventoryRegistry, /// In response to this signal, the crawler tries to open more peer connections.
}; #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct MorePeers;
/// A signal sent by the [`PeerSet`] to cancel a [`Client`]'s current request or response.
///
/// When it receives this signal, the [`Client`] stops processing and exits.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub struct CancelClientWork;
/// A [`tower::Service`] that abstractly represents "the rest of the network". /// A [`tower::Service`] that abstractly represents "the rest of the network".
/// ///
@ -48,47 +105,6 @@ use super::{
/// connections have an ephemeral local or proxy port.) /// connections have an ephemeral local or proxy port.)
/// ///
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state. /// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
///
/// # Implementation
///
/// This implementation is adapted from the one in `tower-balance`, and as
/// described in that crate's documentation, it
///
/// > Distributes requests across inner services using the [Power of Two Choices][p2c].
/// >
/// > As described in the [Finagle Guide][finagle]:
/// >
/// > > The algorithm randomly picks two services from the set of ready endpoints and
/// > > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > > expect a manageable upper bound on the maximum load of any server.
/// > >
/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
/// > > `n` is the number of servers in the cluster.
///
/// This should work well for many network requests, but not all of them: some
/// requests, e.g., a request for some particular inventory item, can only be
/// made to a subset of connected peers, e.g., the ones that have recently
/// advertised that inventory hash, and other requests require specialized logic
/// (e.g., transaction diffusion).
///
/// Implementing this specialized routing logic inside the `PeerSet` -- so that
/// it continues to abstract away "the rest of the network" into one endpoint --
/// is not a problem, as the `PeerSet` can simply maintain more information on
/// its peers and route requests appropriately. However, there is a problem with
/// maintaining accurate backpressure information, because the `Service` trait
/// requires that service readiness is independent of the data in the request.
///
/// For this reason, in the future, this code will probably be refactored to
/// address this backpressure mismatch. One possibility is to refactor the code
/// so that one entity holds and maintains the peer set and metadata on the
/// peers, and each "backpressure category" of request is assigned to different
/// `Service` impls with specialized `poll_ready()` implementations. Another
/// less-elegant solution (which might be useful as an intermediate step for the
/// inventory case) is to provide a way to borrow a particular backing service,
/// say by address.
///
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
pub struct PeerSet<D> pub struct PeerSet<D>
where where
D: Discover<Key = SocketAddr>, D: Discover<Key = SocketAddr>,
@ -99,9 +115,9 @@ where
/// This means that every change to `ready_services` must invalidate or correct it. /// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index: Option<usize>, preselected_p2c_index: Option<usize>,
ready_services: IndexMap<D::Key, D::Service>, ready_services: IndexMap<D::Key, D::Service>,
cancel_handles: HashMap<D::Key, oneshot::Sender<()>>, cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>, unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
demand_signal: mpsc::Sender<()>, demand_signal: mpsc::Sender<MorePeers>,
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
/// ///
/// The join handles passed into the PeerSet are used populate the `guards` member /// The join handles passed into the PeerSet are used populate the `guards` member
@ -132,7 +148,7 @@ where
/// Construct a peerset which uses `discover` internally. /// Construct a peerset which uses `discover` internally.
pub fn new( pub fn new(
discover: D, discover: D,
demand_signal: mpsc::Sender<()>, demand_signal: mpsc::Sender<MorePeers>,
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>, handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,
inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>, inv_stream: broadcast::Receiver<(InventoryHash, SocketAddr)>,
address_book: Arc<std::sync::Mutex<AddressBook>>, address_book: Arc<std::sync::Mutex<AddressBook>>,
@ -252,7 +268,7 @@ where
fn remove(&mut self, key: &D::Key) { fn remove(&mut self, key: &D::Key) {
if self.take_ready_service(key).is_some() { if self.take_ready_service(key).is_some() {
} else if let Some(handle) = self.cancel_handles.remove(key) { } else if let Some(handle) = self.cancel_handles.remove(key) {
let _ = handle.send(()); let _ = handle.send(CancelClientWork);
} }
} }
@ -484,7 +500,7 @@ where
// If we waited here, the crawler could deadlock sending a request to // If we waited here, the crawler could deadlock sending a request to
// fetch more peers, because it also empties the channel. // fetch more peers, because it also empties the channel.
trace!("no ready services, sending demand signal"); trace!("no ready services, sending demand signal");
let _ = self.demand_signal.try_send(()); let _ = self.demand_signal.try_send(MorePeers);
// CORRECTNESS // CORRECTNESS
// //

View File

@ -10,6 +10,8 @@ use std::{
use futures::{channel::oneshot, ready}; use futures::{channel::oneshot, ready};
use tower::Service; use tower::Service;
use crate::peer_set::set::CancelClientWork;
/// A Future that becomes satisfied when an `S`-typed service is ready. /// A Future that becomes satisfied when an `S`-typed service is ready.
/// ///
/// May fail due to cancelation, i.e. if the service is removed from discovery. /// May fail due to cancelation, i.e. if the service is removed from discovery.
@ -18,7 +20,7 @@ use tower::Service;
pub(super) struct UnreadyService<K, S, Req> { pub(super) struct UnreadyService<K, S, Req> {
pub(super) key: Option<K>, pub(super) key: Option<K>,
#[pin] #[pin]
pub(super) cancel: oneshot::Receiver<()>, pub(super) cancel: oneshot::Receiver<CancelClientWork>,
pub(super) service: Option<S>, pub(super) service: Option<S>,
pub(super) _req: PhantomData<Req>, pub(super) _req: PhantomData<Req>,
@ -35,7 +37,7 @@ impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project(); let this = self.project();
if let Poll::Ready(Ok(())) = this.cancel.poll(cx) { if let Poll::Ready(Ok(CancelClientWork)) = this.cancel.poll(cx) {
let key = this.key.take().expect("polled after ready"); let key = this.key.take().expect("polled after ready");
return Poll::Ready(Err((key, Error::Canceled))); return Poll::Ready(Err((key, Error::Canceled)));
} }