From 7501ed65e502a2d978af4bad81687f0403c1bfcf Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 13 May 2019 15:37:50 -0700 Subject: [PATCH] Initialize and Update EpochSlots in RepairService (#4255) * Initialize EpochSlots in RepairService * Fix flaky test --- core/src/repair_service.rs | 244 +++++++++++++++++++++++++++++++++-- core/src/retransmit_stage.rs | 3 + core/src/tvu.rs | 1 + core/src/window_service.rs | 10 +- runtime/src/accounts.rs | 4 +- 5 files changed, 247 insertions(+), 15 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 1b0b90dd7..61bcd5043 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -7,6 +7,7 @@ use crate::cluster_info::ClusterInfo; use crate::result::Result; use crate::service::Service; use solana_metrics::datapoint; +use solana_runtime::bank::EpochSchedule; use solana_sdk::pubkey::Pubkey; use std::collections::HashSet; use std::net::UdpSocket; @@ -27,6 +28,7 @@ pub enum RepairStrategy { RepairAll { bank_forks: Arc>, completed_slots_receiver: CompletedSlotsReceiver, + epoch_schedule: EpochSchedule, }, } @@ -103,8 +105,24 @@ impl RepairService { repair_strategy: RepairStrategy, ) { let mut repair_info = RepairInfo::new(); - let epoch_slots: HashSet = HashSet::new(); + let mut epoch_slots: HashSet = HashSet::new(); let id = cluster_info.read().unwrap().id(); + if let RepairStrategy::RepairAll { + ref bank_forks, + ref epoch_schedule, + .. + } = repair_strategy + { + let root = bank_forks.read().unwrap().root(); + Self::initialize_epoch_slots( + id, + blocktree, + &mut epoch_slots, + root, + epoch_schedule, + cluster_info, + ); + } loop { if exit.load(Ordering::Relaxed) { break; @@ -125,12 +143,14 @@ impl RepairService { RepairStrategy::RepairAll { ref bank_forks, ref completed_slots_receiver, + .. } => { + let root = bank_forks.read().unwrap().root(); Self::update_epoch_slots( id, - &epoch_slots, + root, + &mut epoch_slots, &cluster_info, - bank_forks, completed_slots_receiver, ); Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) @@ -289,20 +309,76 @@ impl RepairService { } } + fn get_completed_slots_past_root( + blocktree: &Blocktree, + slots_in_gossip: &mut HashSet, + root: u64, + epoch_schedule: &EpochSchedule, + ) { + let last_confirmed_epoch = epoch_schedule.get_stakers_epoch(root); + let last_epoch_slot = epoch_schedule.get_last_slot_in_epoch(last_confirmed_epoch); + + let mut meta_iter = blocktree + .slot_meta_iterator(root + 1) + .expect("Couldn't get db iterator"); + + while meta_iter.valid() && meta_iter.key().unwrap() <= last_epoch_slot { + let current_slot = meta_iter.key().unwrap(); + let meta = meta_iter.value().unwrap(); + if meta.is_full() { + slots_in_gossip.insert(current_slot); + } + meta_iter.next(); + } + } + + fn initialize_epoch_slots( + id: Pubkey, + blocktree: &Blocktree, + slots_in_gossip: &mut HashSet, + root: u64, + epoch_schedule: &EpochSchedule, + cluster_info: &RwLock, + ) { + Self::get_completed_slots_past_root(blocktree, slots_in_gossip, root, epoch_schedule); + + // Safe to set into gossip because by this time, the leader schedule cache should + // also be updated with the latest root (done in blocktree_processor) and thus + // will provide a schedule to window_service for any incoming blobs up to the + // last_confirmed_epoch. + cluster_info + .write() + .unwrap() + .push_epoch_slots(id, root, slots_in_gossip.clone()); + } + // Update the gossiped structure used for the "Repairmen" repair protocol. See book // for details. fn update_epoch_slots( id: Pubkey, - slots: &HashSet, + root: u64, + slots_in_gossip: &mut HashSet, cluster_info: &RwLock, - bank_forks: &Arc>, - _completed_slots_receiver: &CompletedSlotsReceiver, + completed_slots_receiver: &CompletedSlotsReceiver, ) { - let root = bank_forks.read().unwrap().root(); - cluster_info - .write() - .unwrap() - .push_epoch_slots(id, root, slots.clone()); + let mut should_update = false; + while let Ok(completed_slots) = completed_slots_receiver.try_recv() { + for slot in completed_slots { + // If the newly completed slot > root, and the set did not contain this value + // before, we should update gossip. + if slot > root && slots_in_gossip.insert(slot) { + should_update = true; + } + } + } + + if should_update { + slots_in_gossip.retain(|x| *x > root); + cluster_info + .write() + .unwrap() + .push_epoch_slots(id, root, slots_in_gossip.clone()); + } } } @@ -321,6 +397,11 @@ mod test { make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, }; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; + use crate::cluster_info::Node; + use rand::seq::SliceRandom; + use rand::{thread_rng, Rng}; + use std::cmp::min; + use std::thread::Builder; #[test] pub fn test_repair_orphan() { @@ -523,4 +604,145 @@ mod test { } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + pub fn test_get_completed_slots_past_root() { + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let num_entries_per_slot = 10; + let root = 10; + + let fork1 = vec![5, 7, root, 15, 20, 21]; + let fork1_blobs: Vec<_> = make_chaining_slot_entries(&fork1, num_entries_per_slot) + .into_iter() + .flat_map(|(blobs, _)| blobs) + .collect(); + let fork2 = vec![8, 12]; + let fork2_blobs = make_chaining_slot_entries(&fork2, num_entries_per_slot); + + // Remove the last blob from each slot to make an incomplete slot + let fork2_incomplete_blobs: Vec<_> = fork2_blobs + .into_iter() + .flat_map(|(mut blobs, _)| { + blobs.pop(); + blobs + }) + .collect(); + let mut full_slots = HashSet::new(); + + blocktree.write_blobs(&fork1_blobs).unwrap(); + blocktree.write_blobs(&fork2_incomplete_blobs).unwrap(); + + // Test that only slots > root from fork1 were included + let epoch_schedule = EpochSchedule::new(32, 32, false); + + RepairService::get_completed_slots_past_root( + &blocktree, + &mut full_slots, + root, + &epoch_schedule, + ); + + let mut expected: HashSet<_> = fork1.into_iter().filter(|x| *x > root).collect(); + assert_eq!(full_slots, expected); + + // Test that slots past the last confirmed epoch boundary don't get included + let last_epoch = epoch_schedule.get_stakers_epoch(root); + let last_slot = epoch_schedule.get_last_slot_in_epoch(last_epoch); + let fork3 = vec![last_slot, last_slot + 1]; + let fork3_blobs: Vec<_> = make_chaining_slot_entries(&fork3, num_entries_per_slot) + .into_iter() + .flat_map(|(blobs, _)| blobs) + .collect(); + blocktree.write_blobs(&fork3_blobs).unwrap(); + RepairService::get_completed_slots_past_root( + &blocktree, + &mut full_slots, + root, + &epoch_schedule, + ); + expected.insert(last_slot); + assert_eq!(full_slots, expected); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + pub fn test_update_epoch_slots() { + let blocktree_path = get_tmp_ledger_path!(); + { + // Create blocktree + let (blocktree, _, completed_slots_receiver) = + Blocktree::open_with_signal(&blocktree_path).unwrap(); + + let blocktree = Arc::new(blocktree); + + let mut root = 0; + let num_slots = 100; + let entries_per_slot = 5; + let blocktree_ = blocktree.clone(); + + // Spin up thread to write to blocktree + let writer = Builder::new() + .name("writer".to_string()) + .spawn(move || { + let slots: Vec<_> = (1..num_slots + 1).collect(); + let mut blobs: Vec<_> = make_chaining_slot_entries(&slots, entries_per_slot) + .into_iter() + .flat_map(|(blobs, _)| blobs) + .collect(); + blobs.shuffle(&mut thread_rng()); + let mut i = 0; + let max_step = entries_per_slot * 4; + let repair_interval_ms = 10; + let mut rng = rand::thread_rng(); + while i < blobs.len() as usize { + let step = rng.gen_range(1, max_step + 1); + blocktree_ + .insert_data_blobs(&blobs[i..min(i + max_step as usize, blobs.len())]) + .unwrap(); + sleep(Duration::from_millis(repair_interval_ms)); + i += step as usize; + } + }) + .unwrap(); + + let mut completed_slots = HashSet::new(); + let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); + let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( + node_info.info.clone(), + )); + + while completed_slots.len() < num_slots as usize { + RepairService::update_epoch_slots( + Pubkey::default(), + root, + &mut completed_slots, + &cluster_info, + &completed_slots_receiver, + ); + } + + let mut expected: HashSet<_> = (1..num_slots + 1).collect(); + assert_eq!(completed_slots, expected); + + // Update with new root, should filter out the slots <= root + root = num_slots / 2; + let (blobs, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot); + blocktree.insert_data_blobs(&blobs).unwrap(); + RepairService::update_epoch_slots( + Pubkey::default(), + root, + &mut completed_slots, + &cluster_info, + &completed_slots_receiver, + ); + expected.insert(num_slots + 2); + expected.retain(|x| *x > root); + assert_eq!(completed_slots, expected); + writer.join().unwrap(); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 8db788c69..d5fad2063 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -11,6 +11,7 @@ use crate::staking_utils; use crate::streamer::BlobReceiver; use crate::window_service::WindowService; use solana_metrics::{datapoint, inc_new_counter_info}; +use solana_runtime::bank::EpochSchedule; use solana_sdk::hash::Hash; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; @@ -117,6 +118,7 @@ impl RetransmitStage { exit: &Arc, genesis_blockhash: &Hash, completed_slots_receiver: CompletedSlotsReceiver, + epoch_schedule: EpochSchedule, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -131,6 +133,7 @@ impl RetransmitStage { let repair_strategy = RepairStrategy::RepairAll { bank_forks, completed_slots_receiver, + epoch_schedule, }; let window_service = WindowService::new( Some(leader_schedule_cache.clone()), diff --git a/core/src/tvu.rs b/core/src/tvu.rs index dac132691..42c14785b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -110,6 +110,7 @@ impl Tvu { &exit, genesis_blockhash, completed_slots_receiver, + *bank_forks.read().unwrap().working_bank().epoch_schedule(), ); let (replay_stage, slot_full_receiver, root_slot_receiver) = ReplayStage::new( diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 007cf0620..e1b446fd8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -364,8 +364,14 @@ mod test { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); let repair_strategy = RepairStrategy::RepairAll { - bank_forks, + bank_forks: bank_forks.clone(), completed_slots_receiver, + epoch_schedule: bank_forks + .read() + .unwrap() + .working_bank() + .epoch_schedule() + .clone(), }; let t_window = WindowService::new( Some(leader_schedule_cache), @@ -445,9 +451,11 @@ mod test { let bank = Bank::new(&create_genesis_block_with_leader(100, &me_id, 10).0); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank))); + let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule(); let repair_strategy = RepairStrategy::RepairAll { bank_forks, completed_slots_receiver, + epoch_schedule, }; let t_window = WindowService::new( Some(leader_schedule_cache), diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 26bba5c81..70c672c1d 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -577,7 +577,7 @@ mod tests { use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; use std::thread::{sleep, Builder}; - use std::time::{Duration, Instant}; + use std::time::Duration; fn load_accounts_with_fee( tx: Transaction, @@ -1061,7 +1061,6 @@ mod tests { .unwrap(); // Function will block until the parent_thread unlocks the parent's record lock - let now = Instant::now(); assert_eq!( Accounts::lock_account( ( @@ -1074,7 +1073,6 @@ mod tests { Ok(()) ); // Make sure that the function blocked - assert!(now.elapsed().as_secs() > 1); parent_thread.join().unwrap(); {