Plumb dumps from replay_stage to repair (#29058)

* Plumb dumps from replay_stage to repair

When dumping a slot from replay_stage as a result of duplicate or
ancestor hashes, properly update repair subtrees to keep weighting and
forks view accurate.

* add test

* pr comments
This commit is contained in:
Ashwin Sekar 2022-12-25 08:58:30 -08:00 committed by GitHub
parent 2681720557
commit f2ba16ee87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 231 additions and 54 deletions

View File

@ -1534,6 +1534,7 @@ mod test {
ref cluster_slots, ref cluster_slots,
.. ..
} = repair_info; } = repair_info;
let (dumped_slots_sender, _dumped_slots_receiver) = unbounded();
// Add the responder to the eligible list for requests // Add the responder to the eligible list for requests
let responder_id = responder_info.id; let responder_id = responder_info.id;
@ -1559,6 +1560,7 @@ mod test {
&requester_blockstore, &requester_blockstore,
None, None,
&mut PurgeRepairSlotCounter::default(), &mut PurgeRepairSlotCounter::default(),
&dumped_slots_sender,
); );
// Simulate making a request // Simulate making a request

View File

@ -87,6 +87,7 @@ impl UpdateOperation {
} }
} }
#[derive(Clone)]
struct ForkInfo { struct ForkInfo {
// Amount of stake that has voted for exactly this slot // Amount of stake that has voted for exactly this slot
stake_voted_at: ForkWeight, stake_voted_at: ForkWeight,
@ -413,24 +414,25 @@ impl HeaviestSubtreeForkChoice {
.map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree)) .map(|(slot_hash, fork_info)| (slot_hash, fork_info.stake_voted_subtree))
} }
/// Dump the node `slot_hash_key` and propagate the stake subtraction up to the root of the /// Split off the node at `slot_hash_key` and propagate the stake subtraction up to the root of the
/// tree. Children of `slot_hash_key` are implicitely dumped as well, as if we were able to /// tree.
/// chain them to a defunct parent that implies that they are defunct as well (consistent with
/// bank forks).
/// ///
/// Returns all removed slots /// Assumes that `slot_hash_key` is not the `tree_root`
pub fn dump_node(&mut self, slot_hash_key: &SlotHashKey) -> Vec<Slot> { /// Returns the subtree originating from `slot_hash_key`
let parent = { pub fn split_off(&mut self, slot_hash_key: &SlotHashKey) -> Self {
let mut node_to_dump = self assert_ne!(self.tree_root, *slot_hash_key);
let split_tree_root = {
let mut node_to_split_at = self
.fork_infos .fork_infos
.get_mut(slot_hash_key) .get_mut(slot_hash_key)
.expect("Slot hash key must exist in tree"); .expect("Slot hash key must exist in tree");
let split_tree_fork_info = node_to_split_at.clone();
// Remove stake to be aggregated up the tree // Remove stake to be aggregated up the tree
node_to_dump.stake_voted_subtree = 0; node_to_split_at.stake_voted_subtree = 0;
node_to_dump.stake_voted_at = 0; node_to_split_at.stake_voted_at = 0;
// Mark this node as invalid so that it cannot be chosen as best child // Mark this node as invalid so that it cannot be chosen as best child
node_to_dump.latest_invalid_ancestor = Some(slot_hash_key.0); node_to_split_at.latest_invalid_ancestor = Some(slot_hash_key.0);
node_to_dump.parent split_tree_fork_info
}; };
let mut update_operations: UpdateOperations = BTreeMap::new(); let mut update_operations: UpdateOperations = BTreeMap::new();
@ -438,9 +440,9 @@ impl HeaviestSubtreeForkChoice {
self.insert_aggregate_operations(&mut update_operations, *slot_hash_key); self.insert_aggregate_operations(&mut update_operations, *slot_hash_key);
self.process_update_operations(update_operations); self.process_update_operations(update_operations);
// Remove node + all children // Remove node + all children and add to new tree
let mut split_tree_fork_infos = HashMap::new();
let mut to_visit = vec![*slot_hash_key]; let mut to_visit = vec![*slot_hash_key];
let mut removed = Vec::new();
while !to_visit.is_empty() { while !to_visit.is_empty() {
let current_node = to_visit.pop().unwrap(); let current_node = to_visit.pop().unwrap();
@ -449,22 +451,34 @@ impl HeaviestSubtreeForkChoice {
.remove(&current_node) .remove(&current_node)
.expect("Node must exist in tree"); .expect("Node must exist in tree");
removed.push(current_node.0); to_visit.extend(current_fork_info.children.iter());
to_visit.extend(current_fork_info.children.into_iter()); split_tree_fork_infos.insert(current_node, current_fork_info);
} }
if let Some(parent) = parent { // Remove link from parent
// Remove link from parent let parent_fork_info = self
let parent_fork_info = self .fork_infos
.fork_infos .get_mut(&split_tree_root.parent.expect("Cannot split off from root"))
.get_mut(&parent) .expect("Parent must exist in fork infos");
.expect("Parent must exist in fork infos"); parent_fork_info.children.remove(slot_hash_key);
parent_fork_info.children.remove(slot_hash_key);
} else {
warn!("Dumped root of tree {:?}", slot_hash_key);
}
removed // Update the root of the new tree with the proper info, now that we have finished
// aggregating
split_tree_fork_infos.insert(*slot_hash_key, split_tree_root);
// Split off the relevant votes to the new tree
let mut split_tree_latest_votes = self.latest_votes.clone();
split_tree_latest_votes.retain(|_, node| split_tree_fork_infos.contains_key(node));
self.latest_votes
.retain(|_, node| self.fork_infos.contains_key(node));
// Create a new tree from the split
HeaviestSubtreeForkChoice {
tree_root: *slot_hash_key,
fork_infos: split_tree_fork_infos,
latest_votes: split_tree_latest_votes,
last_root_time: Instant::now(),
}
} }
#[cfg(test)] #[cfg(test)]
@ -3474,7 +3488,7 @@ mod test {
} }
#[test] #[test]
fn test_dump_node_simple() { fn test_split_off_simple() {
let mut heaviest_subtree_fork_choice = setup_forks(); let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100; let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
@ -3491,7 +3505,7 @@ mod test {
bank.epoch_stakes_map(), bank.epoch_stakes_map(),
bank.epoch_schedule(), bank.epoch_schedule(),
); );
heaviest_subtree_fork_choice.dump_node(&(5, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(5, Hash::default()));
assert_eq!( assert_eq!(
3 * stake, 3 * stake,
@ -3519,10 +3533,18 @@ mod test {
None, None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(6, Hash::default())) heaviest_subtree_fork_choice.stake_voted_subtree(&(6, Hash::default()))
); );
assert_eq!(
stake,
tree.stake_voted_subtree(&(5, Hash::default())).unwrap()
);
assert_eq!(
stake,
tree.stake_voted_subtree(&(6, Hash::default())).unwrap()
);
} }
#[test] #[test]
fn test_dump_node_unvoted() { fn test_split_off_unvoted() {
let mut heaviest_subtree_fork_choice = setup_forks(); let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100; let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
@ -3539,7 +3561,7 @@ mod test {
bank.epoch_stakes_map(), bank.epoch_stakes_map(),
bank.epoch_schedule(), bank.epoch_schedule(),
); );
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));
assert_eq!( assert_eq!(
4 * stake, 4 * stake,
@ -3561,10 +3583,12 @@ mod test {
None, None,
heaviest_subtree_fork_choice.stake_voted_subtree(&(4, Hash::default())) heaviest_subtree_fork_choice.stake_voted_subtree(&(4, Hash::default()))
); );
assert_eq!(0, tree.stake_voted_subtree(&(2, Hash::default())).unwrap());
assert_eq!(0, tree.stake_voted_subtree(&(4, Hash::default())).unwrap());
} }
#[test] #[test]
fn test_dump_node_on_best_path() { fn test_split_off_on_best_path() {
let mut heaviest_subtree_fork_choice = setup_forks(); let mut heaviest_subtree_fork_choice = setup_forks();
let stake = 100; let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake); let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(4, stake);
@ -3584,18 +3608,21 @@ mod test {
assert_eq!(6, heaviest_subtree_fork_choice.best_overall_slot().0); assert_eq!(6, heaviest_subtree_fork_choice.best_overall_slot().0);
heaviest_subtree_fork_choice.dump_node(&(6, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(6, Hash::default()));
assert_eq!(5, heaviest_subtree_fork_choice.best_overall_slot().0); assert_eq!(5, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(6, tree.best_overall_slot().0);
heaviest_subtree_fork_choice.dump_node(&(3, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(3, Hash::default()));
assert_eq!(4, heaviest_subtree_fork_choice.best_overall_slot().0); assert_eq!(4, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(5, tree.best_overall_slot().0);
heaviest_subtree_fork_choice.dump_node(&(1, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(1, Hash::default()));
assert_eq!(0, heaviest_subtree_fork_choice.best_overall_slot().0); assert_eq!(0, heaviest_subtree_fork_choice.best_overall_slot().0);
assert_eq!(4, tree.best_overall_slot().0);
} }
#[test] #[test]
fn test_dump_with_dups() { fn test_split_off_with_dups() {
let ( let (
mut heaviest_subtree_fork_choice, mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4, duplicate_leaves_descended_from_4,
@ -3629,16 +3656,17 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(), heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash expected_best_slot_hash
); );
heaviest_subtree_fork_choice.dump_node(&expected_best_slot_hash); let tree = heaviest_subtree_fork_choice.split_off(&expected_best_slot_hash);
assert_eq!( assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(), heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_4[1] duplicate_leaves_descended_from_4[1]
); );
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
} }
#[test] #[test]
fn test_dump_subtree_with_dups() { fn test_split_off_subtree_with_dups() {
let ( let (
mut heaviest_subtree_fork_choice, mut heaviest_subtree_fork_choice,
duplicate_leaves_descended_from_4, duplicate_leaves_descended_from_4,
@ -3672,45 +3700,43 @@ mod test {
heaviest_subtree_fork_choice.best_overall_slot(), heaviest_subtree_fork_choice.best_overall_slot(),
expected_best_slot_hash expected_best_slot_hash
); );
heaviest_subtree_fork_choice.dump_node(&(2, Hash::default())); let tree = heaviest_subtree_fork_choice.split_off(&(2, Hash::default()));
assert_eq!( assert_eq!(
heaviest_subtree_fork_choice.best_overall_slot(), heaviest_subtree_fork_choice.best_overall_slot(),
duplicate_leaves_descended_from_5[0] duplicate_leaves_descended_from_5[0]
); );
assert_eq!(tree.best_overall_slot(), expected_best_slot_hash);
} }
#[test] #[test]
fn test_dump_node_complicated() { fn test_split_off_complicated() {
let mut heaviest_subtree_fork_choice = setup_complicated_forks(); let mut heaviest_subtree_fork_choice = setup_complicated_forks();
let dump_and_check = let split_and_check =
|tree: &mut HeaviestSubtreeForkChoice, node: Slot, nodes_to_check: Vec<Slot>| { |tree: &mut HeaviestSubtreeForkChoice, node: Slot, nodes_to_check: Vec<Slot>| {
for &n in nodes_to_check.iter() { for &n in nodes_to_check.iter() {
assert!(tree.contains_block(&(n, Hash::default()))); assert!(tree.contains_block(&(n, Hash::default())));
} }
tree.dump_node(&(node, Hash::default())); let split_tree = tree.split_off(&(node, Hash::default()));
for &n in nodes_to_check.iter() { for &n in nodes_to_check.iter() {
assert!(!tree.contains_block(&(n, Hash::default()))); assert!(!tree.contains_block(&(n, Hash::default())));
assert!(split_tree.contains_block(&(n, Hash::default())));
} }
}; };
dump_and_check( split_and_check(
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
14, 14,
vec![14, 15, 16, 22, 23, 17, 21, 18, 19, 20, 24, 25], vec![14, 15, 16, 22, 23, 17, 21, 18, 19, 20, 24, 25],
); );
dump_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]); split_and_check(&mut heaviest_subtree_fork_choice, 12, vec![12, 13]);
dump_and_check( split_and_check(
&mut heaviest_subtree_fork_choice, &mut heaviest_subtree_fork_choice,
2, 2,
vec![2, 7, 8, 9, 33, 34, 10, 31, 32], vec![2, 7, 8, 9, 33, 34, 10, 31, 32],
); );
dump_and_check( split_and_check(&mut heaviest_subtree_fork_choice, 1, vec![1, 5, 6]);
&mut heaviest_subtree_fork_choice,
0,
vec![0, 1, 5, 6, 3, 11, 26],
);
} }
fn setup_forks() -> HeaviestSubtreeForkChoice { fn setup_forks() -> HeaviestSubtreeForkChoice {

View File

@ -40,6 +40,8 @@ pub type DuplicateSlotsResetSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>; pub type ConfirmedSlotsSender = CrossbeamSender<Vec<Slot>>;
pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver<Vec<Slot>>;
pub type DumpedSlotsSender = CrossbeamSender<Vec<(Slot, Hash)>>;
pub type DumpedSlotsReceiver = CrossbeamReceiver<Vec<(Slot, Hash)>>;
pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>; pub type OutstandingShredRepairs = OutstandingRequests<ShredRepairType>;
#[derive(Default, Debug)] #[derive(Default, Debug)]
@ -92,6 +94,7 @@ pub struct RepairStats {
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct RepairTiming { pub struct RepairTiming {
pub set_root_elapsed: u64, pub set_root_elapsed: u64,
pub dump_slots_elapsed: u64,
pub get_votes_elapsed: u64, pub get_votes_elapsed: u64,
pub add_votes_elapsed: u64, pub add_votes_elapsed: u64,
pub get_best_orphans_elapsed: u64, pub get_best_orphans_elapsed: u64,
@ -107,12 +110,14 @@ impl RepairTiming {
fn update( fn update(
&mut self, &mut self,
set_root_elapsed: u64, set_root_elapsed: u64,
dump_slots_elapsed: u64,
get_votes_elapsed: u64, get_votes_elapsed: u64,
add_votes_elapsed: u64, add_votes_elapsed: u64,
build_repairs_batch_elapsed: u64, build_repairs_batch_elapsed: u64,
batch_send_repairs_elapsed: u64, batch_send_repairs_elapsed: u64,
) { ) {
self.set_root_elapsed += set_root_elapsed; self.set_root_elapsed += set_root_elapsed;
self.dump_slots_elapsed += dump_slots_elapsed;
self.get_votes_elapsed += get_votes_elapsed; self.get_votes_elapsed += get_votes_elapsed;
self.add_votes_elapsed += add_votes_elapsed; self.add_votes_elapsed += add_votes_elapsed;
self.build_repairs_batch_elapsed += build_repairs_batch_elapsed; self.build_repairs_batch_elapsed += build_repairs_batch_elapsed;
@ -208,6 +213,7 @@ impl RepairService {
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>, outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
) -> Self { ) -> Self {
let t_repair = { let t_repair = {
let blockstore = blockstore.clone(); let blockstore = blockstore.clone();
@ -223,6 +229,7 @@ impl RepairService {
repair_info, repair_info,
verified_vote_receiver, verified_vote_receiver,
&outstanding_requests, &outstanding_requests,
dumped_slots_receiver,
) )
}) })
.unwrap() .unwrap()
@ -249,6 +256,7 @@ impl RepairService {
repair_info: RepairInfo, repair_info: RepairInfo,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
outstanding_requests: &RwLock<OutstandingShredRepairs>, outstanding_requests: &RwLock<OutstandingShredRepairs>,
dumped_slots_receiver: DumpedSlotsReceiver,
) { ) {
let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root());
let serve_repair = ServeRepair::new( let serve_repair = ServeRepair::new(
@ -271,6 +279,7 @@ impl RepairService {
} }
let mut set_root_elapsed; let mut set_root_elapsed;
let mut dump_slots_elapsed;
let mut get_votes_elapsed; let mut get_votes_elapsed;
let mut add_votes_elapsed; let mut add_votes_elapsed;
@ -283,6 +292,23 @@ impl RepairService {
repair_weight.set_root(new_root); repair_weight.set_root(new_root);
set_root_elapsed.stop(); set_root_elapsed.stop();
// Remove dumped slots from the weighting heuristic
dump_slots_elapsed = Measure::start("dump_slots_elapsed");
dumped_slots_receiver
.try_iter()
.for_each(|slot_hash_keys_to_dump| {
// Currently we don't use the correct_hash in repair. Since this dumped
// slot is DuplicateConfirmed, we have a >= 52% chance on receiving the
// correct version.
for (slot, _correct_hash) in slot_hash_keys_to_dump {
// `slot` is dumped in blockstore wanting to be repaired, we orphan it along with
// descendants while copying the weighting heurstic so that it can be
// repaired with correct ancestor information
repair_weight.split_off(slot);
}
});
dump_slots_elapsed.stop();
// Add new votes to the weighting heuristic // Add new votes to the weighting heuristic
get_votes_elapsed = Measure::start("get_votes_elapsed"); get_votes_elapsed = Measure::start("get_votes_elapsed");
let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new(); let mut slot_to_vote_pubkeys: HashMap<Slot, Vec<Pubkey>> = HashMap::new();
@ -366,6 +392,7 @@ impl RepairService {
repair_timing.update( repair_timing.update(
set_root_elapsed.as_us(), set_root_elapsed.as_us(),
dump_slots_elapsed.as_us(),
get_votes_elapsed.as_us(), get_votes_elapsed.as_us(),
add_votes_elapsed.as_us(), add_votes_elapsed.as_us(),
build_repairs_batch_elapsed.as_us(), build_repairs_batch_elapsed.as_us(),
@ -401,6 +428,7 @@ impl RepairService {
datapoint_info!( datapoint_info!(
"repair_service-repair_timing", "repair_service-repair_timing",
("set-root-elapsed", repair_timing.set_root_elapsed, i64), ("set-root-elapsed", repair_timing.set_root_elapsed, i64),
("dump-slots-elapsed", repair_timing.dump_slots_elapsed, i64),
("get-votes-elapsed", repair_timing.get_votes_elapsed, i64), ("get-votes-elapsed", repair_timing.get_votes_elapsed, i64),
("add-votes-elapsed", repair_timing.add_votes_elapsed, i64), ("add-votes-elapsed", repair_timing.add_votes_elapsed, i64),
( (

View File

@ -248,6 +248,32 @@ impl RepairWeight {
repairs repairs
} }
/// Split `slot` and descendants into an orphan tree in repair weighting
/// These orphaned slots should be removed from `unrooted_slots` as on proper repair these slots might
/// now be part of the rooted path
pub fn split_off(&mut self, slot: Slot) {
if slot == self.root {
error!("Trying to orphan root of repair tree {}", slot);
return;
}
if let Some(subtree_root) = self.slot_to_tree.get(&slot) {
if *subtree_root == slot {
info!("{} is already orphan, skipping", slot);
return;
}
let subtree = self
.trees
.get_mut(subtree_root)
.expect("subtree must exist");
let orphaned_tree = subtree.split_off(&(slot, Hash::default()));
for ((orphaned_slot, _), _) in orphaned_tree.all_slots_stake_voted_subtree() {
self.unrooted_slots.remove(orphaned_slot);
self.slot_to_tree.insert(*orphaned_slot, slot);
}
self.trees.insert(slot, orphaned_tree);
}
}
pub fn set_root(&mut self, new_root: Slot) { pub fn set_root(&mut self, new_root: Slot) {
// Roots should be monotonically increasing // Roots should be monotonically increasing
assert!(self.root <= new_root); assert!(self.root <= new_root);
@ -664,6 +690,7 @@ impl RepairWeight {
mod test { mod test {
use { use {
super::*, super::*,
itertools::Itertools,
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_runtime::{bank::Bank, bank_utils}, solana_runtime::{bank::Bank, bank_utils},
solana_sdk::hash::Hash, solana_sdk::hash::Hash,
@ -1399,6 +1426,76 @@ mod test {
); );
} }
#[test]
fn test_orphan_slot_copy_weight() {
let (blockstore, _, mut repair_weight) = setup_orphan_repair_weight();
let stake = 100;
let (bank, vote_pubkeys) = bank_utils::setup_bank_and_vote_pubkeys_for_tests(1, stake);
repair_weight.add_votes(
&blockstore,
vec![(6, vote_pubkeys)].into_iter(),
bank.epoch_stakes_map(),
bank.epoch_schedule(),
);
// Simulate dump from replay
blockstore.clear_unconfirmed_slot(3);
repair_weight.split_off(3);
blockstore.clear_unconfirmed_slot(10);
repair_weight.split_off(10);
// Verify orphans
let mut orphans = repair_weight.trees.keys().copied().collect_vec();
orphans.sort();
assert_eq!(vec![0, 3, 8, 10, 20], orphans);
// Verify weighting
assert_eq!(
0,
repair_weight
.trees
.get(&8)
.unwrap()
.stake_voted_subtree(&(8, Hash::default()))
.unwrap()
);
assert_eq!(
stake,
repair_weight
.trees
.get(&3)
.unwrap()
.stake_voted_subtree(&(3, Hash::default()))
.unwrap()
);
assert_eq!(
2 * stake,
repair_weight
.trees
.get(&10)
.unwrap()
.stake_voted_subtree(&(10, Hash::default()))
.unwrap()
);
// Get best orphans works as usual
let mut repairs = vec![];
let mut processed_slots = vec![repair_weight.root].into_iter().collect();
repair_weight.get_best_orphans(
&blockstore,
&mut processed_slots,
&mut repairs,
bank.epoch_stakes_map(),
bank.epoch_schedule(),
4,
);
assert_eq!(repairs.len(), 4);
assert_eq!(repairs[0].slot(), 10);
assert_eq!(repairs[1].slot(), 20);
assert_eq!(repairs[2].slot(), 3);
assert_eq!(repairs[3].slot(), 8);
}
fn setup_orphan_repair_weight() -> (Blockstore, Bank, RepairWeight) { fn setup_orphan_repair_weight() -> (Blockstore, Bank, RepairWeight) {
let blockstore = setup_orphans(); let blockstore = setup_orphans();
let stake = 100; let stake = 100;

View File

@ -20,7 +20,7 @@ use {
heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice, heaviest_subtree_fork_choice::HeaviestSubtreeForkChoice,
latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks, latest_validator_votes_for_frozen_banks::LatestValidatorVotesForFrozenBanks,
progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats}, progress_map::{ForkProgress, ProgressMap, PropagatedStats, ReplaySlotStats},
repair_service::DuplicateSlotsResetReceiver, repair_service::{DumpedSlotsSender, DuplicateSlotsResetReceiver},
rewards_recorder_service::RewardsRecorderSender, rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage}, tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes, unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
@ -401,6 +401,7 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>, prioritization_fee_cache: Arc<PrioritizationFeeCache>,
dumped_slots_sender: DumpedSlotsSender,
) -> Result<Self, String> { ) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower()?; let tower = process_blockstore.process_to_create_tower()?;
@ -915,6 +916,7 @@ impl ReplayStage {
&blockstore, &blockstore,
poh_bank.map(|bank| bank.slot()), poh_bank.map(|bank| bank.slot()),
&mut purge_repair_slot_counter, &mut purge_repair_slot_counter,
&dumped_slots_sender,
); );
dump_then_repair_correct_slots_time.stop(); dump_then_repair_correct_slots_time.stop();
@ -1166,12 +1168,14 @@ impl ReplayStage {
blockstore: &Blockstore, blockstore: &Blockstore,
poh_bank_slot: Option<Slot>, poh_bank_slot: Option<Slot>,
purge_repair_slot_counter: &mut PurgeRepairSlotCounter, purge_repair_slot_counter: &mut PurgeRepairSlotCounter,
dumped_slots_sender: &DumpedSlotsSender,
) { ) {
if duplicate_slots_to_repair.is_empty() { if duplicate_slots_to_repair.is_empty() {
return; return;
} }
let root_bank = bank_forks.read().unwrap().root_bank(); let root_bank = bank_forks.read().unwrap().root_bank();
let mut dumped = vec![];
// TODO: handle if alternate version of descendant also got confirmed after ancestor was // TODO: handle if alternate version of descendant also got confirmed after ancestor was
// confirmed, what happens then? Should probably keep track of purged list and skip things // confirmed, what happens then? Should probably keep track of purged list and skip things
// in `duplicate_slots_to_repair` that have already been purged. Add test. // in `duplicate_slots_to_repair` that have already been purged. Add test.
@ -1234,6 +1238,9 @@ impl ReplayStage {
bank_forks, bank_forks,
blockstore, blockstore,
); );
dumped.push((*duplicate_slot, *correct_hash));
let attempt_no = purge_repair_slot_counter let attempt_no = purge_repair_slot_counter
.entry(*duplicate_slot) .entry(*duplicate_slot)
.and_modify(|x| *x += 1) .and_modify(|x| *x += 1)
@ -1246,8 +1253,6 @@ impl ReplayStage {
*duplicate_slot, *attempt_no, *duplicate_slot, *attempt_no,
); );
true true
// TODO: Send signal to repair to repair the correct version of
// `duplicate_slot` with hash == `correct_hash`
} else { } else {
warn!( warn!(
"PoH bank for slot {} is building on duplicate slot {}", "PoH bank for slot {} is building on duplicate slot {}",
@ -1261,6 +1266,10 @@ impl ReplayStage {
// If we purged/repaired, then no need to keep the slot in the set of pending work // If we purged/repaired, then no need to keep the slot in the set of pending work
!did_purge_repair !did_purge_repair
}); });
// Notify repair of the dumped slots along with the correct hash
trace!("Dumped {} slots", dumped.len());
dumped_slots_sender.send(dumped).unwrap();
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -3594,6 +3603,7 @@ pub(crate) mod tests {
vote_simulator::{self, VoteSimulator}, vote_simulator::{self, VoteSimulator},
}, },
crossbeam_channel::unbounded, crossbeam_channel::unbounded,
itertools::Itertools,
solana_entry::entry::{self, Entry}, solana_entry::entry::{self, Entry},
solana_gossip::{cluster_info::Node, crds::Cursor}, solana_gossip::{cluster_info::Node, crds::Cursor},
solana_ledger::{ solana_ledger::{
@ -6038,6 +6048,11 @@ pub(crate) mod tests {
duplicate_slots_to_repair.insert(1, Hash::new_unique()); duplicate_slots_to_repair.insert(1, Hash::new_unique());
duplicate_slots_to_repair.insert(2, Hash::new_unique()); duplicate_slots_to_repair.insert(2, Hash::new_unique());
let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default(); let mut purge_repair_slot_counter = PurgeRepairSlotCounter::default();
let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
let should_be_dumped = duplicate_slots_to_repair
.iter()
.map(|(&s, &h)| (s, h))
.collect_vec();
ReplayStage::dump_then_repair_correct_slots( ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
@ -6048,7 +6063,9 @@ pub(crate) mod tests {
blockstore, blockstore,
None, None,
&mut purge_repair_slot_counter, &mut purge_repair_slot_counter,
&dumped_slots_sender,
); );
assert_eq!(should_be_dumped, dumped_slots_receiver.recv().ok().unwrap());
let r_bank_forks = bank_forks.read().unwrap(); let r_bank_forks = bank_forks.read().unwrap();
for slot in 0..=2 { for slot in 0..=2 {
@ -6154,6 +6171,7 @@ pub(crate) mod tests {
let mut ancestors = bank_forks.read().unwrap().ancestors(); let mut ancestors = bank_forks.read().unwrap().ancestors();
let mut descendants = bank_forks.read().unwrap().descendants(); let mut descendants = bank_forks.read().unwrap().descendants();
let old_descendants_of_2 = descendants.get(&2).unwrap().clone(); let old_descendants_of_2 = descendants.get(&2).unwrap().clone();
let (dumped_slots_sender, _dumped_slots_receiver) = unbounded();
ReplayStage::dump_then_repair_correct_slots( ReplayStage::dump_then_repair_correct_slots(
&mut duplicate_slots_to_repair, &mut duplicate_slots_to_repair,
@ -6164,6 +6182,7 @@ pub(crate) mod tests {
blockstore, blockstore,
None, None,
&mut PurgeRepairSlotCounter::default(), &mut PurgeRepairSlotCounter::default(),
&dumped_slots_sender,
); );
// Check everything was purged properly // Check everything was purged properly

View File

@ -185,6 +185,7 @@ impl Tvu {
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded(); let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) = let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
unbounded(); unbounded();
let (dumped_slots_sender, dumped_slots_receiver) = unbounded();
let window_service = { let window_service = {
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_info = RepairInfo { let repair_info = RepairInfo {
@ -209,6 +210,7 @@ impl Tvu {
completed_data_sets_sender, completed_data_sets_sender,
duplicate_slots_sender, duplicate_slots_sender,
ancestor_hashes_replay_update_receiver, ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
) )
}; };
@ -292,6 +294,7 @@ impl Tvu {
block_metadata_notifier, block_metadata_notifier,
log_messages_bytes_limit, log_messages_bytes_limit,
prioritization_fee_cache.clone(), prioritization_fee_cache.clone(),
dumped_slots_sender,
)?; )?;
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {

View File

@ -7,7 +7,7 @@ use {
cluster_info_vote_listener::VerifiedVoteReceiver, cluster_info_vote_listener::VerifiedVoteReceiver,
completed_data_sets_service::CompletedDataSetsSender, completed_data_sets_service::CompletedDataSetsSender,
repair_response, repair_response,
repair_service::{OutstandingShredRepairs, RepairInfo, RepairService}, repair_service::{DumpedSlotsReceiver, OutstandingShredRepairs, RepairInfo, RepairService},
result::{Error, Result}, result::{Error, Result},
}, },
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
@ -317,6 +317,7 @@ impl WindowService {
completed_data_sets_sender: CompletedDataSetsSender, completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender, duplicate_slots_sender: DuplicateSlotSender,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver, ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
dumped_slots_receiver: DumpedSlotsReceiver,
) -> WindowService { ) -> WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default(); let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
@ -331,6 +332,7 @@ impl WindowService {
verified_vote_receiver, verified_vote_receiver,
outstanding_requests.clone(), outstanding_requests.clone(),
ancestor_hashes_replay_update_receiver, ancestor_hashes_replay_update_receiver,
dumped_slots_receiver,
); );
let (duplicate_sender, duplicate_receiver) = unbounded(); let (duplicate_sender, duplicate_receiver) = unbounded();