diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 870b05219a..1b57f4d694 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,12 +1,19 @@ use super::*; -use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE}; +use solana_ledger::shred::Shredder; use solana_sdk::hash::Hash; use solana_sdk::signature::Keypair; +use std::{thread::sleep, time::Duration}; + +pub const NUM_BAD_SLOTS: u64 = 10; +pub const SLOT_TO_RESOLVE: u64 = 32; #[derive(Clone)] pub(super) struct FailEntryVerificationBroadcastRun { shred_version: u16, keypair: Arc, + good_shreds: Vec, + current_slot: Slot, + next_shred_index: u32, } impl FailEntryVerificationBroadcastRun { @@ -14,6 +21,9 @@ impl FailEntryVerificationBroadcastRun { Self { shred_version, keypair, + good_shreds: vec![], + current_slot: 0, + next_shred_index: 0, } } } @@ -31,44 +41,90 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; - // 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry - // in the slot to make verification fail on validators - if last_tick_height == bank.max_tick_height() { - let mut last_entry = receive_results.entries.last_mut().unwrap(); - last_entry.hash = Hash::default(); + if bank.slot() != self.current_slot { + self.next_shred_index = 0; + self.current_slot = bank.slot(); } - let next_shred_index = blockstore - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0) as u32; + // 2) If we're past SLOT_TO_RESOLVE, insert the correct shreds so validators can repair + // and make progress + if bank.slot() > SLOT_TO_RESOLVE && !self.good_shreds.is_empty() { + info!("Resolving bad shreds"); + let mut shreds = vec![]; + std::mem::swap(&mut shreds, &mut self.good_shreds); + blockstore_sender.send((Arc::new(shreds), None))?; + } + + // 3) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry + // in the slot to make verification fail on validators + let last_entries = { + if last_tick_height == bank.max_tick_height() && bank.slot() < NUM_BAD_SLOTS { + let good_last_entry = receive_results.entries.pop().unwrap(); + let mut bad_last_entry = good_last_entry.clone(); + bad_last_entry.hash = Hash::default(); + Some((good_last_entry, bad_last_entry)) + } else { + None + } + }; let shredder = Shredder::new( bank.slot(), bank.parent().unwrap().slot(), - RECOMMENDED_FEC_RATE, + 0.0, self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds( + let (data_shreds, _, _) = shredder.entries_to_shreds( &receive_results.entries, - last_tick_height == bank.max_tick_height(), - next_shred_index, + last_tick_height == bank.max_tick_height() && last_entries.is_none(), + self.next_shred_index, ); + self.next_shred_index += data_shreds.len() as u32; + let last_shreds = last_entries.map(|(good_last_entry, bad_last_entry)| { + let (good_last_data_shred, _, _) = + shredder.entries_to_shreds(&[good_last_entry], true, self.next_shred_index); + + let (bad_last_data_shred, _, _) = + // Don't mark the last shred as last so that validators won't know that + // they've gotten all the shreds, and will continue trying to repair + shredder.entries_to_shreds(&[bad_last_entry], false, self.next_shred_index); + + self.next_shred_index += 1; + (good_last_data_shred, bad_last_data_shred) + }); + let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; - // 3) Start broadcast step + // 4) Start broadcast step let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let stakes = stakes.map(Arc::new); socket_sender.send(((stakes.clone(), data_shreds), None))?; - socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?; + if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { + // Stash away the good shred so we can rewrite them later + self.good_shreds.extend(good_last_data_shred.clone()); + let good_last_data_shred = Arc::new(good_last_data_shred); + let bad_last_data_shred = Arc::new(bad_last_data_shred); + // Store the good shred so that blockstore will signal ClusterSlots + // that the slot is complete + blockstore_sender.send((good_last_data_shred, None))?; + loop { + // Wait for slot to be complete + if blockstore.is_full(bank.slot()) { + break; + } + sleep(Duration::from_millis(10)); + } + // Store the bad shred so we serve bad repairs to validators catching up + blockstore_sender.send((bad_last_data_shred.clone(), None))?; + // Send bad shreds to rest of network + socket_sender.send(((stakes, bad_last_data_shred), None))?; + } Ok(()) } fn transmit( diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index de8d26a899..f37e28c31a 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -47,29 +47,7 @@ impl ClusterSlots { self.keys.write().unwrap().insert(pubkey.clone()); } let from = self.keys.read().unwrap().get(&pubkey).unwrap().clone(); - let balance = self - .validator_stakes - .read() - .unwrap() - .get(&from) - .map(|v| v.total_stake) - .unwrap_or(0); - - let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(slot).cloned(); - if slot_pubkeys.is_none() { - let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); - self.cluster_slots - .write() - .unwrap() - .insert(*slot, new_slot_pubkeys.clone()); - slot_pubkeys = Some(new_slot_pubkeys); - } - - slot_pubkeys - .unwrap() - .write() - .unwrap() - .insert(from.clone(), balance); + self.insert_node_id(*slot, from); } } self.cluster_slots.write().unwrap().retain(|x, _| *x > root); @@ -79,6 +57,7 @@ impl ClusterSlots { .retain(|x| Arc::strong_count(x) > 1); *self.since.write().unwrap() = since; } + pub fn collect(&self, id: &Pubkey) -> HashSet { self.cluster_slots .read() @@ -90,6 +69,30 @@ impl ClusterSlots { .collect() } + pub fn insert_node_id(&self, slot: Slot, node_id: Arc) { + let balance = self + .validator_stakes + .read() + .unwrap() + .get(&node_id) + .map(|v| v.total_stake) + .unwrap_or(0); + let mut slot_pubkeys = self.cluster_slots.read().unwrap().get(&slot).cloned(); + if slot_pubkeys.is_none() { + let new_slot_pubkeys = Arc::new(RwLock::new(HashMap::default())); + self.cluster_slots + .write() + .unwrap() + .insert(slot, new_slot_pubkeys.clone()); + slot_pubkeys = Some(new_slot_pubkeys); + } + slot_pubkeys + .unwrap() + .write() + .unwrap() + .insert(node_id, balance); + } + fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock) { let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_epoch = root_bank.epoch(); @@ -137,6 +140,23 @@ impl ClusterSlots { .collect() } + pub fn compute_weights_exclude_noncomplete( + &self, + slot: Slot, + repair_peers: &[ContactInfo], + ) -> Vec<(u64, usize)> { + let slot_peers = self.lookup(slot); + repair_peers + .iter() + .enumerate() + .filter_map(|(i, x)| { + slot_peers + .as_ref() + .and_then(|v| v.read().unwrap().get(&x.id).map(|stake| (*stake + 1, i))) + }) + .collect() + } + pub fn generate_repairs_for_missing_slots( &self, self_id: &Pubkey, @@ -274,6 +294,43 @@ mod tests { ); } + #[test] + fn test_best_completed_slot_peer() { + let cs = ClusterSlots::default(); + let mut contact_infos = vec![ContactInfo::default(); 2]; + for ci in contact_infos.iter_mut() { + ci.id = Pubkey::new_rand(); + } + let slot = 9; + + // None of these validators have completed slot 9, so should + // return nothing + assert!(cs + .compute_weights_exclude_noncomplete(slot, &contact_infos) + .is_empty()); + + // Give second validator max stake + let validator_stakes: HashMap<_, _> = vec![( + *Arc::new(contact_infos[1].id), + NodeVoteAccounts { + total_stake: std::u64::MAX / 2, + vote_accounts: vec![Pubkey::default()], + }, + )] + .into_iter() + .collect(); + *cs.validator_stakes.write().unwrap() = Arc::new(validator_stakes); + + // Mark the first validator as completed slot 9, should pick that validator, + // even though it only has default stake, while the other validator has + // max stake + cs.insert_node_id(slot, Arc::new(contact_infos[0].id)); + assert_eq!( + cs.compute_weights_exclude_noncomplete(slot, &contact_infos), + vec![(1, 0)] + ); + } + #[test] fn test_update_new_staked_slot() { let cs = ClusterSlots::default(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 7c8f7c1d7f..068173f20b 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -3,14 +3,17 @@ use crate::{ cluster_info::ClusterInfo, cluster_slots::ClusterSlots, + consensus::VOTE_THRESHOLD_SIZE, result::Result, serve_repair::{RepairType, ServeRepair}, }; +use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver, SlotMeta}, }; -use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey}; +use solana_runtime::bank::Bank; +use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use std::{ collections::HashMap, iter::Iterator, @@ -23,6 +26,9 @@ use std::{ time::{Duration, Instant}, }; +pub type DuplicateSlotsResetSender = CrossbeamSender; +pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; + #[derive(Default)] pub struct RepairStatsGroup { pub count: u64, @@ -46,6 +52,8 @@ pub struct RepairStats { } pub const MAX_REPAIR_LENGTH: usize = 512; +pub const MAX_REPAIR_PER_DUPLICATE: usize = 20; +pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; @@ -55,6 +63,7 @@ pub enum RepairStrategy { bank_forks: Arc>, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, + duplicate_slots_reset_sender: DuplicateSlotsResetSender, }, } @@ -72,6 +81,12 @@ impl Default for RepairSlotRange { } } +#[derive(Default, Clone)] +pub struct DuplicateSlotRepairStatus { + start: u64, + repair_addr: Option, +} + pub struct RepairService { t_repair: JoinHandle<()>, } @@ -117,6 +132,8 @@ impl RepairService { } let mut repair_stats = RepairStats::default(); let mut last_stats = Instant::now(); + let mut duplicate_slot_repair_statuses = HashMap::new(); + if let RepairStrategy::RepairAll { ref completed_slots_receiver, .. @@ -143,14 +160,44 @@ impl RepairService { RepairStrategy::RepairAll { ref completed_slots_receiver, ref bank_forks, + ref duplicate_slots_reset_sender, .. } => { - let new_root = blockstore.last_root(); + let root_bank = bank_forks.read().unwrap().root_bank().clone(); + let new_root = root_bank.slot(); let lowest_slot = blockstore.lowest_slot(); Self::update_lowest_slot(&id, lowest_slot, &cluster_info); Self::update_completed_slots(completed_slots_receiver, &cluster_info); cluster_slots.update(new_root, cluster_info, bank_forks); - Self::generate_repairs(blockstore, new_root, MAX_REPAIR_LENGTH) + let new_duplicate_slots = Self::find_new_duplicate_slots( + &duplicate_slot_repair_statuses, + blockstore, + cluster_slots, + &root_bank, + ); + Self::process_new_duplicate_slots( + &new_duplicate_slots, + &mut duplicate_slot_repair_statuses, + cluster_slots, + &root_bank, + blockstore, + &serve_repair, + &duplicate_slots_reset_sender, + ); + Self::generate_and_send_duplicate_repairs( + &mut duplicate_slot_repair_statuses, + cluster_slots, + blockstore, + &serve_repair, + &mut repair_stats, + &repair_socket, + ); + Self::generate_repairs( + blockstore, + root_bank.slot(), + MAX_REPAIR_LENGTH, + &duplicate_slot_repair_statuses, + ) } } }; @@ -179,6 +226,7 @@ impl RepairService { }); } } + if last_stats.elapsed().as_secs() > 1 { let repair_total = repair_stats.shred.count + repair_stats.highest_shred.count @@ -238,19 +286,216 @@ impl RepairService { blockstore: &Blockstore, root: Slot, max_repairs: usize, + duplicate_slot_repair_statuses: &HashMap, ) -> Result> { // Slot height and shred indexes for shreds we want to repair let mut repairs: Vec = vec![]; - Self::generate_repairs_for_fork(blockstore, &mut repairs, max_repairs, root); + Self::generate_repairs_for_fork( + blockstore, + &mut repairs, + max_repairs, + root, + duplicate_slot_repair_statuses, + ); // TODO: Incorporate gossip to determine priorities for repair? // Try to resolve orphans in blockstore let orphans = blockstore.orphans_iterator(root + 1).unwrap(); Self::generate_repairs_for_orphans(orphans, &mut repairs); + Ok(repairs) } + fn generate_duplicate_repairs_for_slot( + blockstore: &Blockstore, + slot: Slot, + ) -> Option> { + if let Some(slot_meta) = blockstore.meta(slot).unwrap() { + if slot_meta.is_full() { + // If the slot is full, no further need to repair this slot + None + } else { + Some(Self::generate_repairs_for_slot( + blockstore, + slot, + &slot_meta, + MAX_REPAIR_PER_DUPLICATE, + )) + } + } else { + error!("Slot meta for duplicate slot does not exist, cannot generate repairs"); + // Filter out this slot from the set of duplicates to be repaired as + // the SlotMeta has to exist for duplicates to be generated + None + } + } + + fn generate_and_send_duplicate_repairs( + duplicate_slot_repair_statuses: &mut HashMap, + cluster_slots: &ClusterSlots, + blockstore: &Blockstore, + serve_repair: &ServeRepair, + repair_stats: &mut RepairStats, + repair_socket: &UdpSocket, + ) { + duplicate_slot_repair_statuses.retain(|slot, status| { + Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); + if let Some(repair_addr) = status.repair_addr { + let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); + + if let Some(repairs) = repairs { + for repair_type in repairs { + if let Err(e) = Self::serialize_and_send_request( + &repair_type, + repair_socket, + &repair_addr, + serve_repair, + repair_stats, + ) { + info!("repair req send_to({}) error {:?}", repair_addr, e); + } + } + true + } else { + false + } + } else { + true + } + }) + } + + fn serialize_and_send_request( + repair_type: &RepairType, + repair_socket: &UdpSocket, + to: &SocketAddr, + serve_repair: &ServeRepair, + repair_stats: &mut RepairStats, + ) -> Result<()> { + let req = serve_repair.map_repair_request(&repair_type, repair_stats)?; + repair_socket.send_to(&req, to)?; + Ok(()) + } + + fn update_duplicate_slot_repair_addr( + slot: Slot, + status: &mut DuplicateSlotRepairStatus, + cluster_slots: &ClusterSlots, + serve_repair: &ServeRepair, + ) { + let now = timestamp(); + if status.repair_addr.is_none() + || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 + { + let repair_addr = + serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); + status.repair_addr = repair_addr.ok(); + status.start = timestamp(); + } + } + + fn process_new_duplicate_slots( + new_duplicate_slots: &[Slot], + duplicate_slot_repair_statuses: &mut HashMap, + cluster_slots: &ClusterSlots, + root_bank: &Bank, + blockstore: &Blockstore, + serve_repair: &ServeRepair, + duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ) { + for slot in new_duplicate_slots { + warn!( + "Cluster completed slot: {}, dumping our current version and repairing", + slot + ); + // Clear the slot signatures from status cache for this slot + root_bank.clear_slot_signatures(*slot); + + // Clear the accounts for this slot + root_bank.remove_unrooted_slot(*slot); + + // Clear the slot-related data in blockstore. This will: + // 1) Clear old shreds allowing new ones to be inserted + // 2) Clear the "dead" flag allowing ReplayStage to start replaying + // this slot + blockstore.clear_unconfirmed_slot(*slot); + + // Signal ReplayStage to clear its progress map so that a different + // version of this slot can be replayed + let _ = duplicate_slots_reset_sender.send(*slot); + + // Mark this slot as special repair, try to download from single + // validator to avoid corruption + let repair_addr = serve_repair + .repair_request_duplicate_compute_best_peer(*slot, cluster_slots) + .ok(); + let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { + start: timestamp(), + repair_addr, + }; + duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status); + } + } + + fn find_new_duplicate_slots( + duplicate_slot_repair_statuses: &HashMap, + blockstore: &Blockstore, + cluster_slots: &ClusterSlots, + root_bank: &Bank, + ) -> Vec { + let dead_slots_iter = blockstore + .dead_slots_iterator(root_bank.slot() + 1) + .expect("Couldn't get dead slots iterator from blockstore"); + dead_slots_iter + .filter_map(|dead_slot| { + if let Some(status) = duplicate_slot_repair_statuses.get(&dead_slot) { + // Newly repaired version of this slot has been marked dead again, + // time to purge again + warn!( + "Repaired version of slot {} most recently (but maybe not entirely) + from {:?} has failed again", + dead_slot, status.repair_addr + ); + } + cluster_slots + .lookup(dead_slot) + .and_then(|completed_dead_slot_pubkeys| { + let epoch = root_bank.get_epoch_and_slot_index(dead_slot).0; + if let Some(epoch_stakes) = root_bank.epoch_stakes(epoch) { + let total_stake = epoch_stakes.total_stake(); + let node_id_to_vote_accounts = epoch_stakes.node_id_to_vote_accounts(); + let total_completed_slot_stake: u64 = completed_dead_slot_pubkeys + .read() + .unwrap() + .iter() + .map(|(node_key, _)| { + node_id_to_vote_accounts + .get(node_key) + .map(|v| v.total_stake) + .unwrap_or(0) + }) + .sum(); + if total_completed_slot_stake as f64 / total_stake as f64 + > VOTE_THRESHOLD_SIZE + { + Some(dead_slot) + } else { + None + } + } else { + error!( + "Dead slot {} is too far ahead of root bank {}", + dead_slot, + root_bank.slot() + ); + None + } + }) + }) + .collect() + } + fn generate_repairs_for_slot( blockstore: &Blockstore, slot: Slot, @@ -288,10 +533,15 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, + duplicate_slot_repair_statuses: &HashMap, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { let slot = pending_slots.pop().unwrap(); + if duplicate_slot_repair_statuses.contains_key(&slot) { + // These are repaired through a different path + continue; + } if let Some(slot_meta) = blockstore.meta(slot).unwrap() { let new_repairs = Self::generate_repairs_for_slot( blockstore, @@ -370,11 +620,15 @@ impl RepairService { mod test { use super::*; use crate::cluster_info::Node; + use crossbeam_channel::unbounded; use solana_ledger::blockstore::{ make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, }; use solana_ledger::shred::max_ticks_per_n_shreds; use solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}; + use solana_runtime::genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}; + use solana_sdk::signature::Signer; + use solana_vote_program::vote_transaction; #[test] pub fn test_repair_orphan() { @@ -388,7 +642,7 @@ mod test { shreds.extend(shreds2); blockstore.insert_shreds(shreds, None, false).unwrap(); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), vec![RepairType::HighestShred(0, 0), RepairType::Orphan(2)] ); } @@ -410,7 +664,7 @@ mod test { // Check that repair tries to patch the empty slot assert_eq!( - RepairService::generate_repairs(&blockstore, 0, 2).unwrap(), + RepairService::generate_repairs(&blockstore, 0, 2, &HashMap::new()).unwrap(), vec![RepairType::HighestShred(0, 0)] ); } @@ -456,12 +710,19 @@ mod test { .collect(); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(), + RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) + .unwrap(), expected ); assert_eq!( - RepairService::generate_repairs(&blockstore, 0, expected.len() - 2).unwrap()[..], + RepairService::generate_repairs( + &blockstore, + 0, + expected.len() - 2, + &HashMap::new() + ) + .unwrap()[..], expected[0..expected.len() - 2] ); } @@ -490,7 +751,8 @@ mod test { vec![RepairType::HighestShred(0, num_shreds_per_slot - 1)]; assert_eq!( - RepairService::generate_repairs(&blockstore, 0, std::usize::MAX).unwrap(), + RepairService::generate_repairs(&blockstore, 0, std::usize::MAX, &HashMap::new()) + .unwrap(), expected ); } @@ -535,7 +797,7 @@ mod test { RepairService::generate_repairs_in_range( &blockstore, std::usize::MAX, - &repair_slot_range + &repair_slot_range, ) .unwrap(), expected @@ -580,7 +842,7 @@ mod test { RepairService::generate_repairs_in_range( &blockstore, std::usize::MAX, - &repair_slot_range + &repair_slot_range, ) .unwrap(), expected @@ -601,4 +863,290 @@ mod test { .unwrap(); assert_eq!(lowest.lowest, 5); } + + #[test] + pub fn test_generate_duplicate_repairs_for_slot() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let dead_slot = 9; + + // SlotMeta doesn't exist, should make no repairs + assert!( + RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none() + ); + + // Insert some shreds to create a SlotMeta, should make repairs + let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); + blockstore + .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) + .unwrap(); + assert!( + RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_some() + ); + + // SlotMeta is full, should make no repairs + blockstore + .insert_shreds(vec![shreds.pop().unwrap()], None, false) + .unwrap(); + assert!( + RepairService::generate_duplicate_repairs_for_slot(&blockstore, dead_slot,).is_none() + ); + } + + #[test] + pub fn test_generate_and_send_duplicate_repairs() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let cluster_slots = ClusterSlots::default(); + let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); + let mut duplicate_slot_repair_statuses = HashMap::new(); + let dead_slot = 9; + let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); + let duplicate_status = DuplicateSlotRepairStatus { + start: std::u64::MAX, + repair_addr: None, + }; + + // Insert some shreds to create a SlotMeta, + let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; + let (mut shreds, _) = make_slot_entries(dead_slot, dead_slot - 1, num_entries_per_slot); + blockstore + .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) + .unwrap(); + + duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status.clone()); + + // There is no repair_addr, so should not get filtered because the timeout + // `std::u64::MAX` has not expired + RepairService::generate_and_send_duplicate_repairs( + &mut duplicate_slot_repair_statuses, + &cluster_slots, + &blockstore, + &serve_repair, + &mut RepairStats::default(), + &UdpSocket::bind("0.0.0.0:0").unwrap(), + ); + assert!(duplicate_slot_repair_statuses + .get(&dead_slot) + .unwrap() + .repair_addr + .is_none()); + assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); + + // Give the slot a repair address + duplicate_slot_repair_statuses + .get_mut(&dead_slot) + .unwrap() + .repair_addr = Some(receive_socket.local_addr().unwrap()); + + // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` + RepairService::generate_and_send_duplicate_repairs( + &mut duplicate_slot_repair_statuses, + &cluster_slots, + &blockstore, + &serve_repair, + &mut RepairStats::default(), + &UdpSocket::bind("0.0.0.0:0").unwrap(), + ); + assert_eq!(duplicate_slot_repair_statuses.len(), 1); + assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); + + // Insert rest of shreds. Slot is full, should get filtered from + // `duplicate_slot_repair_statuses` + blockstore + .insert_shreds(vec![shreds.pop().unwrap()], None, false) + .unwrap(); + RepairService::generate_and_send_duplicate_repairs( + &mut duplicate_slot_repair_statuses, + &cluster_slots, + &blockstore, + &serve_repair, + &mut RepairStats::default(), + &UdpSocket::bind("0.0.0.0:0").unwrap(), + ); + assert!(duplicate_slot_repair_statuses.is_empty()); + } + + #[test] + pub fn test_update_duplicate_slot_repair_addr() { + let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap()); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( + Node::new_localhost().info, + )); + let serve_repair = ServeRepair::new(cluster_info.clone()); + let valid_repair_peer = Node::new_localhost().info; + + // Signal that this peer has completed the dead slot, and is thus + // a valid target for repair + let dead_slot = 9; + let cluster_slots = ClusterSlots::default(); + cluster_slots.insert_node_id(dead_slot, Arc::new(valid_repair_peer.id)); + cluster_info.insert_info(valid_repair_peer); + + // Not enough time has passed, should not update the + // address + let mut duplicate_status = DuplicateSlotRepairStatus { + start: std::u64::MAX, + repair_addr: dummy_addr, + }; + RepairService::update_duplicate_slot_repair_addr( + dead_slot, + &mut duplicate_status, + &cluster_slots, + &serve_repair, + ); + assert_eq!(duplicate_status.repair_addr, dummy_addr); + + // If the repair address is None, should try to update + let mut duplicate_status = DuplicateSlotRepairStatus { + start: std::u64::MAX, + repair_addr: None, + }; + RepairService::update_duplicate_slot_repair_addr( + dead_slot, + &mut duplicate_status, + &cluster_slots, + &serve_repair, + ); + assert!(duplicate_status.repair_addr.is_some()); + + // If sufficient time has passssed, should try to update + let mut duplicate_status = DuplicateSlotRepairStatus { + start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, + repair_addr: dummy_addr, + }; + RepairService::update_duplicate_slot_repair_addr( + dead_slot, + &mut duplicate_status, + &cluster_slots, + &serve_repair, + ); + assert_ne!(duplicate_status.repair_addr, dummy_addr); + } + + #[test] + pub fn test_process_new_duplicate_slots() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let cluster_slots = ClusterSlots::default(); + let serve_repair = ServeRepair::new_with_invalid_keypair(Node::new_localhost().info); + let mut duplicate_slot_repair_statuses = HashMap::new(); + let duplicate_slot = 9; + + // Fill blockstore for dead slot + blockstore.set_dead_slot(duplicate_slot).unwrap(); + assert!(blockstore.is_dead(duplicate_slot)); + let (shreds, _) = make_slot_entries(duplicate_slot, 0, 1); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + let keypairs = ValidatorVoteKeypairs::new_rand(); + let (reset_sender, reset_receiver) = unbounded(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = genesis_utils::create_genesis_config_with_vote_accounts( + 1_000_000_000, + &[&keypairs], + 10000, + ); + let bank0 = Arc::new(Bank::new(&genesis_config)); + let bank9 = Bank::new_from_parent(&bank0, &Pubkey::default(), duplicate_slot); + let old_balance = bank9.get_balance(&keypairs.node_keypair.pubkey()); + bank9 + .transfer(10_000, &mint_keypair, &keypairs.node_keypair.pubkey()) + .unwrap(); + let vote_tx = vote_transaction::new_vote_transaction( + vec![0], + bank0.hash(), + bank0.last_blockhash(), + &keypairs.node_keypair, + &keypairs.vote_keypair, + &keypairs.vote_keypair, + ); + bank9.process_transaction(&vote_tx).unwrap(); + assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_some()); + + RepairService::process_new_duplicate_slots( + &[duplicate_slot], + &mut duplicate_slot_repair_statuses, + &cluster_slots, + &bank9, + &blockstore, + &serve_repair, + &reset_sender, + ); + + // Blockstore should have been cleared + assert!(!blockstore.is_dead(duplicate_slot)); + + // Should not be able to find signature for slot 9 for the tx + assert!(bank9.get_signature_status(&vote_tx.signatures[0]).is_none()); + + // Getting balance should return the old balance (acounts were cleared) + assert_eq!( + bank9.get_balance(&keypairs.node_keypair.pubkey()), + old_balance + ); + + // Should add the duplicate slot to the tracker + assert!(duplicate_slot_repair_statuses + .get(&duplicate_slot) + .is_some()); + + // A signal should be sent to clear ReplayStage + assert!(reset_receiver.try_recv().is_ok()); + } + + #[test] + pub fn test_find_new_duplicate_slots() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let cluster_slots = ClusterSlots::default(); + let duplicate_slot_repair_statuses = HashMap::new(); + let keypairs = ValidatorVoteKeypairs::new_rand(); + let only_node_id = Arc::new(keypairs.node_keypair.pubkey()); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 1_000_000_000, + &[keypairs], + 100, + ); + let bank0 = Bank::new(&genesis_config); + + // Empty blockstore should have no duplicates + assert!(RepairService::find_new_duplicate_slots( + &duplicate_slot_repair_statuses, + &blockstore, + &cluster_slots, + &bank0, + ) + .is_empty()); + + // Insert a dead slot, but is not confirmed by network so should not + // be marked as duplicate + let dead_slot = 9; + blockstore.set_dead_slot(dead_slot).unwrap(); + assert!(RepairService::find_new_duplicate_slots( + &duplicate_slot_repair_statuses, + &blockstore, + &cluster_slots, + &bank0, + ) + .is_empty()); + + // If supermajority confirms the slot, then dead slot should be + // marked as a duplicate that needs to be repaired + cluster_slots.insert_node_id(dead_slot, only_node_id); + assert_eq!( + RepairService::find_new_duplicate_slots( + &duplicate_slot_repair_statuses, + &blockstore, + &cluster_slots, + &bank0, + ), + vec![dead_slot] + ); + } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0e5e0a5e10..5155b4a678 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -9,6 +9,7 @@ use crate::{ consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, progress_map::{ForkProgress, ForkStats, ProgressMap, PropagatedStats}, + repair_service::DuplicateSlotsResetReceiver, result::Result, rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, @@ -108,7 +109,7 @@ pub struct ReplayStage { } impl ReplayStage { - #[allow(clippy::new_ret_no_self)] + #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] pub fn new( config: ReplayStageConfig, blockstore: Arc, @@ -119,6 +120,7 @@ impl ReplayStage { vote_tracker: Arc, cluster_slots: Arc, retransmit_slots_sender: RetransmitSlotsSender, + duplicate_slots_reset_receiver: DuplicateSlotsResetReceiver, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, @@ -216,9 +218,18 @@ impl ReplayStage { Self::report_memory(&allocated, "replay_active_banks", start); let ancestors = Arc::new(bank_forks.read().unwrap().ancestors()); - let descendants = HashMap::new(); + let descendants = bank_forks.read().unwrap().descendants(); let forks_root = bank_forks.read().unwrap().root(); let start = allocated.get(); + // Reset any duplicate slots that have been confirmed + // by the network in anticipation of the confirmed version of + // the slot + Self::reset_duplicate_slots( + &duplicate_slots_reset_receiver, + &descendants, + &mut progress, + &bank_forks, + ); let mut frozen_banks: Vec<_> = bank_forks .read() .unwrap() @@ -462,6 +473,57 @@ impl ReplayStage { ); } + fn reset_duplicate_slots( + duplicate_slots_reset_receiver: &DuplicateSlotsResetReceiver, + descendants: &HashMap>, + progress: &mut ProgressMap, + bank_forks: &RwLock, + ) { + for duplicate_slot in duplicate_slots_reset_receiver.try_iter() { + Self::purge_unconfirmed_duplicate_slot( + duplicate_slot, + descendants, + progress, + bank_forks, + ); + } + } + + fn purge_unconfirmed_duplicate_slot( + duplicate_slot: Slot, + descendants: &HashMap>, + progress: &mut ProgressMap, + bank_forks: &RwLock, + ) { + error!("purging slot {}", duplicate_slot); + let empty = HashSet::new(); + let slot_descendants = descendants.get(&duplicate_slot).unwrap_or(&empty); + + for d in slot_descendants + .iter() + .chain(std::iter::once(&duplicate_slot)) + { + // Clear the progress map of these forks + let _ = progress.remove(d); + + // Clear the duplicate banks from BankForks + { + let mut w_bank_forks = bank_forks.write().unwrap(); + // Purging should have already been taken care of by logic + // in repair_service, so make sure drop implementation doesn't + // run + w_bank_forks + .get(*d) + .expect("Bank in descendants map must exist in BankForks") + .skip_drop + .store(true, Ordering::Relaxed); + w_bank_forks + .remove(*d) + .expect("Bank in descendants map must exist in BankForks"); + } + } + } + fn log_leader_change( my_pubkey: &Pubkey, bank_slot: Slot, @@ -746,7 +808,7 @@ impl ReplayStage { trace!("latest root send failed: {:?}", e); } }); - trace!("new root {}", new_root); + info!("new root {}", new_root); if let Err(e) = root_bank_sender.send(rooted_banks) { trace!("root_bank_sender failed: {:?}", e); return Err(e.into()); @@ -3541,4 +3603,65 @@ pub(crate) mod tests { &progress_map, )); } + + #[test] + fn test_purge_unconfirmed_duplicate_slot() { + let (bank_forks, mut progress) = setup_forks(); + let descendants = bank_forks.read().unwrap().descendants(); + + // Purging slot 5 should purge only slots 5 and its descendant 6 + ReplayStage::purge_unconfirmed_duplicate_slot(5, &descendants, &mut progress, &bank_forks); + for i in 5..=6 { + assert!(bank_forks.read().unwrap().get(i).is_none()); + assert!(progress.get(&i).is_none()); + } + for i in 0..=4 { + assert!(bank_forks.read().unwrap().get(i).is_some()); + assert!(progress.get(&i).is_some()); + } + + // Purging slot 4 should purge only slot 4 + let descendants = bank_forks.read().unwrap().descendants(); + ReplayStage::purge_unconfirmed_duplicate_slot(4, &descendants, &mut progress, &bank_forks); + for i in 4..=6 { + assert!(bank_forks.read().unwrap().get(i).is_none()); + assert!(progress.get(&i).is_none()); + } + for i in 0..=3 { + assert!(bank_forks.read().unwrap().get(i).is_some()); + assert!(progress.get(&i).is_some()); + } + + // Purging slot 1 should purge both forks 2 and 3 + let descendants = bank_forks.read().unwrap().descendants(); + ReplayStage::purge_unconfirmed_duplicate_slot(1, &descendants, &mut progress, &bank_forks); + for i in 1..=6 { + assert!(bank_forks.read().unwrap().get(i).is_none()); + assert!(progress.get(&i).is_none()); + } + assert!(bank_forks.read().unwrap().get(0).is_some()); + assert!(progress.get(&0).is_some()); + } + + fn setup_forks() -> (RwLock, ProgressMap) { + /* + Build fork structure: + slot 0 + | + slot 1 + / \ + slot 2 | + | slot 3 + slot 4 | + slot 5 + | + slot 6 + */ + let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / (tr(3) / (tr(5) / (tr(6))))); + + let mut vote_simulator = VoteSimulator::new(1); + vote_simulator.fill_bank_forks(forks, &HashMap::new()); + + (vote_simulator.bank_forks, vote_simulator.progress) + } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 3cf7666441..2ceeeb8f65 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,11 +3,12 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_slots::ClusterSlots, + repair_service::DuplicateSlotsResetSender, repair_service::RepairStrategy, result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; -use crossbeam_channel::Receiver as CrossbeamReceiver; +use crossbeam_channel::Receiver; use solana_ledger::{ bank_forks::BankForks, blockstore::{Blockstore, CompletedSlotsReceiver}, @@ -206,13 +207,14 @@ impl RetransmitStage { cluster_info: &Arc, retransmit_sockets: Arc>, repair_socket: Arc, - verified_receiver: CrossbeamReceiver>, + verified_receiver: Receiver>, exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, cfg: Option>, shred_version: u16, cluster_slots: Arc, + duplicate_slots_reset_sender: DuplicateSlotsResetSender, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -229,6 +231,7 @@ impl RetransmitStage { bank_forks, completed_slots_receiver, epoch_schedule, + duplicate_slots_reset_sender, }; let leader_schedule_cache = leader_schedule_cache.clone(); let window_service = WindowService::new( diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 899ad2ff15..b19d5e3b17 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -373,6 +373,20 @@ impl ServeRepair { Ok((addr, out)) } + pub fn repair_request_duplicate_compute_best_peer( + &self, + slot: Slot, + cluster_slots: &ClusterSlots, + ) -> Result { + let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); + if repair_peers.is_empty() { + return Err(ClusterInfoError::NoPeers.into()); + } + let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers); + let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); + Ok(repair_peers[n].serve_repair) + } + pub fn map_repair_request( &self, repair_request: &RepairType, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 83b37e9052..c5a270c58c 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -143,6 +143,7 @@ impl Tvu { }; let cluster_slots = Arc::new(ClusterSlots::default()); + let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded(); let retransmit_stage = RetransmitStage::new( bank_forks.clone(), leader_schedule_cache, @@ -157,6 +158,7 @@ impl Tvu { cfg, tvu_config.shred_version, cluster_slots.clone(), + duplicate_slots_reset_sender, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); @@ -205,6 +207,7 @@ impl Tvu { vote_tracker, cluster_slots, retransmit_slots_sender, + duplicate_slots_reset_receiver, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 278e6be0c8..0f2bfd73ca 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -178,6 +178,10 @@ impl BankForks { bank } + pub fn remove(&mut self, slot: Slot) -> Option> { + self.banks.remove(&slot) + } + pub fn working_bank(&self) -> Arc { self.working_bank.clone() } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index fdbd9b14a9..466093ebe0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -836,6 +836,30 @@ impl Blockstore { Ok(()) } + pub fn clear_unconfirmed_slot(&self, slot: Slot) { + let _lock = self.insert_shreds_lock.lock().unwrap(); + if let Some(mut slot_meta) = self + .meta(slot) + .expect("Couldn't fetch from SlotMeta column family") + { + // Clear all slot related information + self.run_purge(slot, slot) + .expect("Purge database operations failed"); + + // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` + // field. + slot_meta.clear_unconfirmed_slot(); + self.meta_cf + .put(slot, &slot_meta) + .expect("Couldn't insert into SlotMeta column family"); + } else { + error!( + "clear_unconfirmed_slot() called on slot {} with no SlotMeta", + slot + ); + } + } + pub fn insert_shreds( &self, shreds: Vec, @@ -2139,6 +2163,16 @@ impl Blockstore { Ok(orphans_iter.map(|(slot, _)| slot)) } + pub fn dead_slots_iterator<'a>( + &'a self, + slot: Slot, + ) -> Result + 'a> { + let dead_slots_iterator = self + .db + .iter::(IteratorMode::From(slot, IteratorDirection::Forward))?; + Ok(dead_slots_iterator.map(|(slot, _)| slot)) + } + /// Prune blockstore such that slots higher than `target_slot` are deleted and all references to /// higher slots are removed pub fn prune(&self, target_slot: Slot) { @@ -2397,10 +2431,7 @@ fn find_slot_meta_in_db_else_create<'a>( // If this slot doesn't exist, make a orphan slot. This way we // remember which slots chained to this one when we eventually get a real shred // for this slot - insert_map.insert( - slot, - Rc::new(RefCell::new(SlotMeta::new(slot, std::u64::MAX))), - ); + insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot)))); Ok(insert_map.get(&slot).unwrap().clone()) } } @@ -6695,4 +6726,49 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + + #[test] + fn test_clear_unconfirmed_slot() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let unconfirmed_slot = 9; + let unconfirmed_child_slot = 10; + let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot]; + + // Insert into slot 9, mark it as dead + let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) + .into_iter() + .flat_map(|x| x.0) + .collect(); + blockstore.insert_shreds(shreds, None, false).unwrap(); + // Should only be one shred in slot 9 + assert!(blockstore + .get_data_shred(unconfirmed_slot, 0) + .unwrap() + .is_some()); + assert!(blockstore + .get_data_shred(unconfirmed_slot, 1) + .unwrap() + .is_none()); + blockstore.set_dead_slot(unconfirmed_slot).unwrap(); + + // Purge the slot + blockstore.clear_unconfirmed_slot(unconfirmed_slot); + assert!(!blockstore.is_dead(unconfirmed_slot)); + assert_eq!( + blockstore + .meta(unconfirmed_slot) + .unwrap() + .unwrap() + .next_slots, + vec![unconfirmed_child_slot] + ); + assert!(blockstore + .get_data_shred(unconfirmed_slot, 0) + .unwrap() + .is_none()); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } } diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index 0488f24c30..911ac375fe 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -158,6 +158,12 @@ impl SlotMeta { self.parent_slot != std::u64::MAX } + pub fn clear_unconfirmed_slot(&mut self) { + let mut new_self = SlotMeta::new_orphan(self.slot); + std::mem::swap(&mut new_self.next_slots, &mut self.next_slots); + std::mem::swap(self, &mut new_self); + } + pub(crate) fn new(slot: Slot, parent_slot: Slot) -> Self { SlotMeta { slot, @@ -171,6 +177,10 @@ impl SlotMeta { completed_data_indexes: vec![], } } + + pub(crate) fn new_orphan(slot: Slot) -> Self { + Self::new(slot, std::u64::MAX) + } } impl ErasureMeta { @@ -289,4 +299,17 @@ mod test { assert_eq!(e_meta.status(&index), DataFull); } } + + #[test] + fn test_clear_unconfirmed_slot() { + let mut slot_meta = SlotMeta::new_orphan(5); + slot_meta.consumed = 5; + slot_meta.received = 5; + slot_meta.next_slots = vec![6, 7]; + slot_meta.clear_unconfirmed_slot(); + + let mut expected = SlotMeta::new_orphan(5); + expected.next_slots = vec![6, 7]; + assert_eq!(slot_meta, expected); + } } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index afad770c51..913e77c93d 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -16,7 +16,7 @@ use solana_ledger::{ use solana_sdk::{ client::SyncClient, clock::{ - Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, + self, Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS, }, commitment_config::CommitmentConfig, @@ -33,7 +33,7 @@ use std::{ collections::{HashMap, HashSet}, path::Path, thread::sleep, - time::Duration, + time::{Duration, Instant}, }; const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND; @@ -284,6 +284,26 @@ pub fn kill_entry_and_spend_and_verify_rest( } } +pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo]) { + let mut roots = vec![HashSet::new(); contact_infos.len()]; + let mut done = false; + let mut last_print = Instant::now(); + while !done { + for (i, ingress_node) in contact_infos.iter().enumerate() { + let client = create_client(ingress_node.client_facing_addr(), VALIDATOR_PORT_RANGE); + let slot = client.get_slot().unwrap_or(0); + roots[i].insert(slot); + let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); + if last_print.elapsed().as_secs() > 3 { + info!("PARTITION_TEST min observed roots {}/16", min_node); + last_print = Instant::now(); + } + done = min_node >= num_new_roots; + } + sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); + } +} + fn poll_all_nodes_for_signature( entry_point_info: &ContactInfo, cluster_nodes: &[ContactInfo], diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 8b687de76b..34f4fda9c5 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -22,7 +22,7 @@ use solana_sdk::{ client::{AsyncClient, SyncClient}, clock::{self, Slot}, commitment_config::CommitmentConfig, - epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, + epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, genesis_config::OperatingMode, hash::Hash, poh_config::PohConfig, @@ -36,7 +36,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, thread::sleep, - time::{Duration, Instant}, + time::Duration, }; use tempfile::TempDir; @@ -345,26 +345,7 @@ fn run_cluster_partition( .unwrap(); info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); info!("PARTITION_TEST looking for new roots on all nodes"); - let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; - let mut done = false; - let mut last_print = Instant::now(); - while !done { - for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { - let client = create_client( - ingress_node.client_facing_addr(), - solana_core::cluster_info::VALIDATOR_PORT_RANGE, - ); - let slot = client.get_slot().unwrap_or(0); - roots[i].insert(slot); - let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); - if last_print.elapsed().as_secs() > 3 { - info!("PARTITION_TEST min observed roots {}/16", min_node); - last_print = Instant::now(); - } - done = min_node >= 16; - } - sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); - } + cluster_tests::check_for_new_roots(16, &alive_node_contact_infos); info!("PARTITION_TEST done waiting for roots"); } @@ -1104,7 +1085,6 @@ fn test_snapshots_restart_validity() { #[test] #[serial] #[allow(unused_attributes)] -#[ignore] fn test_fail_entry_verification_leader() { test_faulty_node(BroadcastStageType::FailEntryVerification); } @@ -1118,14 +1098,15 @@ fn test_fake_shreds_broadcast_leader() { fn test_faulty_node(faulty_node_type: BroadcastStageType) { solana_logger::setup(); - let num_nodes = 4; + let num_nodes = 2; let validator_config = ValidatorConfig::default(); let mut error_validator_config = ValidatorConfig::default(); error_validator_config.broadcast_stage_type = faulty_node_type.clone(); let mut validator_configs = vec![validator_config; num_nodes - 1]; - validator_configs.push(error_validator_config); - let mut node_stakes = vec![100; num_nodes - 1]; - node_stakes.push(50); + // Push a faulty_bootstrap = vec![error_validator_config]; + validator_configs.insert(0, error_validator_config); + let node_stakes = vec![300, 100]; + assert_eq!(node_stakes.len(), num_nodes); let cluster_config = ClusterConfig { cluster_lamports: 10_000, node_stakes, @@ -1136,37 +1117,14 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { }; let cluster = LocalCluster::new(&cluster_config); - let epoch_schedule = EpochSchedule::custom( - cluster_config.slots_per_epoch, - cluster_config.stakers_slot_offset, - true, - ); - let num_warmup_epochs = epoch_schedule.get_leader_schedule_epoch(0) + 1; - // Wait for the corrupted leader to be scheduled afer the warmup epochs expire - cluster_tests::sleep_n_epochs( - (num_warmup_epochs + 1) as f64, - &cluster.genesis_config.poh_config, - cluster_config.ticks_per_slot, - cluster_config.slots_per_epoch, - ); - - let corrupt_node = cluster + // Check for new roots + let alive_node_contact_infos: Vec<_> = cluster .validators - .iter() - .find(|(_, v)| v.config.broadcast_stage_type == faulty_node_type) - .unwrap() - .0; - let mut ignore = HashSet::new(); - ignore.insert(*corrupt_node); - - // Verify that we can still spend and verify even in the presence of corrupt nodes - cluster_tests::spend_and_verify_all_nodes( - &cluster.entry_point_info, - &cluster.funding_keypair, - num_nodes, - ignore, - ); + .values() + .map(|v| v.info.contact_info.clone()) + .collect(); + cluster_tests::check_for_new_roots(16, &alive_node_contact_infos); } #[test] diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 57e9b1d18f..1de4f45d84 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -478,7 +478,7 @@ pub struct AccountsDB { pub bank_hashes: RwLock>, - pub dead_slots: RwLock>, + dead_slots: RwLock>, stats: AccountsStats, } @@ -1266,6 +1266,39 @@ impl AccountsDB { } } + pub fn remove_unrooted_slot(&self, remove_slot: Slot) { + if self.accounts_index.read().unwrap().is_root(remove_slot) { + panic!("Trying to remove accounts for rooted slot {}", remove_slot); + } + + let pubkey_sets: Vec> = self.scan_account_storage( + remove_slot, + |stored_account: &StoredAccount, _, accum: &mut HashSet| { + accum.insert(stored_account.meta.pubkey); + }, + ); + + // Purge this slot from the accounts index + let mut reclaims = vec![]; + { + let pubkeys = pubkey_sets.iter().flatten(); + let accounts_index = self.accounts_index.read().unwrap(); + + for pubkey in pubkeys { + accounts_index.clean_unrooted_entries_by_slot(remove_slot, pubkey, &mut reclaims); + } + } + + self.handle_reclaims(&reclaims); + + // 1) Remove old bank hash from self.bank_hashes + // 2) Purge this slot's storage entries from self.storage + self.process_dead_slots(); + + // Sanity check storage entries are removed from the index + assert!(self.storage.read().unwrap().0.get(&remove_slot).is_none()); + } + pub fn hash_stored_account(slot: Slot, account: &StoredAccount) -> Hash { Self::hash_account_data( slot, @@ -2199,6 +2232,80 @@ pub mod tests { assert_eq!(db0.load_slow(&ancestors, &key), Some((account0, 0))); } + #[test] + fn test_remove_unrooted_slot() { + let unrooted_slot = 9; + let db = AccountsDB::new(Vec::new()); + let key = Pubkey::default(); + let account0 = Account::new(1, 0, &key); + let ancestors: HashMap<_, _> = vec![(unrooted_slot, 1)].into_iter().collect(); + db.store(unrooted_slot, &[(&key, &account0)]); + db.bank_hashes + .write() + .unwrap() + .insert(unrooted_slot, BankHashInfo::default()); + assert!(db + .accounts_index + .read() + .unwrap() + .get(&key, &ancestors) + .is_some()); + assert_load_account(&db, unrooted_slot, key, 1); + + // Purge the slot + db.remove_unrooted_slot(unrooted_slot); + assert!(db.load_slow(&ancestors, &key).is_none()); + assert!(db.bank_hashes.read().unwrap().get(&unrooted_slot).is_none()); + assert!(db.storage.read().unwrap().0.get(&unrooted_slot).is_none()); + assert!(db + .accounts_index + .read() + .unwrap() + .account_maps + .get(&key) + .map(|pubkey_entry| pubkey_entry.1.read().unwrap().is_empty()) + .unwrap_or(true)); + assert!(db + .accounts_index + .read() + .unwrap() + .get(&key, &ancestors) + .is_none()); + + // Test we can store for the same slot again and get the right information + let account0 = Account::new(2, 0, &key); + db.store(unrooted_slot, &[(&key, &account0)]); + assert_load_account(&db, unrooted_slot, key, 2); + } + + #[test] + fn test_remove_unrooted_slot_snapshot() { + let unrooted_slot = 9; + let db = AccountsDB::new(Vec::new()); + let key = Pubkey::new_rand(); + let account0 = Account::new(1, 0, &key); + db.store(unrooted_slot, &[(&key, &account0)]); + + // Purge the slot + db.remove_unrooted_slot(unrooted_slot); + + // Add a new root + let key2 = Pubkey::new_rand(); + let new_root = unrooted_slot + 1; + db.store(new_root, &[(&key2, &account0)]); + db.add_root(new_root); + + // Simulate reconstruction from snapshot + let db = reconstruct_accounts_db_via_serialization(&db, new_root); + + // Check root account exists + assert_load_account(&db, new_root, key2, 1); + + // Check purged account stays gone + let unrooted_slot_ancestors: HashMap<_, _> = vec![(unrooted_slot, 1)].into_iter().collect(); + assert!(db.load_slow(&unrooted_slot_ancestors, &key).is_none()); + } + fn create_account( accounts: &AccountsDB, pubkeys: &mut Vec, diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index d24cdfff52..beba960baa 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -172,6 +172,23 @@ impl AccountsIndex { } } + pub fn clean_unrooted_entries_by_slot( + &self, + purge_slot: Slot, + pubkey: &Pubkey, + reclaims: &mut SlotList, + ) { + if let Some(entry) = self.account_maps.get(pubkey) { + let mut list = entry.1.write().unwrap(); + list.retain(|(slot, entry)| { + if *slot == purge_slot { + reclaims.push((*slot, entry.clone())); + } + *slot != purge_slot + }); + } + } + pub fn add_index(&mut self, slot: Slot, pubkey: &Pubkey, account_info: T) { let entry = self .account_maps diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index db20367705..0edafebb20 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -355,6 +355,9 @@ pub struct Bank { /// Rewards that were paid out immediately after this bank was created #[serde(skip)] pub rewards: Option>, + + #[serde(skip)] + pub skip_drop: AtomicBool, } impl Default for BlockhashQueue { @@ -466,6 +469,7 @@ impl Bank { hard_forks: parent.hard_forks.clone(), last_vote_sync: AtomicU64::new(parent.last_vote_sync.load(Ordering::Relaxed)), rewards: None, + skip_drop: AtomicBool::new(false), }; datapoint_info!( @@ -966,6 +970,14 @@ impl Bank { self.src.status_cache.write().unwrap().clear_signatures(); } + pub fn clear_slot_signatures(&self, slot: Slot) { + self.src + .status_cache + .write() + .unwrap() + .clear_slot_signatures(slot); + } + pub fn can_commit(result: &Result<()>) -> bool { match result { Ok(_) => true, @@ -1056,6 +1068,10 @@ impl Bank { } } + pub fn remove_unrooted_slot(&self, slot: Slot) { + self.rc.accounts.accounts_db.remove_unrooted_slot(slot) + } + fn load_accounts( &self, txs: &[Transaction], @@ -2259,7 +2275,9 @@ impl Bank { impl Drop for Bank { fn drop(&mut self) { // For root slots this is a noop - self.rc.accounts.purge_slot(self.slot()); + if !self.skip_drop.load(Ordering::Relaxed) { + self.rc.accounts.purge_slot(self.slot()); + } } } diff --git a/runtime/src/genesis_utils.rs b/runtime/src/genesis_utils.rs index 2094be6fae..51a581e725 100644 --- a/runtime/src/genesis_utils.rs +++ b/runtime/src/genesis_utils.rs @@ -28,6 +28,14 @@ impl ValidatorVoteKeypairs { stake_keypair, } } + + pub fn new_rand() -> Self { + Self { + node_keypair: Keypair::new(), + vote_keypair: Keypair::new(), + stake_keypair: Keypair::new(), + } + } } pub struct GenesisConfigInfo { diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 4885e5c5c5..732f076dda 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -9,7 +9,7 @@ use solana_sdk::{ signature::Signature, }; use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap, HashSet}, sync::{Arc, Mutex}, }; @@ -82,6 +82,46 @@ impl PartialEq for StatusCache { } impl StatusCache { + pub fn clear_slot_signatures(&mut self, slot: Slot) { + let slot_deltas = self.slot_deltas.remove(&slot); + if let Some(slot_deltas) = slot_deltas { + let slot_deltas = slot_deltas.lock().unwrap(); + for (blockhash, (_, signature_list)) in slot_deltas.iter() { + // Any blockhash that exists in self.slot_deltas must also exist + // in self.cache, because in self.purge_roots(), when an entry + // (b, (max_slot, _, _)) is removed from self.cache, this implies + // all entries in self.slot_deltas < max_slot are also removed + if let Entry::Occupied(mut o_blockhash_entries) = self.cache.entry(*blockhash) { + let (_, _, all_sig_maps) = o_blockhash_entries.get_mut(); + + for (sig_slice, _) in signature_list { + if let Entry::Occupied(mut o_sig_list) = all_sig_maps.entry(*sig_slice) { + let sig_list = o_sig_list.get_mut(); + sig_list.retain(|(updated_slot, _)| *updated_slot != slot); + if sig_list.is_empty() { + o_sig_list.remove_entry(); + } + } else { + panic!( + "Map for signature must exist if siganture exists in self.slot_deltas, slot: {}", + slot + ) + } + } + + if all_sig_maps.is_empty() { + o_blockhash_entries.remove_entry(); + } + } else { + panic!( + "Blockhash must exist if it exists in self.slot_deltas, slot: {}", + slot + ) + } + } + } + } + /// Check if the signature from a transaction is in any of the forks in the ancestors set. pub fn get_signature_status( &self, @@ -408,6 +448,8 @@ mod tests { status_cache.add_root(i as u64); } let slots: Vec<_> = (0_u64..MAX_CACHE_ENTRIES as u64 + 1).collect(); + assert_eq!(status_cache.slot_deltas.len(), 1); + assert!(status_cache.slot_deltas.get(&1).is_some()); let slot_deltas = status_cache.slot_deltas(&slots); let cache = StatusCache::from_slot_deltas(&slot_deltas); assert_eq!(cache, status_cache); @@ -417,4 +459,51 @@ mod tests { fn test_age_sanity() { assert!(MAX_CACHE_ENTRIES <= MAX_RECENT_BLOCKHASHES); } + + #[test] + fn test_clear_slot_signatures() { + let sig = Signature::default(); + let mut status_cache = BankStatusCache::default(); + let blockhash = hash(Hash::default().as_ref()); + let blockhash2 = hash(blockhash.as_ref()); + status_cache.insert(&blockhash, &sig, 0, ()); + status_cache.insert(&blockhash, &sig, 1, ()); + status_cache.insert(&blockhash2, &sig, 1, ()); + + let mut ancestors0 = HashMap::new(); + ancestors0.insert(0, 0); + let mut ancestors1 = HashMap::new(); + ancestors1.insert(1, 0); + + // Clear slot 0 related data + assert!(status_cache + .get_signature_status(&sig, &blockhash, &ancestors0) + .is_some()); + status_cache.clear_slot_signatures(0); + assert!(status_cache + .get_signature_status(&sig, &blockhash, &ancestors0) + .is_none()); + assert!(status_cache + .get_signature_status(&sig, &blockhash, &ancestors1) + .is_some()); + assert!(status_cache + .get_signature_status(&sig, &blockhash2, &ancestors1) + .is_some()); + + // Check that the slot delta for slot 0 is gone, but slot 1 still + // exists + assert!(status_cache.slot_deltas.get(&0).is_none()); + assert!(status_cache.slot_deltas.get(&1).is_some()); + + // Clear slot 1 related data + status_cache.clear_slot_signatures(1); + assert!(status_cache.slot_deltas.is_empty()); + assert!(status_cache + .get_signature_status(&sig, &blockhash, &ancestors1) + .is_none()); + assert!(status_cache + .get_signature_status(&sig, &blockhash2, &ancestors1) + .is_none()); + assert!(status_cache.cache.is_empty()); + } }