network: tidy peer set implementation

- rename functions more descriptively
- create a common `take_ready_service` function
- organize poll_ functions separately
This commit is contained in:
Henry de Valence 2020-11-23 23:52:40 -08:00 committed by teor
parent f36a4800b2
commit d90e709ce1
1 changed files with 95 additions and 94 deletions

View File

@ -83,9 +83,9 @@ where
{
discover: D,
/// A preselected index for a ready service.
/// INVARIANT: If `next_idx` is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct `next_idx`.
next_idx: Option<usize>,
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index: Option<usize>,
ready_services: IndexMap<D::Key, D::Service>,
cancel_handles: HashMap<D::Key, oneshot::Sender<()>>,
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
@ -120,7 +120,7 @@ where
) -> Self {
Self {
discover,
next_idx: None,
preselected_p2c_index: None,
ready_services: IndexMap::new(),
cancel_handles: HashMap::new(),
unready_services: FuturesUnordered::new(),
@ -131,61 +131,7 @@ where
}
}
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
use futures::ready;
loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.ok_or("discovery stream closed")?
.map_err(Into::into)?
{
Change::Remove(key) => {
trace!(?key, "got Change::Remove from Discover");
self.remove(&key);
}
Change::Insert(key, svc) => {
trace!(?key, "got Change::Insert from Discover");
self.remove(&key);
self.push_unready(key, svc);
}
}
}
}
fn remove(&mut self, key: &D::Key) {
if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) {
// swap_remove perturbs the position of the last element of
// ready_services, so we may have invalidated self.next_idx, in
// which case we need to fix it. Specifically, swap_remove swaps the
// position of the removee and the last element, then drops the
// removee from the end, so we compare the active and removed indices:
//
// We just removed one element, so this was the index of the last element.
let last_idx = self.ready_services.len();
self.next_idx = match self.next_idx {
None => None, // No active index
Some(j) if j == i => None, // We removed j
Some(j) if j == last_idx => Some(i), // We swapped i and j
Some(j) => Some(j), // We swapped an unrelated service.
};
// No Heisenservices: they must be ready or unready.
assert!(!self.cancel_handles.contains_key(key));
} else if let Some(handle) = self.cancel_handles.remove(key) {
let _ = handle.send(());
}
}
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let (tx, rx) = oneshot::channel();
self.cancel_handles.insert(key, tx);
self.unready_services.push(UnreadyService {
key: Some(key),
service: Some(svc),
cancel: rx,
_req: PhantomData,
});
}
fn check_for_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Ok(handles) => {
@ -237,8 +183,71 @@ where
}
}
fn poll_discover(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
use futures::ready;
loop {
match ready!(Pin::new(&mut self.discover).poll_discover(cx))
.ok_or("discovery stream closed")?
.map_err(Into::into)?
{
Change::Remove(key) => {
trace!(?key, "got Change::Remove from Discover");
self.remove(&key);
}
Change::Insert(key, svc) => {
trace!(?key, "got Change::Insert from Discover");
self.remove(&key);
self.push_unready(key, svc);
}
}
}
}
/// Takes a ready service by key, preserving `preselected_p2c_index` if possible.
fn take_ready_service(&mut self, key: &D::Key) -> Option<(D::Key, D::Service)> {
if let Some((i, key, svc)) = self.ready_services.swap_remove_full(key) {
// swap_remove perturbs the position of the last element of
// ready_services, so we may have invalidated self.next_idx, in
// which case we need to fix it. Specifically, swap_remove swaps the
// position of the removee and the last element, then drops the
// removee from the end, so we compare the active and removed indices:
//
// We just removed one element, so this was the index of the last element.
let last_idx = self.ready_services.len();
self.preselected_p2c_index = match self.preselected_p2c_index {
None => None, // No active index
Some(j) if j == i => None, // We removed j
Some(j) if j == last_idx => Some(i), // We swapped i and j
Some(j) => Some(j), // We swapped an unrelated service.
};
// No Heisenservices: they must be ready or unready.
assert!(!self.cancel_handles.contains_key(&key));
Some((key, svc))
} else {
None
}
}
fn remove(&mut self, key: &D::Key) {
if let Some(_) = self.take_ready_service(key) {
} else if let Some(handle) = self.cancel_handles.remove(key) {
let _ = handle.send(());
}
}
fn push_unready(&mut self, key: D::Key, svc: D::Service) {
let (tx, rx) = oneshot::channel();
self.cancel_handles.insert(key, tx);
self.unready_services.push(UnreadyService {
key: Some(key),
service: Some(svc),
cancel: rx,
_req: PhantomData,
});
}
/// Performs P2C on inner services to select a ready service.
fn select_next_ready_index(&mut self) -> Option<usize> {
fn preselect_p2c_index(&mut self) -> Option<usize> {
match self.ready_services.len() {
0 => None,
1 => Some(0),
@ -248,8 +257,8 @@ where
(idxs.index(0), idxs.index(1))
};
let a_load = self.ready_index_load(a);
let b_load = self.ready_index_load(b);
let a_load = self.query_load(a);
let b_load = self.query_load(b);
let selected = if a_load <= b_load { a } else { b };
@ -261,7 +270,7 @@ where
}
/// Accesses a ready endpoint by index and returns its current load.
fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
fn query_load(&self, index: usize) -> <D::Service as Load>::Metric {
let (_, svc) = self.ready_services.get_index(index).expect("invalid index");
svc.load()
}
@ -269,7 +278,7 @@ where
/// Routes a request using P2C load-balancing.
fn route_p2c(&mut self, req: Request) -> <Self as tower::Service<Request>>::Future {
let index = self
.next_idx
.preselected_p2c_index
.take()
.expect("ready service must have valid preselected index");
@ -290,30 +299,21 @@ where
req: Request,
hash: InventoryHash,
) -> <Self as tower::Service<Request>>::Future {
let candidate_index = self
let peer = self
.inventory_registry
.peers(&hash)
.find_map(|addr| self.ready_services.get_index_of(addr));
match candidate_index {
Some(index) => {
let (key, mut svc) = self
.ready_services
.swap_remove_index(index)
.expect("found index must be valid");
// We changed ready_services, so next_idx is invalid
self.next_idx = None;
.find(|&key| self.ready_services.contains_key(key))
.cloned();
match peer.and_then(|key| self.take_ready_service(&key)) {
Some((key, mut svc)) => {
tracing::debug!(?hash, ?key, "routing based on inventory");
let fut = svc.call(req);
self.push_unready(key, svc);
fut.map_err(Into::into).boxed()
}
None => {
tracing::debug!(
?hash,
"could not find ready peer for inventory hash, falling back to p2c"
);
tracing::debug!(?hash, "no ready peer for inventory, falling back to p2c");
self.route_p2c(req)
}
}
@ -324,7 +324,7 @@ where
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
// blocking us from passing &mut self to push_unready.
let ready_services = std::mem::take(&mut self.ready_services);
self.next_idx = None; // We changed ready_services, so next_idx is invalid
self.preselected_p2c_index = None; // All services are now unready.
let futs = FuturesUnordered::new();
for (key, mut svc) in ready_services {
@ -342,6 +342,15 @@ where
}
.boxed()
}
fn update_metrics(&self) {
let num_ready = self.ready_services.len();
let num_unready = self.unready_services.len();
let num_peers = num_ready + num_unready;
metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap());
metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap());
metrics::gauge!("pool.num_peers", num_peers.try_into().unwrap());
}
}
impl<D> Service<Request> for PeerSet<D>
@ -359,27 +368,19 @@ where
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.check_for_background_errors(cx)?;
self.poll_background_errors(cx)?;
// Process peer discovery updates.
let _ = self.poll_discover(cx)?;
self.inventory_registry.poll_inventory(cx)?;
// Poll unready services to drive them to readiness.
self.poll_unready(cx);
let num_ready = self.ready_services.len();
let num_unready = self.unready_services.len();
metrics::gauge!("pool.num_ready", num_ready.try_into().unwrap(),);
metrics::gauge!("pool.num_unready", num_unready.try_into().unwrap(),);
metrics::gauge!(
"pool.num_peers",
(num_ready + num_unready).try_into().unwrap(),
);
self.update_metrics();
loop {
// Re-check that the pre-selected service is ready, in case
// something has happened since (e.g., it failed, peer closed
// connection, ...)
if let Some(index) = self.next_idx {
if let Some(index) = self.preselected_p2c_index {
let (key, service) = self
.ready_services
.get_index_mut(index)
@ -406,9 +407,9 @@ where
}
trace!("preselected service was not ready, reselecting");
self.next_idx = self.select_next_ready_index();
self.preselected_p2c_index = self.preselect_p2c_index();
if self.next_idx.is_none() {
if self.preselected_p2c_index.is_none() {
trace!("no ready services, sending demand signal");
let _ = self.demand_signal.try_send(());
return Poll::Pending;