Security: Stop routing inventory requests by peer address (#3090)
* Rewrite PeerSet comments to split long sentences * Replace peer set integer indexes with address-based indexes Also improve documentation and logging. * Security: Stop using peer addresses to choose inventory routing order * Minor doc and code cleanups * Stop re-using a drained HashSet * Replace used `_cancel` with `cancel` * Reword a comment * Replace cloned with copied
This commit is contained in:
parent
0fafa30e5d
commit
f6abb15778
|
@ -4394,7 +4394,6 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"futures",
|
"futures",
|
||||||
"hex",
|
"hex",
|
||||||
"indexmap",
|
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
"metrics",
|
"metrics",
|
||||||
"pin-project 1.0.7",
|
"pin-project 1.0.7",
|
||||||
|
|
|
@ -13,9 +13,6 @@ byteorder = "1.4"
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
# indexmap has rayon support for parallel iteration,
|
|
||||||
# which we don't use, so disable it to drop the dependencies.
|
|
||||||
indexmap = { version = "1.7", default-features = false }
|
|
||||||
lazy_static = "1.4.0"
|
lazy_static = "1.4.0"
|
||||||
pin-project = "1.0.7"
|
pin-project = "1.0.7"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
|
@ -2,8 +2,8 @@
|
||||||
//!
|
//!
|
||||||
//! # Implementation
|
//! # Implementation
|
||||||
//!
|
//!
|
||||||
//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate, and as
|
//! The [`PeerSet`] implementation is adapted from the one in the [Tower Balance][tower-balance] crate.
|
||||||
//! described in that crate's documentation, it
|
//! As described in that crate's documentation, it:
|
||||||
//!
|
//!
|
||||||
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
|
||||||
//! >
|
//! >
|
||||||
|
@ -16,11 +16,11 @@
|
||||||
//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
|
//! > > The maximum load variance between any two servers is bound by `ln(ln(n))` where
|
||||||
//! > > `n` is the number of servers in the cluster.
|
//! > > `n` is the number of servers in the cluster.
|
||||||
//!
|
//!
|
||||||
//! This should work well for many network requests, but not all of them: some
|
//! The Power of Two Choices should work well for many network requests, but not all of them.
|
||||||
//! requests, e.g., a request for some particular inventory item, can only be
|
//! Some requests should only be made to a subset of connected peers.
|
||||||
//! made to a subset of connected peers, e.g., the ones that have recently
|
//! For example, a request for a particular inventory item
|
||||||
//! advertised that inventory hash, and other requests require specialized logic
|
//! should be made to a peer that has recently advertised that inventory hash.
|
||||||
//! (e.g., transaction diffusion).
|
//! Other requests require broadcasts, such as transaction diffusion.
|
||||||
//!
|
//!
|
||||||
//! Implementing this specialized routing logic inside the `PeerSet` -- so that
|
//! Implementing this specialized routing logic inside the `PeerSet` -- so that
|
||||||
//! it continues to abstract away "the rest of the network" into one endpoint --
|
//! it continues to abstract away "the rest of the network" into one endpoint --
|
||||||
|
@ -43,7 +43,7 @@
|
||||||
//! [tower-balance]: https://crates.io/crates/tower-balance
|
//! [tower-balance]: https://crates.io/crates/tower-balance
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::{HashMap, HashSet},
|
||||||
convert,
|
convert,
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
future::Future,
|
future::Future,
|
||||||
|
@ -61,7 +61,6 @@ use futures::{
|
||||||
prelude::*,
|
prelude::*,
|
||||||
stream::FuturesUnordered,
|
stream::FuturesUnordered,
|
||||||
};
|
};
|
||||||
use indexmap::IndexMap;
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{broadcast, oneshot::error::TryRecvError},
|
sync::{broadcast, oneshot::error::TryRecvError},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
|
@ -121,14 +120,20 @@ where
|
||||||
|
|
||||||
/// Connected peers that are ready to receive requests from Zebra,
|
/// Connected peers that are ready to receive requests from Zebra,
|
||||||
/// or send requests to Zebra.
|
/// or send requests to Zebra.
|
||||||
ready_services: IndexMap<D::Key, D::Service>,
|
ready_services: HashMap<D::Key, D::Service>,
|
||||||
|
|
||||||
/// A preselected index for a ready service.
|
/// A preselected ready service.
|
||||||
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
|
///
|
||||||
/// This means that every change to `ready_services` must invalidate or correct it.
|
/// # Correctness
|
||||||
preselected_p2c_index: Option<usize>,
|
///
|
||||||
|
/// If this is `Some(addr)`, `addr` must be a key for a peer in `ready_services`.
|
||||||
|
/// If that peer is removed from `ready_services`, we must set the preselected peer to `None`.
|
||||||
|
///
|
||||||
|
/// This is handled by [`PeerSet::take_ready_service`] and [`PeerSet::route_all`].
|
||||||
|
preselected_p2c_peer: Option<D::Key>,
|
||||||
|
|
||||||
/// Stores gossiped inventory from connected peers.
|
/// Stores gossiped inventory hashes from connected peers.
|
||||||
|
///
|
||||||
/// Used to route inventory requests to peers that are likely to have it.
|
/// Used to route inventory requests to peers that are likely to have it.
|
||||||
inventory_registry: InventoryRegistry,
|
inventory_registry: InventoryRegistry,
|
||||||
|
|
||||||
|
@ -214,8 +219,8 @@ where
|
||||||
Self {
|
Self {
|
||||||
// Ready peers
|
// Ready peers
|
||||||
discover,
|
discover,
|
||||||
ready_services: IndexMap::new(),
|
ready_services: HashMap::new(),
|
||||||
preselected_p2c_index: None,
|
preselected_p2c_peer: None,
|
||||||
inventory_registry: InventoryRegistry::new(inv_stream),
|
inventory_registry: InventoryRegistry::new(inv_stream),
|
||||||
|
|
||||||
// Unready peers
|
// Unready peers
|
||||||
|
@ -292,9 +297,7 @@ where
|
||||||
"the peer set requires at least one background task"
|
"the peer set requires at least one background task"
|
||||||
);
|
);
|
||||||
|
|
||||||
for handle in handles {
|
self.guards.extend(handles);
|
||||||
self.guards.push(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -320,8 +323,8 @@ where
|
||||||
/// - channels by closing the channel
|
/// - channels by closing the channel
|
||||||
fn shut_down_tasks_and_channels(&mut self) {
|
fn shut_down_tasks_and_channels(&mut self) {
|
||||||
// Drop services and cancel their background tasks.
|
// Drop services and cancel their background tasks.
|
||||||
self.preselected_p2c_index = None;
|
self.preselected_p2c_peer = None;
|
||||||
self.ready_services = IndexMap::new();
|
self.ready_services = HashMap::new();
|
||||||
|
|
||||||
for (_peer_key, handle) in self.cancel_handles.drain() {
|
for (_peer_key, handle) in self.cancel_handles.drain() {
|
||||||
let _ = handle.send(CancelClientWork);
|
let _ = handle.send(CancelClientWork);
|
||||||
|
@ -342,34 +345,51 @@ where
|
||||||
// TODO: implement graceful shutdown for InventoryRegistry (#1678)
|
// TODO: implement graceful shutdown for InventoryRegistry (#1678)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check busy peer services for request completion or errors.
|
||||||
|
///
|
||||||
|
/// Move newly ready services to the ready list, and drop failed services.
|
||||||
fn poll_unready(&mut self, cx: &mut Context<'_>) {
|
fn poll_unready(&mut self, cx: &mut Context<'_>) {
|
||||||
loop {
|
loop {
|
||||||
match Pin::new(&mut self.unready_services).poll_next(cx) {
|
match Pin::new(&mut self.unready_services).poll_next(cx) {
|
||||||
|
// No unready service changes, or empty unready services
|
||||||
Poll::Pending | Poll::Ready(None) => return,
|
Poll::Pending | Poll::Ready(None) => return,
|
||||||
|
|
||||||
|
// Unready -> Ready
|
||||||
Poll::Ready(Some(Ok((key, svc)))) => {
|
Poll::Ready(Some(Ok((key, svc)))) => {
|
||||||
trace!(?key, "service became ready");
|
trace!(?key, "service became ready");
|
||||||
let _cancel = self.cancel_handles.remove(&key);
|
let cancel = self.cancel_handles.remove(&key);
|
||||||
assert!(_cancel.is_some(), "missing cancel handle");
|
assert!(cancel.is_some(), "missing cancel handle");
|
||||||
self.ready_services.insert(key, svc);
|
self.ready_services.insert(key, svc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unready -> Canceled
|
||||||
Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => {
|
Poll::Ready(Some(Err((key, UnreadyError::Canceled)))) => {
|
||||||
trace!(?key, "service was canceled");
|
// A service be canceled because we've connected to the same service twice.
|
||||||
// This debug assert is invalid because we can have a
|
// In that case, there is a cancel handle for the peer address,
|
||||||
// service be canceled due us connecting to the same service
|
// but it belongs to the service for the newer connection.
|
||||||
// twice.
|
trace!(
|
||||||
//
|
?key,
|
||||||
// assert!(!self.cancel_handles.contains_key(&key))
|
duplicate_connection = self.cancel_handles.contains_key(&key),
|
||||||
|
"service was canceled, dropping service"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unready -> Errored
|
||||||
Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => {
|
Poll::Ready(Some(Err((key, UnreadyError::Inner(e))))) => {
|
||||||
let error = e.into();
|
let error = e.into();
|
||||||
debug!(%error, "service failed while unready, dropped");
|
debug!(%error, "service failed while unready, dropping service");
|
||||||
let _cancel = self.cancel_handles.remove(&key);
|
|
||||||
assert!(_cancel.is_some(), "missing cancel handle");
|
let cancel = self.cancel_handles.remove(&key);
|
||||||
|
assert!(cancel.is_some(), "missing cancel handle");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Checks for newly inserted or removed services.
|
||||||
|
///
|
||||||
|
/// Puts inserted services in the unready list.
|
||||||
|
/// Drops removed services, after cancelling any pending requests.
|
||||||
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
|
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
|
||||||
use futures::ready;
|
use futures::ready;
|
||||||
loop {
|
loop {
|
||||||
|
@ -390,38 +410,42 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Takes a ready service by key, preserving `preselected_p2c_index` if possible.
|
/// Takes a ready service by key, invalidating `preselected_p2c_peer` if needed.
|
||||||
fn take_ready_service(&mut self, key: &D::Key) -> Option<(D::Key, D::Service)> {
|
fn take_ready_service(&mut self, key: &D::Key) -> Option<D::Service> {
|
||||||
if let Some((i, key, svc)) = self.ready_services.swap_remove_full(key) {
|
if let Some(svc) = self.ready_services.remove(key) {
|
||||||
// swap_remove perturbs the position of the last element of
|
if Some(*key) == self.preselected_p2c_peer {
|
||||||
// ready_services, so we may have invalidated self.next_idx, in
|
self.preselected_p2c_peer = None;
|
||||||
// which case we need to fix it. Specifically, swap_remove swaps the
|
}
|
||||||
// position of the removee and the last element, then drops the
|
|
||||||
// removee from the end, so we compare the active and removed indices:
|
assert!(
|
||||||
//
|
!self.cancel_handles.contains_key(key),
|
||||||
// We just removed one element, so this was the index of the last element.
|
"cancel handles are only used for unready service work"
|
||||||
let last_idx = self.ready_services.len();
|
);
|
||||||
self.preselected_p2c_index = match self.preselected_p2c_index {
|
|
||||||
None => None, // No active index
|
Some(svc)
|
||||||
Some(j) if j == i => None, // We removed j
|
|
||||||
Some(j) if j == last_idx => Some(i), // We swapped i and j
|
|
||||||
Some(j) => Some(j), // We swapped an unrelated service.
|
|
||||||
};
|
|
||||||
// No Heisenservices: they must be ready or unready.
|
|
||||||
assert!(!self.cancel_handles.contains_key(&key));
|
|
||||||
Some((key, svc))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove the service corresponding to `key` from the peer set.
|
||||||
|
///
|
||||||
|
/// Drops the service, cancelling any pending request or response to that peer.
|
||||||
|
/// If the peer does not exist, does nothing.
|
||||||
fn remove(&mut self, key: &D::Key) {
|
fn remove(&mut self, key: &D::Key) {
|
||||||
if self.take_ready_service(key).is_some() {
|
if let Some(ready_service) = self.take_ready_service(key) {
|
||||||
|
// A ready service has no work to cancel, so just drop it.
|
||||||
|
std::mem::drop(ready_service);
|
||||||
} else if let Some(handle) = self.cancel_handles.remove(key) {
|
} else if let Some(handle) = self.cancel_handles.remove(key) {
|
||||||
|
// Cancel the work, implicitly dropping the cancel handle.
|
||||||
|
// The service future returns a `Canceled` error,
|
||||||
|
// making `poll_unready` drop the service.
|
||||||
let _ = handle.send(CancelClientWork);
|
let _ = handle.send(CancelClientWork);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Adds a busy service to the unready list,
|
||||||
|
/// and adds a cancel handle for the service's current request.
|
||||||
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
|
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
self.cancel_handles.insert(key, tx);
|
self.cancel_handles.insert(key, tx);
|
||||||
|
@ -433,49 +457,83 @@ where
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Performs P2C on inner services to select a ready service.
|
/// Performs P2C on `self.ready_services` to randomly select a less-loaded ready service.
|
||||||
fn preselect_p2c_index(&mut self) -> Option<usize> {
|
fn preselect_p2c_peer(&self) -> Option<D::Key> {
|
||||||
match self.ready_services.len() {
|
self.select_p2c_peer_from_list(self.ready_services.keys().copied().collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Performs P2C on `ready_service_list` to randomly select a less-loaded ready service.
|
||||||
|
fn select_p2c_peer_from_list(&self, ready_service_list: HashSet<D::Key>) -> Option<D::Key> {
|
||||||
|
match ready_service_list.len() {
|
||||||
0 => None,
|
0 => None,
|
||||||
1 => Some(0),
|
1 => Some(
|
||||||
|
ready_service_list
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.expect("just checked there is one service"),
|
||||||
|
),
|
||||||
len => {
|
len => {
|
||||||
|
// If there are only 2 peers, randomise their order.
|
||||||
|
// Otherwise, choose 2 random peers in a random order.
|
||||||
let (a, b) = {
|
let (a, b) = {
|
||||||
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
|
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
|
||||||
(idxs.index(0), idxs.index(1))
|
let a = idxs.index(0);
|
||||||
|
let b = idxs.index(1);
|
||||||
|
|
||||||
|
let a = *ready_service_list
|
||||||
|
.iter()
|
||||||
|
.nth(a)
|
||||||
|
.expect("sample returns valid indexes");
|
||||||
|
let b = *ready_service_list
|
||||||
|
.iter()
|
||||||
|
.nth(b)
|
||||||
|
.expect("sample returns valid indexes");
|
||||||
|
|
||||||
|
(a, b)
|
||||||
};
|
};
|
||||||
|
|
||||||
let a_load = self.query_load(a);
|
let a_load = self.query_load(&a).expect("supplied services are ready");
|
||||||
let b_load = self.query_load(b);
|
let b_load = self.query_load(&b).expect("supplied services are ready");
|
||||||
|
|
||||||
let selected = if a_load <= b_load { a } else { b };
|
let selected = if a_load <= b_load { a } else { b };
|
||||||
|
|
||||||
trace!(a.idx = a, a.load = ?a_load, b.idx = b, b.load = ?b_load, selected, "selected service by p2c");
|
trace!(
|
||||||
|
a.key = ?a,
|
||||||
|
a.load = ?a_load,
|
||||||
|
b.key = ?b,
|
||||||
|
b.load = ?b_load,
|
||||||
|
selected = ?selected,
|
||||||
|
?len,
|
||||||
|
"selected service by p2c"
|
||||||
|
);
|
||||||
|
|
||||||
Some(selected)
|
Some(selected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Accesses a ready endpoint by index and returns its current load.
|
/// Accesses a ready endpoint by `key` and returns its current load.
|
||||||
fn query_load(&self, index: usize) -> <D::Service as Load>::Metric {
|
///
|
||||||
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
|
/// Returns `None` if the service is not in the ready service list.
|
||||||
svc.load()
|
fn query_load(&self, key: &D::Key) -> Option<<D::Service as Load>::Metric> {
|
||||||
|
let svc = self.ready_services.get(key);
|
||||||
|
svc.map(|svc| svc.load())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Routes a request using P2C load-balancing.
|
/// Routes a request using P2C load-balancing.
|
||||||
fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
||||||
let index = self
|
let preselected_key = self
|
||||||
.preselected_p2c_index
|
.preselected_p2c_peer
|
||||||
.take()
|
.expect("ready peer service must have a preselected peer");
|
||||||
.expect("ready service must have valid preselected index");
|
|
||||||
|
|
||||||
let (key, mut svc) = self
|
tracing::trace!(?preselected_key, "routing based on p2c");
|
||||||
.ready_services
|
|
||||||
.swap_remove_index(index)
|
let mut svc = self
|
||||||
.expect("preselected index must be valid");
|
.take_ready_service(&preselected_key)
|
||||||
|
.expect("ready peer set must have preselected a ready peer");
|
||||||
|
|
||||||
let fut = svc.call(req);
|
let fut = svc.call(req);
|
||||||
self.push_unready(key, svc);
|
self.push_unready(preselected_key, svc);
|
||||||
fut.map_err(Into::into).boxed()
|
fut.map_err(Into::into).boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,32 +544,44 @@ where
|
||||||
req: Request,
|
req: Request,
|
||||||
hash: InventoryHash,
|
hash: InventoryHash,
|
||||||
) -> <Self as tower::Service<Request>>::Future {
|
) -> <Self as tower::Service<Request>>::Future {
|
||||||
let peer = self
|
let inventory_peer_list = self
|
||||||
.inventory_registry
|
.inventory_registry
|
||||||
.peers(&hash)
|
.peers(&hash)
|
||||||
.find(|&key| self.ready_services.contains_key(key))
|
.filter(|&key| self.ready_services.contains_key(key))
|
||||||
.cloned();
|
.copied()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// # Security
|
||||||
|
//
|
||||||
|
// Choose a random, less-loaded peer with the inventory.
|
||||||
|
//
|
||||||
|
// If we chose the first peer in HashMap order,
|
||||||
|
// peers would be able to influence our choice by switching addresses.
|
||||||
|
// But we need the choice to be random,
|
||||||
|
// so that a peer can't provide all our inventory responses.
|
||||||
|
let peer = self.select_p2c_peer_from_list(inventory_peer_list);
|
||||||
|
|
||||||
match peer.and_then(|key| self.take_ready_service(&key)) {
|
match peer.and_then(|key| self.take_ready_service(&key)) {
|
||||||
Some((key, mut svc)) => {
|
Some(mut svc) => {
|
||||||
tracing::debug!(?hash, ?key, "routing based on inventory");
|
let peer = peer.expect("just checked peer is Some");
|
||||||
|
tracing::trace!(?hash, ?peer, "routing based on inventory");
|
||||||
let fut = svc.call(req);
|
let fut = svc.call(req);
|
||||||
self.push_unready(key, svc);
|
self.push_unready(peer, svc);
|
||||||
fut.map_err(Into::into).boxed()
|
fut.map_err(Into::into).boxed()
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
tracing::debug!(?hash, "no ready peer for inventory, falling back to p2c");
|
tracing::trace!(?hash, "no ready peer for inventory, falling back to p2c");
|
||||||
self.route_p2c(req)
|
self.route_p2c(req)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Routes a request to all ready peers, ignoring return values.
|
/// Routes a request to all ready peers, ignoring return values.
|
||||||
fn route_all(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
fn route_all(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
|
||||||
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
|
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
|
||||||
// blocking us from passing &mut self to push_unready.
|
// blocking us from passing &mut self to push_unready.
|
||||||
let ready_services = std::mem::take(&mut self.ready_services);
|
let ready_services = std::mem::take(&mut self.ready_services);
|
||||||
self.preselected_p2c_index = None; // All services are now unready.
|
self.preselected_p2c_peer = None; // All services are now unready.
|
||||||
|
|
||||||
let futs = FuturesUnordered::new();
|
let futs = FuturesUnordered::new();
|
||||||
for (key, mut svc) in ready_services {
|
for (key, mut svc) in ready_services {
|
||||||
|
@ -524,12 +594,14 @@ where
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
ok.len = results.iter().filter(|r| r.is_ok()).count(),
|
ok.len = results.iter().filter(|r| r.is_ok()).count(),
|
||||||
err.len = results.iter().filter(|r| r.is_err()).count(),
|
err.len = results.iter().filter(|r| r.is_err()).count(),
|
||||||
|
"sent peer request broadcast"
|
||||||
);
|
);
|
||||||
Ok(Response::Nil)
|
Ok(Response::Nil)
|
||||||
}
|
}
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Logs the peer set size.
|
||||||
fn log_peer_set_size(&mut self) {
|
fn log_peer_set_size(&mut self) {
|
||||||
let ready_services_len = self.ready_services.len();
|
let ready_services_len = self.ready_services.len();
|
||||||
let unready_services_len = self.unready_services.len();
|
let unready_services_len = self.unready_services.len();
|
||||||
|
@ -575,6 +647,11 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Updates the peer set metrics.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// If the peer set size exceeds the connection limit.
|
||||||
fn update_metrics(&self) {
|
fn update_metrics(&self) {
|
||||||
let num_ready = self.ready_services.len();
|
let num_ready = self.ready_services.len();
|
||||||
let num_unready = self.unready_services.len();
|
let num_unready = self.unready_services.len();
|
||||||
|
@ -612,7 +689,8 @@ where
|
||||||
|
|
||||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
self.poll_background_errors(cx)?;
|
self.poll_background_errors(cx)?;
|
||||||
// Process peer discovery updates.
|
|
||||||
|
// Update peer statuses
|
||||||
let _ = self.poll_discover(cx)?;
|
let _ = self.poll_discover(cx)?;
|
||||||
self.inventory_registry.poll_inventory(cx)?;
|
self.inventory_registry.poll_inventory(cx)?;
|
||||||
self.poll_unready(cx);
|
self.poll_unready(cx);
|
||||||
|
@ -624,37 +702,35 @@ where
|
||||||
// Re-check that the pre-selected service is ready, in case
|
// Re-check that the pre-selected service is ready, in case
|
||||||
// something has happened since (e.g., it failed, peer closed
|
// something has happened since (e.g., it failed, peer closed
|
||||||
// connection, ...)
|
// connection, ...)
|
||||||
if let Some(index) = self.preselected_p2c_index {
|
if let Some(key) = self.preselected_p2c_peer {
|
||||||
let (key, service) = self
|
trace!(preselected_key = ?key);
|
||||||
.ready_services
|
let mut service = self
|
||||||
.get_index_mut(index)
|
.take_ready_service(&key)
|
||||||
.expect("preselected index must be valid");
|
.expect("preselected peer must be in the ready list");
|
||||||
trace!(preselected_index = index, ?key);
|
|
||||||
match service.poll_ready(cx) {
|
match service.poll_ready(cx) {
|
||||||
Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
|
Poll::Ready(Ok(())) => {
|
||||||
|
trace!("preselected service is still ready, keeping it selected");
|
||||||
|
self.preselected_p2c_peer = Some(key);
|
||||||
|
self.ready_services.insert(key, service);
|
||||||
|
return Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
Poll::Pending => {
|
Poll::Pending => {
|
||||||
trace!("preselected service is no longer ready");
|
trace!("preselected service is no longer ready, moving to unready list");
|
||||||
let (key, service) = self
|
|
||||||
.ready_services
|
|
||||||
.swap_remove_index(index)
|
|
||||||
.expect("preselected index must be valid");
|
|
||||||
self.push_unready(key, service);
|
self.push_unready(key, service);
|
||||||
}
|
}
|
||||||
Poll::Ready(Err(e)) => {
|
Poll::Ready(Err(e)) => {
|
||||||
let error = e.into();
|
let error = e.into();
|
||||||
trace!(%error, "preselected service failed, dropping it");
|
trace!(%error, "preselected service failed, dropping it");
|
||||||
self.ready_services
|
std::mem::drop(service);
|
||||||
.swap_remove_index(index)
|
|
||||||
.expect("preselected index must be valid");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("preselected service was not ready, reselecting");
|
trace!("preselected service was not ready, preselecting another ready service");
|
||||||
self.preselected_p2c_index = self.preselect_p2c_index();
|
self.preselected_p2c_peer = self.preselect_p2c_peer();
|
||||||
self.update_metrics();
|
self.update_metrics();
|
||||||
|
|
||||||
if self.preselected_p2c_index.is_none() {
|
if self.preselected_p2c_peer.is_none() {
|
||||||
// CORRECTNESS
|
// CORRECTNESS
|
||||||
//
|
//
|
||||||
// If the channel is full, drop the demand signal rather than waiting.
|
// If the channel is full, drop the demand signal rather than waiting.
|
||||||
|
|
Loading…
Reference in New Issue