diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index d2a3ab9b4..f1f8fead9 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -356,10 +356,20 @@ impl Blocktree { Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) } + pub fn slot_coding_iterator<'a>( + &'a self, + slot: Slot, + ) -> Result)> + 'a> { + let slot_iterator = self + .db + .iter::(IteratorMode::From((slot, 0), IteratorDirection::Forward))?; + Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot)) + } + fn try_shred_recovery( db: &Database, erasure_metas: &HashMap<(u64, u64), ErasureMeta>, - index_working_set: &HashMap, + index_working_set: &mut HashMap, prev_inserted_datas: &mut HashMap<(u64, u64), Shred>, prev_inserted_codes: &mut HashMap<(u64, u64), Shred>, ) -> Vec { @@ -384,8 +394,8 @@ impl Blocktree { ); }; - let index_meta_entry = index_working_set.get(&slot).expect("Index"); - let index = &index_meta_entry.index; + let index_meta_entry = index_working_set.get_mut(&slot).expect("Index"); + let index = &mut index_meta_entry.index; match erasure_meta.status(&index) { ErasureMetaStatus::CanRecover => { // Find shreds for this erasure set and try recovery @@ -412,8 +422,17 @@ impl Blocktree { }); (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( |i| { - if let Some(shred) = - prev_inserted_codes.remove(&(slot, i)).or_else(|| { + if let Some(shred) = prev_inserted_codes + .remove(&(slot, i)) + .map(|s| { + // Remove from the index so it doesn't get committed. We know + // this is safe to do because everything in + // `prev_inserted_codes` does not yet exist in blocktree + // (guaranteed by `check_cache_coding_shred`) + index.coding_mut().set_present(i, false); + s + }) + .or_else(|| { if index.coding().is_present(i) { let some_code = code_cf .get_bytes((slot, i)) @@ -449,8 +468,14 @@ impl Blocktree { ErasureMetaStatus::DataFull => { (set_index..set_index + erasure_meta.config.num_coding() as u64).for_each( |i| { - // Remove saved coding shreds. We don't need these for future recovery - let _ = prev_inserted_codes.remove(&(slot, i)); + // Remove saved coding shreds. We don't need these for future recovery. + if prev_inserted_codes.remove(&(slot, i)).is_some() { + // Remove from the index so it doesn't get committed. We know + // this is safe to do because everything in + // `prev_inserted_codes` does not yet exist in blocktree + // (guaranteed by `check_cache_coding_shred`) + index.coding_mut().set_present(i, false); + } }, ); submit_metrics(false, "complete".into(), 0); @@ -523,7 +548,7 @@ impl Blocktree { let recovered_data = Self::try_shred_recovery( &db, &erasure_metas, - &index_working_set, + &mut index_working_set, &mut just_inserted_data_shreds, &mut just_inserted_coding_shreds, ); @@ -680,6 +705,13 @@ impl Blocktree { ); } + // Should be safe to modify index_meta here. Two cases + // 1) Recovery happens: Then all inserted erasure metas are removed + // from just_received_coding_shreds, and nothing wll be committed by + // `check_insert_coding_shred`, so the coding index meta will not be + // committed + index_meta.coding_mut().set_present(shred_index, true); + just_received_coding_shreds .entry((slot, shred_index)) .or_insert_with(|| shred); @@ -2109,10 +2141,12 @@ pub mod tests { use crate::{ entry::{next_entry, next_entry_mut}, genesis_utils::{create_genesis_config, GenesisConfigInfo}, + leader_schedule::{FixedSchedule, LeaderSchedule}, shred::{max_ticks_per_n_shreds, DataShredHeader}, }; use itertools::Itertools; use rand::{seq::SliceRandom, thread_rng}; + use solana_runtime::bank::Bank; use solana_sdk::{ hash::{self, Hash}, instruction::CompiledInstruction, @@ -2124,11 +2158,7 @@ pub mod tests { use std::{iter::FromIterator, time::Duration}; // used for tests only - fn make_slot_entries_with_transactions( - slot: Slot, - parent_slot: Slot, - num_entries: u64, - ) -> (Vec, Vec) { + fn make_slot_entries_with_transactions(num_entries: u64) -> Vec { let mut entries: Vec = Vec::new(); for _ in 0..num_entries { let transaction = Transaction::new_with_compiled_instructions( @@ -2142,8 +2172,7 @@ pub mod tests { let mut tick = create_ticks(1, 0, Hash::default()); entries.append(&mut tick); } - let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true, 0); - (shreds, entries) + entries } #[test] @@ -4187,8 +4216,8 @@ pub mod tests { #[test] fn test_get_confirmed_block() { let slot = 0; - let (shreds, entries) = make_slot_entries_with_transactions(slot, 0, 100); - + let entries = make_slot_entries_with_transactions(100); + let shreds = entries_to_test_shreds(entries.clone(), slot, 0, true, 0); let ledger_path = get_tmp_ledger_path!(); let ledger = Blocktree::open(&ledger_path).unwrap(); ledger.insert_shreds(shreds, None, false).unwrap(); @@ -4376,4 +4405,236 @@ pub mod tests { } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + fn test_recovery() { + let slot = 1; + let (data_shreds, coding_shreds, leader_schedule_cache) = + setup_erasure_shreds(slot, 0, 100, 1.0); + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + blocktree + .insert_shreds(coding_shreds, Some(&leader_schedule_cache), false) + .unwrap(); + let shred_bufs: Vec<_> = data_shreds + .iter() + .map(|shred| shred.payload.clone()) + .collect(); + + // Check all the data shreds were recovered + for (s, buf) in data_shreds.iter().zip(shred_bufs) { + assert_eq!( + blocktree + .get_data_shred(s.slot(), s.index() as u64) + .unwrap() + .unwrap(), + buf + ); + } + + verify_index_integrity(&blocktree, slot); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_index_integrity() { + let slot = 1; + let num_entries = 100; + let (data_shreds, coding_shreds, leader_schedule_cache) = + setup_erasure_shreds(slot, 0, num_entries, 1.0); + assert!(data_shreds.len() > 3); + assert!(coding_shreds.len() > 3); + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + // Test inserting all the shreds + let all_shreds: Vec<_> = data_shreds + .iter() + .cloned() + .chain(coding_shreds.iter().cloned()) + .collect(); + blocktree + .insert_shreds(all_shreds, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test inserting just the codes, enough for recovery + blocktree + .insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test inserting some codes, but not enough for recovery + blocktree + .insert_shreds( + coding_shreds[..coding_shreds.len() - 1].to_vec(), + Some(&leader_schedule_cache), + false, + ) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test inserting just the codes, and some data, enough for recovery + let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1] + .iter() + .cloned() + .chain(coding_shreds[..coding_shreds.len() - 1].iter().cloned()) + .collect(); + blocktree + .insert_shreds(shreds, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test inserting some codes, and some data, but enough for recovery + let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] + .iter() + .cloned() + .chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned()) + .collect(); + blocktree + .insert_shreds(shreds, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test inserting all shreds in 2 rounds, make sure nothing is lost + let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] + .iter() + .cloned() + .chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned()) + .collect(); + let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..] + .iter() + .cloned() + .chain(coding_shreds[coding_shreds.len() / 2 - 1..].iter().cloned()) + .collect(); + blocktree + .insert_shreds(shreds1, Some(&leader_schedule_cache), false) + .unwrap(); + blocktree + .insert_shreds(shreds2, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test not all, but enough data and coding shreds in 2 rounds to trigger recovery, + // make sure nothing is lost + let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1] + .iter() + .cloned() + .chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned()) + .collect(); + let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..data_shreds.len() / 2] + .iter() + .cloned() + .chain( + coding_shreds[coding_shreds.len() / 2 - 1..data_shreds.len() / 2] + .iter() + .cloned(), + ) + .collect(); + blocktree + .insert_shreds(shreds1, Some(&leader_schedule_cache), false) + .unwrap(); + blocktree + .insert_shreds(shreds2, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + + // Test insert shreds in 2 rounds, but not enough to trigger + // recovery, make sure nothing is lost + let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 2] + .iter() + .cloned() + .chain(coding_shreds[..coding_shreds.len() / 2 - 2].iter().cloned()) + .collect(); + let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 2..data_shreds.len() / 2 - 1] + .iter() + .cloned() + .chain( + coding_shreds[coding_shreds.len() / 2 - 2..coding_shreds.len() / 2 - 1] + .iter() + .cloned(), + ) + .collect(); + blocktree + .insert_shreds(shreds1, Some(&leader_schedule_cache), false) + .unwrap(); + blocktree + .insert_shreds(shreds2, Some(&leader_schedule_cache), false) + .unwrap(); + verify_index_integrity(&blocktree, slot); + blocktree.purge_slots(0, Some(slot)); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + + fn setup_erasure_shreds( + slot: u64, + parent_slot: u64, + num_entries: u64, + erasure_rate: f32, + ) -> (Vec, Vec, Arc) { + let entries = make_slot_entries_with_transactions(num_entries); + let leader_keypair = Arc::new(Keypair::new()); + let shredder = Shredder::new( + slot, + parent_slot, + erasure_rate, + leader_keypair.clone(), + 0, + 0, + ) + .expect("Failed in creating shredder"); + let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0); + + let genesis_config = create_genesis_config(2).genesis_config; + let bank = Arc::new(Bank::new(&genesis_config)); + let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); + let fixed_schedule = FixedSchedule { + leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![ + leader_keypair.pubkey() + ])), + start_epoch: 0, + }; + leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule)); + + (data_shreds, coding_shreds, Arc::new(leader_schedule_cache)) + } + + fn verify_index_integrity(blocktree: &Blocktree, slot: u64) { + let index = blocktree.get_index(slot).unwrap().unwrap(); + // Test the set of data shreds in the index and in the data column + // family are the same + let data_iter = blocktree.slot_data_iterator(slot).unwrap(); + let mut num_data = 0; + for ((slot, index), _) in data_iter { + num_data += 1; + assert!(blocktree.get_data_shred(slot, index).unwrap().is_some()); + } + + // Test the data index doesn't have anything extra + let num_data_in_index = index.data().num_data(); + assert_eq!(num_data_in_index, num_data); + + // Test the set of coding shreds in the index and in the coding column + // family are the same + let coding_iter = blocktree.slot_coding_iterator(slot).unwrap(); + let mut num_coding = 0; + for ((slot, index), _) in coding_iter { + num_coding += 1; + assert!(blocktree.get_coding_shred(slot, index).unwrap().is_some()); + } + + // Test the data index doesn't have anything extra + let num_coding_in_index = index.coding().num_coding(); + assert_eq!(num_coding_in_index, num_coding); + } } diff --git a/ledger/src/blocktree_meta.rs b/ledger/src/blocktree_meta.rs index b27a14384..5e5e9734b 100644 --- a/ledger/src/blocktree_meta.rs +++ b/ledger/src/blocktree_meta.rs @@ -97,6 +97,10 @@ impl Index { } impl CodingIndex { + pub fn num_coding(&self) -> usize { + self.index.len() + } + pub fn present_in_bounds(&self, bounds: impl RangeBounds) -> usize { self.index.range(bounds).count() } @@ -121,6 +125,10 @@ impl CodingIndex { } impl DataIndex { + pub fn num_data(&self) -> usize { + self.index.len() + } + pub fn present_in_bounds(&self, bounds: impl RangeBounds) -> usize { self.index.range(bounds).count() }