diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs new file mode 100644 index 000000000..7824d4d41 --- /dev/null +++ b/core/src/accounts_hash_verifier.rs @@ -0,0 +1,196 @@ +// Service to verify accounts hashes with other trusted validator nodes. +// +// Each interval, publish the snapshat hash which is the full accounts state +// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators +// set and halt the node if a mismatch is detected. + +use crate::cluster_info::ClusterInfo; +use solana_ledger::{ + snapshot_package::SnapshotPackage, snapshot_package::SnapshotPackageReceiver, + snapshot_package::SnapshotPackageSender, +}; +use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; +use std::collections::{HashMap, HashSet}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::RecvTimeoutError, + Arc, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub struct AccountsHashVerifier { + t_accounts_hash_verifier: JoinHandle<()>, +} + +impl AccountsHashVerifier { + pub fn new( + snapshot_package_receiver: SnapshotPackageReceiver, + snapshot_package_sender: Option, + exit: &Arc, + cluster_info: &Arc>, + trusted_validators: Option>, + halt_on_trusted_validators_accounts_hash_mismatch: bool, + fault_injection_rate_slots: u64, + ) -> Self { + let exit = exit.clone(); + let cluster_info = cluster_info.clone(); + let t_accounts_hash_verifier = Builder::new() + .name("solana-accounts-hash".to_string()) + .spawn(move || { + let mut hashes = vec![]; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) { + Ok(snapshot_package) => { + Self::process_snapshot( + snapshot_package, + &cluster_info, + &trusted_validators, + halt_on_trusted_validators_accounts_hash_mismatch, + &snapshot_package_sender, + &mut hashes, + &exit, + fault_injection_rate_slots, + ); + } + Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Timeout) => (), + } + } + }) + .unwrap(); + Self { + t_accounts_hash_verifier, + } + } + + fn process_snapshot( + snapshot_package: SnapshotPackage, + cluster_info: &Arc>, + trusted_validators: &Option>, + halt_on_trusted_validator_accounts_hash_mismatch: bool, + snapshot_package_sender: &Option, + hashes: &mut Vec<(Slot, Hash)>, + exit: &Arc, + fault_injection_rate_slots: u64, + ) { + if fault_injection_rate_slots != 0 + && snapshot_package.root % fault_injection_rate_slots == 0 + { + // For testing, publish an invalid hash to gossip. + use rand::{thread_rng, Rng}; + use solana_sdk::hash::extend_and_hash; + warn!("inserting fault at slot: {}", snapshot_package.root); + let rand = thread_rng().gen_range(0, 10); + let hash = extend_and_hash(&snapshot_package.hash, &[rand]); + hashes.push((snapshot_package.root, hash)); + } else { + hashes.push((snapshot_package.root, snapshot_package.hash)); + } + + if halt_on_trusted_validator_accounts_hash_mismatch { + let mut slot_to_hash = HashMap::new(); + for (slot, hash) in hashes.iter() { + slot_to_hash.insert(*slot, *hash); + } + if Self::should_halt(&cluster_info, trusted_validators, &mut slot_to_hash) { + exit.store(true, Ordering::Relaxed); + } + } + if let Some(sender) = snapshot_package_sender.as_ref() { + if sender.send(snapshot_package).is_err() {} + } + + cluster_info + .write() + .unwrap() + .push_accounts_hashes(hashes.clone()); + } + + fn should_halt( + cluster_info: &Arc>, + trusted_validators: &Option>, + slot_to_hash: &mut HashMap, + ) -> bool { + if let Some(trusted_validators) = trusted_validators.as_ref() { + for trusted_validator in trusted_validators { + let cluster_info_r = cluster_info.read().unwrap(); + if let Some(accounts_hashes) = + cluster_info_r.get_accounts_hash_for_node(trusted_validator) + { + for (slot, hash) in accounts_hashes { + if let Some(reference_hash) = slot_to_hash.get(slot) { + if *hash != *reference_hash { + error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})", + trusted_validator, + slot, + hash, + reference_hash, + ); + + return true; + } + } else { + slot_to_hash.insert(*slot, *hash); + } + } + } + } + } + false + } + + pub fn join(self) -> thread::Result<()> { + self.t_accounts_hash_verifier.join() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cluster_info::make_accounts_hashes_message; + use crate::contact_info::ContactInfo; + use solana_sdk::{ + hash::hash, + signature::{Keypair, Signer}, + }; + + #[test] + fn test_should_halt() { + let keypair = Keypair::new(); + + let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); + let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + let cluster_info = Arc::new(RwLock::new(cluster_info)); + + let mut trusted_validators = HashSet::new(); + let mut slot_to_hash = HashMap::new(); + assert!(!AccountsHashVerifier::should_halt( + &cluster_info, + &Some(trusted_validators.clone()), + &mut slot_to_hash, + )); + + let validator1 = Keypair::new(); + let hash1 = hash(&[1]); + let hash2 = hash(&[2]); + { + let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap(); + let mut cluster_info_w = cluster_info.write().unwrap(); + cluster_info_w.push_message(message); + } + slot_to_hash.insert(0, hash2); + trusted_validators.insert(validator1.pubkey()); + assert!(AccountsHashVerifier::should_halt( + &cluster_info, + &Some(trusted_validators.clone()), + &mut slot_to_hash, + )); + } +} diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 113a7dfdd..42db2e4b1 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -48,7 +48,7 @@ use solana_sdk::timing::duration_as_s; use solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH}, pubkey::Pubkey, - signature::{Keypair, Signable, Signature}, + signature::{Keypair, Signable, Signature, Signer}, timing::{duration_as_ms, timestamp}, transaction::Transaction, }; @@ -178,6 +178,14 @@ struct PullData { pub filter: CrdsFilter, } +pub fn make_accounts_hashes_message( + keypair: &Keypair, + accounts_hashes: Vec<(Slot, Hash)>, +) -> Option { + let message = CrdsData::AccountsHashes(SnapshotHash::new(keypair.pubkey(), accounts_hashes)); + Some(CrdsValue::new_signed(message, keypair)) +} + // TODO These messages should go through the gpu pipeline for spam filtering #[derive(Serialize, Deserialize, Debug)] #[allow(clippy::large_enum_variant)] @@ -411,22 +419,37 @@ impl ClusterInfo { } } } - pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { - if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES { + + pub fn push_message(&mut self, message: CrdsValue) { + let now = message.wallclock(); + let id = message.pubkey(); + self.gossip.process_push_message(&id, vec![message], now); + } + + pub fn push_accounts_hashes(&mut self, accounts_hashes: Vec<(Slot, Hash)>) { + if accounts_hashes.len() > MAX_SNAPSHOT_HASHES { warn!( - "snapshot_hashes too large, ignored: {}", - snapshot_hashes.len() + "accounts hashes too large, ignored: {}", + accounts_hashes.len(), ); return; } - let now = timestamp(); - let entry = CrdsValue::new_signed( - CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)), - &self.keypair, - ); - self.gossip - .process_push_message(&self.id(), vec![entry], now); + let message = CrdsData::AccountsHashes(SnapshotHash::new(self.id(), accounts_hashes)); + self.push_message(CrdsValue::new_signed(message, &self.keypair)); + } + + pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { + if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES { + warn!( + "snapshot hashes too large, ignored: {}", + snapshot_hashes.len(), + ); + return; + } + + let message = CrdsData::SnapshotHashes(SnapshotHash::new(self.id(), snapshot_hashes)); + self.push_message(CrdsValue::new_signed(message, &self.keypair)); } pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) { @@ -486,11 +509,19 @@ impl ClusterInfo { .collect() } + pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { + self.gossip + .crds + .table + .get(&CrdsValueLabel::AccountsHashes(*pubkey)) + .map(|x| &x.value.accounts_hash().unwrap().hashes) + } + pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { self.gossip .crds .table - .get(&CrdsValueLabel::SnapshotHash(*pubkey)) + .get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .map(|x| &x.value.snapshot_hash().unwrap().hashes) } @@ -2365,7 +2396,7 @@ mod tests { let payload: Vec = vec![]; let vec_size = serialized_size(&payload).unwrap(); let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size; - let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHash(SnapshotHash { + let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash { from: Pubkey::default(), hashes: vec![], wallclock: 0, @@ -2373,7 +2404,7 @@ mod tests { let mut i = 0; while value.size() <= desired_size { - value.data = CrdsData::SnapshotHash(SnapshotHash { + value.data = CrdsData::SnapshotHashes(SnapshotHash { from: Pubkey::default(), hashes: vec![(0, Hash::default()); i], wallclock: 0, diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index ffae1c751..f7e2ca200 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo; use crate::deprecated; use crate::epoch_slots::EpochSlots; use bincode::{serialize, serialized_size}; +use solana_sdk::timing::timestamp; use solana_sdk::{ clock::Slot, hash::Hash, @@ -67,8 +68,9 @@ pub enum CrdsData { ContactInfo(ContactInfo), Vote(VoteIndex, Vote), LowestSlot(u8, LowestSlot), - SnapshotHash(SnapshotHash), + SnapshotHashes(SnapshotHash), EpochSlots(EpochSlotsIndex, EpochSlots), + AccountsHashes(SnapshotHash), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -79,11 +81,11 @@ pub struct SnapshotHash { } impl SnapshotHash { - pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self { + pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>) -> Self { Self { from, hashes, - wallclock, + wallclock: timestamp(), } } } @@ -134,8 +136,9 @@ pub enum CrdsValueLabel { ContactInfo(Pubkey), Vote(VoteIndex, Pubkey), LowestSlot(Pubkey), - SnapshotHash(Pubkey), + SnapshotHashes(Pubkey), EpochSlots(EpochSlotsIndex, Pubkey), + AccountsHashes(Pubkey), } impl fmt::Display for CrdsValueLabel { @@ -144,8 +147,9 @@ impl fmt::Display for CrdsValueLabel { CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()), CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()), CrdsValueLabel::LowestSlot(_) => write!(f, "LowestSlot({})", self.pubkey()), - CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()), + CrdsValueLabel::SnapshotHashes(_) => write!(f, "SnapshotHash({})", self.pubkey()), CrdsValueLabel::EpochSlots(ix, _) => write!(f, "EpochSlots({}, {})", ix, self.pubkey()), + CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()), } } } @@ -156,8 +160,9 @@ impl CrdsValueLabel { CrdsValueLabel::ContactInfo(p) => *p, CrdsValueLabel::Vote(_, p) => *p, CrdsValueLabel::LowestSlot(p) => *p, - CrdsValueLabel::SnapshotHash(p) => *p, + CrdsValueLabel::SnapshotHashes(p) => *p, CrdsValueLabel::EpochSlots(_, p) => *p, + CrdsValueLabel::AccountsHashes(p) => *p, } } } @@ -183,8 +188,9 @@ impl CrdsValue { CrdsData::ContactInfo(contact_info) => contact_info.wallclock, CrdsData::Vote(_, vote) => vote.wallclock, CrdsData::LowestSlot(_, obj) => obj.wallclock, - CrdsData::SnapshotHash(hash) => hash.wallclock, + CrdsData::SnapshotHashes(hash) => hash.wallclock, CrdsData::EpochSlots(_, p) => p.wallclock, + CrdsData::AccountsHashes(hash) => hash.wallclock, } } pub fn pubkey(&self) -> Pubkey { @@ -192,8 +198,9 @@ impl CrdsValue { CrdsData::ContactInfo(contact_info) => contact_info.id, CrdsData::Vote(_, vote) => vote.from, CrdsData::LowestSlot(_, slots) => slots.from, - CrdsData::SnapshotHash(hash) => hash.from, + CrdsData::SnapshotHashes(hash) => hash.from, CrdsData::EpochSlots(_, p) => p.from, + CrdsData::AccountsHashes(hash) => hash.from, } } pub fn label(&self) -> CrdsValueLabel { @@ -201,8 +208,9 @@ impl CrdsValue { CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()), CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()), CrdsData::LowestSlot(_, _) => CrdsValueLabel::LowestSlot(self.pubkey()), - CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()), + CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()), CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()), + CrdsData::AccountsHashes(_) => CrdsValueLabel::AccountsHashes(self.pubkey()), } } pub fn contact_info(&self) -> Option<&ContactInfo> { @@ -234,7 +242,14 @@ impl CrdsValue { pub fn snapshot_hash(&self) -> Option<&SnapshotHash> { match &self.data { - CrdsData::SnapshotHash(slots) => Some(slots), + CrdsData::SnapshotHashes(slots) => Some(slots), + _ => None, + } + } + + pub fn accounts_hash(&self) -> Option<&SnapshotHash> { + match &self.data { + CrdsData::AccountsHashes(slots) => Some(slots), _ => None, } } @@ -251,7 +266,8 @@ impl CrdsValue { let mut labels = vec![ CrdsValueLabel::ContactInfo(*key), CrdsValueLabel::LowestSlot(*key), - CrdsValueLabel::SnapshotHash(*key), + CrdsValueLabel::SnapshotHashes(*key), + CrdsValueLabel::AccountsHashes(*key), ]; labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key))); labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key))); @@ -302,16 +318,17 @@ mod test { #[test] fn test_labels() { - let mut hits = [false; 3 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; + let mut hits = [false; 4 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize]; // this method should cover all the possible labels for v in &CrdsValue::record_labels(&Pubkey::default()) { match v { CrdsValueLabel::ContactInfo(_) => hits[0] = true, CrdsValueLabel::LowestSlot(_) => hits[1] = true, - CrdsValueLabel::SnapshotHash(_) => hits[2] = true, - CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true, + CrdsValueLabel::SnapshotHashes(_) => hits[2] = true, + CrdsValueLabel::AccountsHashes(_) => hits[3] = true, + CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 4] = true, CrdsValueLabel::EpochSlots(ix, _) => { - hits[*ix as usize + MAX_VOTES as usize + 3] = true + hits[*ix as usize + MAX_VOTES as usize + 4] = true } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 7139e3ddf..75e355219 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,6 +5,7 @@ //! command-line tools to spin up validators and a Rust library //! +pub mod accounts_hash_verifier; pub mod banking_stage; pub mod broadcast_stage; pub mod cluster_info_vote_listener; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3f46b95fa..f31a543e2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -85,7 +85,7 @@ pub struct ReplayStageConfig { pub leader_schedule_cache: Arc, pub slot_full_senders: Vec>, pub latest_root_senders: Vec>, - pub snapshot_package_sender: Option, + pub accounts_hash_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, pub rewards_recorder_sender: Option, @@ -189,7 +189,7 @@ impl ReplayStage { leader_schedule_cache, slot_full_senders, latest_root_senders, - snapshot_package_sender, + accounts_hash_sender, block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -350,7 +350,7 @@ impl ReplayStage { &leader_schedule_cache, &root_bank_sender, &lockouts_sender, - &snapshot_package_sender, + &accounts_hash_sender, &latest_root_senders, &mut earliest_vote_on_fork, )?; @@ -666,7 +666,7 @@ impl ReplayStage { leader_schedule_cache: &Arc, root_bank_sender: &Sender>>, lockouts_sender: &Sender, - snapshot_package_sender: &Option, + accounts_hash_sender: &Option, latest_root_senders: &[Sender], earliest_vote_on_fork: &mut Slot, ) -> Result<()> { @@ -698,7 +698,7 @@ impl ReplayStage { new_root, &bank_forks, progress, - snapshot_package_sender, + accounts_hash_sender, earliest_vote_on_fork, ); latest_root_senders.iter().for_each(|s| { @@ -1167,13 +1167,13 @@ impl ReplayStage { new_root: u64, bank_forks: &RwLock, progress: &mut ProgressMap, - snapshot_package_sender: &Option, + accounts_hash_sender: &Option, earliest_vote_on_fork: &mut u64, ) { bank_forks .write() .unwrap() - .set_root(new_root, snapshot_package_sender); + .set_root(new_root, accounts_hash_sender); let r_bank_forks = bank_forks.read().unwrap(); *earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork); progress.retain(|k, _| r_bank_forks.get(*k).is_some()); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 6814926ee..275a2b32d 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -2,6 +2,7 @@ //! validation pipeline in software. use crate::{ + accounts_hash_verifier::AccountsHashVerifier, blockstream_service::BlockstreamService, cluster_info::ClusterInfo, cluster_info_vote_listener::VoteTracker, @@ -29,6 +30,7 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, }; +use std::collections::HashSet; use std::{ net::UdpSocket, path::PathBuf, @@ -48,6 +50,7 @@ pub struct Tvu { blockstream_service: Option, ledger_cleanup_service: Option, storage_stage: StorageStage, + accounts_hash_verifier: AccountsHashVerifier, } pub struct Sockets { @@ -57,6 +60,16 @@ pub struct Sockets { pub forwards: Vec, } +#[derive(Default)] +pub struct TvuConfig { + pub max_ledger_slots: Option, + pub sigverify_disabled: bool, + pub shred_version: u16, + pub halt_on_trusted_validators_accounts_hash_mismatch: bool, + pub trusted_validators: Option>, + pub accounts_hash_fault_injection_slots: u64, +} + impl Tvu { /// This service receives messages from a leader in the network and processes the transactions /// on the bank state. @@ -75,7 +88,6 @@ impl Tvu { blockstore: Arc, storage_state: &StorageState, blockstream_unix_socket: Option<&PathBuf>, - max_ledger_slots: Option, ledger_signal_receiver: Receiver, subscriptions: &Arc, poh_recorder: &Arc>, @@ -83,13 +95,12 @@ impl Tvu { exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, - sigverify_disabled: bool, cfg: Option>, - shred_version: u16, transaction_status_sender: Option, rewards_recorder_sender: Option, snapshot_package_sender: Option, vote_tracker: Arc, + tvu_config: TvuConfig, ) -> Self { let keypair: Arc = cluster_info .read() @@ -119,7 +130,7 @@ impl Tvu { ); let (verified_sender, verified_receiver) = unbounded(); - let sigverify_stage = if !sigverify_disabled { + let sigverify_stage = if !tvu_config.sigverify_disabled { SigVerifyStage::new( fetch_receiver, verified_sender, @@ -145,12 +156,23 @@ impl Tvu { completed_slots_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), cfg, - shred_version, + tvu_config.shred_version, ); let (blockstream_slot_sender, blockstream_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); + let (accounts_hash_sender, accounts_hash_receiver) = channel(); + let accounts_hash_verifier = AccountsHashVerifier::new( + accounts_hash_receiver, + snapshot_package_sender, + exit, + cluster_info, + tvu_config.trusted_validators.clone(), + tvu_config.halt_on_trusted_validators_accounts_hash_mismatch, + tvu_config.accounts_hash_fault_injection_slots, + ); + let replay_stage_config = ReplayStageConfig { my_pubkey: keypair.pubkey(), vote_account: *vote_account, @@ -160,7 +182,7 @@ impl Tvu { leader_schedule_cache: leader_schedule_cache.clone(), slot_full_senders: vec![blockstream_slot_sender], latest_root_senders: vec![ledger_cleanup_slot_sender], - snapshot_package_sender, + accounts_hash_sender: Some(accounts_hash_sender), block_commitment_cache, transaction_status_sender, rewards_recorder_sender, @@ -188,7 +210,7 @@ impl Tvu { None }; - let ledger_cleanup_service = max_ledger_slots.map(|max_ledger_slots| { + let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| { LedgerCleanupService::new( ledger_cleanup_slot_receiver, blockstore.clone(), @@ -216,6 +238,7 @@ impl Tvu { blockstream_service, ledger_cleanup_service, storage_stage, + accounts_hash_verifier, } } @@ -231,6 +254,7 @@ impl Tvu { self.ledger_cleanup_service.unwrap().join()?; } self.replay_stage.join()?; + self.accounts_hash_verifier.join()?; Ok(()) } } @@ -291,7 +315,6 @@ pub mod tests { blockstore, &StorageState::default(), None, - None, l_receiver, &Arc::new(RpcSubscriptions::new(&exit)), &poh_recorder, @@ -299,13 +322,12 @@ pub mod tests { &exit, completed_slots_receiver, block_commitment_cache, - false, None, - 0, None, None, None, Arc::new(VoteTracker::new(&bank)), + TvuConfig::default(), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index e6b271592..374bdd6d1 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -21,7 +21,7 @@ use crate::{ storage_stage::StorageState, tpu::Tpu, transaction_status_service::TransactionStatusService, - tvu::{Sockets, Tvu}, + tvu::{Sockets, Tvu, TvuConfig}, }; use crossbeam_channel::unbounded; use solana_ledger::{ @@ -77,6 +77,8 @@ pub struct ValidatorConfig { pub wait_for_supermajority: Option, pub new_hard_forks: Option>, pub trusted_validators: Option>, // None = trust all + pub halt_on_trusted_validators_accounts_hash_mismatch: bool, + pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection } impl Default for ValidatorConfig { @@ -100,6 +102,8 @@ impl Default for ValidatorConfig { wait_for_supermajority: None, new_hard_forks: None, trusted_validators: None, + halt_on_trusted_validators_accounts_hash_mismatch: false, + accounts_hash_fault_injection_slots: 0, } } } @@ -414,7 +418,6 @@ impl Validator { blockstore.clone(), &storage_state, config.blockstream_unix_socket.as_ref(), - config.max_ledger_slots, ledger_signal_receiver, &subscriptions, &poh_recorder, @@ -422,13 +425,20 @@ impl Validator { &exit, completed_slots_receiver, block_commitment_cache, - config.dev_sigverify_disabled, config.enable_partition.clone(), - node.info.shred_version, transaction_status_sender.clone(), rewards_recorder_sender, snapshot_package_sender, vote_tracker.clone(), + TvuConfig { + max_ledger_slots: config.max_ledger_slots, + sigverify_disabled: config.dev_sigverify_disabled, + halt_on_trusted_validators_accounts_hash_mismatch: config + .halt_on_trusted_validators_accounts_hash_mismatch, + shred_version: node.info.shred_version, + trusted_validators: config.trusted_validators.clone(), + accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, + }, ); if config.dev_sigverify_disabled { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 2fb1239f8..d49e3c190 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -607,6 +607,95 @@ fn test_softlaunch_operating_mode() { } } +#[test] +#[serial] +fn test_consistency_halt() { + solana_logger::setup(); + let snapshot_interval_slots = 20; + let num_account_paths = 1; + + // Create cluster with a leader producing bad snapshot hashes. + let mut leader_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + leader_snapshot_test_config + .validator_config + .accounts_hash_fault_injection_slots = 40; + + let validator_stake = 10_000; + let config = ClusterConfig { + node_stakes: vec![validator_stake], + cluster_lamports: 100_000, + validator_configs: vec![leader_snapshot_test_config.validator_config.clone()], + ..ClusterConfig::default() + }; + + let mut cluster = LocalCluster::new(&config); + + sleep(Duration::from_millis(5000)); + let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 1).unwrap(); + info!("num_nodes: {}", cluster_nodes.len()); + + // Add a validator with the leader as trusted, it should halt when it detects + // mismatch. + let mut validator_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + + let mut trusted_validators = HashSet::new(); + trusted_validators.insert(cluster_nodes[0].id); + + validator_snapshot_test_config + .validator_config + .trusted_validators = Some(trusted_validators); + validator_snapshot_test_config + .validator_config + .halt_on_trusted_validators_accounts_hash_mismatch = true; + + warn!("adding a validator"); + cluster.add_validator( + &validator_snapshot_test_config.validator_config, + validator_stake as u64, + Arc::new(Keypair::new()), + ); + let num_nodes = 2; + assert_eq!( + discover_cluster(&cluster.entry_point_info.gossip, num_nodes) + .unwrap() + .0 + .len(), + num_nodes + ); + + // Check for only 1 node on the network. + let mut encountered_error = false; + loop { + let discover = discover_cluster(&cluster.entry_point_info.gossip, 2); + match discover { + Err(_) => { + encountered_error = true; + break; + } + Ok(nodes) => { + if nodes.0.len() < 2 { + encountered_error = true; + break; + } + info!("checking cluster for fewer nodes.. {:?}", nodes.0.len()); + } + } + let client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + if let Ok(slot) = client.get_slot() { + if slot > 210 { + break; + } + info!("slot: {}", slot); + } + sleep(Duration::from_millis(1000)); + } + assert!(encountered_error); +} + #[test] #[serial] fn test_snapshot_download() { diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 951bc47f8..407c7187c 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -145,6 +145,9 @@ while [[ -n $1 ]]; do elif [[ $1 = --trusted-validator ]]; then args+=("$1" "$2") shift 2 + elif [[ $1 = --halt-on-trusted-validators-accounts-hash-mismatch ]]; then + args+=("$1") + shift elif [[ $1 = -h ]]; then usage "$@" else diff --git a/validator/src/main.rs b/validator/src/main.rs index a5ace42a7..9bdba70b9 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -663,6 +663,13 @@ pub fn main() { .validator(solana_net_utils::is_host) .help("IP address to bind the RPC port [default: use --bind-address]"), ) + .arg( + clap::Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") + .long("halt-on-trusted-validators-accounts-hash-mismatch") + .requires("trusted_validators") + .takes_value(false) + .help("Abort the validator if a bank hash mismatch is detected within trusted validator set"), + ) .get_matches(); let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new)); @@ -821,6 +828,10 @@ pub fn main() { validator_config.max_ledger_slots = Some(limit_ledger_size); } + if matches.is_present("halt_on_trusted_validators_accounts_hash_mismatch") { + validator_config.halt_on_trusted_validators_accounts_hash_mismatch = true; + } + if matches.value_of("signer_addr").is_some() { warn!("--vote-signer-address ignored"); }