Handle errors on replaying ledger properly (#7741)

This commit is contained in:
carllin 2020-01-10 12:16:44 -08:00 committed by GitHub
parent b714a4be63
commit 27d2c0aaf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 202 additions and 72 deletions

View File

@ -28,6 +28,7 @@ use solana_sdk::{
};
use std::{
cell::RefCell,
collections::HashMap,
path::PathBuf,
result,
sync::Arc,
@ -254,7 +255,7 @@ pub enum BlocktreeProcessorError {
/// Callback for accessing bank state while processing the blocktree
pub type ProcessCallback = Arc<dyn Fn(&Bank) -> () + Sync + Send>;
#[derive(Default)]
#[derive(Default, Clone)]
pub struct ProcessOptions {
pub poh_verify: bool,
pub full_leader_cache: bool,
@ -328,7 +329,8 @@ pub fn process_blocktree_from_root(
&mut rooted_path,
opts,
)?;
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip();
let (banks, bank_forks_info): (Vec<_>, Vec<_>) =
fork_info.into_iter().map(|(_, v)| v).unzip();
if banks.is_empty() {
return Err(BlocktreeProcessorError::NoValidForksFound);
}
@ -427,29 +429,33 @@ fn process_bank_0(
BlocktreeProcessorError::FailedToLoadEntries
})?;
verify_and_process_slot_entries(bank0, &entries, bank0.last_blockhash(), opts)?;
verify_and_process_slot_entries(bank0, &entries, bank0.last_blockhash(), opts)
.expect("processing for bank 0 must succceed");
bank0.freeze();
Ok(())
}
// Given a slot, add its children to the pending slots queue if those children slots are
// Given a bank, add its children to the pending slots queue if those children slots are
// complete
fn process_next_slots(
bank: &Arc<Bank>,
meta: &SlotMeta,
blocktree: &Blocktree,
leader_schedule_cache: &LeaderScheduleCache,
pending_slots: &mut Vec<(u64, SlotMeta, Arc<Bank>, Hash)>,
fork_info: &mut Vec<(Arc<Bank>, BankForksInfo)>,
pending_slots: &mut Vec<(SlotMeta, Arc<Bank>, Hash)>,
fork_info: &mut HashMap<u64, (Arc<Bank>, BankForksInfo)>,
) -> result::Result<(), BlocktreeProcessorError> {
if let Some(parent) = bank.parent() {
fork_info.remove(&parent.slot());
}
let bfi = BankForksInfo {
bank_slot: bank.slot(),
};
fork_info.insert(bank.slot(), (bank.clone(), bfi));
if meta.next_slots.is_empty() {
// Reached the end of this fork. Record the final entry height and last entry.hash
let bfi = BankForksInfo {
bank_slot: bank.slot(),
};
fork_info.push((bank.clone(), bfi));
return Ok(());
}
@ -482,17 +488,12 @@ fn process_next_slots(
bank.slot(),
allocated.since(initial_allocation)
);
pending_slots.push((*next_slot, next_meta, next_bank, bank.last_blockhash()));
} else {
let bfi = BankForksInfo {
bank_slot: bank.slot(),
};
fork_info.push((bank.clone(), bfi));
pending_slots.push((next_meta, next_bank, bank.last_blockhash()));
}
}
// Reverse sort by slot, so the next slot to be processed can be popped
pending_slots.sort_by(|a, b| b.0.cmp(&a.0));
pending_slots.sort_by(|a, b| b.1.slot().cmp(&a.1.slot()));
Ok(())
}
@ -505,8 +506,8 @@ fn process_pending_slots(
leader_schedule_cache: &mut LeaderScheduleCache,
rooted_path: &mut Vec<u64>,
opts: &ProcessOptions,
) -> result::Result<Vec<(Arc<Bank>, BankForksInfo)>, BlocktreeProcessorError> {
let mut fork_info = vec![];
) -> result::Result<HashMap<u64, (Arc<Bank>, BankForksInfo)>, BlocktreeProcessorError> {
let mut fork_info = HashMap::new();
let mut last_status_report = Instant::now();
let mut pending_slots = vec![];
let mut last_root_slot = root_bank.slot();
@ -521,8 +522,8 @@ fn process_pending_slots(
let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX);
while !pending_slots.is_empty() {
let (slot, meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
let (meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
let slot = bank.slot();
if last_status_report.elapsed() > Duration::from_secs(2) {
info!(
"processing ledger: slot={}, last root slot={}",
@ -531,27 +532,13 @@ fn process_pending_slots(
last_status_report = Instant::now();
}
if blocktree.is_dead(slot) {
warn!("slot {} is dead", slot);
continue;
}
let allocated = thread_mem_usage::Allocatedp::default();
let initial_allocation = allocated.get();
// Fetch all entries for this slot
let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| {
warn!("Failed to load entries for slot {}: {:?}", slot, err);
BlocktreeProcessorError::FailedToLoadEntries
})?;
if let Err(err) = verify_and_process_slot_entries(&bank, &entries, last_entry_hash, opts) {
warn!("slot {} failed to verify: {}", slot, err);
if process_single_slot(blocktree, &bank, &last_entry_hash, opts).is_err() {
continue;
}
bank.freeze(); // all banks handled by this routine are created from complete slots
if blocktree.is_root(slot) {
let parents = bank.parents().into_iter().map(|b| b.slot()).rev().skip(1);
let parents: Vec<_> = parents.collect();
@ -572,8 +559,6 @@ fn process_pending_slots(
);
if slot >= dev_halt_at_slot {
let bfi = BankForksInfo { bank_slot: slot };
fork_info.push((bank, bfi));
break;
}
@ -590,6 +575,34 @@ fn process_pending_slots(
Ok(fork_info)
}
// Processes and replays the contents of a single slot, returns Error
// if failed to play the slot
fn process_single_slot(
blocktree: &Blocktree,
bank: &Arc<Bank>,
last_entry_hash: &Hash,
opts: &ProcessOptions,
) -> result::Result<(), BlocktreeProcessorError> {
let slot = bank.slot();
// Fetch all entries for this slot
let entries = blocktree.get_slot_entries(slot, 0, None).map_err(|err| {
warn!("Failed to load entries for slot {}: {:?}", slot, err);
BlocktreeProcessorError::FailedToLoadEntries
})?;
// If this errors with a fatal error, should mark the slot as dead so
// validators don't replay this slot and see DuplicateSignature errors
// later in ReplayStage
verify_and_process_slot_entries(&bank, &entries, *last_entry_hash, opts).map_err(|err| {
warn!("slot {} failed to verify: {}", slot, err);
err
})?;
bank.freeze(); // all banks handled by this routine are created from complete slots
Ok(())
}
pub struct TransactionStatusBatch {
pub bank: Arc<Bank>,
pub transactions: Vec<Transaction>,
@ -707,19 +720,17 @@ pub mod tests {
Ok(_)
);
assert_eq!(
process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
}
)
.err(),
Some(BlocktreeProcessorError::NoValidForksFound)
);
let (_bank_forks, bank_forks_info, _) = process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
},
)
.unwrap();
assert_eq!(bank_forks_info, vec![BankForksInfo { bank_slot: 0 }]);
}
#[test]
@ -752,20 +763,18 @@ pub mod tests {
Ok(_)
);
// No valid forks in blocktree, expect a failure
assert_eq!(
process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
}
)
.err(),
Some(BlocktreeProcessorError::NoValidForksFound)
);
// Should return slot 0, the last slot on the fork that is valid
let (_bank_forks, bank_forks_info, _) = process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
},
)
.unwrap();
assert_eq!(bank_forks_info, vec![BankForksInfo { bank_slot: 0 }]);
// Write slot 2 fully
let _last_slot2_entry_hash =
@ -831,10 +840,9 @@ pub mod tests {
poh_verify: true,
..ProcessOptions::default()
};
assert_eq!(
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts).err(),
Some(BlocktreeProcessorError::NoValidForksFound)
);
let (_bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts).unwrap();
assert_eq!(bank_forks_info, vec![BankForksInfo { bank_slot: 0 }]);
}
#[test]
@ -898,7 +906,7 @@ pub mod tests {
..ProcessOptions::default()
};
let (mut _bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts).unwrap();
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts.clone()).unwrap();
assert_eq!(bank_forks_info.len(), 1);
assert_eq!(
@ -907,6 +915,31 @@ pub mod tests {
bank_slot: 0, // slot 1 isn't "full", we stop at slot zero
}
);
/* Add a complete slot such that the tree looks like:
slot 0 (all ticks)
/ \
slot 1 (all ticks but one) slot 3 (all ticks)
|
slot 2 (all ticks)
*/
let opts = ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
};
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 3, 0, blockhash);
// Slot 0 should not show up in the ending bank_forks_info
let (mut _bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts).unwrap();
assert_eq!(bank_forks_info.len(), 1);
assert_eq!(
bank_forks_info[0],
BankForksInfo {
bank_slot: 3, // slot 1 isn't "full", we stop at slot zero
}
);
}
#[test]
@ -1033,9 +1066,10 @@ pub mod tests {
poh_verify: true,
..ProcessOptions::default()
};
let (bank_forks, bank_forks_info, _) =
let (bank_forks, mut bank_forks_info, _) =
process_blocktree(&genesis_config, &blocktree, Vec::new(), opts).unwrap();
bank_forks_info.sort_by(|a, b| a.bank_slot.cmp(&b.bank_slot));
assert_eq!(bank_forks_info.len(), 2); // There are two forks
assert_eq!(
bank_forks_info[0],
@ -1119,6 +1153,102 @@ pub mod tests {
verify_fork_infos(&bank_forks, &bank_forks_info);
}
#[test]
fn test_process_blocktree_with_dead_child() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let ticks_per_slot = genesis_config.ticks_per_slot;
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
debug!("ledger_path: {:?}", ledger_path);
/*
slot 0
|
slot 1
/ \
/ \
slot 2 \
/ \
slot 4 (dead) slot 3
*/
let blocktree = Blocktree::open(&ledger_path).unwrap();
let slot1_blockhash =
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 1, 0, blockhash);
let slot2_blockhash =
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 2, 1, slot1_blockhash);
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 4, 2, slot2_blockhash);
blocktree.set_dead_slot(4).unwrap();
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 3, 1, slot1_blockhash);
let (bank_forks, mut bank_forks_info, _) = process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions::default(),
)
.unwrap();
bank_forks_info.sort_by(|a, b| a.bank_slot.cmp(&b.bank_slot));
assert_eq!(bank_forks_info.len(), 2);
// Should see the parent of the dead child
assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 2 },);
assert_eq!(bank_forks_info[1], BankForksInfo { bank_slot: 3 },);
assert_eq!(
&bank_forks[3]
.parents()
.iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>(),
&[1, 0]
);
assert_eq!(
&bank_forks[2]
.parents()
.iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>(),
&[1, 0]
);
verify_fork_infos(&bank_forks, &bank_forks_info);
}
#[test]
fn test_root_with_all_dead_children() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let ticks_per_slot = genesis_config.ticks_per_slot;
let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config);
debug!("ledger_path: {:?}", ledger_path);
/*
slot 0
/ \
/ \
slot 1 (dead) slot 2 (dead)
*/
let blocktree = Blocktree::open(&ledger_path).unwrap();
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 1, 0, blockhash);
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 2, 0, blockhash);
blocktree.set_dead_slot(1).unwrap();
blocktree.set_dead_slot(2).unwrap();
let (bank_forks, bank_forks_info, _) = process_blocktree(
&genesis_config,
&blocktree,
Vec::new(),
ProcessOptions::default(),
)
.unwrap();
// Should see only the parent of the dead children
println!("{:?}", bank_forks_info);
assert_eq!(bank_forks_info.len(), 1);
assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 0 },);
verify_fork_infos(&bank_forks, &bank_forks_info);
}
#[test]
fn test_process_blocktree_epoch_boundary_root() {
solana_logger::setup();