Mark dead forks in replay stage (#4715)
* Add DeadSlots column family * Filter dead forks from get_slots_since * Mark erroring slots as dead in replay stage, add test * Mark dead forks in progress instead of removing them * Fix logging process_entries failures in replay_stage * Unignore test_fail_entry_verification_leader
This commit is contained in:
parent
33d13a3aea
commit
aacb38864c
|
@ -84,6 +84,7 @@ pub struct Blocktree {
|
|||
db: Arc<Database>,
|
||||
meta_cf: LedgerColumn<cf::SlotMeta>,
|
||||
data_cf: LedgerColumn<cf::Data>,
|
||||
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
|
||||
erasure_cf: LedgerColumn<cf::Coding>,
|
||||
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
|
||||
orphans_cf: LedgerColumn<cf::Orphans>,
|
||||
|
@ -97,6 +98,8 @@ pub struct Blocktree {
|
|||
pub const META_CF: &str = "meta";
|
||||
// Column family for the data in a leader slot
|
||||
pub const DATA_CF: &str = "data";
|
||||
// Column family for slots that have been marked as dead
|
||||
pub const DEAD_SLOTS_CF: &str = "dead_slots";
|
||||
// Column family for erasure data
|
||||
pub const ERASURE_CF: &str = "erasure";
|
||||
pub const ERASURE_META_CF: &str = "erasure_meta";
|
||||
|
@ -124,6 +127,9 @@ impl Blocktree {
|
|||
// Create the data column family
|
||||
let data_cf = db.column();
|
||||
|
||||
// Create the dead slots column family
|
||||
let dead_slots_cf = db.column();
|
||||
|
||||
// Create the erasure column family
|
||||
let erasure_cf = db.column();
|
||||
|
||||
|
@ -143,6 +149,7 @@ impl Blocktree {
|
|||
db,
|
||||
meta_cf,
|
||||
data_cf,
|
||||
dead_slots_cf,
|
||||
erasure_cf,
|
||||
erasure_meta_cf,
|
||||
orphans_cf,
|
||||
|
@ -808,7 +815,17 @@ impl Blocktree {
|
|||
let result: HashMap<u64, Vec<u64>> = slots
|
||||
.iter()
|
||||
.zip(slot_metas)
|
||||
.filter_map(|(height, meta)| meta.map(|meta| (*height, meta.next_slots)))
|
||||
.filter_map(|(height, meta)| {
|
||||
meta.map(|meta| {
|
||||
let valid_next_slots: Vec<u64> = meta
|
||||
.next_slots
|
||||
.iter()
|
||||
.cloned()
|
||||
.filter(|s| !self.is_dead(*s))
|
||||
.collect();
|
||||
(*height, valid_next_slots)
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(result)
|
||||
|
@ -840,6 +857,22 @@ impl Blocktree {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_dead(&self, slot: u64) -> bool {
|
||||
if let Some(true) = self
|
||||
.db
|
||||
.get::<cf::DeadSlots>(slot)
|
||||
.expect("fetch from DeadSlots column family failed")
|
||||
{
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_dead_slot(&self, slot: u64) -> Result<()> {
|
||||
self.dead_slots_cf.put(slot, &true)
|
||||
}
|
||||
|
||||
pub fn get_orphans(&self, max: Option<usize>) -> Vec<u64> {
|
||||
let mut results = vec![];
|
||||
|
||||
|
|
|
@ -28,6 +28,10 @@ pub mod columns {
|
|||
/// Data Column
|
||||
pub struct Data;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// Data Column
|
||||
pub struct DeadSlots;
|
||||
|
||||
#[derive(Debug)]
|
||||
/// The erasure meta column
|
||||
pub struct ErasureMeta;
|
||||
|
|
|
@ -100,6 +100,25 @@ impl Column<Kvs> for cf::Data {
|
|||
}
|
||||
}
|
||||
|
||||
impl Column<Kvs> for cf::DeadSlots {
|
||||
const NAME: &'static str = super::DEAD_SLOTS;
|
||||
type Index = u64;
|
||||
|
||||
fn key(slot: u64) -> Key {
|
||||
let mut key = Key::default();
|
||||
BigEndian::write_u64(&mut key.0[8..16], slot);
|
||||
key
|
||||
}
|
||||
|
||||
fn index(key: &Key) -> u64 {
|
||||
BigEndian::read_u64(&key.0[8..16])
|
||||
}
|
||||
}
|
||||
|
||||
impl TypedColumn<Kvs> for cf::Root {
|
||||
type Type = bool;
|
||||
}
|
||||
|
||||
impl Column<Kvs> for cf::Orphans {
|
||||
const NAME: &'static str = super::ORPHANS_CF;
|
||||
type Index = u64;
|
||||
|
|
|
@ -30,7 +30,7 @@ impl Backend for Rocks {
|
|||
type Error = rocksdb::Error;
|
||||
|
||||
fn open(path: &Path) -> Result<Rocks> {
|
||||
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta};
|
||||
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
|
||||
|
||||
fs::create_dir_all(&path)?;
|
||||
|
||||
|
@ -40,6 +40,7 @@ impl Backend for Rocks {
|
|||
// Column family names
|
||||
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
|
||||
let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options());
|
||||
let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
|
||||
let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options());
|
||||
let erasure_meta_cf_descriptor =
|
||||
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
|
||||
|
@ -49,6 +50,7 @@ impl Backend for Rocks {
|
|||
let cfs = vec![
|
||||
meta_cf_descriptor,
|
||||
data_cf_descriptor,
|
||||
dead_slots_cf_descriptor,
|
||||
erasure_cf_descriptor,
|
||||
erasure_meta_cf_descriptor,
|
||||
orphans_cf_descriptor,
|
||||
|
@ -62,11 +64,12 @@ impl Backend for Rocks {
|
|||
}
|
||||
|
||||
fn columns(&self) -> Vec<&'static str> {
|
||||
use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta};
|
||||
use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta};
|
||||
|
||||
vec![
|
||||
Coding::NAME,
|
||||
ErasureMeta::NAME,
|
||||
DeadSlots::NAME,
|
||||
Data::NAME,
|
||||
Orphans::NAME,
|
||||
Root::NAME,
|
||||
|
@ -161,6 +164,25 @@ impl Column<Rocks> for cf::Data {
|
|||
}
|
||||
}
|
||||
|
||||
impl Column<Rocks> for cf::DeadSlots {
|
||||
const NAME: &'static str = super::DEAD_SLOTS_CF;
|
||||
type Index = u64;
|
||||
|
||||
fn key(slot: u64) -> Vec<u8> {
|
||||
let mut key = vec![0; 8];
|
||||
BigEndian::write_u64(&mut key[..], slot);
|
||||
key
|
||||
}
|
||||
|
||||
fn index(key: &[u8]) -> u64 {
|
||||
BigEndian::read_u64(&key[..8])
|
||||
}
|
||||
}
|
||||
|
||||
impl TypedColumn<Rocks> for cf::DeadSlots {
|
||||
type Type = bool;
|
||||
}
|
||||
|
||||
impl Column<Rocks> for cf::Orphans {
|
||||
const NAME: &'static str = super::ORPHANS_CF;
|
||||
type Index = u64;
|
||||
|
|
|
@ -13,7 +13,7 @@ use crate::poh_recorder::PohRecorder;
|
|||
use crate::result::{Error, Result};
|
||||
use crate::rpc_subscriptions::RpcSubscriptions;
|
||||
use crate::service::Service;
|
||||
use solana_metrics::{datapoint_warn, inc_new_counter_error, inc_new_counter_info};
|
||||
use solana_metrics::{datapoint_warn, inc_new_counter_info};
|
||||
use solana_runtime::bank::Bank;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
|
@ -58,6 +58,7 @@ struct ForkProgress {
|
|||
last_entry: Hash,
|
||||
num_blobs: usize,
|
||||
started_ms: u64,
|
||||
is_dead: bool,
|
||||
}
|
||||
impl ForkProgress {
|
||||
pub fn new(last_entry: Hash) -> Self {
|
||||
|
@ -65,6 +66,7 @@ impl ForkProgress {
|
|||
last_entry,
|
||||
num_blobs: 0,
|
||||
started_ms: timing::timestamp(),
|
||||
is_dead: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -271,23 +273,45 @@ impl ReplayStage {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Returns Some(result) if the `result` is a fatal error, which is an error that will cause a
|
||||
// bank to be marked as dead/corrupted
|
||||
fn is_replay_result_fatal(result: &Result<()>) -> bool {
|
||||
match result {
|
||||
Err(Error::TransactionError(e)) => {
|
||||
// Transactions withand transaction errors mean this fork is bogus
|
||||
let tx_error = Err(e.clone());
|
||||
!Bank::can_commit(&tx_error)
|
||||
}
|
||||
Err(Error::BlobError(BlobError::VerificationFailed)) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn replay_blocktree_into_bank(
|
||||
bank: &Bank,
|
||||
blocktree: &Blocktree,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
) -> Result<()> {
|
||||
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
|
||||
let len = entries.len();
|
||||
let result = Self::replay_entries_into_bank(bank, entries, progress, num);
|
||||
if result.is_ok() {
|
||||
trace!("verified entries {}", len);
|
||||
inc_new_counter_info!("replicate-stage_process_entries", len);
|
||||
} else {
|
||||
info!("debug to verify entries {}", len);
|
||||
//TODO: mark this fork as failed
|
||||
inc_new_counter_error!("replicate-stage_failed_process_entries", len);
|
||||
|
||||
if Self::is_replay_result_fatal(&result) {
|
||||
Self::mark_dead_slot(bank.slot(), blocktree, progress);
|
||||
}
|
||||
Ok(())
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn mark_dead_slot(slot: u64, blocktree: &Blocktree, progress: &mut HashMap<u64, ForkProgress>) {
|
||||
// Remove from progress map so we no longer try to replay this bank
|
||||
let mut progress_entry = progress
|
||||
.get_mut(&slot)
|
||||
.expect("Progress entry must exist after call to replay_entries_into_bank()");
|
||||
progress_entry.is_dead = true;
|
||||
blocktree
|
||||
.set_dead_slot(slot)
|
||||
.expect("Failed to mark slot as dead in blocktree");
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
@ -387,16 +411,28 @@ impl ReplayStage {
|
|||
trace!("active banks {:?}", active_banks);
|
||||
|
||||
for bank_slot in &active_banks {
|
||||
// If the fork was marked as dead, don't replay it
|
||||
if progress.get(bank_slot).map(|p| p.is_dead).unwrap_or(false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
|
||||
*ticks_per_slot = bank.ticks_per_slot();
|
||||
if bank.collector_id() != my_pubkey {
|
||||
Self::replay_blocktree_into_bank(&bank, &blocktree, progress)?;
|
||||
if bank.collector_id() != my_pubkey
|
||||
&& Self::is_replay_result_fatal(&Self::replay_blocktree_into_bank(
|
||||
&bank, &blocktree, progress,
|
||||
))
|
||||
{
|
||||
// If the bank was corrupted, don't try to run the below logic to check if the
|
||||
// bank is completed
|
||||
continue;
|
||||
}
|
||||
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
|
||||
if bank.tick_height() == max_tick_height {
|
||||
Self::process_completed_bank(my_pubkey, bank, slot_full_sender);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -530,6 +566,7 @@ impl ReplayStage {
|
|||
if let Some(last_entry) = entries.last() {
|
||||
bank_progress.last_entry = last_entry.hash;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
|
@ -546,6 +583,12 @@ impl ReplayStage {
|
|||
last_entry,
|
||||
bank.last_blockhash()
|
||||
);
|
||||
|
||||
datapoint_error!(
|
||||
"replay-stage-entry_verification_failure",
|
||||
("slot", bank.slot(), i64),
|
||||
("last_entry", last_entry.to_string(), String),
|
||||
);
|
||||
return Err(Error::BlobError(BlobError::VerificationFailed));
|
||||
}
|
||||
blocktree_processor::process_entries(bank, entries)?;
|
||||
|
@ -619,10 +662,16 @@ impl Service for ReplayStage {
|
|||
mod test {
|
||||
use super::*;
|
||||
use crate::blocktree::get_tmp_ledger_path;
|
||||
use crate::entry;
|
||||
use crate::genesis_utils::create_genesis_block;
|
||||
use crate::packet::Blob;
|
||||
use crate::replay_stage::ReplayStage;
|
||||
use solana_runtime::genesis_utils::GenesisBlockInfo;
|
||||
use solana_sdk::hash::hash;
|
||||
use solana_sdk::hash::Hash;
|
||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||
use solana_sdk::system_transaction;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use std::fs::remove_dir_all;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
|
@ -681,4 +730,104 @@ mod test {
|
|||
ReplayStage::handle_new_root(&bank_forks, &mut progress);
|
||||
assert!(progress.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dead_forks() {
|
||||
let ledger_path = get_tmp_ledger_path!();
|
||||
{
|
||||
let blocktree = Arc::new(
|
||||
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
|
||||
);
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
let mut progress = HashMap::new();
|
||||
progress.insert(bank0.slot(), ForkProgress::new(bank0.last_blockhash()));
|
||||
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let missing_keypair = Keypair::new();
|
||||
|
||||
// Insert entry with TransactionError::AccountNotFound error
|
||||
let account_not_found_blob = entry::next_entry(
|
||||
&bank0.last_blockhash(),
|
||||
1,
|
||||
vec![
|
||||
system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
bank0.last_blockhash(),
|
||||
), // should be fine,
|
||||
system_transaction::transfer(
|
||||
&missing_keypair,
|
||||
&mint_keypair.pubkey(),
|
||||
2,
|
||||
bank0.last_blockhash(),
|
||||
), // should cause AccountNotFound error
|
||||
],
|
||||
)
|
||||
.to_blob();
|
||||
|
||||
blocktree
|
||||
.insert_data_blobs(&[account_not_found_blob])
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress),
|
||||
Err(Error::TransactionError(TransactionError::AccountNotFound))
|
||||
);
|
||||
|
||||
// Check that the erroring bank was marked as dead in the progress map
|
||||
assert!(progress
|
||||
.get(&bank0.slot())
|
||||
.map(|b| b.is_dead)
|
||||
.unwrap_or(false));
|
||||
|
||||
// Check that the erroring bank was marked as dead in blocktree
|
||||
assert!(blocktree.is_dead(bank0.slot()));
|
||||
|
||||
// Create new bank
|
||||
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.slot() + 1);
|
||||
progress.insert(bank1.slot(), ForkProgress::new(bank0.last_blockhash()));
|
||||
let bad_hash = hash(&[2; 30]);
|
||||
|
||||
// Insert entry that causes verification failure
|
||||
let mut verifcation_failure_blob = entry::next_entry(
|
||||
// use wrong blockhash
|
||||
&bad_hash,
|
||||
1,
|
||||
vec![system_transaction::create_user_account(
|
||||
&keypair1,
|
||||
&keypair2.pubkey(),
|
||||
2,
|
||||
bank1.last_blockhash(),
|
||||
)],
|
||||
)
|
||||
.to_blob();
|
||||
verifcation_failure_blob.set_slot(1);
|
||||
verifcation_failure_blob.set_index(0);
|
||||
verifcation_failure_blob.set_parent(bank0.slot());
|
||||
|
||||
blocktree
|
||||
.insert_data_blobs(&[verifcation_failure_blob])
|
||||
.unwrap();
|
||||
assert_matches!(
|
||||
ReplayStage::replay_blocktree_into_bank(&bank1, &blocktree, &mut progress),
|
||||
Err(Error::BlobError(BlobError::VerificationFailed))
|
||||
);
|
||||
// Check that the erroring bank was marked as dead in the progress map
|
||||
assert!(progress
|
||||
.get(&bank1.slot())
|
||||
.map(|b| b.is_dead)
|
||||
.unwrap_or(false));
|
||||
|
||||
// Check that the erroring bank was marked as dead in blocktree
|
||||
assert!(blocktree.is_dead(bank1.slot()));
|
||||
}
|
||||
|
||||
let _ignored = remove_dir_all(&ledger_path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,7 +222,6 @@ fn test_listener_startup() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_fail_entry_verification_leader() {
|
||||
solana_logger::setup();
|
||||
let num_nodes = 4;
|
||||
|
|
Loading…
Reference in New Issue