Add blockstore-root-scan for api nodes on boot (#17402)
* Add blockstore-root-scan for api nodes on boot * Ensure cluster-confirmed root and parents are set as root in blockstore in load_frozen_forks() * Plumb rpc-scan-and-fix-roots validator flag
This commit is contained in:
parent
30b60a976b
commit
41ec1c8d50
|
@ -139,6 +139,7 @@ pub struct JsonRpcConfig {
|
||||||
pub rpc_bigtable_timeout: Option<Duration>,
|
pub rpc_bigtable_timeout: Option<Duration>,
|
||||||
pub minimal_api: bool,
|
pub minimal_api: bool,
|
||||||
pub obsolete_v1_7_api: bool,
|
pub obsolete_v1_7_api: bool,
|
||||||
|
pub rpc_scan_and_fix_roots: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
|
|
@ -75,7 +75,7 @@ use std::{
|
||||||
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
sync::atomic::{AtomicBool, AtomicU64, Ordering},
|
||||||
sync::mpsc::Receiver,
|
sync::mpsc::Receiver,
|
||||||
sync::{Arc, Mutex, RwLock},
|
sync::{Arc, Mutex, RwLock},
|
||||||
thread::sleep,
|
thread::{sleep, Builder},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -1091,6 +1091,23 @@ fn new_banks_from_ledger(
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let blockstore = Arc::new(blockstore);
|
||||||
|
let blockstore_root_scan = if config.rpc_addrs.is_some()
|
||||||
|
&& config.rpc_config.enable_rpc_transaction_history
|
||||||
|
&& config.rpc_config.rpc_scan_and_fix_roots
|
||||||
|
{
|
||||||
|
let blockstore = blockstore.clone();
|
||||||
|
let exit = exit.clone();
|
||||||
|
Some(
|
||||||
|
Builder::new()
|
||||||
|
.name("blockstore-root-scan".to_string())
|
||||||
|
.spawn(move || blockstore.scan_and_fix_roots(&exit))
|
||||||
|
.unwrap(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
let process_options = blockstore_processor::ProcessOptions {
|
let process_options = blockstore_processor::ProcessOptions {
|
||||||
bpf_jit: config.bpf_jit,
|
bpf_jit: config.bpf_jit,
|
||||||
poh_verify,
|
poh_verify,
|
||||||
|
@ -1103,7 +1120,6 @@ fn new_banks_from_ledger(
|
||||||
..blockstore_processor::ProcessOptions::default()
|
..blockstore_processor::ProcessOptions::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let blockstore = Arc::new(blockstore);
|
|
||||||
let transaction_history_services =
|
let transaction_history_services =
|
||||||
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
||||||
initialize_rpc_transaction_history_services(
|
initialize_rpc_transaction_history_services(
|
||||||
|
@ -1196,6 +1212,12 @@ fn new_banks_from_ledger(
|
||||||
bank_forks.set_snapshot_config(config.snapshot_config.clone());
|
bank_forks.set_snapshot_config(config.snapshot_config.clone());
|
||||||
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
|
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
|
||||||
|
|
||||||
|
if let Some(blockstore_root_scan) = blockstore_root_scan {
|
||||||
|
if let Err(err) = blockstore_root_scan.join() {
|
||||||
|
warn!("blockstore_root_scan failed to join {:?}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
(
|
(
|
||||||
genesis_config,
|
genesis_config,
|
||||||
bank_forks,
|
bank_forks,
|
||||||
|
|
|
@ -52,6 +52,7 @@ use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
rc::Rc,
|
rc::Rc,
|
||||||
sync::{
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
|
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
|
||||||
Arc, Mutex, RwLock,
|
Arc, Mutex, RwLock,
|
||||||
},
|
},
|
||||||
|
@ -2923,6 +2924,10 @@ impl Blockstore {
|
||||||
self.last_root()
|
self.last_root()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn lowest_cleanup_slot(&self) -> Slot {
|
||||||
|
*self.lowest_cleanup_slot.read().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn storage_size(&self) -> Result<u64> {
|
pub fn storage_size(&self) -> Result<u64> {
|
||||||
self.db.storage_size()
|
self.db.storage_size()
|
||||||
}
|
}
|
||||||
|
@ -2930,6 +2935,50 @@ impl Blockstore {
|
||||||
pub fn is_primary_access(&self) -> bool {
|
pub fn is_primary_access(&self) -> bool {
|
||||||
self.db.is_primary_access()
|
self.db.is_primary_access()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn scan_and_fix_roots(&self, exit: &Arc<AtomicBool>) -> Result<()> {
|
||||||
|
let ancestor_iterator = AncestorIterator::new(self.last_root(), &self)
|
||||||
|
.take_while(|&slot| slot >= self.lowest_cleanup_slot());
|
||||||
|
|
||||||
|
let mut find_missing_roots = Measure::start("find_missing_roots");
|
||||||
|
let mut roots_to_fix = vec![];
|
||||||
|
for slot in ancestor_iterator.filter(|slot| !self.is_root(*slot)) {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
roots_to_fix.push(slot);
|
||||||
|
}
|
||||||
|
find_missing_roots.stop();
|
||||||
|
let mut fix_roots = Measure::start("fix_roots");
|
||||||
|
if !roots_to_fix.is_empty() {
|
||||||
|
info!("{} slots to be rooted", roots_to_fix.len());
|
||||||
|
for chunk in roots_to_fix.chunks(100) {
|
||||||
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
trace!("{:?}", chunk);
|
||||||
|
self.set_roots(&roots_to_fix)?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"No missing roots found in range {} to {}",
|
||||||
|
self.lowest_cleanup_slot(),
|
||||||
|
self.last_root()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
fix_roots.stop();
|
||||||
|
datapoint_info!(
|
||||||
|
"blockstore-scan_and_fix_roots",
|
||||||
|
(
|
||||||
|
"find_missing_roots_us",
|
||||||
|
find_missing_roots.as_us() as i64,
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
("num_roots_to_fix", roots_to_fix.len() as i64, i64),
|
||||||
|
("fix_roots_us", fix_roots.as_us() as i64, i64),
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
|
// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
|
||||||
|
|
|
@ -976,7 +976,7 @@ fn load_frozen_forks(
|
||||||
).and_then(|supermajority_root| {
|
).and_then(|supermajority_root| {
|
||||||
if supermajority_root > *root {
|
if supermajority_root > *root {
|
||||||
// If there's a cluster confirmed root greater than our last
|
// If there's a cluster confirmed root greater than our last
|
||||||
// replayed root, then beccause the cluster confirmed root should
|
// replayed root, then because the cluster confirmed root should
|
||||||
// be descended from our last root, it must exist in `all_banks`
|
// be descended from our last root, it must exist in `all_banks`
|
||||||
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();
|
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();
|
||||||
|
|
||||||
|
@ -984,6 +984,21 @@ fn load_frozen_forks(
|
||||||
// is drastically wrong
|
// is drastically wrong
|
||||||
assert!(cluster_root_bank.ancestors.contains_key(root));
|
assert!(cluster_root_bank.ancestors.contains_key(root));
|
||||||
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
|
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
|
||||||
|
|
||||||
|
// Ensure cluster-confirmed root and parents are set as root in blockstore
|
||||||
|
let mut rooted_slots = vec![];
|
||||||
|
let mut new_root_bank = cluster_root_bank.clone();
|
||||||
|
loop {
|
||||||
|
if new_root_bank.slot() == *root { break; } // Found the last root in the chain, yay!
|
||||||
|
assert!(new_root_bank.slot() > *root);
|
||||||
|
|
||||||
|
rooted_slots.push(new_root_bank.slot());
|
||||||
|
// As noted, the cluster confirmed root should be descended from
|
||||||
|
// our last root; therefore parent should be set
|
||||||
|
new_root_bank = new_root_bank.parent().unwrap();
|
||||||
|
}
|
||||||
|
inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len());
|
||||||
|
blockstore.set_roots(&rooted_slots).expect("Blockstore::set_roots should succeed");
|
||||||
Some(cluster_root_bank)
|
Some(cluster_root_bank)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
|
|
@ -1563,6 +1563,13 @@ pub fn main() {
|
||||||
.default_value(&default_rpc_send_transaction_leader_forward_count)
|
.default_value(&default_rpc_send_transaction_leader_forward_count)
|
||||||
.help("The number of upcoming leaders to which to forward transactions sent via rpc service."),
|
.help("The number of upcoming leaders to which to forward transactions sent via rpc service."),
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("rpc_scan_and_fix_roots")
|
||||||
|
.long("rpc-scan-and-fix-roots")
|
||||||
|
.takes_value(false)
|
||||||
|
.requires("enable_rpc_transaction_history")
|
||||||
|
.help("Verifies blockstore roots on boot and fixes any gaps"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
|
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
|
||||||
.long("halt-on-trusted-validators-accounts-hash-mismatch")
|
.long("halt-on-trusted-validators-accounts-hash-mismatch")
|
||||||
|
@ -2050,6 +2057,7 @@ pub fn main() {
|
||||||
.ok()
|
.ok()
|
||||||
.map(Duration::from_secs),
|
.map(Duration::from_secs),
|
||||||
account_indexes: account_indexes.clone(),
|
account_indexes: account_indexes.clone(),
|
||||||
|
rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"),
|
||||||
},
|
},
|
||||||
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
|
rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| {
|
||||||
(
|
(
|
||||||
|
|
Loading…
Reference in New Issue