From e403aeaf0566841ddcca1f9ec6b83c21929bc793 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 15 Feb 2021 00:52:52 +0000 Subject: [PATCH] adds an inverted index to leader schedule (#15249) next_leader_slot is doing a linear search for slots in which a pubkey is the leader: https://github.com/solana-labs/solana/blob/e59a24d9f/ledger/src/leader_schedule_cache.rs#L123-L157 This can be done more efficiently by adding an inverted index to leader schedule. --- ledger/src/leader_schedule.rs | 78 ++++++++++++++++++++++++-- ledger/src/leader_schedule_cache.rs | 85 +++++++++++++---------------- 2 files changed, 111 insertions(+), 52 deletions(-) diff --git a/ledger/src/leader_schedule.rs b/ledger/src/leader_schedule.rs index e2ca2662bf..d2cc19e256 100644 --- a/ledger/src/leader_schedule.rs +++ b/ledger/src/leader_schedule.rs @@ -1,6 +1,9 @@ +use itertools::Itertools; use rand::distributions::{Distribution, WeightedIndex}; use rand_chacha::{rand_core::SeedableRng, ChaChaRng}; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; +use std::convert::identity; use std::ops::Index; use std::sync::Arc; @@ -15,6 +18,8 @@ pub struct FixedSchedule { #[derive(Debug, Default, PartialEq)] pub struct LeaderSchedule { slot_leaders: Vec, + // Inverted index from pubkeys to indices where they are the leader. + index: HashMap>>, } impl LeaderSchedule { @@ -34,11 +39,22 @@ impl LeaderSchedule { } }) .collect(); - Self { slot_leaders } + Self::new_from_schedule(slot_leaders) } pub fn new_from_schedule(slot_leaders: Vec) -> Self { - Self { slot_leaders } + let index = slot_leaders + .iter() + .enumerate() + .map(|(i, pk)| (*pk, i)) + .into_group_map() + .into_iter() + .map(|(k, v)| (k, Arc::new(v))) + .collect(); + Self { + slot_leaders, + index, + } } pub fn get_slot_leaders(&self) -> &[Pubkey] { @@ -48,6 +64,33 @@ impl LeaderSchedule { pub fn num_slots(&self) -> usize { self.slot_leaders.len() } + + /// 'offset' is an index into the leader schedule. The function returns an + /// iterator of indices i >= offset where the given pubkey is the leader. + pub(crate) fn get_indices( + &self, + pubkey: &Pubkey, + offset: usize, // Starting index. + ) -> impl Iterator { + let index = self.index.get(pubkey).cloned().unwrap_or_default(); + let num_slots = self.slot_leaders.len(); + let size = index.len(); + #[allow(clippy::reversed_empty_ranges)] + let range = if index.is_empty() { + 1..=0 // Intentionally empty range of type RangeInclusive. + } else { + let offset = index + .binary_search(&(offset % num_slots)) + .unwrap_or_else(identity) + + offset / num_slots * size; + offset..=usize::MAX + }; + // The modular arithmetic here and above replicate Index implementation + // for LeaderSchedule, where the schedule keeps repeating endlessly. + // The '%' returns where in a cycle we are and the '/' returns how many + // times the schedule is repeated. + range.map(move |k| index[k % size] + k / size * num_slots) + } } impl Index for LeaderSchedule { @@ -61,14 +104,14 @@ impl Index for LeaderSchedule { #[cfg(test)] mod tests { use super::*; + use rand::Rng; + use std::iter::repeat_with; #[test] fn test_leader_schedule_index() { let pubkey0 = solana_sdk::pubkey::new_rand(); let pubkey1 = solana_sdk::pubkey::new_rand(); - let leader_schedule = LeaderSchedule { - slot_leaders: vec![pubkey0, pubkey1], - }; + let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]); assert_eq!(leader_schedule[0], pubkey0); assert_eq!(leader_schedule[1], pubkey1); assert_eq!(leader_schedule[2], pubkey0); @@ -157,4 +200,29 @@ mod tests { assert_eq!(leaders1, leaders1_expected); assert_eq!(leaders2, leaders2_expected); } + + #[test] + fn test_get_indices() { + const NUM_SLOTS: usize = 97; + let mut rng = rand::thread_rng(); + let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(4).collect(); + let schedule: Vec<_> = repeat_with(|| pubkeys[rng.gen_range(0, 3)]) + .take(19) + .collect(); + let schedule = LeaderSchedule::new_from_schedule(schedule); + let leaders = (0..NUM_SLOTS) + .map(|i| (schedule[i as u64], i)) + .into_group_map(); + for pubkey in &pubkeys { + let index = leaders.get(pubkey).cloned().unwrap_or_default(); + for offset in 0..NUM_SLOTS { + let schedule: Vec<_> = schedule + .get_indices(pubkey, offset) + .take_while(|s| *s < NUM_SLOTS) + .collect(); + let index: Vec<_> = index.iter().copied().skip_while(|s| *s < offset).collect(); + assert_eq!(schedule, index); + } + } + } } diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 084989eac5..5752750c29 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -3,6 +3,7 @@ use crate::{ leader_schedule::{FixedSchedule, LeaderSchedule}, leader_schedule_utils, }; +use itertools::Itertools; use log::*; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -99,18 +100,17 @@ impl LeaderScheduleCache { } } - /// Return the (next slot, last slot) after the given current_slot that the given node will be leader + /// Returns the (next slot, last slot) consecutive range of slots after + /// the given current_slot that the given node will be leader. pub fn next_leader_slot( &self, pubkey: &Pubkey, - mut current_slot: Slot, + current_slot: Slot, bank: &Bank, blockstore: Option<&Blockstore>, max_slot_range: u64, ) -> Option<(Slot, Slot)> { - let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1); - let mut first_slot = None; - let mut last_slot = current_slot; + let (epoch, start_index) = bank.get_epoch_and_slot_index(current_slot + 1); let max_epoch = *self.max_epoch.read().unwrap(); if epoch > max_epoch { debug!( @@ -120,49 +120,40 @@ impl LeaderScheduleCache { ); return None; } - while let Some(leader_schedule) = self.get_epoch_schedule_else_compute(epoch, bank) { - // clippy thinks I should do this: - // for (i, ) in leader_schedule - // .iter() - // .enumerate() - // .take(bank.get_slots_in_epoch(epoch)) - // .skip(from_slot_index + 1) { - // - // but leader_schedule doesn't implement Iter... - #[allow(clippy::needless_range_loop)] - for i in start_index..bank.get_slots_in_epoch(epoch) { - current_slot += 1; - if *pubkey == leader_schedule[i] { - if let Some(blockstore) = blockstore { - if let Some(meta) = blockstore.meta(current_slot).unwrap() { - // We have already sent a shred for this slot, so skip it - if meta.received > 0 { - continue; - } - } - } - - if let Some(first_slot) = first_slot { - if current_slot - first_slot + 1 >= max_slot_range { - return Some((first_slot, current_slot)); - } - } else { - first_slot = Some(current_slot); - } - - last_slot = current_slot; - } else if first_slot.is_some() { - return Some((first_slot.unwrap(), last_slot)); + // Slots after current_slot where pubkey is the leader. + let mut schedule = (epoch..=max_epoch) + .map(|epoch| self.get_epoch_schedule_else_compute(epoch, bank)) + .while_some() + .zip(epoch..) + .flat_map(|(leader_schedule, k)| { + let offset = if k == epoch { start_index as usize } else { 0 }; + let num_slots = bank.get_slots_in_epoch(k) as usize; + let first_slot = bank.epoch_schedule().get_first_slot_in_epoch(k); + leader_schedule + .get_indices(pubkey, offset) + .take_while(move |i| *i < num_slots) + .map(move |i| i as Slot + first_slot) + }) + .skip_while(|slot| { + match blockstore { + None => false, + // Skip slots we have already sent a shred for. + Some(blockstore) => match blockstore.meta(*slot).unwrap() { + Some(meta) => meta.received > 0, + None => false, + }, } - } - - epoch += 1; - if epoch > max_epoch { - break; - } - start_index = 0; - } - first_slot.map(|slot| (slot, last_slot)) + }); + let first_slot = schedule.next()?; + let max_slot = first_slot.saturating_add(max_slot_range); + let last_slot = schedule + .take_while(|slot| *slot < max_slot) + .zip(first_slot + 1..) + .take_while(|(a, b)| a == b) + .map(|(s, _)| s) + .last() + .unwrap_or(first_slot); + Some((first_slot, last_slot)) } pub fn set_fixed_leader_schedule(&mut self, fixed_schedule: Option) {