adds an inverted index to leader schedule (#15249)
next_leader_slot is doing a linear search for slots in which a pubkey is the leader: https://github.com/solana-labs/solana/blob/e59a24d9f/ledger/src/leader_schedule_cache.rs#L123-L157 This can be done more efficiently by adding an inverted index to leader schedule.
This commit is contained in:
parent
5b8f046c67
commit
e403aeaf05
|
@ -1,6 +1,9 @@
|
||||||
|
use itertools::Itertools;
|
||||||
use rand::distributions::{Distribution, WeightedIndex};
|
use rand::distributions::{Distribution, WeightedIndex};
|
||||||
use rand_chacha::{rand_core::SeedableRng, ChaChaRng};
|
use rand_chacha::{rand_core::SeedableRng, ChaChaRng};
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::convert::identity;
|
||||||
use std::ops::Index;
|
use std::ops::Index;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
@ -15,6 +18,8 @@ pub struct FixedSchedule {
|
||||||
#[derive(Debug, Default, PartialEq)]
|
#[derive(Debug, Default, PartialEq)]
|
||||||
pub struct LeaderSchedule {
|
pub struct LeaderSchedule {
|
||||||
slot_leaders: Vec<Pubkey>,
|
slot_leaders: Vec<Pubkey>,
|
||||||
|
// Inverted index from pubkeys to indices where they are the leader.
|
||||||
|
index: HashMap<Pubkey, Arc<Vec<usize>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LeaderSchedule {
|
impl LeaderSchedule {
|
||||||
|
@ -34,11 +39,22 @@ impl LeaderSchedule {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
Self { slot_leaders }
|
Self::new_from_schedule(slot_leaders)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_from_schedule(slot_leaders: Vec<Pubkey>) -> Self {
|
pub fn new_from_schedule(slot_leaders: Vec<Pubkey>) -> Self {
|
||||||
Self { slot_leaders }
|
let index = slot_leaders
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, pk)| (*pk, i))
|
||||||
|
.into_group_map()
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, v)| (k, Arc::new(v)))
|
||||||
|
.collect();
|
||||||
|
Self {
|
||||||
|
slot_leaders,
|
||||||
|
index,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_slot_leaders(&self) -> &[Pubkey] {
|
pub fn get_slot_leaders(&self) -> &[Pubkey] {
|
||||||
|
@ -48,6 +64,33 @@ impl LeaderSchedule {
|
||||||
pub fn num_slots(&self) -> usize {
|
pub fn num_slots(&self) -> usize {
|
||||||
self.slot_leaders.len()
|
self.slot_leaders.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 'offset' is an index into the leader schedule. The function returns an
|
||||||
|
/// iterator of indices i >= offset where the given pubkey is the leader.
|
||||||
|
pub(crate) fn get_indices(
|
||||||
|
&self,
|
||||||
|
pubkey: &Pubkey,
|
||||||
|
offset: usize, // Starting index.
|
||||||
|
) -> impl Iterator<Item = usize> {
|
||||||
|
let index = self.index.get(pubkey).cloned().unwrap_or_default();
|
||||||
|
let num_slots = self.slot_leaders.len();
|
||||||
|
let size = index.len();
|
||||||
|
#[allow(clippy::reversed_empty_ranges)]
|
||||||
|
let range = if index.is_empty() {
|
||||||
|
1..=0 // Intentionally empty range of type RangeInclusive.
|
||||||
|
} else {
|
||||||
|
let offset = index
|
||||||
|
.binary_search(&(offset % num_slots))
|
||||||
|
.unwrap_or_else(identity)
|
||||||
|
+ offset / num_slots * size;
|
||||||
|
offset..=usize::MAX
|
||||||
|
};
|
||||||
|
// The modular arithmetic here and above replicate Index implementation
|
||||||
|
// for LeaderSchedule, where the schedule keeps repeating endlessly.
|
||||||
|
// The '%' returns where in a cycle we are and the '/' returns how many
|
||||||
|
// times the schedule is repeated.
|
||||||
|
range.map(move |k| index[k % size] + k / size * num_slots)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Index<u64> for LeaderSchedule {
|
impl Index<u64> for LeaderSchedule {
|
||||||
|
@ -61,14 +104,14 @@ impl Index<u64> for LeaderSchedule {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use rand::Rng;
|
||||||
|
use std::iter::repeat_with;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_leader_schedule_index() {
|
fn test_leader_schedule_index() {
|
||||||
let pubkey0 = solana_sdk::pubkey::new_rand();
|
let pubkey0 = solana_sdk::pubkey::new_rand();
|
||||||
let pubkey1 = solana_sdk::pubkey::new_rand();
|
let pubkey1 = solana_sdk::pubkey::new_rand();
|
||||||
let leader_schedule = LeaderSchedule {
|
let leader_schedule = LeaderSchedule::new_from_schedule(vec![pubkey0, pubkey1]);
|
||||||
slot_leaders: vec![pubkey0, pubkey1],
|
|
||||||
};
|
|
||||||
assert_eq!(leader_schedule[0], pubkey0);
|
assert_eq!(leader_schedule[0], pubkey0);
|
||||||
assert_eq!(leader_schedule[1], pubkey1);
|
assert_eq!(leader_schedule[1], pubkey1);
|
||||||
assert_eq!(leader_schedule[2], pubkey0);
|
assert_eq!(leader_schedule[2], pubkey0);
|
||||||
|
@ -157,4 +200,29 @@ mod tests {
|
||||||
assert_eq!(leaders1, leaders1_expected);
|
assert_eq!(leaders1, leaders1_expected);
|
||||||
assert_eq!(leaders2, leaders2_expected);
|
assert_eq!(leaders2, leaders2_expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_get_indices() {
|
||||||
|
const NUM_SLOTS: usize = 97;
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let pubkeys: Vec<_> = repeat_with(Pubkey::new_unique).take(4).collect();
|
||||||
|
let schedule: Vec<_> = repeat_with(|| pubkeys[rng.gen_range(0, 3)])
|
||||||
|
.take(19)
|
||||||
|
.collect();
|
||||||
|
let schedule = LeaderSchedule::new_from_schedule(schedule);
|
||||||
|
let leaders = (0..NUM_SLOTS)
|
||||||
|
.map(|i| (schedule[i as u64], i))
|
||||||
|
.into_group_map();
|
||||||
|
for pubkey in &pubkeys {
|
||||||
|
let index = leaders.get(pubkey).cloned().unwrap_or_default();
|
||||||
|
for offset in 0..NUM_SLOTS {
|
||||||
|
let schedule: Vec<_> = schedule
|
||||||
|
.get_indices(pubkey, offset)
|
||||||
|
.take_while(|s| *s < NUM_SLOTS)
|
||||||
|
.collect();
|
||||||
|
let index: Vec<_> = index.iter().copied().skip_while(|s| *s < offset).collect();
|
||||||
|
assert_eq!(schedule, index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ use crate::{
|
||||||
leader_schedule::{FixedSchedule, LeaderSchedule},
|
leader_schedule::{FixedSchedule, LeaderSchedule},
|
||||||
leader_schedule_utils,
|
leader_schedule_utils,
|
||||||
};
|
};
|
||||||
|
use itertools::Itertools;
|
||||||
use log::*;
|
use log::*;
|
||||||
use solana_runtime::bank::Bank;
|
use solana_runtime::bank::Bank;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
|
@ -99,18 +100,17 @@ impl LeaderScheduleCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the (next slot, last slot) after the given current_slot that the given node will be leader
|
/// Returns the (next slot, last slot) consecutive range of slots after
|
||||||
|
/// the given current_slot that the given node will be leader.
|
||||||
pub fn next_leader_slot(
|
pub fn next_leader_slot(
|
||||||
&self,
|
&self,
|
||||||
pubkey: &Pubkey,
|
pubkey: &Pubkey,
|
||||||
mut current_slot: Slot,
|
current_slot: Slot,
|
||||||
bank: &Bank,
|
bank: &Bank,
|
||||||
blockstore: Option<&Blockstore>,
|
blockstore: Option<&Blockstore>,
|
||||||
max_slot_range: u64,
|
max_slot_range: u64,
|
||||||
) -> Option<(Slot, Slot)> {
|
) -> Option<(Slot, Slot)> {
|
||||||
let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
|
let (epoch, start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
|
||||||
let mut first_slot = None;
|
|
||||||
let mut last_slot = current_slot;
|
|
||||||
let max_epoch = *self.max_epoch.read().unwrap();
|
let max_epoch = *self.max_epoch.read().unwrap();
|
||||||
if epoch > max_epoch {
|
if epoch > max_epoch {
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -120,49 +120,40 @@ impl LeaderScheduleCache {
|
||||||
);
|
);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
while let Some(leader_schedule) = self.get_epoch_schedule_else_compute(epoch, bank) {
|
// Slots after current_slot where pubkey is the leader.
|
||||||
// clippy thinks I should do this:
|
let mut schedule = (epoch..=max_epoch)
|
||||||
// for (i, <item>) in leader_schedule
|
.map(|epoch| self.get_epoch_schedule_else_compute(epoch, bank))
|
||||||
// .iter()
|
.while_some()
|
||||||
// .enumerate()
|
.zip(epoch..)
|
||||||
// .take(bank.get_slots_in_epoch(epoch))
|
.flat_map(|(leader_schedule, k)| {
|
||||||
// .skip(from_slot_index + 1) {
|
let offset = if k == epoch { start_index as usize } else { 0 };
|
||||||
//
|
let num_slots = bank.get_slots_in_epoch(k) as usize;
|
||||||
// but leader_schedule doesn't implement Iter...
|
let first_slot = bank.epoch_schedule().get_first_slot_in_epoch(k);
|
||||||
#[allow(clippy::needless_range_loop)]
|
leader_schedule
|
||||||
for i in start_index..bank.get_slots_in_epoch(epoch) {
|
.get_indices(pubkey, offset)
|
||||||
current_slot += 1;
|
.take_while(move |i| *i < num_slots)
|
||||||
if *pubkey == leader_schedule[i] {
|
.map(move |i| i as Slot + first_slot)
|
||||||
if let Some(blockstore) = blockstore {
|
})
|
||||||
if let Some(meta) = blockstore.meta(current_slot).unwrap() {
|
.skip_while(|slot| {
|
||||||
// We have already sent a shred for this slot, so skip it
|
match blockstore {
|
||||||
if meta.received > 0 {
|
None => false,
|
||||||
continue;
|
// Skip slots we have already sent a shred for.
|
||||||
}
|
Some(blockstore) => match blockstore.meta(*slot).unwrap() {
|
||||||
}
|
Some(meta) => meta.received > 0,
|
||||||
}
|
None => false,
|
||||||
|
},
|
||||||
if let Some(first_slot) = first_slot {
|
|
||||||
if current_slot - first_slot + 1 >= max_slot_range {
|
|
||||||
return Some((first_slot, current_slot));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
first_slot = Some(current_slot);
|
|
||||||
}
|
|
||||||
|
|
||||||
last_slot = current_slot;
|
|
||||||
} else if first_slot.is_some() {
|
|
||||||
return Some((first_slot.unwrap(), last_slot));
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
|
let first_slot = schedule.next()?;
|
||||||
epoch += 1;
|
let max_slot = first_slot.saturating_add(max_slot_range);
|
||||||
if epoch > max_epoch {
|
let last_slot = schedule
|
||||||
break;
|
.take_while(|slot| *slot < max_slot)
|
||||||
}
|
.zip(first_slot + 1..)
|
||||||
start_index = 0;
|
.take_while(|(a, b)| a == b)
|
||||||
}
|
.map(|(s, _)| s)
|
||||||
first_slot.map(|slot| (slot, last_slot))
|
.last()
|
||||||
|
.unwrap_or(first_slot);
|
||||||
|
Some((first_slot, last_slot))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_fixed_leader_schedule(&mut self, fixed_schedule: Option<FixedSchedule>) {
|
pub fn set_fixed_leader_schedule(&mut self, fixed_schedule: Option<FixedSchedule>) {
|
||||||
|
|
Loading…
Reference in New Issue