network: fix invariant violation in peer set
Closes #1183. The peer set maintains a preselected ready service that it can use to perform power-of-two-choices (p2c) routing of requests. Ready services are stored by key (socket address) in an `IndexMap`, and the preselected service is represented by an `Option<usize>` indexing that map. This means that whenever the set of ready services changes (e.g., a service is removed from the peer set, or a service is taken to be used to process a request), the preselected index is invalidated. The original P2C-only implementation maintained this invariant but did not document it. The change to inventory-based routing introduced a bug by failing to maintain this invariant and appropriately invalidate the preselected index. However, this was only noticeable approximately 1/N of the time on the next request after an inventory-directed request, so the bug occurred infrequently. Luckily, the use of `.expect` caused the bug to be an immediate panic, making it possible to identify by inspecting all uses of the ready service map.
This commit is contained in:
parent
f1155297a3
commit
f36a4800b2
|
@ -82,10 +82,13 @@ where
|
|||
D: Discover<Key = SocketAddr>,
|
||||
{
|
||||
discover: D,
|
||||
/// A preselected index for a ready service.
|
||||
/// INVARIANT: If `next_idx` is `Some(i)`, `i` must be a valid index for `ready_services`.
|
||||
/// This means that every change to `ready_services` must invalidate or correct `next_idx`.
|
||||
next_idx: Option<usize>,
|
||||
ready_services: IndexMap<D::Key, D::Service>,
|
||||
cancel_handles: HashMap<D::Key, oneshot::Sender<()>>,
|
||||
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,
|
||||
next_idx: Option<usize>,
|
||||
demand_signal: mpsc::Sender<()>,
|
||||
/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
|
||||
///
|
||||
|
@ -117,10 +120,10 @@ where
|
|||
) -> Self {
|
||||
Self {
|
||||
discover,
|
||||
next_idx: None,
|
||||
ready_services: IndexMap::new(),
|
||||
cancel_handles: HashMap::new(),
|
||||
unready_services: FuturesUnordered::new(),
|
||||
next_idx: None,
|
||||
demand_signal,
|
||||
guards: futures::stream::FuturesUnordered::new(),
|
||||
handle_rx,
|
||||
|
@ -149,20 +152,20 @@ where
|
|||
}
|
||||
|
||||
fn remove(&mut self, key: &D::Key) {
|
||||
// Remove key from either the set of ready services,
|
||||
// or else from the set of unready services.
|
||||
if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) {
|
||||
// swap_remove perturbs the position of the last element of
|
||||
// ready_services, so we may have invalidated self.next_idx, in
|
||||
// which case we need to fix it. Specifically, swap_remove swaps the
|
||||
// position of the removee and the last element, then drops the
|
||||
// removee from the end, so we compare the active and removed indices:
|
||||
let len = self.ready_services.len();
|
||||
//
|
||||
// We just removed one element, so this was the index of the last element.
|
||||
let last_idx = self.ready_services.len();
|
||||
self.next_idx = match self.next_idx {
|
||||
None => None, // No active index
|
||||
Some(j) if j == i => None, // We removed j
|
||||
Some(j) if j == len => Some(i), // We swapped i and j
|
||||
Some(j) => Some(j), // We swapped an unrelated service.
|
||||
None => None, // No active index
|
||||
Some(j) if j == i => None, // We removed j
|
||||
Some(j) if j == last_idx => Some(i), // We swapped i and j
|
||||
Some(j) => Some(j), // We swapped an unrelated service.
|
||||
};
|
||||
// No Heisenservices: they must be ready or unready.
|
||||
assert!(!self.cancel_handles.contains_key(key));
|
||||
|
@ -240,7 +243,6 @@ where
|
|||
0 => None,
|
||||
1 => Some(0),
|
||||
len => {
|
||||
// XXX avoid relying on rand complexity
|
||||
let (a, b) = {
|
||||
let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2);
|
||||
(idxs.index(0), idxs.index(1))
|
||||
|
@ -299,8 +301,10 @@ where
|
|||
.ready_services
|
||||
.swap_remove_index(index)
|
||||
.expect("found index must be valid");
|
||||
tracing::debug!(?hash, ?key, "routing based on inventory");
|
||||
// We changed ready_services, so next_idx is invalid
|
||||
self.next_idx = None;
|
||||
|
||||
tracing::debug!(?hash, ?key, "routing based on inventory");
|
||||
let fut = svc.call(req);
|
||||
self.push_unready(key, svc);
|
||||
fut.map_err(Into::into).boxed()
|
||||
|
@ -320,6 +324,7 @@ where
|
|||
// This is not needless: otherwise, we'd hold a &mut reference to self.ready_services,
|
||||
// blocking us from passing &mut self to push_unready.
|
||||
let ready_services = std::mem::take(&mut self.ready_services);
|
||||
self.next_idx = None; // We changed ready_services, so next_idx is invalid
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, mut svc) in ready_services {
|
||||
|
|
Loading…
Reference in New Issue