2019-10-10 18:15:24 -07:00
|
|
|
use std::{
|
|
|
|
collections::HashMap,
|
2020-02-14 13:38:33 -08:00
|
|
|
convert::TryInto,
|
2019-10-10 18:15:24 -07:00
|
|
|
fmt::Debug,
|
2019-12-13 14:25:14 -08:00
|
|
|
future::Future,
|
2019-10-10 18:15:24 -07:00
|
|
|
marker::PhantomData,
|
|
|
|
pin::Pin,
|
|
|
|
task::{Context, Poll},
|
|
|
|
};
|
|
|
|
|
2019-10-21 15:24:17 -07:00
|
|
|
use futures::{
|
|
|
|
channel::{mpsc, oneshot},
|
2019-12-13 14:25:14 -08:00
|
|
|
prelude::*,
|
2019-10-21 15:24:17 -07:00
|
|
|
stream::FuturesUnordered,
|
|
|
|
};
|
2019-10-10 18:15:24 -07:00
|
|
|
use indexmap::IndexMap;
|
2020-06-09 12:24:28 -07:00
|
|
|
use tokio::sync::oneshot::error::TryRecvError;
|
|
|
|
use tokio::task::JoinHandle;
|
2019-10-10 18:15:24 -07:00
|
|
|
use tower::{
|
|
|
|
discover::{Change, Discover},
|
|
|
|
Service,
|
|
|
|
};
|
|
|
|
use tower_load::Load;
|
|
|
|
|
|
|
|
use crate::{
|
|
|
|
protocol::internal::{Request, Response},
|
|
|
|
BoxedStdError,
|
|
|
|
};
|
|
|
|
|
2019-10-22 12:44:08 -07:00
|
|
|
use super::unready_service::{Error as UnreadyError, UnreadyService};
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
/// A [`tower::Service`] that abstractly represents "the rest of the network".
|
|
|
|
///
|
|
|
|
/// This implementation is adapted from the one in `tower-balance`, and as
|
|
|
|
/// described in that crate's documentation, it
|
|
|
|
///
|
|
|
|
/// > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
|
|
|
/// >
|
|
|
|
/// > As described in the [Finagle Guide][finagle]:
|
|
|
|
/// >
|
|
|
|
/// > > The algorithm randomly picks two services from the set of ready endpoints and
|
|
|
|
/// > > selects the least loaded of the two. By repeatedly using this strategy, we can
|
|
|
|
/// > > expect a manageable upper bound on the maximum load of any server.
|
|
|
|
/// > >
|
|
|
|
/// > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
|
|
|
|
/// > > `n` is the number of servers in the cluster.
|
|
|
|
///
|
|
|
|
/// This should work well for many network requests, but not all of them: some
|
|
|
|
/// requests, e.g., a request for some particular inventory item, can only be
|
|
|
|
/// made to a subset of connected peers, e.g., the ones that have recently
|
|
|
|
/// advertised that inventory hash, and other requests require specialized logic
|
|
|
|
/// (e.g., transaction diffusion).
|
|
|
|
///
|
|
|
|
/// Implementing this specialized routing logic inside the `PeerSet` -- so that
|
|
|
|
/// it continues to abstract away "the rest of the network" into one endpoint --
|
|
|
|
/// is not a problem, as the `PeerSet` can simply maintain more information on
|
|
|
|
/// its peers and route requests appropriately. However, there is a problem with
|
|
|
|
/// maintaining accurate backpressure information, because the `Service` trait
|
|
|
|
/// requires that service readiness is independent of the data in the request.
|
|
|
|
///
|
|
|
|
/// For this reason, in the future, this code will probably be refactored to
|
|
|
|
/// address this backpressure mismatch. One possibility is to refactor the code
|
|
|
|
/// so that one entity holds and maintains the peer set and metadata on the
|
|
|
|
/// peers, and each "backpressure category" of request is assigned to different
|
|
|
|
/// `Service` impls with specialized `poll_ready()` implementations. Another
|
|
|
|
/// less-elegant solution (which might be useful as an intermediate step for the
|
|
|
|
/// inventory case) is to provide a way to borrow a particular backing service,
|
|
|
|
/// say by address.
|
|
|
|
///
|
|
|
|
/// [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
|
|
|
|
/// [p2c]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
|
|
|
|
pub struct PeerSet<D>
|
|
|
|
where
|
|
|
|
D: Discover,
|
|
|
|
{
|
|
|
|
discover: D,
|
|
|
|
ready_services: IndexMap<D::Key, D::Service>,
|
|
|
|
cancel_handles: HashMap<D::Key, oneshot::Sender<()>>,
|
|
|
|
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
|
|
|
|
next_idx: Option<usize>,
|
2019-10-21 15:24:17 -07:00
|
|
|
demand_signal: mpsc::Sender<()>,
|
2020-06-09 12:24:28 -07:00
|
|
|
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
|
|
|
|
///
|
|
|
|
/// The join handles passed into the PeerSet are used populate the `guards` member
|
|
|
|
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>,
|
|
|
|
/// Unordered set of handles to background tasks associated with the `PeerSet`
|
|
|
|
///
|
|
|
|
/// These guards are checked for errors as part of `poll_ready` which lets
|
|
|
|
/// the `PeerSet` propagate errors from background tasks back to the user
|
|
|
|
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxedStdError>>>,
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> PeerSet<D>
|
|
|
|
where
|
|
|
|
D: Discover + Unpin,
|
|
|
|
D::Key: Clone + Debug,
|
|
|
|
D::Service: Service<Request, Response = Response> + Load,
|
|
|
|
D::Error: Into<BoxedStdError>,
|
|
|
|
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
|
|
|
|
<D::Service as Service<Request>>::Future: Send + 'static,
|
|
|
|
<D::Service as Load>::Metric: Debug,
|
|
|
|
{
|
|
|
|
/// Construct a peerset which uses `discover` internally.
|
2020-06-09 12:24:28 -07:00
|
|
|
pub fn new(
|
|
|
|
discover: D,
|
|
|
|
demand_signal: mpsc::Sender<()>,
|
|
|
|
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxedStdError>>>>,
|
|
|
|
) -> Self {
|
2019-10-10 18:15:24 -07:00
|
|
|
Self {
|
|
|
|
discover,
|
|
|
|
ready_services: IndexMap::new(),
|
|
|
|
cancel_handles: HashMap::new(),
|
|
|
|
unready_services: FuturesUnordered::new(),
|
|
|
|
next_idx: None,
|
2019-10-21 15:24:17 -07:00
|
|
|
demand_signal,
|
2020-06-09 12:24:28 -07:00
|
|
|
guards: futures::stream::FuturesUnordered::new(),
|
|
|
|
handle_rx,
|
2019-10-10 18:15:24 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxedStdError>> {
|
|
|
|
use futures::ready;
|
|
|
|
loop {
|
|
|
|
match ready!(Pin::new(&mut self.discover).poll_discover(cx)).map_err(Into::into)? {
|
|
|
|
Change::Remove(key) => {
|
|
|
|
trace!(?key, "got Change::Remove from Discover");
|
|
|
|
self.remove(&key);
|
|
|
|
}
|
|
|
|
Change::Insert(key, svc) => {
|
|
|
|
trace!(?key, "got Change::Insert from Discover");
|
|
|
|
self.remove(&key);
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn remove(&mut self, key: &D::Key) {
|
|
|
|
// Remove key from either the set of ready services,
|
|
|
|
// or else from the set of unready services.
|
|
|
|
if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) {
|
|
|
|
// swap_remove perturbs the position of the last element of
|
|
|
|
// ready_services, so we may have invalidated self.next_idx, in
|
|
|
|
// which case we need to fix it. Specifically, swap_remove swaps the
|
|
|
|
// position of the removee and the last element, then drops the
|
|
|
|
// removee from the end, so we compare the active and removed indices:
|
|
|
|
let len = self.ready_services.len();
|
|
|
|
self.next_idx = match self.next_idx {
|
|
|
|
None => None, // No active index
|
|
|
|
Some(j) if j == i => None, // We removed j
|
|
|
|
Some(j) if j == len => Some(i), // We swapped i and j
|
|
|
|
Some(j) => Some(j), // We swapped an unrelated service.
|
|
|
|
};
|
|
|
|
// No Heisenservices: they must be ready or unready.
|
|
|
|
debug_assert!(!self.cancel_handles.contains_key(key));
|
|
|
|
} else if let Some(handle) = self.cancel_handles.remove(key) {
|
|
|
|
let _ = handle.send(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
|
|
self.cancel_handles.insert(key.clone(), tx);
|
|
|
|
self.unready_services.push(UnreadyService {
|
|
|
|
key: Some(key),
|
|
|
|
service: Some(svc),
|
|
|
|
cancel: rx,
|
|
|
|
_req: PhantomData,
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2020-06-09 12:24:28 -07:00
|
|
|
fn check_for_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxedStdError> {
|
|
|
|
if self.guards.is_empty() {
|
|
|
|
match self.handle_rx.try_recv() {
|
|
|
|
Ok(handles) => {
|
|
|
|
for handle in handles {
|
|
|
|
self.guards.push(handle);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(TryRecvError::Closed) => unreachable!(
|
|
|
|
"try_recv will never be called if the futures have already been received"
|
|
|
|
),
|
|
|
|
Err(TryRecvError::Empty) => return Ok(()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match Pin::new(&mut self.guards).poll_next(cx) {
|
|
|
|
Poll::Pending => {}
|
|
|
|
Poll::Ready(Some(res)) => res??,
|
|
|
|
Poll::Ready(None) => Err("all background tasks have exited")?,
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-10-10 18:15:24 -07:00
|
|
|
fn poll_unready(&mut self, cx: &mut Context<'_>) {
|
|
|
|
loop {
|
|
|
|
match Pin::new(&mut self.unready_services).poll_next(cx) {
|
|
|
|
Poll::Pending | Poll::Ready(None) => return,
|
|
|
|
Poll::Ready(Some(Ok((key, svc)))) => {
|
|
|
|
trace!(?key, "service became ready");
|
|
|
|
let _cancel = self.cancel_handles.remove(&key);
|
|
|
|
debug_assert!(_cancel.is_some(), "missing cancel handle");
|
|
|
|
self.ready_services.insert(key, svc);
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => {
|
fix panic in seed subcommand (#401)
Co-authored-by: Jane Lusby <jane@zfnd.org>
Prior to this change, the seed subcommand would consistently encounter a panic in one of the background tasks, but would continue running after the panic. This is indicative of two bugs.
First, zebrad was not configured to treat panics as non recoverable and instead defaulted to the tokio defaults, which are to catch panics in tasks and return them via the join handle if available, or to print them if the join handle has been discarded. This is likely a poor fit for zebrad as an application, we do not need to maximize uptime or minimize the extent of an outage should one of our tasks / services start encountering panics. Ignoring a panic increases our risk of observing invalid state, causing all sorts of wild and bad bugs. To deal with this we've switched the default panic behavior from `unwind` to `abort`. This makes panics fail immediately and take down the entire application, regardless of where they occur, which is consistent with our treatment of misbehaving connections.
The second bug is the panic itself. This was triggered by a duplicate entry in the initial_peers set. To fix this we've switched the storage for the peers from a `Vec` to a `HashSet`, which has similar properties but guarantees uniqueness of its keys.
2020-05-27 17:40:12 -07:00
|
|
|
trace!(?key, "service was canceled");
|
2019-10-10 18:15:24 -07:00
|
|
|
debug_assert!(!self.cancel_handles.contains_key(&key))
|
|
|
|
}
|
|
|
|
Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => {
|
|
|
|
let error = e.into();
|
|
|
|
debug!(%error, "service failed while unready, dropped");
|
|
|
|
let _cancel = self.cancel_handles.remove(&key);
|
|
|
|
debug_assert!(_cancel.is_some(), "missing cancel handle");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Performs P2C on inner services to select a ready service.
|
|
|
|
fn select_next_ready_index(&mut self) -> Option<usize> {
|
|
|
|
match self.ready_services.len() {
|
|
|
|
0 => None,
|
|
|
|
1 => Some(0),
|
|
|
|
len => {
|
|
|
|
// XXX avoid relying on rand complexity
|
|
|
|
let (a, b) = {
|
|
|
|
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
|
|
|
|
(idxs.index(0), idxs.index(1))
|
|
|
|
};
|
|
|
|
|
|
|
|
let a_load = self.ready_index_load(a);
|
|
|
|
let b_load = self.ready_index_load(b);
|
|
|
|
|
|
|
|
let selected = if a_load <= b_load { a } else { b };
|
|
|
|
|
|
|
|
trace!(a.idx = a, a.load = ?a_load, b.idx = b, b.load = ?b_load, selected, "selected service by p2c");
|
|
|
|
|
|
|
|
Some(selected)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Accesses a ready endpoint by index and returns its current load.
|
|
|
|
fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
|
|
|
|
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
|
|
|
|
svc.load()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<D> Service<Request> for PeerSet<D>
|
|
|
|
where
|
|
|
|
D: Discover + Unpin,
|
2020-02-19 10:22:06 -08:00
|
|
|
D::Key: Clone + Debug + ToString,
|
2019-10-10 18:15:24 -07:00
|
|
|
D::Service: Service<Request, Response = Response> + Load,
|
|
|
|
D::Error: Into<BoxedStdError>,
|
|
|
|
<D::Service as Service<Request>>::Error: Into<BoxedStdError> + 'static,
|
|
|
|
<D::Service as Service<Request>>::Future: Send + 'static,
|
|
|
|
<D::Service as Load>::Metric: Debug,
|
|
|
|
{
|
|
|
|
type Response = Response;
|
|
|
|
type Error = BoxedStdError;
|
2019-10-16 17:06:21 -07:00
|
|
|
type Future =
|
|
|
|
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
2020-06-09 12:24:28 -07:00
|
|
|
self.check_for_background_errors(cx)?;
|
2019-10-10 18:15:24 -07:00
|
|
|
// Process peer discovery updates.
|
|
|
|
let _ = self.poll_discover(cx)?;
|
|
|
|
|
|
|
|
// Poll unready services to drive them to readiness.
|
|
|
|
self.poll_unready(cx);
|
2020-02-19 10:22:06 -08:00
|
|
|
let num_ready = self.ready_services.len();
|
|
|
|
let num_unready = self.unready_services.len();
|
|
|
|
metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap(),);
|
|
|
|
metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap(),);
|
2020-02-14 13:38:33 -08:00
|
|
|
metrics::gauge!(
|
2020-02-19 10:22:06 -08:00
|
|
|
"pool.num_peers",
|
|
|
|
(num_ready + num_unready).try_into().unwrap(),
|
2020-02-14 13:38:33 -08:00
|
|
|
);
|
2019-10-10 18:15:24 -07:00
|
|
|
|
|
|
|
loop {
|
|
|
|
// Re-check that the pre-selected service is ready, in case
|
|
|
|
// something has happened since (e.g., it failed, peer closed
|
|
|
|
// connection, ...)
|
|
|
|
if let Some(index) = self.next_idx {
|
|
|
|
let (key, service) = self
|
|
|
|
.ready_services
|
|
|
|
.get_index_mut(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
trace!(preselected_index = index, ?key);
|
|
|
|
match service.poll_ready(cx) {
|
|
|
|
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
|
|
|
|
Poll::Pending => {
|
|
|
|
trace!("preselected service is no longer ready");
|
|
|
|
let (key, service) = self
|
|
|
|
.ready_services
|
|
|
|
.swap_remove_index(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
self.push_unready(key, service);
|
|
|
|
}
|
|
|
|
Poll::Ready(Err(e)) => {
|
|
|
|
let error = e.into();
|
|
|
|
trace!(%error, "preselected service failed, dropping it");
|
|
|
|
self.ready_services
|
|
|
|
.swap_remove_index(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
trace!("preselected service was not ready, reselecting");
|
|
|
|
self.next_idx = self.select_next_ready_index();
|
|
|
|
|
|
|
|
if self.next_idx.is_none() {
|
2019-10-21 15:24:17 -07:00
|
|
|
trace!("no ready services, sending demand signal");
|
|
|
|
let _ = self.demand_signal.try_send(());
|
2019-10-10 18:15:24 -07:00
|
|
|
return Poll::Pending;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn call(&mut self, req: Request) -> Self::Future {
|
|
|
|
let index = self
|
|
|
|
.next_idx
|
|
|
|
.take()
|
|
|
|
.expect("ready service must have valid preselected index");
|
|
|
|
let (key, mut svc) = self
|
|
|
|
.ready_services
|
|
|
|
.swap_remove_index(index)
|
|
|
|
.expect("preselected index must be valid");
|
|
|
|
|
2020-02-19 10:22:06 -08:00
|
|
|
// XXX add a dimension tagging request metrics by type
|
|
|
|
metrics::counter!(
|
|
|
|
"outbound_requests",
|
|
|
|
1,
|
|
|
|
"key" => key.to_string(),
|
|
|
|
);
|
|
|
|
|
2019-10-10 18:15:24 -07:00
|
|
|
let fut = svc.call(req);
|
|
|
|
self.push_unready(key, svc);
|
|
|
|
|
|
|
|
use futures::future::TryFutureExt;
|
|
|
|
fut.map_err(Into::into).boxed()
|
|
|
|
}
|
|
|
|
}
|