From 2ed7e3af890cf04a5275223dc4403fdc1dcdcc35 Mon Sep 17 00:00:00 2001 From: Jeff Biseda Date: Fri, 19 Nov 2021 19:17:30 -0800 Subject: [PATCH] prioritize slot repairs for unknown last index and close to completion (#21070) --- core/src/lib.rs | 1 + core/src/repair_generic_traversal.rs | 312 ++++++++++++++++++++++++++ core/src/repair_service.rs | 129 ++++++++++- core/src/repair_weight.rs | 146 +++++++++++- core/src/repair_weighted_traversal.rs | 22 +- ledger/src/blockstore_meta.rs | 8 + 6 files changed, 600 insertions(+), 18 deletions(-) create mode 100644 core/src/repair_generic_traversal.rs diff --git a/core/src/lib.rs b/core/src/lib.rs index 525a1b0d8..eea8e921a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -34,6 +34,7 @@ pub mod outstanding_requests; pub mod packet_hasher; pub mod progress_map; pub mod qos_service; +pub mod repair_generic_traversal; pub mod repair_response; pub mod repair_service; pub mod repair_weight; diff --git a/core/src/repair_generic_traversal.rs b/core/src/repair_generic_traversal.rs new file mode 100644 index 000000000..24eb36aac --- /dev/null +++ b/core/src/repair_generic_traversal.rs @@ -0,0 +1,312 @@ +use crate::{ + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairService, + serve_repair::ShredRepairType, tree_diff::TreeDiff, +}; +use solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta}; +use solana_sdk::{clock::Slot, hash::Hash}; +use std::collections::{HashMap, HashSet}; + +struct GenericTraversal<'a> { + tree: &'a HeaviestSubtreeForkChoice, + pending: Vec, +} + +impl<'a> GenericTraversal<'a> { + pub fn new(tree: &'a HeaviestSubtreeForkChoice) -> Self { + Self { + tree, + pending: vec![tree.root().0], + } + } +} + +impl<'a> Iterator for GenericTraversal<'a> { + type Item = Slot; + fn next(&mut self) -> Option { + let next = self.pending.pop(); + if let Some(slot) = next { + let children: Vec<_> = self + .tree + .children(&(slot, Hash::default())) + .unwrap() + .iter() + .map(|(child_slot, _)| *child_slot) + .collect(); + self.pending.extend(children); + } + next + } +} + +pub fn get_unknown_last_index( + tree: &HeaviestSubtreeForkChoice, + blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, + processed_slots: &mut HashSet, + limit: usize, +) -> Vec { + let iter = GenericTraversal::new(tree); + let mut unknown_last = Vec::new(); + for slot in iter { + if processed_slots.contains(&slot) { + continue; + } + let slot_meta = slot_meta_cache + .entry(slot) + .or_insert_with(|| blockstore.meta(slot).unwrap()); + if let Some(slot_meta) = slot_meta { + if slot_meta.known_last_index().is_none() { + let shred_index = blockstore.get_index(slot).unwrap(); + let num_processed_shreds = if let Some(shred_index) = shred_index { + shred_index.data().num_shreds() as u64 + } else { + slot_meta.consumed + }; + unknown_last.push((slot, slot_meta.received, num_processed_shreds)); + processed_slots.insert(slot); + } + } + } + // prioritize slots with more received shreds + unknown_last.sort_by(|(_, _, count1), (_, _, count2)| count2.cmp(count1)); + unknown_last + .iter() + .take(limit) + .map(|(slot, received, _)| ShredRepairType::HighestShred(*slot, *received)) + .collect() +} + +fn get_unrepaired_path( + start_slot: Slot, + blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, + visited: &mut HashSet, +) -> Vec { + let mut path = Vec::new(); + let mut slot = start_slot; + while !visited.contains(&slot) { + visited.insert(slot); + let slot_meta = slot_meta_cache + .entry(slot) + .or_insert_with(|| blockstore.meta(slot).unwrap()); + if let Some(slot_meta) = slot_meta { + if slot_meta.is_full() { + break; + } + path.push(slot); + slot = slot_meta.parent_slot; + } + } + path.reverse(); + path +} + +pub fn get_closest_completion( + tree: &HeaviestSubtreeForkChoice, + blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, + processed_slots: &mut HashSet, + limit: usize, +) -> Vec { + let mut v: Vec<(Slot, u64)> = Vec::default(); + let iter = GenericTraversal::new(tree); + for slot in iter { + if processed_slots.contains(&slot) { + continue; + } + let slot_meta = slot_meta_cache + .entry(slot) + .or_insert_with(|| blockstore.meta(slot).unwrap()); + if let Some(slot_meta) = slot_meta { + if slot_meta.is_full() { + continue; + } + if let Some(last_index) = slot_meta.known_last_index() { + let shred_index = blockstore.get_index(slot).unwrap(); + let dist = if let Some(shred_index) = shred_index { + let shred_count = shred_index.data().num_shreds() as u64; + last_index - shred_count + } else { + last_index - slot_meta.consumed + }; + v.push((slot, dist)); + processed_slots.insert(slot); + } + } + } + v.sort_by(|(_, d1), (_, d2)| d1.cmp(d2)); + + let mut visited = HashSet::new(); + let mut repairs = Vec::new(); + for (slot, _) in v { + if repairs.len() >= limit { + break; + } + // attempt to repair heaviest slots starting with their parents + let path = get_unrepaired_path(slot, blockstore, slot_meta_cache, &mut visited); + for slot in path { + if repairs.len() >= limit { + break; + } + let slot_meta = slot_meta_cache.get(&slot).unwrap().as_ref().unwrap(); + let new_repairs = RepairService::generate_repairs_for_slot( + blockstore, + slot, + slot_meta, + limit - repairs.len(), + ); + repairs.extend(new_repairs); + } + } + + repairs +} + +#[cfg(test)] +pub mod test { + use super::*; + use solana_ledger::{ + blockstore::{Blockstore, MAX_TURBINE_PROPAGATION_IN_MS}, + get_tmp_ledger_path, + }; + use solana_sdk::hash::Hash; + use std::{thread::sleep, time::Duration}; + use trees::{tr, Tree, TreeWalk}; + + #[test] + fn test_get_unknown_last_index() { + let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); + let last_shred = blockstore.meta(0).unwrap().unwrap().received; + let mut slot_meta_cache = HashMap::default(); + let mut processed_slots = HashSet::default(); + let repairs = get_unknown_last_index( + &heaviest_subtree_fork_choice, + &blockstore, + &mut slot_meta_cache, + &mut processed_slots, + 10, + ); + assert_eq!( + repairs, + [0, 1, 3, 5, 2, 4] + .iter() + .map(|slot| ShredRepairType::HighestShred(*slot, last_shred)) + .collect::>() + ); + } + + #[test] + fn test_get_closest_completion() { + let (blockstore, heaviest_subtree_fork_choice) = setup_forks(); + let mut slot_meta_cache = HashMap::default(); + let mut processed_slots = HashSet::default(); + let repairs = get_closest_completion( + &heaviest_subtree_fork_choice, + &blockstore, + &mut slot_meta_cache, + &mut processed_slots, + 10, + ); + assert_eq!(repairs, []); + + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5)))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + add_tree_with_missing_shreds( + &blockstore, + forks.clone(), + false, + true, + 100, + Hash::default(), + ); + let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_tree(forks); + sleep(Duration::from_millis(MAX_TURBINE_PROPAGATION_IN_MS)); + let mut slot_meta_cache = HashMap::default(); + let mut processed_slots = HashSet::default(); + let repairs = get_closest_completion( + &heaviest_subtree_fork_choice, + &blockstore, + &mut slot_meta_cache, + &mut processed_slots, + 2, + ); + assert_eq!( + repairs, + [ShredRepairType::Shred(0, 3), ShredRepairType::Shred(1, 3)] + ); + } + + fn add_tree_with_missing_shreds( + blockstore: &Blockstore, + forks: Tree, + is_orphan: bool, + is_slot_complete: bool, + num_ticks: u64, + starting_hash: Hash, + ) { + let mut walk = TreeWalk::from(forks); + let mut blockhashes = HashMap::new(); + while let Some(visit) = walk.get() { + let slot = *visit.node().data(); + if blockstore.meta(slot).unwrap().is_some() + && blockstore.orphan(slot).unwrap().is_none() + { + // If slot exists in blockstore and is not an orphan, then skip it + walk.forward(); + continue; + } + let parent = walk.get_parent().map(|n| *n.data()); + if parent.is_some() || !is_orphan { + let parent_hash = parent + // parent won't exist for first node in a tree where + // `is_orphan == true` + .and_then(|parent| blockhashes.get(&parent)) + .unwrap_or(&starting_hash); + let entries = solana_entry::entry::create_ticks( + num_ticks * (std::cmp::max(1, slot - parent.unwrap_or(slot))), + 0, + *parent_hash, + ); + blockhashes.insert(slot, entries.last().unwrap().hash); + + let mut shreds = solana_ledger::blockstore::entries_to_test_shreds( + entries.clone(), + slot, + parent.unwrap_or(slot), + is_slot_complete, + 0, + ); + + // remove next to last shred + let shred = shreds.pop().unwrap(); + shreds.pop().unwrap(); + shreds.push(shred); + + blockstore.insert_shreds(shreds, None, false).unwrap(); + } + walk.forward(); + } + } + + fn setup_forks() -> (Blockstore, HeaviestSubtreeForkChoice) { + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 + slot 4 | + slot 5 + */ + + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5)))); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&ledger_path).unwrap(); + blockstore.add_tree(forks.clone(), false, false, 2, Hash::default()); + + (blockstore, HeaviestSubtreeForkChoice::new_from_tree(forks)) + } +} diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d5a0303fd..3a38a9e92 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -95,6 +95,8 @@ pub struct RepairTiming { pub add_votes_elapsed: u64, pub get_best_orphans_elapsed: u64, pub get_best_shreds_elapsed: u64, + pub get_unknown_last_index_elapsed: u64, + pub get_closest_completion_elapsed: u64, pub send_repairs_elapsed: u64, pub build_repairs_batch_elapsed: u64, pub batch_send_repairs_elapsed: u64, @@ -118,11 +120,50 @@ impl RepairTiming { } } +#[derive(Default, Debug)] +pub struct BestRepairsStats { + pub call_count: u64, + pub num_orphan_slots: u64, + pub num_orphan_repairs: u64, + pub num_best_shreds_slots: u64, + pub num_best_shreds_repairs: u64, + pub num_unknown_last_index_slots: u64, + pub num_unknown_last_index_repairs: u64, + pub num_closest_completion_slots: u64, + pub num_closest_completion_repairs: u64, +} + +impl BestRepairsStats { + pub fn update( + &mut self, + num_orphan_slots: u64, + num_orphan_repairs: u64, + num_best_shreds_slots: u64, + num_best_shreds_repairs: u64, + num_unknown_last_index_slots: u64, + num_unknown_last_index_repairs: u64, + num_closest_completion_slots: u64, + num_closest_completion_repairs: u64, + ) { + self.call_count += 1; + self.num_orphan_slots += num_orphan_slots; + self.num_orphan_repairs += num_orphan_repairs; + self.num_best_shreds_slots += num_best_shreds_slots; + self.num_best_shreds_repairs += num_best_shreds_repairs; + self.num_unknown_last_index_slots += num_unknown_last_index_slots; + self.num_unknown_last_index_repairs += num_unknown_last_index_repairs; + self.num_closest_completion_slots += num_closest_completion_slots; + self.num_closest_completion_repairs += num_closest_completion_repairs; + } +} + pub const MAX_REPAIR_LENGTH: usize = 512; pub const MAX_REPAIR_PER_DUPLICATE: usize = 20; pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; +pub const MAX_UNKNOWN_LAST_INDEX_REPAIRS: usize = 10; +pub const MAX_CLOSEST_COMPLETION_REPAIRS: usize = 100; #[derive(Clone)] pub struct RepairInfo { @@ -210,6 +251,7 @@ impl RepairService { let id = repair_info.cluster_info.id(); let mut repair_stats = RepairStats::default(); let mut repair_timing = RepairTiming::default(); + let mut best_repairs_stats = BestRepairsStats::default(); let mut last_stats = Instant::now(); let duplicate_slot_repair_statuses: HashMap = HashMap::new(); @@ -257,15 +299,20 @@ impl RepairService { ); add_votes_elapsed.stop(); - repair_weight.get_best_weighted_repairs( + let repairs = repair_weight.get_best_weighted_repairs( blockstore, root_bank.epoch_stakes_map(), root_bank.epoch_schedule(), MAX_ORPHANS, MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &duplicate_slot_repair_statuses, Some(&mut repair_timing), - ) + Some(&mut best_repairs_stats), + ); + + repairs }; let mut build_repairs_batch_elapsed = Measure::start("build_repairs_batch_elapsed"); @@ -362,6 +409,16 @@ impl RepairService { repair_timing.get_best_shreds_elapsed, i64 ), + ( + "get-unknown-last-index-elapsed", + repair_timing.get_unknown_last_index_elapsed, + i64 + ), + ( + "get-closest-completion-elapsed", + repair_timing.get_closest_completion_elapsed, + i64 + ), ( "send-repairs-elapsed", repair_timing.send_repairs_elapsed, @@ -378,8 +435,45 @@ impl RepairService { i64 ), ); + datapoint_info!( + "serve_repair-best-repairs", + ("call-count", best_repairs_stats.call_count, i64), + ("orphan-slots", best_repairs_stats.num_orphan_slots, i64), + ("orphan-repairs", best_repairs_stats.num_orphan_repairs, i64), + ( + "best-shreds-slots", + best_repairs_stats.num_best_shreds_slots, + i64 + ), + ( + "best-shreds-repairs", + best_repairs_stats.num_best_shreds_repairs, + i64 + ), + ( + "unknown-last-index-slots", + best_repairs_stats.num_unknown_last_index_slots, + i64 + ), + ( + "unknown-last-index-repairs", + best_repairs_stats.num_unknown_last_index_repairs, + i64 + ), + ( + "closest-completion-slots", + best_repairs_stats.num_closest_completion_slots, + i64 + ), + ( + "closest-completion-repairs", + best_repairs_stats.num_closest_completion_repairs, + i64 + ), + ); repair_stats = RepairStats::default(); repair_timing = RepairTiming::default(); + best_repairs_stats = BestRepairsStats::default(); last_stats = Instant::now(); } sleep(Duration::from_millis(REPAIR_MS)); @@ -474,7 +568,7 @@ impl RepairService { } } - #[allow(dead_code)] + #[cfg_attr(not(test), allow(dead_code))] fn generate_duplicate_repairs_for_slot( blockstore: &Blockstore, slot: Slot, @@ -499,7 +593,7 @@ impl RepairService { } } - #[allow(dead_code)] + #[cfg_attr(not(test), allow(dead_code))] fn generate_and_send_duplicate_repairs( duplicate_slot_repair_statuses: &mut HashMap, cluster_slots: &ClusterSlots, @@ -550,7 +644,7 @@ impl RepairService { }) } - #[allow(dead_code)] + #[cfg_attr(not(test), allow(dead_code))] fn serialize_and_send_request( repair_type: &ShredRepairType, repair_socket: &UdpSocket, @@ -566,7 +660,7 @@ impl RepairService { Ok(()) } - #[allow(dead_code)] + #[cfg_attr(not(test), allow(dead_code))] fn update_duplicate_slot_repair_addr( slot: Slot, status: &mut DuplicateSlotRepairStatus, @@ -659,8 +753,11 @@ mod test { &EpochSchedule::default(), MAX_ORPHANS, MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &HashSet::default(), None, + None, ), vec![ ShredRepairType::Orphan(2), @@ -693,8 +790,11 @@ mod test { &EpochSchedule::default(), MAX_ORPHANS, MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &HashSet::default(), - None + None, + None, ), vec![ShredRepairType::HighestShred(0, 0)] ); @@ -748,8 +848,11 @@ mod test { &EpochSchedule::default(), MAX_ORPHANS, MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &HashSet::default(), - None + None, + None, ), expected ); @@ -761,8 +864,11 @@ mod test { &EpochSchedule::default(), MAX_ORPHANS, expected.len() - 2, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &HashSet::default(), - None + None, + None, )[..], expected[0..expected.len() - 2] ); @@ -799,8 +905,11 @@ mod test { &EpochSchedule::default(), MAX_ORPHANS, MAX_REPAIR_LENGTH, + MAX_UNKNOWN_LAST_INDEX_REPAIRS, + MAX_CLOSEST_COMPLETION_REPAIRS, &HashSet::default(), - None + None, + None, ), expected ); diff --git a/core/src/repair_weight.rs b/core/src/repair_weight.rs index e341d5cda..a167a6811 100644 --- a/core/src/repair_weight.rs +++ b/core/src/repair_weight.rs @@ -1,8 +1,14 @@ use crate::{ - heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairTiming, - repair_weighted_traversal, serve_repair::ShredRepairType, tree_diff::TreeDiff, + heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, + repair_generic_traversal::{get_closest_completion, get_unknown_last_index}, + repair_service::{BestRepairsStats, RepairTiming}, + repair_weighted_traversal, + serve_repair::ShredRepairType, + tree_diff::TreeDiff, +}; +use solana_ledger::{ + ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_meta::SlotMeta, }; -use solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}; use solana_measure::measure::Measure; use solana_runtime::{contains::Contains, epoch_stakes::EpochStakes}; use solana_sdk::{ @@ -138,6 +144,7 @@ impl RepairWeight { } } + #[allow(clippy::too_many_arguments)] pub fn get_best_weighted_repairs<'a>( &mut self, blockstore: &Blockstore, @@ -145,29 +152,91 @@ impl RepairWeight { epoch_schedule: &EpochSchedule, max_new_orphans: usize, max_new_shreds: usize, + max_unknown_last_index_repairs: usize, + max_closest_completion_repairs: usize, ignore_slots: &impl Contains<'a, Slot>, repair_timing: Option<&mut RepairTiming>, + stats: Option<&mut BestRepairsStats>, ) -> Vec { let mut repairs = vec![]; + let mut processed_slots: HashSet = vec![self.root].into_iter().collect(); + let mut slot_meta_cache = HashMap::default(); + let mut get_best_orphans_elapsed = Measure::start("get_best_orphans"); // Update the orphans in order from heaviest to least heavy self.get_best_orphans( blockstore, + &mut processed_slots, &mut repairs, epoch_stakes, epoch_schedule, max_new_orphans, ); + let num_orphan_slots = processed_slots.len() - 1; + let num_orphan_repairs = repairs.len(); get_best_orphans_elapsed.stop(); let mut get_best_shreds_elapsed = Measure::start("get_best_shreds"); + let mut best_shreds_repairs = Vec::default(); // Find the best incomplete slots in rooted subtree - self.get_best_shreds(blockstore, &mut repairs, max_new_shreds, ignore_slots); + self.get_best_shreds( + blockstore, + &mut slot_meta_cache, + &mut best_shreds_repairs, + max_new_shreds, + ignore_slots, + ); + let num_best_shreds_repairs = best_shreds_repairs.len(); + let repair_slots_set: HashSet = + best_shreds_repairs.iter().map(|r| r.slot()).collect(); + let num_best_shreds_slots = repair_slots_set.len(); + processed_slots.extend(repair_slots_set); + repairs.extend(best_shreds_repairs); get_best_shreds_elapsed.stop(); + let mut get_unknown_last_index_elapsed = Measure::start("get_unknown_last_index"); + let pre_num_slots = processed_slots.len(); + let unknown_last_index_repairs = self.get_best_unknown_last_index( + blockstore, + &mut slot_meta_cache, + &mut processed_slots, + max_unknown_last_index_repairs, + ); + let num_unknown_last_index_repairs = unknown_last_index_repairs.len(); + let num_unknown_last_index_slots = processed_slots.len() - pre_num_slots; + repairs.extend(unknown_last_index_repairs); + get_unknown_last_index_elapsed.stop(); + + let mut get_closest_completion_elapsed = Measure::start("get_closest_completion"); + let pre_num_slots = processed_slots.len(); + let closest_completion_repairs = self.get_best_closest_completion( + blockstore, + &mut slot_meta_cache, + &mut processed_slots, + max_closest_completion_repairs, + ); + let num_closest_completion_repairs = closest_completion_repairs.len(); + let num_closest_completion_slots = processed_slots.len() - pre_num_slots; + repairs.extend(closest_completion_repairs); + get_closest_completion_elapsed.stop(); + + if let Some(stats) = stats { + stats.update( + num_orphan_slots as u64, + num_orphan_repairs as u64, + num_best_shreds_slots as u64, + num_best_shreds_repairs as u64, + num_unknown_last_index_slots as u64, + num_unknown_last_index_repairs as u64, + num_closest_completion_slots as u64, + num_closest_completion_repairs as u64, + ); + } if let Some(repair_timing) = repair_timing { repair_timing.get_best_orphans_elapsed += get_best_orphans_elapsed.as_us(); repair_timing.get_best_shreds_elapsed += get_best_shreds_elapsed.as_us(); + repair_timing.get_unknown_last_index_elapsed += get_unknown_last_index_elapsed.as_us(); + repair_timing.get_closest_completion_elapsed += get_closest_completion_elapsed.as_us(); } repairs } @@ -248,6 +317,7 @@ impl RepairWeight { fn get_best_shreds<'a>( &mut self, blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, ignore_slots: &impl Contains<'a, Slot>, @@ -256,6 +326,7 @@ impl RepairWeight { repair_weighted_traversal::get_best_repair_shreds( root_tree, blockstore, + slot_meta_cache, repairs, max_new_shreds, ignore_slots, @@ -265,6 +336,7 @@ impl RepairWeight { fn get_best_orphans( &mut self, blockstore: &Blockstore, + processed_slots: &mut HashSet, repairs: &mut Vec, epoch_stakes: &HashMap, epoch_schedule: &EpochSchedule, @@ -292,7 +364,7 @@ impl RepairWeight { if best_orphans.len() >= max_new_orphans { break; } - if heaviest_tree_root == self.root { + if processed_slots.contains(&heaviest_tree_root) { continue; } // Ignore trees that were merged in a previous iteration @@ -307,6 +379,7 @@ impl RepairWeight { if new_orphan_root != self.root && !best_orphans.contains(&new_orphan_root) { best_orphans.insert(new_orphan_root); repairs.push(ShredRepairType::Orphan(new_orphan_root)); + processed_slots.insert(new_orphan_root); } } } @@ -319,6 +392,7 @@ impl RepairWeight { if !best_orphans.contains(&new_orphan) { repairs.push(ShredRepairType::Orphan(new_orphan)); best_orphans.insert(new_orphan); + processed_slots.insert(new_orphan); } if best_orphans.len() == max_new_orphans { @@ -328,6 +402,54 @@ impl RepairWeight { } } + fn get_best_unknown_last_index( + &mut self, + blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, + processed_slots: &mut HashSet, + max_new_repairs: usize, + ) -> Vec { + let mut repairs = Vec::default(); + for (_slot, tree) in self.trees.iter() { + if repairs.len() >= max_new_repairs { + break; + } + let new_repairs = get_unknown_last_index( + tree, + blockstore, + slot_meta_cache, + processed_slots, + max_new_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + } + repairs + } + + fn get_best_closest_completion( + &mut self, + blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, + processed_slots: &mut HashSet, + max_new_repairs: usize, + ) -> Vec { + let mut repairs = Vec::default(); + for (_slot, tree) in self.trees.iter() { + if repairs.len() >= max_new_repairs { + break; + } + let new_repairs = get_closest_completion( + tree, + blockstore, + slot_meta_cache, + processed_slots, + max_new_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + } + repairs + } + // Attempts to chain the orphan subtree rooted at `orphan_tree_root` // to any earlier subtree with new any ancestry information in `blockstore`. // Returns the earliest known ancestor of `heaviest_tree_root`. @@ -852,8 +974,10 @@ mod test { // Ask for only 1 orphan. Because the orphans have the same weight, // should prioritize smaller orphan first let mut repairs = vec![]; + let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -879,6 +1003,7 @@ mod test { // New vote on same orphan branch, without any new slot chaining // information blockstore should not resolve the orphan repairs = vec![]; + processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(10, vec![vote_pubkeys[0]])]; repair_weight.add_votes( &blockstore, @@ -888,6 +1013,7 @@ mod test { ); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -898,8 +1024,10 @@ mod test { // Ask for 2 orphans, should return all the orphans repairs = vec![]; + processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -911,6 +1039,7 @@ mod test { // If one orphan gets heavier, should pick that one repairs = vec![]; + processed_slots = vec![repair_weight.root].into_iter().collect(); let votes = vec![(20, vec![vote_pubkeys[0]])]; repair_weight.add_votes( &blockstore, @@ -920,6 +1049,7 @@ mod test { ); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -931,10 +1061,12 @@ mod test { // Resolve the orphans, should now return no // orphans repairs = vec![]; + processed_slots = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(6) / (tr(8)), true, true, 2, Hash::default()); blockstore.add_tree(tr(11) / (tr(20)), true, true, 2, Hash::default()); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -967,9 +1099,11 @@ mod test { // orphan in the `trees` map, we should search for // exactly one more of the remaining two let mut repairs = vec![]; + let mut processed_slots: HashSet = vec![repair_weight.root].into_iter().collect(); blockstore.add_tree(tr(100) / (tr(101)), true, true, 2, Hash::default()); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), @@ -981,8 +1115,10 @@ mod test { // If we ask for 3 orphans, we should get all of them let mut repairs = vec![]; + processed_slots = vec![repair_weight.root].into_iter().collect(); repair_weight.get_best_orphans( &blockstore, + &mut processed_slots, &mut repairs, bank.epoch_stakes_map(), bank.epoch_schedule(), diff --git a/core/src/repair_weighted_traversal.rs b/core/src/repair_weighted_traversal.rs index 0ed3dc83d..a7e744d6e 100644 --- a/core/src/repair_weighted_traversal.rs +++ b/core/src/repair_weighted_traversal.rs @@ -2,7 +2,7 @@ use crate::{ heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, repair_service::RepairService, serve_repair::ShredRepairType, tree_diff::TreeDiff, }; -use solana_ledger::blockstore::Blockstore; +use solana_ledger::{blockstore::Blockstore, blockstore_meta::SlotMeta}; use solana_runtime::contains::Contains; use solana_sdk::{clock::Slot, hash::Hash}; use std::collections::{HashMap, HashSet}; @@ -29,7 +29,7 @@ struct RepairWeightTraversal<'a> { } impl<'a> RepairWeightTraversal<'a> { - pub fn new(tree: &'a HeaviestSubtreeForkChoice) -> Self { + fn new(tree: &'a HeaviestSubtreeForkChoice) -> Self { Self { tree, pending: vec![Visit::Unvisited(tree.root().0)], @@ -73,6 +73,7 @@ impl<'a> Iterator for RepairWeightTraversal<'a> { pub fn get_best_repair_shreds<'a>( tree: &HeaviestSubtreeForkChoice, blockstore: &Blockstore, + slot_meta_cache: &mut HashMap>, repairs: &mut Vec, max_new_shreds: usize, ignore_slots: &impl Contains<'a, Slot>, @@ -81,7 +82,6 @@ pub fn get_best_repair_shreds<'a>( let max_repairs = initial_len + max_new_shreds; let weighted_iter = RepairWeightTraversal::new(tree); let mut visited_set = HashSet::new(); - let mut slot_meta_cache = HashMap::new(); for next in weighted_iter { if repairs.len() > max_repairs { break; @@ -215,10 +215,12 @@ pub mod test { // `blockstore` and `heaviest_subtree_fork_choice` match exactly, so should // return repairs for all slots (none are completed) in order of traversal let mut repairs = vec![]; + let mut slot_meta_cache = HashMap::default(); let last_shred = blockstore.meta(0).unwrap().unwrap().received; get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, 6, &HashSet::default(), @@ -234,6 +236,7 @@ pub mod test { // Add some leaves to blockstore, attached to the current best leaf, should prioritize // repairing those new leaves before trying other branches repairs = vec![]; + slot_meta_cache = HashMap::default(); let best_overall_slot = heaviest_subtree_fork_choice.best_overall_slot().0; assert_eq!(best_overall_slot, 4); blockstore.add_tree( @@ -246,6 +249,7 @@ pub mod test { get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, 6, &HashSet::default(), @@ -260,6 +264,7 @@ pub mod test { // Completing slots should remove them from the repair list repairs = vec![]; + slot_meta_cache = HashMap::default(); let completed_shreds: Vec = [0, 2, 4, 6] .iter() .map(|slot| { @@ -281,6 +286,7 @@ pub mod test { get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, 4, &HashSet::default(), @@ -296,10 +302,12 @@ pub mod test { // Adding incomplete children with higher weighted parents, even if // the parents are complete should still be repaired repairs = vec![]; + slot_meta_cache = HashMap::default(); blockstore.add_tree(tr(2) / (tr(8)), true, false, 2, Hash::default()); get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, 4, &HashSet::default(), @@ -320,9 +328,11 @@ pub mod test { // 4 again when the Unvisited(2) event happens blockstore.add_tree(tr(2) / (tr(6) / tr(7)), true, false, 2, Hash::default()); let mut repairs = vec![]; + let mut slot_meta_cache = HashMap::default(); get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, std::usize::MAX, &HashSet::default(), @@ -344,11 +354,13 @@ pub mod test { // Adding slots to ignore should remove them from the repair set, but // should not remove their children let mut repairs = vec![]; + let mut slot_meta_cache = HashMap::default(); let mut ignore_set: HashSet = vec![1, 3].into_iter().collect(); let last_shred = blockstore.meta(0).unwrap().unwrap().received; get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, std::usize::MAX, &ignore_set, @@ -364,11 +376,13 @@ pub mod test { // Adding slot 2 to ignore should not remove its unexplored children from // the repair set repairs = vec![]; + slot_meta_cache = HashMap::default(); blockstore.add_tree(tr(2) / (tr(6) / tr(7)), true, false, 2, Hash::default()); ignore_set.insert(2); get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, std::usize::MAX, &ignore_set, @@ -385,9 +399,11 @@ pub mod test { // child 7 from the repair set repairs = vec![]; ignore_set.insert(6); + slot_meta_cache = HashMap::default(); get_best_repair_shreds( &heaviest_subtree_fork_choice, &blockstore, + &mut slot_meta_cache, &mut repairs, std::usize::MAX, &ignore_set, diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index a1187d048..323c6f6c4 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -192,6 +192,14 @@ impl SlotMeta { self.consumed == self.last_index + 1 } + pub fn known_last_index(&self) -> Option { + if self.last_index == std::u64::MAX { + None + } else { + Some(self.last_index) + } + } + pub fn is_parent_set(&self) -> bool { self.parent_slot != std::u64::MAX }