From f17ac70bb2ac1fea9948d02c17a99d6ec08e4879 Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 2 Jul 2020 14:33:04 -0700 Subject: [PATCH] Add weighted traversal (#10877) Co-authored-by: Carl --- core/src/lib.rs | 1 + core/src/repair_service.rs | 84 +++-- core/src/repair_weighted_traversal.rs | 427 ++++++++++++++++++++++++++ ledger/src/blockstore.rs | 11 +- 4 files changed, 495 insertions(+), 28 deletions(-) create mode 100644 core/src/repair_weighted_traversal.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index c65a821260..019d37a46d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -40,6 +40,7 @@ pub mod progress_map; pub mod pubkey_references; pub mod repair_response; pub mod repair_service; +pub mod repair_weighted_traversal; pub mod replay_stage; mod result; pub mod retransmit_stage; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index b29a085de9..bc9ee7db4a 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -4,6 +4,7 @@ use crate::{ cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, cluster_slots::ClusterSlots, + repair_weighted_traversal::Contains, result::Result, serve_repair::{RepairType, ServeRepair, DEFAULT_NONCE}, }; @@ -273,6 +274,61 @@ impl RepairService { Ok(repairs) } + pub fn generate_repairs_for_slot( + blockstore: &Blockstore, + slot: Slot, + slot_meta: &SlotMeta, + max_repairs: usize, + ) -> Vec { + if max_repairs == 0 || slot_meta.is_full() { + vec![] + } else if slot_meta.consumed == slot_meta.received { + vec![RepairType::HighestShred(slot, slot_meta.received)] + } else { + let reqs = blockstore.find_missing_data_indexes( + slot, + slot_meta.first_shred_timestamp, + slot_meta.consumed, + slot_meta.received, + max_repairs, + ); + reqs.into_iter() + .map(|i| RepairType::Shred(slot, i)) + .collect() + } + } + + /// Repairs any fork starting at the input slot + pub fn generate_repairs_for_fork( + blockstore: &Blockstore, + repairs: &mut Vec, + max_repairs: usize, + slot: Slot, + duplicate_slot_repair_statuses: &dyn Contains, + ) { + let mut pending_slots = vec![slot]; + while repairs.len() < max_repairs && !pending_slots.is_empty() { + let slot = pending_slots.pop().unwrap(); + if duplicate_slot_repair_statuses.contains(&slot) { + // These are repaired through a different path + continue; + } + if let Some(slot_meta) = blockstore.meta(slot).unwrap() { + let new_repairs = Self::generate_repairs_for_slot( + blockstore, + slot, + &slot_meta, + max_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + let next_slots = slot_meta.next_slots; + pending_slots.extend(next_slots); + } else { + break; + } + } + } + fn generate_repairs( blockstore: &Blockstore, root: Slot, @@ -282,7 +338,7 @@ impl RepairService { ) -> Result> { // Slot height and shred indexes for shreds we want to repair let mut repairs: Vec = vec![]; - Self::generate_repairs_for_fork( + Self::generate_repairs_by_level( blockstore, &mut repairs, max_repairs, @@ -501,30 +557,6 @@ impl RepairService { .collect() } - fn generate_repairs_for_slot( - blockstore: &Blockstore, - slot: Slot, - slot_meta: &SlotMeta, - max_repairs: usize, - ) -> Vec { - if slot_meta.is_full() { - vec![] - } else if slot_meta.consumed == slot_meta.received { - vec![RepairType::HighestShred(slot, slot_meta.received)] - } else { - let reqs = blockstore.find_missing_data_indexes( - slot, - slot_meta.first_shred_timestamp, - slot_meta.consumed, - slot_meta.received, - max_repairs, - ); - reqs.into_iter() - .map(|i| RepairType::Shred(slot, i)) - .collect() - } - } - fn generate_repairs_for_orphans( orphans: impl Iterator, repairs: &mut Vec, @@ -533,7 +565,7 @@ impl RepairService { } /// Repairs any fork starting at the input slot - fn generate_repairs_for_fork( + fn generate_repairs_by_level( blockstore: &Blockstore, repairs: &mut Vec, max_repairs: usize, diff --git a/core/src/repair_weighted_traversal.rs b/core/src/repair_weighted_traversal.rs new file mode 100644 index 0000000000..24b5cd8c84 --- /dev/null +++ b/core/src/repair_weighted_traversal.rs @@ -0,0 +1,427 @@ +use crate::{ + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairService, + serve_repair::RepairType, +}; +use solana_ledger::blockstore::Blockstore; +use solana_sdk::clock::Slot; +use std::{ + cmp::Eq, + collections::{HashMap, HashSet}, + hash::Hash, +}; + +pub trait Contains { + fn contains(&self, key: &T) -> bool; +} + +impl Contains for HashMap { + fn contains(&self, key: &T) -> bool { + self.contains_key(key) + } +} +impl Contains for HashSet { + fn contains(&self, key: &T) -> bool { + self.contains(key) + } +} + +#[derive(Debug, PartialEq)] +enum Visit { + Visited(Slot), + Unvisited(Slot), +} + +impl Visit { + pub fn slot(&self) -> Slot { + match self { + Visit::Visited(slot) => *slot, + Visit::Unvisited(slot) => *slot, + } + } +} + +// Iterates through slots in order of weight +struct RepairWeightTraversal<'a> { + tree: &'a HeaviestSubtreeForkChoice, + pending: Vec, +} + +impl<'a> RepairWeightTraversal<'a> { + pub fn new(tree: &'a HeaviestSubtreeForkChoice) -> Self { + Self { + tree, + pending: vec![Visit::Unvisited(tree.root())], + } + } +} + +impl<'a> Iterator for RepairWeightTraversal<'a> { + type Item = Visit; + fn next(&mut self) -> Option { + let next = self.pending.pop(); + next.map(|next| { + if let Visit::Unvisited(slot) = next { + // Add a bookmark to communicate all child + // slots have been visited + self.pending.push(Visit::Visited(slot)); + let mut children: Vec<_> = self + .tree + .children(slot) + .unwrap() + .iter() + .map(|child_slot| Visit::Unvisited(*child_slot)) + .collect(); + + // Sort children by weight to prioritize visiting the heaviest + // ones first + children + .sort_by(|slot1, slot2| self.tree.max_by_weight(slot1.slot(), slot2.slot())); + self.pending.extend(children); + } + next + }) + } +} + +// Generate shred repairs for main subtree rooted at `self.slot` +pub fn get_best_repair_shreds( + tree: &HeaviestSubtreeForkChoice, + blockstore: &Blockstore, + repairs: &mut Vec, + max_new_shreds: usize, + ignore_slots: &dyn Contains, +) { + let initial_len = repairs.len(); + let max_repairs = initial_len + max_new_shreds; + let weighted_iter = RepairWeightTraversal::new(tree); + let mut visited_set = HashSet::new(); + let mut slot_meta_cache = HashMap::new(); + for next in weighted_iter { + if repairs.len() > max_repairs { + break; + } + + let slot_meta = slot_meta_cache + .entry(next.slot()) + .or_insert_with(|| blockstore.meta(next.slot()).unwrap()); + + if let Some(slot_meta) = slot_meta { + match next { + Visit::Unvisited(slot) => { + if !ignore_slots.contains(&slot) { + let new_repairs = RepairService::generate_repairs_for_slot( + blockstore, + slot, + &slot_meta, + max_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + } + visited_set.insert(slot); + } + Visit::Visited(_) => { + // By the time we reach here, this means all the children of this slot + // have been explored/repaired. Although this slot has already been visited, + // this slot is still the heaviest slot left in the traversal. Thus any + // remaining children that have not been explored should now be repaired. + for new_child_slot in &slot_meta.next_slots { + // If the `new_child_slot` has not been visited by now, it must + // not exist in `tree` + if !visited_set.contains(new_child_slot) { + // Generate repairs for entire subtree rooted at `new_child_slot` + RepairService::generate_repairs_for_fork( + blockstore, + repairs, + max_repairs, + *new_child_slot, + ignore_slots, + ); + } + visited_set.insert(*new_child_slot); + } + } + } + } + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use solana_ledger::{get_tmp_ledger_path, shred::Shred}; + use solana_runtime::bank_utils; + use trees::tr; + + #[test] + fn test_weighted_repair_traversal_single() { + let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new(42); + let weighted_traversal = RepairWeightTraversal::new(&heaviest_subtree_fork_choice); + let steps: Vec<_> = weighted_traversal.collect(); + assert_eq!(steps, vec![Visit::Unvisited(42), Visit::Visited(42)]); + } + + #[test] + fn test_weighted_repair_traversal() { + let stake = 100; + let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys(1, stake); + let (_, mut heaviest_subtree_fork_choice) = setup_forks(); + let weighted_traversal = RepairWeightTraversal::new(&heaviest_subtree_fork_choice); + let steps: Vec<_> = weighted_traversal.collect(); + + // When every node has a weight of zero, visit + // smallest children first + assert_eq!( + steps, + vec![ + Visit::Unvisited(0), + Visit::Unvisited(1), + Visit::Unvisited(2), + Visit::Unvisited(4), + Visit::Visited(4), + Visit::Visited(2), + Visit::Unvisited(3), + Visit::Unvisited(5), + Visit::Visited(5), + Visit::Visited(3), + Visit::Visited(1), + Visit::Visited(0) + ] + ); + + // Add a vote to branch with slot 5, + // should prioritize that branch + heaviest_subtree_fork_choice.add_votes( + &[(vote_pubkeys[0], 5)], + bank.epoch_stakes_map(), + bank.epoch_schedule(), + ); + + let weighted_traversal = RepairWeightTraversal::new(&heaviest_subtree_fork_choice); + let steps: Vec<_> = weighted_traversal.collect(); + assert_eq!( + steps, + vec![ + Visit::Unvisited(0), + Visit::Unvisited(1), + Visit::Unvisited(3), + Visit::Unvisited(5), + Visit::Visited(5), + // Prioritizes heavier child 3 over 2 + Visit::Visited(3), + Visit::Unvisited(2), + Visit::Unvisited(4), + Visit::Visited(4), + Visit::Visited(2), + Visit::Visited(1), + Visit::Visited(0) + ] + ); + } + + #[test] + fn test_get_best_repair_shreds() { + let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); + + // `blockstore` and `heaviest_subtree_fork_choice` match exactly, so should + // return repairs for all slots (none are completed) in order of traversal + let mut repairs = vec![]; + let last_shred = blockstore.meta(0).unwrap().unwrap().received; + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + 6, + &HashSet::default(), + ); + assert_eq!( + repairs, + [0, 1, 2, 4, 3, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + + // Add some leaves to blockstore, attached to the current best leaf, should prioritize + // repairing those new leaves before trying other branches + repairs = vec![]; + let best_overall_slot = heaviest_subtree_fork_choice.best_overall_slot(); + assert_eq!(heaviest_subtree_fork_choice.best_overall_slot(), 4); + blockstore.add_tree(tr(best_overall_slot) / (tr(6) / tr(7)), true, false); + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + 6, + &HashSet::default(), + ); + assert_eq!( + repairs, + [0, 1, 2, 4, 6, 7] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + + // Completing slots should remove them from the repair list + repairs = vec![]; + let completed_shreds: Vec = [0, 2, 4, 6] + .iter() + .map(|slot| { + let mut shred = Shred::new_from_serialized_shred( + blockstore + .get_data_shred(*slot, last_shred - 1) + .unwrap() + .unwrap(), + ) + .unwrap(); + shred.set_index(last_shred as u32); + shred.set_last_in_slot(); + shred + }) + .collect(); + blockstore + .insert_shreds(completed_shreds, None, false) + .unwrap(); + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + 4, + &HashSet::default(), + ); + assert_eq!( + repairs, + [1, 7, 3, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + + // Adding incomplete children with higher weighted parents, even if + // the parents are complete should still be repaired + repairs = vec![]; + blockstore.add_tree(tr(2) / (tr(8)), true, false); + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + 4, + &HashSet::default(), + ); + assert_eq!( + repairs, + [1, 7, 8, 3] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + } + + #[test] + fn test_get_best_repair_shreds_no_duplicates() { + let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); + // Add a branch to slot 2, make sure it doesn't repair child + // 4 again when the Unvisited(2) event happens + blockstore.add_tree(tr(2) / (tr(6) / tr(7)), true, false); + let mut repairs = vec![]; + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + std::usize::MAX, + &HashSet::default(), + ); + let last_shred = blockstore.meta(0).unwrap().unwrap().received; + assert_eq!( + repairs, + [0, 1, 2, 4, 6, 7, 3, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + } + + #[test] + fn test_get_best_repair_shreds_ignore() { + let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); + + // Adding slots to ignore should remove them from the repair set, but + // should not remove their children + let mut repairs = vec![]; + let mut ignore_set: HashSet = vec![1, 3].into_iter().collect(); + let last_shred = blockstore.meta(0).unwrap().unwrap().received; + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + std::usize::MAX, + &ignore_set, + ); + assert_eq!( + repairs, + [0, 2, 4, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + + // Adding slot 2 to ignore should not remove its unexplored children from + // the repair set + repairs = vec![]; + blockstore.add_tree(tr(2) / (tr(6) / tr(7)), true, false); + ignore_set.insert(2); + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + std::usize::MAX, + &ignore_set, + ); + assert_eq!( + repairs, + [0, 4, 6, 7, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + + // Adding unexplored child 6 to ignore set should remove it and it's + // child 7 from the repair set + repairs = vec![]; + ignore_set.insert(6); + get_best_repair_shreds( + &heaviest_subtree_fork_choice, + &blockstore, + &mut repairs, + std::usize::MAX, + &ignore_set, + ); + assert_eq!( + repairs, + [0, 4, 5] + .iter() + .map(|slot| RepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + } + + fn setup_forks() -> (Blockstore, HeaviestSubtreeForkChoice) { + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 + slot 4 | + slot 5 + */ + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5)))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.add_tree(forks.clone(), false, false); + + (blockstore, HeaviestSubtreeForkChoice::new_from_tree(forks)) + } +} diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index c08a55c431..21508def97 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -304,7 +304,7 @@ impl Blockstore { Ok((blockstore, signal_receiver, completed_slots_receiver)) } - pub fn add_tree(&self, forks: Tree, is_orphan: bool) { + pub fn add_tree(&self, forks: Tree, is_orphan: bool, is_slot_complete: bool) { let mut walk = TreeWalk::from(forks); while let Some(visit) = walk.get() { let slot = visit.node().data; @@ -314,7 +314,14 @@ impl Blockstore { } let parent = walk.get_parent().map(|n| n.data); if parent.is_some() || !is_orphan { - let (shreds, _) = make_slot_entries(slot, parent.unwrap_or(slot), 1); + let entries = create_ticks(2, 0, Hash::default()); + let shreds = entries_to_test_shreds( + entries.clone(), + slot, + parent.unwrap_or(slot), + is_slot_complete, + 0, + ); self.insert_shreds(shreds, None, false).unwrap(); } walk.forward();