diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index a8df59e5b..fe9d50d71 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -82,10 +82,13 @@ where D: Discover, { discover: D, + /// A preselected index for a ready service. + /// INVARIANT: If `next_idx` is `Some(i)`, `i` must be a valid index for `ready_services`. + /// This means that every change to `ready_services` must invalidate or correct `next_idx`. + next_idx: Option, ready_services: IndexMap, cancel_handles: HashMap>, unready_services: FuturesUnordered>, - next_idx: Option, demand_signal: mpsc::Sender<()>, /// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks /// @@ -117,10 +120,10 @@ where ) -> Self { Self { discover, + next_idx: None, ready_services: IndexMap::new(), cancel_handles: HashMap::new(), unready_services: FuturesUnordered::new(), - next_idx: None, demand_signal, guards: futures::stream::FuturesUnordered::new(), handle_rx, @@ -149,20 +152,20 @@ where } fn remove(&mut self, key: &D::Key) { - // Remove key from either the set of ready services, - // or else from the set of unready services. if let Some((i, _, _)) = self.ready_services.swap_remove_full(key) { // swap_remove perturbs the position of the last element of // ready_services, so we may have invalidated self.next_idx, in // which case we need to fix it. Specifically, swap_remove swaps the // position of the removee and the last element, then drops the // removee from the end, so we compare the active and removed indices: - let len = self.ready_services.len(); + // + // We just removed one element, so this was the index of the last element. + let last_idx = self.ready_services.len(); self.next_idx = match self.next_idx { - None => None, // No active index - Some(j) if j == i => None, // We removed j - Some(j) if j == len => Some(i), // We swapped i and j - Some(j) => Some(j), // We swapped an unrelated service. + None => None, // No active index + Some(j) if j == i => None, // We removed j + Some(j) if j == last_idx => Some(i), // We swapped i and j + Some(j) => Some(j), // We swapped an unrelated service. }; // No Heisenservices: they must be ready or unready. assert!(!self.cancel_handles.contains_key(key)); @@ -240,7 +243,6 @@ where 0 => None, 1 => Some(0), len => { - // XXX avoid relying on rand complexity let (a, b) = { let idxs = rand::seq::index::sample(&mut rand::thread_rng(), len, 2); (idxs.index(0), idxs.index(1)) @@ -299,8 +301,10 @@ where .ready_services .swap_remove_index(index) .expect("found index must be valid"); - tracing::debug!(?hash, ?key, "routing based on inventory"); + // We changed ready_services, so next_idx is invalid + self.next_idx = None; + tracing::debug!(?hash, ?key, "routing based on inventory"); let fut = svc.call(req); self.push_unready(key, svc); fut.map_err(Into::into).boxed() @@ -320,6 +324,7 @@ where // This is not needless: otherwise, we'd hold a &mut reference to self.ready_services, // blocking us from passing &mut self to push_unready. let ready_services = std::mem::take(&mut self.ready_services); + self.next_idx = None; // We changed ready_services, so next_idx is invalid let futs = FuturesUnordered::new(); for (key, mut svc) in ready_services {