Fix Erasure Index (#7319)

Fix Erasure Index Check to set the erasure presence
This commit is contained in:
carllin 2019-12-09 00:13:36 -08:00 committed by GitHub
parent 43e608af47
commit b55b646f12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 286 additions and 17 deletions

View File

@ -356,10 +356,20 @@ impl Blocktree {
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
pub fn slot_coding_iterator<'a>(
&'a self,
slot: Slot,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + 'a> {
let slot_iterator = self
.db
.iter::<cf::ShredCode>(IteratorMode::From((slot, 0), IteratorDirection::Forward))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
) -> Vec<Shred> {
@ -384,8 +394,8 @@ impl Blocktree {
);
};
let index_meta_entry = index_working_set.get(&slot).expect("Index");
let index = &index_meta_entry.index;
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
// Find shreds for this erasure set and try recovery
@ -412,8 +422,17 @@ impl Blocktree {
});
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if let Some(shred) =
prev_inserted_codes.remove(&(slot, i)).or_else(|| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blocktree
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
@ -449,8 +468,14 @@ impl Blocktree {
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
// Remove saved coding shreds. We don't need these for future recovery
let _ = prev_inserted_codes.remove(&(slot, i));
// Remove saved coding shreds. We don't need these for future recovery.
if prev_inserted_codes.remove(&(slot, i)).is_some() {
// Remove from the index so it doesn't get committed. We know
// this is safe to do because everything in
// `prev_inserted_codes` does not yet exist in blocktree
// (guaranteed by `check_cache_coding_shred`)
index.coding_mut().set_present(i, false);
}
},
);
submit_metrics(false, "complete".into(), 0);
@ -523,7 +548,7 @@ impl Blocktree {
let recovered_data = Self::try_shred_recovery(
&db,
&erasure_metas,
&index_working_set,
&mut index_working_set,
&mut just_inserted_data_shreds,
&mut just_inserted_coding_shreds,
);
@ -680,6 +705,13 @@ impl Blocktree {
);
}
// Should be safe to modify index_meta here. Two cases
// 1) Recovery happens: Then all inserted erasure metas are removed
// from just_received_coding_shreds, and nothing wll be committed by
// `check_insert_coding_shred`, so the coding index meta will not be
// committed
index_meta.coding_mut().set_present(shred_index, true);
just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
@ -2109,10 +2141,12 @@ pub mod tests {
use crate::{
entry::{next_entry, next_entry_mut},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
shred::{max_ticks_per_n_shreds, DataShredHeader},
};
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};
use solana_runtime::bank::Bank;
use solana_sdk::{
hash::{self, Hash},
instruction::CompiledInstruction,
@ -2124,11 +2158,7 @@ pub mod tests {
use std::{iter::FromIterator, time::Duration};
// used for tests only
fn make_slot_entries_with_transactions(
slot: Slot,
parent_slot: Slot,
num_entries: u64,
) -> (Vec<Shred>, Vec<Entry>) {
fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new();
for _ in 0..num_entries {
let transaction = Transaction::new_with_compiled_instructions(
@ -2142,8 +2172,7 @@ pub mod tests {
let mut tick = create_ticks(1, 0, Hash::default());
entries.append(&mut tick);
}
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true, 0);
(shreds, entries)
entries
}
#[test]
@ -4187,8 +4216,8 @@ pub mod tests {
#[test]
fn test_get_confirmed_block() {
let slot = 0;
let (shreds, entries) = make_slot_entries_with_transactions(slot, 0, 100);
let entries = make_slot_entries_with_transactions(100);
let shreds = entries_to_test_shreds(entries.clone(), slot, 0, true, 0);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blocktree::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
@ -4376,4 +4405,236 @@ pub mod tests {
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_recovery() {
let slot = 1;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.unwrap();
let shred_bufs: Vec<_> = data_shreds
.iter()
.map(|shred| shred.payload.clone())
.collect();
// Check all the data shreds were recovered
for (s, buf) in data_shreds.iter().zip(shred_bufs) {
assert_eq!(
blocktree
.get_data_shred(s.slot(), s.index() as u64)
.unwrap()
.unwrap(),
buf
);
}
verify_index_integrity(&blocktree, slot);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_index_integrity() {
let slot = 1;
let num_entries = 100;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, num_entries, 1.0);
assert!(data_shreds.len() > 3);
assert!(coding_shreds.len() > 3);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
// Test inserting all the shreds
let all_shreds: Vec<_> = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect();
blocktree
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test inserting just the codes, enough for recovery
blocktree
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test inserting some codes, but not enough for recovery
blocktree
.insert_shreds(
coding_shreds[..coding_shreds.len() - 1].to_vec(),
Some(&leader_schedule_cache),
false,
)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test inserting just the codes, and some data, enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() - 1].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test inserting some codes, and some data, but enough for recovery
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test inserting all shreds in 2 rounds, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..]
.iter()
.cloned()
.chain(coding_shreds[coding_shreds.len() / 2 - 1..].iter().cloned())
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test not all, but enough data and coding shreds in 2 rounds to trigger recovery,
// make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..data_shreds.len() / 2]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 1..data_shreds.len() / 2]
.iter()
.cloned(),
)
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
// Test insert shreds in 2 rounds, but not enough to trigger
// recovery, make sure nothing is lost
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 2]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 2].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 2..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 2..coding_shreds.len() / 2 - 1]
.iter()
.cloned(),
)
.collect();
blocktree
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blocktree
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blocktree, slot);
blocktree.purge_slots(0, Some(slot));
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
fn setup_erasure_shreds(
slot: u64,
parent_slot: u64,
num_entries: u64,
erasure_rate: f32,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(
slot,
parent_slot,
erasure_rate,
leader_keypair.clone(),
0,
0,
)
.expect("Failed in creating shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new(&genesis_config));
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let fixed_schedule = FixedSchedule {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
start_epoch: 0,
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));
(data_shreds, coding_shreds, Arc::new(leader_schedule_cache))
}
fn verify_index_integrity(blocktree: &Blocktree, slot: u64) {
let index = blocktree.get_index(slot).unwrap().unwrap();
// Test the set of data shreds in the index and in the data column
// family are the same
let data_iter = blocktree.slot_data_iterator(slot).unwrap();
let mut num_data = 0;
for ((slot, index), _) in data_iter {
num_data += 1;
assert!(blocktree.get_data_shred(slot, index).unwrap().is_some());
}
// Test the data index doesn't have anything extra
let num_data_in_index = index.data().num_data();
assert_eq!(num_data_in_index, num_data);
// Test the set of coding shreds in the index and in the coding column
// family are the same
let coding_iter = blocktree.slot_coding_iterator(slot).unwrap();
let mut num_coding = 0;
for ((slot, index), _) in coding_iter {
num_coding += 1;
assert!(blocktree.get_coding_shred(slot, index).unwrap().is_some());
}
// Test the data index doesn't have anything extra
let num_coding_in_index = index.coding().num_coding();
assert_eq!(num_coding_in_index, num_coding);
}
}

View File

@ -97,6 +97,10 @@ impl Index {
}
impl CodingIndex {
pub fn num_coding(&self) -> usize {
self.index.len()
}
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}
@ -121,6 +125,10 @@ impl CodingIndex {
}
impl DataIndex {
pub fn num_data(&self) -> usize {
self.index.len()
}
pub fn present_in_bounds(&self, bounds: impl RangeBounds<u64>) -> usize {
self.index.range(bounds).count()
}