diff --git a/core/src/rpc.rs b/core/src/rpc.rs index f6e05b89e6..78521da701 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -139,6 +139,7 @@ pub struct JsonRpcConfig { pub rpc_bigtable_timeout: Option, pub minimal_api: bool, pub obsolete_v1_7_api: bool, + pub rpc_scan_and_fix_roots: bool, } #[derive(Clone)] diff --git a/core/src/validator.rs b/core/src/validator.rs index 5f7f3c1c1e..8f27fac35b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -75,7 +75,7 @@ use std::{ sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::Receiver, sync::{Arc, Mutex, RwLock}, - thread::sleep, + thread::{sleep, Builder}, time::Duration, }; @@ -1091,6 +1091,23 @@ fn new_banks_from_ledger( }); } + let blockstore = Arc::new(blockstore); + let blockstore_root_scan = if config.rpc_addrs.is_some() + && config.rpc_config.enable_rpc_transaction_history + && config.rpc_config.rpc_scan_and_fix_roots + { + let blockstore = blockstore.clone(); + let exit = exit.clone(); + Some( + Builder::new() + .name("blockstore-root-scan".to_string()) + .spawn(move || blockstore.scan_and_fix_roots(&exit)) + .unwrap(), + ) + } else { + None + }; + let process_options = blockstore_processor::ProcessOptions { bpf_jit: config.bpf_jit, poh_verify, @@ -1103,7 +1120,6 @@ fn new_banks_from_ledger( ..blockstore_processor::ProcessOptions::default() }; - let blockstore = Arc::new(blockstore); let transaction_history_services = if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { initialize_rpc_transaction_history_services( @@ -1196,6 +1212,12 @@ fn new_banks_from_ledger( bank_forks.set_snapshot_config(config.snapshot_config.clone()); bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots); + if let Some(blockstore_root_scan) = blockstore_root_scan { + if let Err(err) = blockstore_root_scan.join() { + warn!("blockstore_root_scan failed to join {:?}", err); + } + } + ( genesis_config, bank_forks, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8d5921c7c1..2c207249fe 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -52,6 +52,7 @@ use std::{ path::{Path, PathBuf}, rc::Rc, sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{sync_channel, Receiver, SyncSender, TrySendError}, Arc, Mutex, RwLock, }, @@ -2923,6 +2924,10 @@ impl Blockstore { self.last_root() } + pub fn lowest_cleanup_slot(&self) -> Slot { + *self.lowest_cleanup_slot.read().unwrap() + } + pub fn storage_size(&self) -> Result { self.db.storage_size() } @@ -2930,6 +2935,50 @@ impl Blockstore { pub fn is_primary_access(&self) -> bool { self.db.is_primary_access() } + + pub fn scan_and_fix_roots(&self, exit: &Arc) -> Result<()> { + let ancestor_iterator = AncestorIterator::new(self.last_root(), &self) + .take_while(|&slot| slot >= self.lowest_cleanup_slot()); + + let mut find_missing_roots = Measure::start("find_missing_roots"); + let mut roots_to_fix = vec![]; + for slot in ancestor_iterator.filter(|slot| !self.is_root(*slot)) { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + roots_to_fix.push(slot); + } + find_missing_roots.stop(); + let mut fix_roots = Measure::start("fix_roots"); + if !roots_to_fix.is_empty() { + info!("{} slots to be rooted", roots_to_fix.len()); + for chunk in roots_to_fix.chunks(100) { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + trace!("{:?}", chunk); + self.set_roots(&roots_to_fix)?; + } + } else { + debug!( + "No missing roots found in range {} to {}", + self.lowest_cleanup_slot(), + self.last_root() + ); + } + fix_roots.stop(); + datapoint_info!( + "blockstore-scan_and_fix_roots", + ( + "find_missing_roots_us", + find_missing_roots.as_us() as i64, + i64 + ), + ("num_roots_to_fix", roots_to_fix.len() as i64, i64), + ("fix_roots_us", fix_roots.as_us() as i64, i64), + ); + Ok(()) + } } // Update the `completed_data_indexes` with a new shred `new_shred_index`. If a diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a2b8b97918..1a3de33f34 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -976,7 +976,7 @@ fn load_frozen_forks( ).and_then(|supermajority_root| { if supermajority_root > *root { // If there's a cluster confirmed root greater than our last - // replayed root, then beccause the cluster confirmed root should + // replayed root, then because the cluster confirmed root should // be descended from our last root, it must exist in `all_banks` let cluster_root_bank = all_banks.get(&supermajority_root).unwrap(); @@ -984,6 +984,21 @@ fn load_frozen_forks( // is drastically wrong assert!(cluster_root_bank.ancestors.contains_key(root)); info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot()); + + // Ensure cluster-confirmed root and parents are set as root in blockstore + let mut rooted_slots = vec![]; + let mut new_root_bank = cluster_root_bank.clone(); + loop { + if new_root_bank.slot() == *root { break; } // Found the last root in the chain, yay! + assert!(new_root_bank.slot() > *root); + + rooted_slots.push(new_root_bank.slot()); + // As noted, the cluster confirmed root should be descended from + // our last root; therefore parent should be set + new_root_bank = new_root_bank.parent().unwrap(); + } + inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len()); + blockstore.set_roots(&rooted_slots).expect("Blockstore::set_roots should succeed"); Some(cluster_root_bank) } else { None diff --git a/validator/src/main.rs b/validator/src/main.rs index 777c7e27d9..65cdf5edd2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1563,6 +1563,13 @@ pub fn main() { .default_value(&default_rpc_send_transaction_leader_forward_count) .help("The number of upcoming leaders to which to forward transactions sent via rpc service."), ) + .arg( + Arg::with_name("rpc_scan_and_fix_roots") + .long("rpc-scan-and-fix-roots") + .takes_value(false) + .requires("enable_rpc_transaction_history") + .help("Verifies blockstore roots on boot and fixes any gaps"), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .long("halt-on-trusted-validators-accounts-hash-mismatch") @@ -2050,6 +2057,7 @@ pub fn main() { .ok() .map(Duration::from_secs), account_indexes: account_indexes.clone(), + rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { (