diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 318fd8148..2536aaca5 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -84,6 +84,7 @@ pub struct Blocktree { db: Arc, meta_cf: LedgerColumn, data_cf: LedgerColumn, + dead_slots_cf: LedgerColumn, erasure_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, @@ -97,6 +98,8 @@ pub struct Blocktree { pub const META_CF: &str = "meta"; // Column family for the data in a leader slot pub const DATA_CF: &str = "data"; +// Column family for slots that have been marked as dead +pub const DEAD_SLOTS_CF: &str = "dead_slots"; // Column family for erasure data pub const ERASURE_CF: &str = "erasure"; pub const ERASURE_META_CF: &str = "erasure_meta"; @@ -124,6 +127,9 @@ impl Blocktree { // Create the data column family let data_cf = db.column(); + // Create the dead slots column family + let dead_slots_cf = db.column(); + // Create the erasure column family let erasure_cf = db.column(); @@ -143,6 +149,7 @@ impl Blocktree { db, meta_cf, data_cf, + dead_slots_cf, erasure_cf, erasure_meta_cf, orphans_cf, @@ -808,7 +815,17 @@ impl Blocktree { let result: HashMap> = slots .iter() .zip(slot_metas) - .filter_map(|(height, meta)| meta.map(|meta| (*height, meta.next_slots))) + .filter_map(|(height, meta)| { + meta.map(|meta| { + let valid_next_slots: Vec = meta + .next_slots + .iter() + .cloned() + .filter(|s| !self.is_dead(*s)) + .collect(); + (*height, valid_next_slots) + }) + }) .collect(); Ok(result) @@ -840,6 +857,22 @@ impl Blocktree { Ok(()) } + pub fn is_dead(&self, slot: u64) -> bool { + if let Some(true) = self + .db + .get::(slot) + .expect("fetch from DeadSlots column family failed") + { + true + } else { + false + } + } + + pub fn set_dead_slot(&self, slot: u64) -> Result<()> { + self.dead_slots_cf.put(slot, &true) + } + pub fn get_orphans(&self, max: Option) -> Vec { let mut results = vec![]; diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 1ae0b104a..412f8bd72 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -28,6 +28,10 @@ pub mod columns { /// Data Column pub struct Data; + #[derive(Debug)] + /// Data Column + pub struct DeadSlots; + #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index f64c92ab6..db2768da0 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -100,6 +100,25 @@ impl Column for cf::Data { } } +impl Column for cf::DeadSlots { + const NAME: &'static str = super::DEAD_SLOTS; + type Index = u64; + + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key + } + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } +} + +impl TypedColumn for cf::Root { + type Type = bool; +} + impl Column for cf::Orphans { const NAME: &'static str = super::ORPHANS_CF; type Index = u64; diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 81a2483c0..ef696d773 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,7 +30,7 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; fs::create_dir_all(&path)?; @@ -40,6 +40,7 @@ impl Backend for Rocks { // Column family names let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options()); let data_cf_descriptor = ColumnFamilyDescriptor::new(Data::NAME, get_cf_options()); + let dead_slots_cf_descriptor = ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options()); let erasure_cf_descriptor = ColumnFamilyDescriptor::new(Coding::NAME, get_cf_options()); let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); @@ -49,6 +50,7 @@ impl Backend for Rocks { let cfs = vec![ meta_cf_descriptor, data_cf_descriptor, + dead_slots_cf_descriptor, erasure_cf_descriptor, erasure_meta_cf_descriptor, orphans_cf_descriptor, @@ -62,11 +64,12 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, DeadSlots, ErasureMeta, Orphans, Root, SlotMeta}; vec![ Coding::NAME, ErasureMeta::NAME, + DeadSlots::NAME, Data::NAME, Orphans::NAME, Root::NAME, @@ -161,6 +164,25 @@ impl Column for cf::Data { } } +impl Column for cf::DeadSlots { + const NAME: &'static str = super::DEAD_SLOTS_CF; + type Index = u64; + + fn key(slot: u64) -> Vec { + let mut key = vec![0; 8]; + BigEndian::write_u64(&mut key[..], slot); + key + } + + fn index(key: &[u8]) -> u64 { + BigEndian::read_u64(&key[..8]) + } +} + +impl TypedColumn for cf::DeadSlots { + type Type = bool; +} + impl Column for cf::Orphans { const NAME: &'static str = super::ORPHANS_CF; type Index = u64; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index f71bbbd65..fe4943788 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -13,7 +13,7 @@ use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use solana_metrics::{datapoint_warn, inc_new_counter_error, inc_new_counter_info}; +use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::bank::Bank; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; @@ -58,6 +58,7 @@ struct ForkProgress { last_entry: Hash, num_blobs: usize, started_ms: u64, + is_dead: bool, } impl ForkProgress { pub fn new(last_entry: Hash) -> Self { @@ -65,6 +66,7 @@ impl ForkProgress { last_entry, num_blobs: 0, started_ms: timing::timestamp(), + is_dead: false, } } } @@ -271,23 +273,45 @@ impl ReplayStage { }); } } + + // Returns Some(result) if the `result` is a fatal error, which is an error that will cause a + // bank to be marked as dead/corrupted + fn is_replay_result_fatal(result: &Result<()>) -> bool { + match result { + Err(Error::TransactionError(e)) => { + // Transactions withand transaction errors mean this fork is bogus + let tx_error = Err(e.clone()); + !Bank::can_commit(&tx_error) + } + Err(Error::BlobError(BlobError::VerificationFailed)) => true, + _ => false, + } + } + fn replay_blocktree_into_bank( bank: &Bank, blocktree: &Blocktree, progress: &mut HashMap, ) -> Result<()> { let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?; - let len = entries.len(); let result = Self::replay_entries_into_bank(bank, entries, progress, num); - if result.is_ok() { - trace!("verified entries {}", len); - inc_new_counter_info!("replicate-stage_process_entries", len); - } else { - info!("debug to verify entries {}", len); - //TODO: mark this fork as failed - inc_new_counter_error!("replicate-stage_failed_process_entries", len); + + if Self::is_replay_result_fatal(&result) { + Self::mark_dead_slot(bank.slot(), blocktree, progress); } - Ok(()) + + result + } + + fn mark_dead_slot(slot: u64, blocktree: &Blocktree, progress: &mut HashMap) { + // Remove from progress map so we no longer try to replay this bank + let mut progress_entry = progress + .get_mut(&slot) + .expect("Progress entry must exist after call to replay_entries_into_bank()"); + progress_entry.is_dead = true; + blocktree + .set_dead_slot(slot) + .expect("Failed to mark slot as dead in blocktree"); } #[allow(clippy::too_many_arguments)] @@ -387,16 +411,28 @@ impl ReplayStage { trace!("active banks {:?}", active_banks); for bank_slot in &active_banks { + // If the fork was marked as dead, don't replay it + if progress.get(bank_slot).map(|p| p.is_dead).unwrap_or(false) { + continue; + } + let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); *ticks_per_slot = bank.ticks_per_slot(); - if bank.collector_id() != my_pubkey { - Self::replay_blocktree_into_bank(&bank, &blocktree, progress)?; + if bank.collector_id() != my_pubkey + && Self::is_replay_result_fatal(&Self::replay_blocktree_into_bank( + &bank, &blocktree, progress, + )) + { + // If the bank was corrupted, don't try to run the below logic to check if the + // bank is completed + continue; } let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1; if bank.tick_height() == max_tick_height { Self::process_completed_bank(my_pubkey, bank, slot_full_sender); } } + Ok(()) } @@ -530,6 +566,7 @@ impl ReplayStage { if let Some(last_entry) = entries.last() { bank_progress.last_entry = last_entry.hash; } + result } @@ -546,6 +583,12 @@ impl ReplayStage { last_entry, bank.last_blockhash() ); + + datapoint_error!( + "replay-stage-entry_verification_failure", + ("slot", bank.slot(), i64), + ("last_entry", last_entry.to_string(), String), + ); return Err(Error::BlobError(BlobError::VerificationFailed)); } blocktree_processor::process_entries(bank, entries)?; @@ -619,10 +662,16 @@ impl Service for ReplayStage { mod test { use super::*; use crate::blocktree::get_tmp_ledger_path; + use crate::entry; use crate::genesis_utils::create_genesis_block; use crate::packet::Blob; use crate::replay_stage::ReplayStage; + use solana_runtime::genesis_utils::GenesisBlockInfo; + use solana_sdk::hash::hash; use solana_sdk::hash::Hash; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; + use solana_sdk::transaction::TransactionError; use std::fs::remove_dir_all; use std::sync::{Arc, RwLock}; @@ -681,4 +730,104 @@ mod test { ReplayStage::handle_new_root(&bank_forks, &mut progress); assert!(progress.is_empty()); } + + #[test] + fn test_dead_forks() { + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let GenesisBlockInfo { + genesis_block, + mint_keypair, + .. + } = create_genesis_block(1000); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let mut progress = HashMap::new(); + progress.insert(bank0.slot(), ForkProgress::new(bank0.last_blockhash())); + + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let missing_keypair = Keypair::new(); + + // Insert entry with TransactionError::AccountNotFound error + let account_not_found_blob = entry::next_entry( + &bank0.last_blockhash(), + 1, + vec![ + system_transaction::create_user_account( + &keypair1, + &keypair2.pubkey(), + 2, + bank0.last_blockhash(), + ), // should be fine, + system_transaction::transfer( + &missing_keypair, + &mint_keypair.pubkey(), + 2, + bank0.last_blockhash(), + ), // should cause AccountNotFound error + ], + ) + .to_blob(); + + blocktree + .insert_data_blobs(&[account_not_found_blob]) + .unwrap(); + assert_matches!( + ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress), + Err(Error::TransactionError(TransactionError::AccountNotFound)) + ); + + // Check that the erroring bank was marked as dead in the progress map + assert!(progress + .get(&bank0.slot()) + .map(|b| b.is_dead) + .unwrap_or(false)); + + // Check that the erroring bank was marked as dead in blocktree + assert!(blocktree.is_dead(bank0.slot())); + + // Create new bank + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), bank0.slot() + 1); + progress.insert(bank1.slot(), ForkProgress::new(bank0.last_blockhash())); + let bad_hash = hash(&[2; 30]); + + // Insert entry that causes verification failure + let mut verifcation_failure_blob = entry::next_entry( + // use wrong blockhash + &bad_hash, + 1, + vec![system_transaction::create_user_account( + &keypair1, + &keypair2.pubkey(), + 2, + bank1.last_blockhash(), + )], + ) + .to_blob(); + verifcation_failure_blob.set_slot(1); + verifcation_failure_blob.set_index(0); + verifcation_failure_blob.set_parent(bank0.slot()); + + blocktree + .insert_data_blobs(&[verifcation_failure_blob]) + .unwrap(); + assert_matches!( + ReplayStage::replay_blocktree_into_bank(&bank1, &blocktree, &mut progress), + Err(Error::BlobError(BlobError::VerificationFailed)) + ); + // Check that the erroring bank was marked as dead in the progress map + assert!(progress + .get(&bank1.slot()) + .map(|b| b.is_dead) + .unwrap_or(false)); + + // Check that the erroring bank was marked as dead in blocktree + assert!(blocktree.is_dead(bank1.slot())); + } + + let _ignored = remove_dir_all(&ledger_path); + } } diff --git a/core/tests/local_cluster.rs b/core/tests/local_cluster.rs index bfa18ee06..fd52318dc 100644 --- a/core/tests/local_cluster.rs +++ b/core/tests/local_cluster.rs @@ -222,7 +222,6 @@ fn test_listener_startup() { } #[test] -#[ignore] fn test_fail_entry_verification_leader() { solana_logger::setup(); let num_nodes = 4;