Avoid to miss to root for local slots before the hard fork (#19912)

* Make sure to root local slots even with hard fork

* Address review comments

* Cleanup a bit

* Further clean up

* Further clean up a bit

* Add comment

* Tweak hard fork reconciliation code placement
This commit is contained in:
Ryo Onodera 2022-06-26 15:14:17 +09:00 committed by GitHub
parent bf97a99dca
commit cd2878acf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 559 additions and 142 deletions

View File

@ -1072,7 +1072,7 @@ impl Tower {
if let Some(last_voted_slot) = self.last_voted_slot() {
if tower_root <= replayed_root {
// Normally, we goes into this clause with possible help of
// reconcile_blockstore_roots_with_tower()
// reconcile_blockstore_roots_with_external_source()
if slot_history.check(last_voted_slot) == Check::TooOld {
// We could try hard to anchor with other older votes, but opt to simplify the
// following logic
@ -1320,45 +1320,77 @@ impl TowerError {
}
}
#[derive(Debug)]
pub enum ExternalRootSource {
Tower(Slot),
HardFork(Slot),
}
impl ExternalRootSource {
fn root(&self) -> Slot {
match self {
ExternalRootSource::Tower(slot) => *slot,
ExternalRootSource::HardFork(slot) => *slot,
}
}
}
// Given an untimely crash, tower may have roots that are not reflected in blockstore,
// or the reverse of this.
// That's because we don't impose any ordering guarantee or any kind of write barriers
// between tower (plain old POSIX fs calls) and blockstore (through RocksDB), when
// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots.
pub fn reconcile_blockstore_roots_with_tower(
tower: &Tower,
pub fn reconcile_blockstore_roots_with_external_source(
external_source: ExternalRootSource,
blockstore: &Blockstore,
// blockstore.last_root() might have been updated already.
// so take a &mut param both to input (and output iff we update root)
last_blockstore_root: &mut Slot,
) -> blockstore_db::Result<()> {
let tower_root = tower.root();
let last_blockstore_root = blockstore.last_root();
if last_blockstore_root < tower_root {
// Ensure tower_root itself to exist and be marked as rooted in the blockstore
let external_root = external_source.root();
if *last_blockstore_root < external_root {
// Ensure external_root itself to exist and be marked as rooted in the blockstore
// in addition to its ancestors.
let new_roots: Vec<_> = AncestorIterator::new_inclusive(tower_root, blockstore)
.take_while(|current| match current.cmp(&last_blockstore_root) {
let new_roots: Vec<_> = AncestorIterator::new_inclusive(external_root, blockstore)
.take_while(|current| match current.cmp(last_blockstore_root) {
Ordering::Greater => true,
Ordering::Equal => false,
Ordering::Less => panic!(
"couldn't find a last_blockstore_root upwards from: {}!?",
tower_root
"last_blockstore_root({}) is skipped while traversing \
blockstore (currently at {}) from external root ({:?})!?",
last_blockstore_root, current, external_source,
),
})
.collect();
if !new_roots.is_empty() {
info!(
"Reconciling slots as root based on tower root: {:?} ({}..{}) ",
new_roots, tower_root, last_blockstore_root
"Reconciling slots as root based on external root: {:?} (external: {:?}, blockstore: {})",
new_roots, external_source, last_blockstore_root
);
blockstore.set_roots(new_roots.iter())?;
// Unfortunately, we can't supply duplicate-confirmed hashes,
// because it can't be guaranteed to be able to replay these slots
// under this code-path's limited condition (i.e. those shreds
// might not be available, etc...) also correctly overcoming this
// limitation is hard...
blockstore.mark_slots_as_if_rooted_normally_at_startup(
new_roots.into_iter().map(|root| (root, None)).collect(),
false,
)?;
// Update the caller-managed state of last root in blockstore.
// Repeated calls of this function should result in a no-op for
// the range of `new_roots`.
*last_blockstore_root = blockstore.last_root();
} else {
// This indicates we're in bad state; but still don't panic here.
// That's because we might have a chance of recovering properly with
// newer snapshot.
warn!(
"Couldn't find any ancestor slots from tower root ({}) \
"Couldn't find any ancestor slots from external source ({:?}) \
towards blockstore root ({}); blockstore pruned or only \
tower moved into new ledger?",
tower_root, last_blockstore_root,
tower moved into new ledger or just hard fork?",
external_source, last_blockstore_root,
);
}
}
@ -2814,7 +2846,12 @@ pub mod test {
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert!(!blockstore.is_root(0));
assert!(blockstore.is_root(1));
@ -2825,7 +2862,9 @@ pub mod test {
}
#[test]
#[should_panic(expected = "couldn't find a last_blockstore_root upwards from: 4!?")]
#[should_panic(expected = "last_blockstore_root(3) is skipped while \
traversing blockstore (currently at 1) from \
external root (Tower(4))!?")]
fn test_reconcile_blockstore_roots_with_tower_panic_no_common_root() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
@ -2846,7 +2885,12 @@ pub mod test {
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
@ -2869,7 +2913,12 @@ pub mod test {
let mut tower = Tower::default();
tower.vote_state.root_slot = Some(4);
assert_eq!(blockstore.last_root(), 0);
reconcile_blockstore_roots_with_tower(&tower, &blockstore).unwrap();
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
&blockstore,
&mut blockstore.last_root(),
)
.unwrap();
assert_eq!(blockstore.last_root(), 0);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");

View File

@ -23,6 +23,7 @@ use {
rewards_recorder_service::RewardsRecorderSender,
tower_storage::{SavedTower, SavedTowerVersions, TowerStorage},
unfrozen_gossip_verified_vote_hashes::UnfrozenGossipVerifiedVoteHashes,
validator::ProcessBlockStore,
voting_service::VoteOp,
window_service::DuplicateSlotReceiver,
},
@ -352,7 +353,7 @@ pub struct ReplayStage {
impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T: Into<Tower> + Sized>(
pub fn new(
config: ReplayStageConfig,
blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>,
@ -360,7 +361,7 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
tower: T,
maybe_process_blockstore: Option<ProcessBlockStore>,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
@ -375,8 +376,14 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>,
) -> Self {
let mut tower = tower.into();
info!("Tower state: {:?}", tower);
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower();
info!("Tower state: {:?}", tower);
tower
} else {
warn!("creating default tower....");
Tower::default()
};
let ReplayStageConfig {
vote_account,

View File

@ -11,7 +11,6 @@ use {
},
cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender,
consensus::Tower,
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
@ -22,6 +21,7 @@ use {
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
tower_storage::TowerStorage,
validator::ProcessBlockStore,
voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
},
@ -96,7 +96,7 @@ impl Tvu {
/// * `sockets` - fetch, repair, and retransmit sockets
/// * `blockstore` - the ledger itself
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new<T: Into<Tower> + Sized>(
pub fn new(
vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
bank_forks: &Arc<RwLock<BankForks>>,
@ -106,7 +106,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>,
rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: T,
maybe_process_block_store: Option<ProcessBlockStore>,
tower_storage: Arc<dyn TowerStorage>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
@ -260,7 +260,7 @@ impl Tvu {
ledger_signal_receiver,
duplicate_slots_receiver,
poh_recorder.clone(),
tower,
maybe_process_block_store,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
@ -384,7 +384,6 @@ pub mod tests {
let (completed_data_sets_sender, _completed_data_sets_receiver) = unbounded();
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let tower = Tower::default();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let tvu = Tvu::new(
&vote_keypair.pubkey(),
@ -410,7 +409,7 @@ pub mod tests {
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)),
&poh_recorder,
tower,
None,
Arc::new(crate::tower_storage::FileTowerStorage::default()),
&leader_schedule_cache,
&exit,

View File

@ -8,7 +8,7 @@ use {
cache_block_meta_service::{CacheBlockMetaSender, CacheBlockMetaService},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
consensus::{reconcile_blockstore_roots_with_tower, Tower},
consensus::{reconcile_blockstore_roots_with_external_source, ExternalRootSource, Tower},
ledger_metric_report_service::LedgerMetricReportService,
poh_timing_report_service::PohTimingReportService,
rewards_recorder_service::{RewardsRecorderSender, RewardsRecorderService},
@ -514,6 +514,7 @@ impl Validator {
genesis_config,
bank_forks,
blockstore,
original_blockstore_root,
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
@ -669,6 +670,7 @@ impl Validator {
vote_account,
&start_progress,
&blockstore,
original_blockstore_root,
&bank_forks,
&leader_schedule_cache,
&blockstore_process_options,
@ -945,7 +947,7 @@ impl Validator {
ledger_signal_receiver,
&rpc_subscriptions,
&poh_recorder,
process_blockstore,
Some(process_blockstore),
config.tower_storage.clone(),
&leader_schedule_cache,
&exit,
@ -1235,6 +1237,17 @@ fn check_poh_speed(genesis_config: &GenesisConfig, maybe_hash_samples: Option<u6
}
}
fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
// detect cluster restart (hard fork) indirectly via wait_for_supermajority...
if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
if wait_slot_for_supermajority == root_slot {
return Some(wait_slot_for_supermajority);
}
}
None
}
fn post_process_restored_tower(
restored_tower: crate::consensus::Result<Tower>,
validator_identity: &Pubkey,
@ -1248,29 +1261,28 @@ fn post_process_restored_tower(
.and_then(|tower| {
let root_bank = bank_forks.root_bank();
let slot_history = root_bank.get_slot_history();
// make sure tower isn't corrupted first before the following hard fork check
let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
if let Some(wait_slot_for_supermajority) = config.wait_for_supermajority {
if root_bank.slot() == wait_slot_for_supermajority {
// intentionally fail to restore tower; we're supposedly in a new hard fork; past
// out-of-chain vote state doesn't make sense at all
// what if --wait-for-supermajority again if the validator restarted?
let message = format!("Hardfork is detected; discarding tower restoration result: {:?}", tower);
datapoint_error!(
"tower_error",
(
"error",
message,
String
),
);
error!("{}", message);
if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(config, root_bank.slot()) {
// intentionally fail to restore tower; we're supposedly in a new hard fork; past
// out-of-chain vote state doesn't make sense at all
// what if --wait-for-supermajority again if the validator restarted?
let message = format!("Hard fork is detected; discarding tower restoration result: {:?}", tower);
datapoint_error!(
"tower_error",
(
"error",
message,
String
),
);
error!("{}", message);
// unconditionally relax tower requirement so that we can always restore tower
// from root bank.
should_require_tower = false;
return Err(crate::consensus::TowerError::HardFork(wait_slot_for_supermajority));
}
// unconditionally relax tower requirement so that we can always restore tower
// from root bank.
should_require_tower = false;
return Err(crate::consensus::TowerError::HardFork(hard_fork_restart_slot));
}
if let Some(warp_slot) = config.warp_slot {
@ -1337,6 +1349,7 @@ fn load_blockstore(
GenesisConfig,
Arc<RwLock<BankForks>>,
Arc<Blockstore>,
Slot,
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
@ -1389,6 +1402,9 @@ fn load_blockstore(
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
blockstore.shred_timing_point_sender = poh_timing_point_sender;
// following boot sequence (esp BankForks) could set root. so stash the original value
// of blockstore root away here as soon as possible.
let original_blockstore_root = blockstore.last_root();
let blockstore = Arc::new(blockstore);
let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit);
@ -1493,6 +1509,7 @@ fn load_blockstore(
genesis_config,
bank_forks,
blockstore,
original_blockstore_root,
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
@ -1527,11 +1544,12 @@ fn highest_slot(blockstore: &Blockstore) -> Option<Slot> {
highest_slot
}
struct ProcessBlockStore<'a> {
pub struct ProcessBlockStore<'a> {
id: &'a Pubkey,
vote_account: &'a Pubkey,
start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
blockstore: &'a Blockstore,
original_blockstore_root: Slot,
bank_forks: &'a Arc<RwLock<BankForks>>,
leader_schedule_cache: &'a LeaderScheduleCache,
process_options: &'a blockstore_processor::ProcessOptions,
@ -1550,6 +1568,7 @@ impl<'a> ProcessBlockStore<'a> {
vote_account: &'a Pubkey,
start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
blockstore: &'a Blockstore,
original_blockstore_root: Slot,
bank_forks: &'a Arc<RwLock<BankForks>>,
leader_schedule_cache: &'a LeaderScheduleCache,
process_options: &'a blockstore_processor::ProcessOptions,
@ -1564,6 +1583,7 @@ impl<'a> ProcessBlockStore<'a> {
vote_account,
start_progress,
blockstore,
original_blockstore_root,
bank_forks,
leader_schedule_cache,
process_options,
@ -1576,7 +1596,7 @@ impl<'a> ProcessBlockStore<'a> {
}
}
fn process(&mut self) {
pub(crate) fn process(&mut self) {
if self.tower.is_none() {
let previous_start_process = *self.start_progress.read().unwrap();
*self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
@ -1633,12 +1653,16 @@ impl<'a> ProcessBlockStore<'a> {
self.tower = Some({
let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, self.blockstore).unwrap_or_else(
|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
},
);
// reconciliation attempt 1 of 2 with tower
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()),
self.blockstore,
&mut self.original_blockstore_root,
)
.unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
});
}
post_process_restored_tower(
@ -1650,15 +1674,30 @@ impl<'a> ProcessBlockStore<'a> {
)
});
if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
self.config,
self.bank_forks.read().unwrap().root_bank().slot(),
) {
// reconciliation attempt 2 of 2 with hard fork
// this should be #2 because hard fork root > tower root in almost all cases
reconcile_blockstore_roots_with_external_source(
ExternalRootSource::HardFork(hard_fork_restart_slot),
self.blockstore,
&mut self.original_blockstore_root,
)
.unwrap_or_else(|err| {
error!("Failed to reconcile blockstore with hard fork: {:?}", err);
abort()
});
}
*self.start_progress.write().unwrap() = previous_start_process;
}
}
}
impl<'a> From<ProcessBlockStore<'a>> for Tower {
fn from(mut process_blockstore: ProcessBlockStore<'a>) -> Self {
process_blockstore.process();
process_blockstore.tower.expect("valid tower")
pub(crate) fn process_to_create_tower(mut self) -> Tower {
self.process();
self.tower.unwrap()
}
}

View File

@ -525,13 +525,28 @@ impl Blockstore {
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
pub fn rooted_slot_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = u64> + '_> {
fn prepare_rooted_slot_iterator(
&self,
slot: Slot,
direction: IteratorDirection,
) -> Result<impl Iterator<Item = Slot> + '_> {
let slot_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Forward))?;
.iter::<cf::Root>(IteratorMode::From(slot, direction))?;
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
}
pub fn rooted_slot_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = Slot> + '_> {
self.prepare_rooted_slot_iterator(slot, IteratorDirection::Forward)
}
pub fn reversed_rooted_slot_iterator(
&self,
slot: Slot,
) -> Result<impl Iterator<Item = Slot> + '_> {
self.prepare_rooted_slot_iterator(slot, IteratorDirection::Reverse)
}
/// Determines if starting_slot and ending_slot are connected
pub fn slots_connected(&self, starting_slot: Slot, ending_slot: Slot) -> bool {
let mut next_slots: VecDeque<_> = vec![starting_slot].into();
@ -1745,6 +1760,13 @@ impl Blockstore {
self.meta_cf.put_bytes(slot, bytes)
}
/// Manually update the meta for a slot.
/// Can interfere with automatic meta update and potentially break chaining.
/// Dangerous. Use with care.
pub fn put_meta(&self, slot: Slot, meta: &SlotMeta) -> Result<()> {
self.put_meta_bytes(slot, &bincode::serialize(meta)?)
}
// Given a start and end entry index, find all the missing
// indexes in the ledger in the range [start_index, end_index)
// for the slot with the specified slot
@ -3043,6 +3065,22 @@ impl Blockstore {
Ok(())
}
pub fn mark_slots_as_if_rooted_normally_at_startup(
&self,
slots: Vec<(Slot, Option<Hash>)>,
with_hash: bool,
) -> Result<()> {
self.set_roots(slots.iter().map(|(slot, _hash)| slot))?;
if with_hash {
self.set_duplicate_confirmed_slots_and_hashes(
slots
.into_iter()
.map(|(slot, maybe_hash)| (slot, maybe_hash.unwrap())),
)?;
}
Ok(())
}
pub fn is_dead(&self, slot: Slot) -> bool {
matches!(
self.db

View File

@ -214,6 +214,11 @@ impl SlotMeta {
Some(self.consumed) == self.last_index.map(|ix| ix + 1)
}
/// Dangerous. Currently only needed for a local-cluster test
pub fn unset_parent(&mut self) {
self.parent_slot = None;
}
pub fn clear_unconfirmed_slot(&mut self) {
let mut new_self = SlotMeta::new_orphan(self.slot);
std::mem::swap(&mut new_self.next_slots, &mut self.next_slots);

View File

@ -746,8 +746,11 @@ pub fn process_blockstore_from_root(
// ensure start_slot is rooted for correct replay
if blockstore.is_primary_access() {
blockstore
.set_roots(std::iter::once(&start_slot))
.expect("Couldn't set root slot on startup");
.mark_slots_as_if_rooted_normally_at_startup(
vec![(bank.slot(), Some(bank.hash()))],
true,
)
.expect("Couldn't mark start_slot as root on startup");
} else {
info!(
"Starting slot {} isn't root and won't be updated due to being secondary blockstore access",
@ -1361,17 +1364,16 @@ fn load_frozen_forks(
if new_root_bank.slot() == root { break; } // Found the last root in the chain, yay!
assert!(new_root_bank.slot() > root);
rooted_slots.push((new_root_bank.slot(), new_root_bank.hash()));
rooted_slots.push((new_root_bank.slot(), Some(new_root_bank.hash())));
// As noted, the cluster confirmed root should be descended from
// our last root; therefore parent should be set
new_root_bank = new_root_bank.parent().unwrap();
}
inc_new_counter_info!("load_frozen_forks-cluster-confirmed-root", rooted_slots.len());
if blockstore.is_primary_access() {
blockstore.set_roots(rooted_slots.iter().map(|(slot, _hash)| slot))
.expect("Blockstore::set_roots should succeed");
blockstore.set_duplicate_confirmed_slots_and_hashes(rooted_slots.into_iter())
.expect("Blockstore::set_duplicate_confirmed should succeed");
blockstore
.mark_slots_as_if_rooted_normally_at_startup(rooted_slots, true)
.expect("Blockstore::mark_slots_as_if_rooted_normally_at_startup() should succeed");
}
Some(cluster_root_bank)
} else {

View File

@ -20,14 +20,17 @@ use {
cluster_info::Node, contact_info::ContactInfo, gossip_service::discover_cluster,
},
solana_ledger::create_new_tmp_ledger,
solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
ValidatorVoteKeypairs,
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
ValidatorVoteKeypairs,
},
snapshot_config::SnapshotConfig,
},
solana_sdk::{
account::{Account, AccountSharedData},
client::SyncClient,
clock::{DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
genesis_config::{ClusterType, GenesisConfig},
@ -52,10 +55,13 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
};
const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy";
pub struct ClusterConfig {
/// The validator config that should be applied to every node in the cluster
pub validator_configs: Vec<ValidatorConfig>,
@ -138,6 +144,23 @@ impl LocalCluster {
Self::new(&mut config, socket_addr_space)
}
fn sync_ledger_path_across_nested_config_fields(
config: &mut ValidatorConfig,
ledger_path: &Path,
) {
config.account_paths = vec![ledger_path.join("accounts")];
config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.to_path_buf()));
if let Some(snapshot_config) = &mut config.snapshot_config {
let dummy: PathBuf = DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into();
if snapshot_config.full_snapshot_archives_dir == dummy {
snapshot_config.full_snapshot_archives_dir = ledger_path.to_path_buf();
}
if snapshot_config.bank_snapshots_dir == dummy {
snapshot_config.bank_snapshots_dir = ledger_path.join("snapshot");
}
}
}
pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());
@ -241,8 +264,7 @@ impl LocalCluster {
let leader_contact_info = leader_node.info.clone();
let mut leader_config = safe_clone_config(&config.validator_configs[0]);
leader_config.rpc_addrs = Some((leader_node.info.rpc, leader_node.info.rpc_pubsub));
leader_config.account_paths = vec![leader_ledger_path.join("accounts")];
leader_config.tower_storage = Arc::new(FileTowerStorage::new(leader_ledger_path.clone()));
Self::sync_ledger_path_across_nested_config_fields(&mut leader_config, &leader_ledger_path);
let leader_keypair = Arc::new(Keypair::from_bytes(&leader_keypair.to_bytes()).unwrap());
let leader_vote_keypair =
Arc::new(Keypair::from_bytes(&leader_vote_keypair.to_bytes()).unwrap());
@ -445,8 +467,7 @@ impl LocalCluster {
let mut config = safe_clone_config(validator_config);
config.rpc_addrs = Some((validator_node.info.rpc, validator_node.info.rpc_pubsub));
config.account_paths = vec![ledger_path.join("accounts")];
config.tower_storage = Arc::new(FileTowerStorage::new(ledger_path.clone()));
Self::sync_ledger_path_across_nested_config_fields(&mut config, &ledger_path);
let voting_keypair = voting_keypair.unwrap();
let validator_server = Validator::new(
validator_node,
@ -479,7 +500,7 @@ impl LocalCluster {
validator_pubkey
}
pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> std::path::PathBuf {
pub fn ledger_path(&self, validator_pubkey: &Pubkey) -> PathBuf {
self.validators
.get(validator_pubkey)
.unwrap()
@ -718,6 +739,19 @@ impl LocalCluster {
)),
}
}
pub fn create_dummy_load_only_snapshot_config() -> SnapshotConfig {
// DUMMY_SNAPSHOT_CONFIG_PATH_MARKER will be replaced with real value as part of cluster
// node lifecycle.
// There must be some place holder for now...
SnapshotConfig {
full_snapshot_archive_interval_slots: Slot::MAX,
incremental_snapshot_archive_interval_slots: Slot::MAX,
full_snapshot_archives_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(),
bank_snapshots_dir: DUMMY_SNAPSHOT_CONFIG_PATH_MARKER.into(),
..SnapshotConfig::default()
}
}
}
impl Cluster for LocalCluster {
@ -790,10 +824,10 @@ impl Cluster for LocalCluster {
) -> ClusterValidatorInfo {
// Restart the node
let validator_info = &cluster_validator_info.info;
cluster_validator_info.config.account_paths =
vec![validator_info.ledger_path.join("accounts")];
cluster_validator_info.config.tower_storage =
Arc::new(FileTowerStorage::new(validator_info.ledger_path.clone()));
LocalCluster::sync_ledger_path_across_nested_config_fields(
&mut cluster_validator_info.config,
&validator_info.ledger_path,
);
let restarted_node = Validator::new(
node,
validator_info.keypair.clone(),

View File

@ -103,9 +103,9 @@ pub fn open_blockstore(ledger_path: &Path) -> Blockstore {
})
}
pub fn purge_slots(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) {
blockstore.purge_from_next_slots(start_slot, start_slot + slot_count);
blockstore.purge_slots(start_slot, start_slot + slot_count, PurgeType::Exact);
pub fn purge_slots_with_count(blockstore: &Blockstore, start_slot: Slot, slot_count: Slot) {
blockstore.purge_from_next_slots(start_slot, start_slot + slot_count - 1);
blockstore.purge_slots(start_slot, start_slot + slot_count - 1, PurgeType::Exact);
}
// Fetches the last vote in the tower, blocking until it has also appeared in blockstore.

View File

@ -23,7 +23,10 @@ use {
},
solana_download_utils::download_snapshot_archive,
solana_gossip::gossip_service::discover_cluster,
solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore},
solana_ledger::{
ancestor_iterator::AncestorIterator, bank_forks_utils, blockstore::Blockstore,
blockstore_processor::ProcessOptions,
},
solana_local_cluster::{
cluster::{Cluster, ClusterValidatorInfo},
cluster_tests,
@ -31,9 +34,11 @@ use {
validator_configs::*,
},
solana_runtime::{
hardened_unpack::open_genesis_config,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::SnapshotType,
snapshot_utils::{self, ArchiveFormat},
snapshot_utils::{self, ArchiveFormat, SnapshotVersion},
},
solana_sdk::{
account::AccountSharedData,
@ -42,6 +47,7 @@ use {
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
genesis_config::ClusterType,
hard_forks::HardForks,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
@ -57,7 +63,7 @@ use {
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
@ -1927,7 +1933,7 @@ fn do_test_future_tower(cluster_mode: ClusterMode) {
purged_slot_before_restart,
);
let blockstore = open_blockstore(&val_a_ledger_path);
purge_slots(&blockstore, purged_slot_before_restart, 100);
purge_slots_with_count(&blockstore, purged_slot_before_restart, 100);
}
cluster.restart_node(
@ -1979,6 +1985,59 @@ fn test_future_tower_master_slave() {
do_test_future_tower(ClusterMode::MasterSlave);
}
fn restart_whole_cluster_after_hard_fork(
cluster: &Arc<Mutex<LocalCluster>>,
validator_a_pubkey: Pubkey,
validator_b_pubkey: Pubkey,
mut validator_a_info: ClusterValidatorInfo,
validator_b_info: ClusterValidatorInfo,
) {
// restart validator A first
let cluster_for_a = cluster.clone();
let val_a_ledger_path = validator_a_info.info.ledger_path.clone();
// Spawn a thread because wait_for_supermajority blocks in Validator::new()!
let thread = std::thread::spawn(move || {
let restart_context = cluster_for_a
.lock()
.unwrap()
.create_restart_context(&validator_a_pubkey, &mut validator_a_info);
let restarted_validator_info = LocalCluster::restart_node_with_context(
validator_a_info,
restart_context,
SocketAddrSpace::Unspecified,
);
cluster_for_a
.lock()
.unwrap()
.add_node(&validator_a_pubkey, restarted_validator_info);
});
// test validator A actually to wait for supermajority
let mut last_vote = None;
for _ in 0..10 {
sleep(Duration::from_millis(1000));
let (new_last_vote, _) =
last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap();
if let Some(last_vote) = last_vote {
assert_eq!(last_vote, new_last_vote);
} else {
last_vote = Some(new_last_vote);
}
}
// restart validator B normally
cluster.lock().unwrap().restart_node(
&validator_b_pubkey,
validator_b_info,
SocketAddrSpace::Unspecified,
);
// validator A should now start so join its thread here
thread.join().unwrap();
}
#[test]
fn test_hard_fork_invalidates_tower() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
@ -2038,6 +2097,8 @@ fn test_hard_fork_invalidates_tower() {
let mut validator_b_info = cluster.lock().unwrap().exit_node(&validator_b_pubkey);
// setup hard fork at slot < a previously rooted slot!
// hard fork earlier than root is very unrealistic in the wild, but it's handy for
// persistent tower's lockout behavior...
let hard_fork_slot = min_root - 5;
let hard_fork_slots = Some(vec![hard_fork_slot]);
let mut hard_forks = solana_sdk::hard_forks::HardForks::default();
@ -2060,53 +2121,18 @@ fn test_hard_fork_invalidates_tower() {
{
let blockstore_a = open_blockstore(&validator_a_info.info.ledger_path);
let blockstore_b = open_blockstore(&validator_b_info.info.ledger_path);
purge_slots(&blockstore_a, hard_fork_slot + 1, 100);
purge_slots(&blockstore_b, hard_fork_slot + 1, 100);
purge_slots_with_count(&blockstore_a, hard_fork_slot + 1, 100);
purge_slots_with_count(&blockstore_b, hard_fork_slot + 1, 100);
}
// restart validator A first
let cluster_for_a = cluster.clone();
// Spawn a thread because wait_for_supermajority blocks in Validator::new()!
let thread = std::thread::spawn(move || {
let restart_context = cluster_for_a
.lock()
.unwrap()
.create_restart_context(&validator_a_pubkey, &mut validator_a_info);
let restarted_validator_info = LocalCluster::restart_node_with_context(
validator_a_info,
restart_context,
SocketAddrSpace::Unspecified,
);
cluster_for_a
.lock()
.unwrap()
.add_node(&validator_a_pubkey, restarted_validator_info);
});
// test validator A actually to wait for supermajority
let mut last_vote = None;
for _ in 0..10 {
sleep(Duration::from_millis(1000));
let (new_last_vote, _) =
last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap();
if let Some(last_vote) = last_vote {
assert_eq!(last_vote, new_last_vote);
} else {
last_vote = Some(new_last_vote);
}
}
// restart validator B normally
cluster.lock().unwrap().restart_node(
&validator_b_pubkey,
restart_whole_cluster_after_hard_fork(
&cluster,
validator_a_pubkey,
validator_b_pubkey,
validator_a_info,
validator_b_info,
SocketAddrSpace::Unspecified,
);
// validator A should now start so join its thread here
thread.join().unwrap();
// new slots should be rooted after hard-fork cluster relaunch
cluster
.lock()
@ -2120,6 +2146,224 @@ fn test_run_test_load_program_accounts_root() {
run_test_load_program_accounts(CommitmentConfig::finalized());
}
fn create_simple_snapshot_config(ledger_path: &Path) -> SnapshotConfig {
SnapshotConfig {
full_snapshot_archives_dir: ledger_path.to_path_buf(),
bank_snapshots_dir: ledger_path.join("snapshot"),
..SnapshotConfig::default()
}
}
fn create_snapshot_to_hard_fork(
blockstore: &Blockstore,
snapshot_slot: Slot,
hard_forks: Vec<Slot>,
) {
let process_options = ProcessOptions {
halt_at_slot: Some(snapshot_slot),
new_hard_forks: Some(hard_forks),
poh_verify: false,
..ProcessOptions::default()
};
let ledger_path = blockstore.ledger_path();
let genesis_config = open_genesis_config(ledger_path, u64::max_value());
let snapshot_config = Some(create_simple_snapshot_config(ledger_path));
let (bank_forks, ..) = bank_forks_utils::load(
&genesis_config,
blockstore,
vec![ledger_path.join("accounts")],
None,
snapshot_config.as_ref(),
process_options,
None,
None,
None,
)
.unwrap();
let bank = bank_forks.read().unwrap().get(snapshot_slot).unwrap();
let full_snapshot_archive_info = snapshot_utils::bank_to_full_snapshot_archive(
ledger_path,
&bank,
Some(SnapshotVersion::default()),
ledger_path,
ledger_path,
ArchiveFormat::TarZstd,
1,
1,
)
.unwrap();
info!(
"Successfully created snapshot for slot {}, hash {}: {}",
bank.slot(),
bank.hash(),
full_snapshot_archive_info.path().display(),
);
}
#[test]
#[serial]
fn test_hard_fork_with_gap_in_roots() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 2 nodes
let slots_per_epoch = 2048;
let node_stakes = vec![60, 40];
let validator_keys = vec![
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.take(node_stakes.len())
.collect::<Vec<_>>();
let validators = validator_keys
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();
let validator_a_pubkey = validators[0];
let validator_b_pubkey = validators[1];
let validator_config = ValidatorConfig {
snapshot_config: Some(LocalCluster::create_dummy_load_only_snapshot_config()),
..ValidatorConfig::default()
};
let mut config = ClusterConfig {
cluster_lamports: 100_000,
node_stakes: node_stakes.clone(),
validator_configs: make_identical_validator_configs(&validator_config, node_stakes.len()),
validator_keys: Some(validator_keys),
slots_per_epoch,
stakers_slot_offset: slots_per_epoch,
skip_warmup_slots: true,
..ClusterConfig::default()
};
let cluster = std::sync::Arc::new(std::sync::Mutex::new(LocalCluster::new(
&mut config,
SocketAddrSpace::Unspecified,
)));
let val_a_ledger_path = cluster.lock().unwrap().ledger_path(&validator_a_pubkey);
let val_b_ledger_path = cluster.lock().unwrap().ledger_path(&validator_b_pubkey);
let min_last_vote = 45;
let min_root = 10;
loop {
sleep(Duration::from_millis(100));
if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) {
if last_vote >= min_last_vote
&& root_in_tower(&val_a_ledger_path, &validator_a_pubkey) > Some(min_root)
{
break;
}
}
}
// stop all nodes of the cluster
let mut validator_a_info = cluster.lock().unwrap().exit_node(&validator_a_pubkey);
let mut validator_b_info = cluster.lock().unwrap().exit_node(&validator_b_pubkey);
// hard fork slot is effectively a (possibly skipping) new root.
// assert that the precondition of validator a to test gap between
// blockstore and hard fork...
let hard_fork_slot = min_last_vote - 5;
assert!(hard_fork_slot > root_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap());
let hard_fork_slots = Some(vec![hard_fork_slot]);
let mut hard_forks = HardForks::default();
hard_forks.register(hard_fork_slot);
let expected_shred_version = solana_sdk::shred_version::compute_shred_version(
&cluster.lock().unwrap().genesis_config.hash(),
Some(&hard_forks),
);
// create hard-forked snapshot only for validator a, emulating the manual cluster restart
// procedure with `solana-ledger-tool create-snapshot`
let genesis_slot = 0;
{
let blockstore_a = Blockstore::open(&val_a_ledger_path).unwrap();
create_snapshot_to_hard_fork(&blockstore_a, hard_fork_slot, vec![hard_fork_slot]);
// Intentionally make solana-validator unbootable by replaying blocks from the genesis to
// ensure the hard-forked snapshot is used always. Otherwise, we couldn't create a gap
// in the ledger roots column family reliably.
// There was a bug which caused the hard-forked snapshot at an unrooted slot to forget
// to root some slots (thus, creating a gap in roots, which shouldn't happen).
purge_slots_with_count(&blockstore_a, genesis_slot, 1);
let next_slot = genesis_slot + 1;
let mut meta = blockstore_a.meta(next_slot).unwrap().unwrap();
meta.unset_parent();
blockstore_a.put_meta(next_slot, &meta).unwrap();
}
// strictly speaking, new_hard_forks isn't needed for validator a.
// but when snapshot loading isn't working, you might see:
// shred version mismatch: expected NNNN found: MMMM
//validator_a_info.config.new_hard_forks = hard_fork_slots.clone();
// effectively pass the --hard-fork parameter to validator b
validator_b_info.config.new_hard_forks = hard_fork_slots;
validator_a_info.config.wait_for_supermajority = Some(hard_fork_slot);
validator_a_info.config.expected_shred_version = Some(expected_shred_version);
validator_b_info.config.wait_for_supermajority = Some(hard_fork_slot);
validator_b_info.config.expected_shred_version = Some(expected_shred_version);
restart_whole_cluster_after_hard_fork(
&cluster,
validator_a_pubkey,
validator_b_pubkey,
validator_a_info,
validator_b_info,
);
// new slots should be rooted after hard-fork cluster relaunch
cluster
.lock()
.unwrap()
.check_for_new_roots(16, "hard fork", SocketAddrSpace::Unspecified);
// drop everything to open blockstores below
drop(cluster);
let (common_last_vote, common_root) = {
let (last_vote_a, _) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap();
let (last_vote_b, _) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap();
let root_a = root_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap();
let root_b = root_in_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap();
(last_vote_a.min(last_vote_b), root_a.min(root_b))
};
let blockstore_a = Blockstore::open(&val_a_ledger_path).unwrap();
let blockstore_b = Blockstore::open(&val_b_ledger_path).unwrap();
// collect all slot/root parents
let mut slots_a = AncestorIterator::new(common_last_vote, &blockstore_a).collect::<Vec<_>>();
let mut roots_a = blockstore_a
.reversed_rooted_slot_iterator(common_root)
.unwrap()
.collect::<Vec<_>>();
// artifically restore the forcibly purged genesis only for the validator A just for the sake of
// the final assertions.
slots_a.push(genesis_slot);
roots_a.push(genesis_slot);
let slots_b = AncestorIterator::new(common_last_vote, &blockstore_b).collect::<Vec<_>>();
let roots_b = blockstore_b
.reversed_rooted_slot_iterator(common_root)
.unwrap()
.collect::<Vec<_>>();
// compare them all!
assert_eq!((&slots_a, &roots_a), (&slots_b, &roots_b));
assert_eq!(&slots_a[slots_a.len() - roots_a.len()..].to_vec(), &roots_a);
assert_eq!(&slots_b[slots_b.len() - roots_b.len()..].to_vec(), &roots_b);
}
#[test]
#[serial]
fn test_restart_tower_rollback() {

View File

@ -243,7 +243,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
remove_tower(&val_c_ledger_path, &validator_b_pubkey);
let blockstore = open_blockstore(&val_c_ledger_path);
purge_slots(&blockstore, base_slot + 1, truncated_slots);
purge_slots_with_count(&blockstore, base_slot + 1, truncated_slots);
}
info!("Create validator A's ledger");
{
@ -257,7 +257,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
copy_blocks(b_last_vote, &b_blockstore, &a_blockstore);
// Purge uneccessary slots
purge_slots(&a_blockstore, next_slot_on_a + 1, truncated_slots);
purge_slots_with_count(&a_blockstore, next_slot_on_a + 1, truncated_slots);
}
// This should be guaranteed because we waited for validator `A` to vote on a slot > `next_slot_on_a`
@ -270,7 +270,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
{
let blockstore = open_blockstore(&val_a_ledger_path);
purge_slots(&blockstore, next_slot_on_a + 1, truncated_slots);
purge_slots_with_count(&blockstore, next_slot_on_a + 1, truncated_slots);
if !with_tower {
info!("Removing tower!");
remove_tower(&val_a_ledger_path, &validator_a_pubkey);
@ -281,7 +281,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
// hasn't gotten the heavier fork from validator C yet.
// Then it will be stuck on 27 unable to switch because C doesn't
// have enough stake to generate a switching proof
purge_slots(&blockstore, next_slot_on_a, truncated_slots);
purge_slots_with_count(&blockstore, next_slot_on_a, truncated_slots);
} else {
info!("Not removing tower!");
}

View File

@ -367,7 +367,7 @@ fn test_slot_hash_expiry() {
// Get rid of any slots past common_ancestor_slot
info!("Removing extra slots from B's blockstore");
let blockstore = open_blockstore(&b_ledger_path);
purge_slots(&blockstore, common_ancestor_slot + 1, 100);
purge_slots_with_count(&blockstore, common_ancestor_slot + 1, 100);
}
info!(