Add Accounts hash consistency halting (#8772)
* Accounts hash consistency halting * Add option to inject account hash faults for testing. Enable option in local cluster test to see that node halts.
This commit is contained in:
parent
eab4fe50a3
commit
dc347dd3d7
|
@ -0,0 +1,196 @@
|
|||
// Service to verify accounts hashes with other trusted validator nodes.
|
||||
//
|
||||
// Each interval, publish the snapshat hash which is the full accounts state
|
||||
// hash on gossip. Monitor gossip for messages from validators in the --trusted-validators
|
||||
// set and halt the node if a mismatch is detected.
|
||||
|
||||
use crate::cluster_info::ClusterInfo;
|
||||
use solana_ledger::{
|
||||
snapshot_package::SnapshotPackage, snapshot_package::SnapshotPackageReceiver,
|
||||
snapshot_package::SnapshotPackageSender,
|
||||
};
|
||||
use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::RecvTimeoutError,
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
pub struct AccountsHashVerifier {
|
||||
t_accounts_hash_verifier: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl AccountsHashVerifier {
|
||||
pub fn new(
|
||||
snapshot_package_receiver: SnapshotPackageReceiver,
|
||||
snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: Option<HashSet<Pubkey>>,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
||||
fault_injection_rate_slots: u64,
|
||||
) -> Self {
|
||||
let exit = exit.clone();
|
||||
let cluster_info = cluster_info.clone();
|
||||
let t_accounts_hash_verifier = Builder::new()
|
||||
.name("solana-accounts-hash".to_string())
|
||||
.spawn(move || {
|
||||
let mut hashes = vec![];
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
|
||||
match snapshot_package_receiver.recv_timeout(Duration::from_secs(1)) {
|
||||
Ok(snapshot_package) => {
|
||||
Self::process_snapshot(
|
||||
snapshot_package,
|
||||
&cluster_info,
|
||||
&trusted_validators,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch,
|
||||
&snapshot_package_sender,
|
||||
&mut hashes,
|
||||
&exit,
|
||||
fault_injection_rate_slots,
|
||||
);
|
||||
}
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => (),
|
||||
}
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
Self {
|
||||
t_accounts_hash_verifier,
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshot(
|
||||
snapshot_package: SnapshotPackage,
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: &Option<HashSet<Pubkey>>,
|
||||
halt_on_trusted_validator_accounts_hash_mismatch: bool,
|
||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||
hashes: &mut Vec<(Slot, Hash)>,
|
||||
exit: &Arc<AtomicBool>,
|
||||
fault_injection_rate_slots: u64,
|
||||
) {
|
||||
if fault_injection_rate_slots != 0
|
||||
&& snapshot_package.root % fault_injection_rate_slots == 0
|
||||
{
|
||||
// For testing, publish an invalid hash to gossip.
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_sdk::hash::extend_and_hash;
|
||||
warn!("inserting fault at slot: {}", snapshot_package.root);
|
||||
let rand = thread_rng().gen_range(0, 10);
|
||||
let hash = extend_and_hash(&snapshot_package.hash, &[rand]);
|
||||
hashes.push((snapshot_package.root, hash));
|
||||
} else {
|
||||
hashes.push((snapshot_package.root, snapshot_package.hash));
|
||||
}
|
||||
|
||||
if halt_on_trusted_validator_accounts_hash_mismatch {
|
||||
let mut slot_to_hash = HashMap::new();
|
||||
for (slot, hash) in hashes.iter() {
|
||||
slot_to_hash.insert(*slot, *hash);
|
||||
}
|
||||
if Self::should_halt(&cluster_info, trusted_validators, &mut slot_to_hash) {
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
if let Some(sender) = snapshot_package_sender.as_ref() {
|
||||
if sender.send(snapshot_package).is_err() {}
|
||||
}
|
||||
|
||||
cluster_info
|
||||
.write()
|
||||
.unwrap()
|
||||
.push_accounts_hashes(hashes.clone());
|
||||
}
|
||||
|
||||
fn should_halt(
|
||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||
trusted_validators: &Option<HashSet<Pubkey>>,
|
||||
slot_to_hash: &mut HashMap<Slot, Hash>,
|
||||
) -> bool {
|
||||
if let Some(trusted_validators) = trusted_validators.as_ref() {
|
||||
for trusted_validator in trusted_validators {
|
||||
let cluster_info_r = cluster_info.read().unwrap();
|
||||
if let Some(accounts_hashes) =
|
||||
cluster_info_r.get_accounts_hash_for_node(trusted_validator)
|
||||
{
|
||||
for (slot, hash) in accounts_hashes {
|
||||
if let Some(reference_hash) = slot_to_hash.get(slot) {
|
||||
if *hash != *reference_hash {
|
||||
error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})",
|
||||
trusted_validator,
|
||||
slot,
|
||||
hash,
|
||||
reference_hash,
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
slot_to_hash.insert(*slot, *hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn join(self) -> thread::Result<()> {
|
||||
self.t_accounts_hash_verifier.join()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::cluster_info::make_accounts_hashes_message;
|
||||
use crate::contact_info::ContactInfo;
|
||||
use solana_sdk::{
|
||||
hash::hash,
|
||||
signature::{Keypair, Signer},
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_should_halt() {
|
||||
let keypair = Keypair::new();
|
||||
|
||||
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
|
||||
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
|
||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||
|
||||
let mut trusted_validators = HashSet::new();
|
||||
let mut slot_to_hash = HashMap::new();
|
||||
assert!(!AccountsHashVerifier::should_halt(
|
||||
&cluster_info,
|
||||
&Some(trusted_validators.clone()),
|
||||
&mut slot_to_hash,
|
||||
));
|
||||
|
||||
let validator1 = Keypair::new();
|
||||
let hash1 = hash(&[1]);
|
||||
let hash2 = hash(&[2]);
|
||||
{
|
||||
let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap();
|
||||
let mut cluster_info_w = cluster_info.write().unwrap();
|
||||
cluster_info_w.push_message(message);
|
||||
}
|
||||
slot_to_hash.insert(0, hash2);
|
||||
trusted_validators.insert(validator1.pubkey());
|
||||
assert!(AccountsHashVerifier::should_halt(
|
||||
&cluster_info,
|
||||
&Some(trusted_validators.clone()),
|
||||
&mut slot_to_hash,
|
||||
));
|
||||
}
|
||||
}
|
|
@ -48,7 +48,7 @@ use solana_sdk::timing::duration_as_s;
|
|||
use solana_sdk::{
|
||||
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signable, Signature},
|
||||
signature::{Keypair, Signable, Signature, Signer},
|
||||
timing::{duration_as_ms, timestamp},
|
||||
transaction::Transaction,
|
||||
};
|
||||
|
@ -178,6 +178,14 @@ struct PullData {
|
|||
pub filter: CrdsFilter,
|
||||
}
|
||||
|
||||
pub fn make_accounts_hashes_message(
|
||||
keypair: &Keypair,
|
||||
accounts_hashes: Vec<(Slot, Hash)>,
|
||||
) -> Option<CrdsValue> {
|
||||
let message = CrdsData::AccountsHashes(SnapshotHash::new(keypair.pubkey(), accounts_hashes));
|
||||
Some(CrdsValue::new_signed(message, keypair))
|
||||
}
|
||||
|
||||
// TODO These messages should go through the gpu pipeline for spam filtering
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
|
@ -411,22 +419,37 @@ impl ClusterInfo {
|
|||
}
|
||||
}
|
||||
}
|
||||
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
|
||||
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES {
|
||||
|
||||
pub fn push_message(&mut self, message: CrdsValue) {
|
||||
let now = message.wallclock();
|
||||
let id = message.pubkey();
|
||||
self.gossip.process_push_message(&id, vec![message], now);
|
||||
}
|
||||
|
||||
pub fn push_accounts_hashes(&mut self, accounts_hashes: Vec<(Slot, Hash)>) {
|
||||
if accounts_hashes.len() > MAX_SNAPSHOT_HASHES {
|
||||
warn!(
|
||||
"snapshot_hashes too large, ignored: {}",
|
||||
snapshot_hashes.len()
|
||||
"accounts hashes too large, ignored: {}",
|
||||
accounts_hashes.len(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let now = timestamp();
|
||||
let entry = CrdsValue::new_signed(
|
||||
CrdsData::SnapshotHash(SnapshotHash::new(self.id(), snapshot_hashes, now)),
|
||||
&self.keypair,
|
||||
);
|
||||
self.gossip
|
||||
.process_push_message(&self.id(), vec![entry], now);
|
||||
let message = CrdsData::AccountsHashes(SnapshotHash::new(self.id(), accounts_hashes));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
||||
}
|
||||
|
||||
pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) {
|
||||
if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES {
|
||||
warn!(
|
||||
"snapshot hashes too large, ignored: {}",
|
||||
snapshot_hashes.len(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let message = CrdsData::SnapshotHashes(SnapshotHash::new(self.id(), snapshot_hashes));
|
||||
self.push_message(CrdsValue::new_signed(message, &self.keypair));
|
||||
}
|
||||
|
||||
pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) {
|
||||
|
@ -486,11 +509,19 @@ impl ClusterInfo {
|
|||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> {
|
||||
self.gossip
|
||||
.crds
|
||||
.table
|
||||
.get(&CrdsValueLabel::AccountsHashes(*pubkey))
|
||||
.map(|x| &x.value.accounts_hash().unwrap().hashes)
|
||||
}
|
||||
|
||||
pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> {
|
||||
self.gossip
|
||||
.crds
|
||||
.table
|
||||
.get(&CrdsValueLabel::SnapshotHash(*pubkey))
|
||||
.get(&CrdsValueLabel::SnapshotHashes(*pubkey))
|
||||
.map(|x| &x.value.snapshot_hash().unwrap().hashes)
|
||||
}
|
||||
|
||||
|
@ -2365,7 +2396,7 @@ mod tests {
|
|||
let payload: Vec<CrdsValue> = vec![];
|
||||
let vec_size = serialized_size(&payload).unwrap();
|
||||
let desired_size = MAX_PROTOCOL_PAYLOAD_SIZE - vec_size;
|
||||
let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHash(SnapshotHash {
|
||||
let mut value = CrdsValue::new_unsigned(CrdsData::SnapshotHashes(SnapshotHash {
|
||||
from: Pubkey::default(),
|
||||
hashes: vec![],
|
||||
wallclock: 0,
|
||||
|
@ -2373,7 +2404,7 @@ mod tests {
|
|||
|
||||
let mut i = 0;
|
||||
while value.size() <= desired_size {
|
||||
value.data = CrdsData::SnapshotHash(SnapshotHash {
|
||||
value.data = CrdsData::SnapshotHashes(SnapshotHash {
|
||||
from: Pubkey::default(),
|
||||
hashes: vec![(0, Hash::default()); i],
|
||||
wallclock: 0,
|
||||
|
|
|
@ -2,6 +2,7 @@ use crate::contact_info::ContactInfo;
|
|||
use crate::deprecated;
|
||||
use crate::epoch_slots::EpochSlots;
|
||||
use bincode::{serialize, serialized_size};
|
||||
use solana_sdk::timing::timestamp;
|
||||
use solana_sdk::{
|
||||
clock::Slot,
|
||||
hash::Hash,
|
||||
|
@ -67,8 +68,9 @@ pub enum CrdsData {
|
|||
ContactInfo(ContactInfo),
|
||||
Vote(VoteIndex, Vote),
|
||||
LowestSlot(u8, LowestSlot),
|
||||
SnapshotHash(SnapshotHash),
|
||||
SnapshotHashes(SnapshotHash),
|
||||
EpochSlots(EpochSlotsIndex, EpochSlots),
|
||||
AccountsHashes(SnapshotHash),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
|
@ -79,11 +81,11 @@ pub struct SnapshotHash {
|
|||
}
|
||||
|
||||
impl SnapshotHash {
|
||||
pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>, wallclock: u64) -> Self {
|
||||
pub fn new(from: Pubkey, hashes: Vec<(Slot, Hash)>) -> Self {
|
||||
Self {
|
||||
from,
|
||||
hashes,
|
||||
wallclock,
|
||||
wallclock: timestamp(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -134,8 +136,9 @@ pub enum CrdsValueLabel {
|
|||
ContactInfo(Pubkey),
|
||||
Vote(VoteIndex, Pubkey),
|
||||
LowestSlot(Pubkey),
|
||||
SnapshotHash(Pubkey),
|
||||
SnapshotHashes(Pubkey),
|
||||
EpochSlots(EpochSlotsIndex, Pubkey),
|
||||
AccountsHashes(Pubkey),
|
||||
}
|
||||
|
||||
impl fmt::Display for CrdsValueLabel {
|
||||
|
@ -144,8 +147,9 @@ impl fmt::Display for CrdsValueLabel {
|
|||
CrdsValueLabel::ContactInfo(_) => write!(f, "ContactInfo({})", self.pubkey()),
|
||||
CrdsValueLabel::Vote(ix, _) => write!(f, "Vote({}, {})", ix, self.pubkey()),
|
||||
CrdsValueLabel::LowestSlot(_) => write!(f, "LowestSlot({})", self.pubkey()),
|
||||
CrdsValueLabel::SnapshotHash(_) => write!(f, "SnapshotHash({})", self.pubkey()),
|
||||
CrdsValueLabel::SnapshotHashes(_) => write!(f, "SnapshotHash({})", self.pubkey()),
|
||||
CrdsValueLabel::EpochSlots(ix, _) => write!(f, "EpochSlots({}, {})", ix, self.pubkey()),
|
||||
CrdsValueLabel::AccountsHashes(_) => write!(f, "AccountsHashes({})", self.pubkey()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -156,8 +160,9 @@ impl CrdsValueLabel {
|
|||
CrdsValueLabel::ContactInfo(p) => *p,
|
||||
CrdsValueLabel::Vote(_, p) => *p,
|
||||
CrdsValueLabel::LowestSlot(p) => *p,
|
||||
CrdsValueLabel::SnapshotHash(p) => *p,
|
||||
CrdsValueLabel::SnapshotHashes(p) => *p,
|
||||
CrdsValueLabel::EpochSlots(_, p) => *p,
|
||||
CrdsValueLabel::AccountsHashes(p) => *p,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -183,8 +188,9 @@ impl CrdsValue {
|
|||
CrdsData::ContactInfo(contact_info) => contact_info.wallclock,
|
||||
CrdsData::Vote(_, vote) => vote.wallclock,
|
||||
CrdsData::LowestSlot(_, obj) => obj.wallclock,
|
||||
CrdsData::SnapshotHash(hash) => hash.wallclock,
|
||||
CrdsData::SnapshotHashes(hash) => hash.wallclock,
|
||||
CrdsData::EpochSlots(_, p) => p.wallclock,
|
||||
CrdsData::AccountsHashes(hash) => hash.wallclock,
|
||||
}
|
||||
}
|
||||
pub fn pubkey(&self) -> Pubkey {
|
||||
|
@ -192,8 +198,9 @@ impl CrdsValue {
|
|||
CrdsData::ContactInfo(contact_info) => contact_info.id,
|
||||
CrdsData::Vote(_, vote) => vote.from,
|
||||
CrdsData::LowestSlot(_, slots) => slots.from,
|
||||
CrdsData::SnapshotHash(hash) => hash.from,
|
||||
CrdsData::SnapshotHashes(hash) => hash.from,
|
||||
CrdsData::EpochSlots(_, p) => p.from,
|
||||
CrdsData::AccountsHashes(hash) => hash.from,
|
||||
}
|
||||
}
|
||||
pub fn label(&self) -> CrdsValueLabel {
|
||||
|
@ -201,8 +208,9 @@ impl CrdsValue {
|
|||
CrdsData::ContactInfo(_) => CrdsValueLabel::ContactInfo(self.pubkey()),
|
||||
CrdsData::Vote(ix, _) => CrdsValueLabel::Vote(*ix, self.pubkey()),
|
||||
CrdsData::LowestSlot(_, _) => CrdsValueLabel::LowestSlot(self.pubkey()),
|
||||
CrdsData::SnapshotHash(_) => CrdsValueLabel::SnapshotHash(self.pubkey()),
|
||||
CrdsData::SnapshotHashes(_) => CrdsValueLabel::SnapshotHashes(self.pubkey()),
|
||||
CrdsData::EpochSlots(ix, _) => CrdsValueLabel::EpochSlots(*ix, self.pubkey()),
|
||||
CrdsData::AccountsHashes(_) => CrdsValueLabel::AccountsHashes(self.pubkey()),
|
||||
}
|
||||
}
|
||||
pub fn contact_info(&self) -> Option<&ContactInfo> {
|
||||
|
@ -234,7 +242,14 @@ impl CrdsValue {
|
|||
|
||||
pub fn snapshot_hash(&self) -> Option<&SnapshotHash> {
|
||||
match &self.data {
|
||||
CrdsData::SnapshotHash(slots) => Some(slots),
|
||||
CrdsData::SnapshotHashes(slots) => Some(slots),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn accounts_hash(&self) -> Option<&SnapshotHash> {
|
||||
match &self.data {
|
||||
CrdsData::AccountsHashes(slots) => Some(slots),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -251,7 +266,8 @@ impl CrdsValue {
|
|||
let mut labels = vec![
|
||||
CrdsValueLabel::ContactInfo(*key),
|
||||
CrdsValueLabel::LowestSlot(*key),
|
||||
CrdsValueLabel::SnapshotHash(*key),
|
||||
CrdsValueLabel::SnapshotHashes(*key),
|
||||
CrdsValueLabel::AccountsHashes(*key),
|
||||
];
|
||||
labels.extend((0..MAX_VOTES).map(|ix| CrdsValueLabel::Vote(ix, *key)));
|
||||
labels.extend((0..MAX_EPOCH_SLOTS).map(|ix| CrdsValueLabel::EpochSlots(ix, *key)));
|
||||
|
@ -302,16 +318,17 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn test_labels() {
|
||||
let mut hits = [false; 3 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||
let mut hits = [false; 4 + MAX_VOTES as usize + MAX_EPOCH_SLOTS as usize];
|
||||
// this method should cover all the possible labels
|
||||
for v in &CrdsValue::record_labels(&Pubkey::default()) {
|
||||
match v {
|
||||
CrdsValueLabel::ContactInfo(_) => hits[0] = true,
|
||||
CrdsValueLabel::LowestSlot(_) => hits[1] = true,
|
||||
CrdsValueLabel::SnapshotHash(_) => hits[2] = true,
|
||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 3] = true,
|
||||
CrdsValueLabel::SnapshotHashes(_) => hits[2] = true,
|
||||
CrdsValueLabel::AccountsHashes(_) => hits[3] = true,
|
||||
CrdsValueLabel::Vote(ix, _) => hits[*ix as usize + 4] = true,
|
||||
CrdsValueLabel::EpochSlots(ix, _) => {
|
||||
hits[*ix as usize + MAX_VOTES as usize + 3] = true
|
||||
hits[*ix as usize + MAX_VOTES as usize + 4] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
//! command-line tools to spin up validators and a Rust library
|
||||
//!
|
||||
|
||||
pub mod accounts_hash_verifier;
|
||||
pub mod banking_stage;
|
||||
pub mod broadcast_stage;
|
||||
pub mod cluster_info_vote_listener;
|
||||
|
|
|
@ -85,7 +85,7 @@ pub struct ReplayStageConfig {
|
|||
pub leader_schedule_cache: Arc<LeaderScheduleCache>,
|
||||
pub slot_full_senders: Vec<Sender<(u64, Pubkey)>>,
|
||||
pub latest_root_senders: Vec<Sender<Slot>>,
|
||||
pub snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
pub accounts_hash_sender: Option<SnapshotPackageSender>,
|
||||
pub block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
pub transaction_status_sender: Option<TransactionStatusSender>,
|
||||
pub rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
|
@ -189,7 +189,7 @@ impl ReplayStage {
|
|||
leader_schedule_cache,
|
||||
slot_full_senders,
|
||||
latest_root_senders,
|
||||
snapshot_package_sender,
|
||||
accounts_hash_sender,
|
||||
block_commitment_cache,
|
||||
transaction_status_sender,
|
||||
rewards_recorder_sender,
|
||||
|
@ -350,7 +350,7 @@ impl ReplayStage {
|
|||
&leader_schedule_cache,
|
||||
&root_bank_sender,
|
||||
&lockouts_sender,
|
||||
&snapshot_package_sender,
|
||||
&accounts_hash_sender,
|
||||
&latest_root_senders,
|
||||
&mut earliest_vote_on_fork,
|
||||
)?;
|
||||
|
@ -666,7 +666,7 @@ impl ReplayStage {
|
|||
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
||||
root_bank_sender: &Sender<Vec<Arc<Bank>>>,
|
||||
lockouts_sender: &Sender<CommitmentAggregationData>,
|
||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||
latest_root_senders: &[Sender<Slot>],
|
||||
earliest_vote_on_fork: &mut Slot,
|
||||
) -> Result<()> {
|
||||
|
@ -698,7 +698,7 @@ impl ReplayStage {
|
|||
new_root,
|
||||
&bank_forks,
|
||||
progress,
|
||||
snapshot_package_sender,
|
||||
accounts_hash_sender,
|
||||
earliest_vote_on_fork,
|
||||
);
|
||||
latest_root_senders.iter().for_each(|s| {
|
||||
|
@ -1167,13 +1167,13 @@ impl ReplayStage {
|
|||
new_root: u64,
|
||||
bank_forks: &RwLock<BankForks>,
|
||||
progress: &mut ProgressMap,
|
||||
snapshot_package_sender: &Option<SnapshotPackageSender>,
|
||||
accounts_hash_sender: &Option<SnapshotPackageSender>,
|
||||
earliest_vote_on_fork: &mut u64,
|
||||
) {
|
||||
bank_forks
|
||||
.write()
|
||||
.unwrap()
|
||||
.set_root(new_root, snapshot_package_sender);
|
||||
.set_root(new_root, accounts_hash_sender);
|
||||
let r_bank_forks = bank_forks.read().unwrap();
|
||||
*earliest_vote_on_fork = std::cmp::max(new_root, *earliest_vote_on_fork);
|
||||
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
//! validation pipeline in software.
|
||||
|
||||
use crate::{
|
||||
accounts_hash_verifier::AccountsHashVerifier,
|
||||
blockstream_service::BlockstreamService,
|
||||
cluster_info::ClusterInfo,
|
||||
cluster_info_vote_listener::VoteTracker,
|
||||
|
@ -29,6 +30,7 @@ use solana_sdk::{
|
|||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signer},
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
use std::{
|
||||
net::UdpSocket,
|
||||
path::PathBuf,
|
||||
|
@ -48,6 +50,7 @@ pub struct Tvu {
|
|||
blockstream_service: Option<BlockstreamService>,
|
||||
ledger_cleanup_service: Option<LedgerCleanupService>,
|
||||
storage_stage: StorageStage,
|
||||
accounts_hash_verifier: AccountsHashVerifier,
|
||||
}
|
||||
|
||||
pub struct Sockets {
|
||||
|
@ -57,6 +60,16 @@ pub struct Sockets {
|
|||
pub forwards: Vec<UdpSocket>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TvuConfig {
|
||||
pub max_ledger_slots: Option<u64>,
|
||||
pub sigverify_disabled: bool,
|
||||
pub shred_version: u16,
|
||||
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
||||
pub trusted_validators: Option<HashSet<Pubkey>>,
|
||||
pub accounts_hash_fault_injection_slots: u64,
|
||||
}
|
||||
|
||||
impl Tvu {
|
||||
/// This service receives messages from a leader in the network and processes the transactions
|
||||
/// on the bank state.
|
||||
|
@ -75,7 +88,6 @@ impl Tvu {
|
|||
blockstore: Arc<Blockstore>,
|
||||
storage_state: &StorageState,
|
||||
blockstream_unix_socket: Option<&PathBuf>,
|
||||
max_ledger_slots: Option<u64>,
|
||||
ledger_signal_receiver: Receiver<bool>,
|
||||
subscriptions: &Arc<RpcSubscriptions>,
|
||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||
|
@ -83,13 +95,12 @@ impl Tvu {
|
|||
exit: &Arc<AtomicBool>,
|
||||
completed_slots_receiver: CompletedSlotsReceiver,
|
||||
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
|
||||
sigverify_disabled: bool,
|
||||
cfg: Option<Arc<AtomicBool>>,
|
||||
shred_version: u16,
|
||||
transaction_status_sender: Option<TransactionStatusSender>,
|
||||
rewards_recorder_sender: Option<RewardsRecorderSender>,
|
||||
snapshot_package_sender: Option<SnapshotPackageSender>,
|
||||
vote_tracker: Arc<VoteTracker>,
|
||||
tvu_config: TvuConfig,
|
||||
) -> Self {
|
||||
let keypair: Arc<Keypair> = cluster_info
|
||||
.read()
|
||||
|
@ -119,7 +130,7 @@ impl Tvu {
|
|||
);
|
||||
|
||||
let (verified_sender, verified_receiver) = unbounded();
|
||||
let sigverify_stage = if !sigverify_disabled {
|
||||
let sigverify_stage = if !tvu_config.sigverify_disabled {
|
||||
SigVerifyStage::new(
|
||||
fetch_receiver,
|
||||
verified_sender,
|
||||
|
@ -145,12 +156,23 @@ impl Tvu {
|
|||
completed_slots_receiver,
|
||||
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
|
||||
cfg,
|
||||
shred_version,
|
||||
tvu_config.shred_version,
|
||||
);
|
||||
|
||||
let (blockstream_slot_sender, blockstream_slot_receiver) = channel();
|
||||
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();
|
||||
|
||||
let (accounts_hash_sender, accounts_hash_receiver) = channel();
|
||||
let accounts_hash_verifier = AccountsHashVerifier::new(
|
||||
accounts_hash_receiver,
|
||||
snapshot_package_sender,
|
||||
exit,
|
||||
cluster_info,
|
||||
tvu_config.trusted_validators.clone(),
|
||||
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
|
||||
tvu_config.accounts_hash_fault_injection_slots,
|
||||
);
|
||||
|
||||
let replay_stage_config = ReplayStageConfig {
|
||||
my_pubkey: keypair.pubkey(),
|
||||
vote_account: *vote_account,
|
||||
|
@ -160,7 +182,7 @@ impl Tvu {
|
|||
leader_schedule_cache: leader_schedule_cache.clone(),
|
||||
slot_full_senders: vec![blockstream_slot_sender],
|
||||
latest_root_senders: vec![ledger_cleanup_slot_sender],
|
||||
snapshot_package_sender,
|
||||
accounts_hash_sender: Some(accounts_hash_sender),
|
||||
block_commitment_cache,
|
||||
transaction_status_sender,
|
||||
rewards_recorder_sender,
|
||||
|
@ -188,7 +210,7 @@ impl Tvu {
|
|||
None
|
||||
};
|
||||
|
||||
let ledger_cleanup_service = max_ledger_slots.map(|max_ledger_slots| {
|
||||
let ledger_cleanup_service = tvu_config.max_ledger_slots.map(|max_ledger_slots| {
|
||||
LedgerCleanupService::new(
|
||||
ledger_cleanup_slot_receiver,
|
||||
blockstore.clone(),
|
||||
|
@ -216,6 +238,7 @@ impl Tvu {
|
|||
blockstream_service,
|
||||
ledger_cleanup_service,
|
||||
storage_stage,
|
||||
accounts_hash_verifier,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -231,6 +254,7 @@ impl Tvu {
|
|||
self.ledger_cleanup_service.unwrap().join()?;
|
||||
}
|
||||
self.replay_stage.join()?;
|
||||
self.accounts_hash_verifier.join()?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -291,7 +315,6 @@ pub mod tests {
|
|||
blockstore,
|
||||
&StorageState::default(),
|
||||
None,
|
||||
None,
|
||||
l_receiver,
|
||||
&Arc::new(RpcSubscriptions::new(&exit)),
|
||||
&poh_recorder,
|
||||
|
@ -299,13 +322,12 @@ pub mod tests {
|
|||
&exit,
|
||||
completed_slots_receiver,
|
||||
block_commitment_cache,
|
||||
false,
|
||||
None,
|
||||
0,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Arc::new(VoteTracker::new(&bank)),
|
||||
TvuConfig::default(),
|
||||
);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
tvu.join().unwrap();
|
||||
|
|
|
@ -21,7 +21,7 @@ use crate::{
|
|||
storage_stage::StorageState,
|
||||
tpu::Tpu,
|
||||
transaction_status_service::TransactionStatusService,
|
||||
tvu::{Sockets, Tvu},
|
||||
tvu::{Sockets, Tvu, TvuConfig},
|
||||
};
|
||||
use crossbeam_channel::unbounded;
|
||||
use solana_ledger::{
|
||||
|
@ -77,6 +77,8 @@ pub struct ValidatorConfig {
|
|||
pub wait_for_supermajority: Option<Slot>,
|
||||
pub new_hard_forks: Option<Vec<Slot>>,
|
||||
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
|
||||
pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
|
||||
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
|
||||
}
|
||||
|
||||
impl Default for ValidatorConfig {
|
||||
|
@ -100,6 +102,8 @@ impl Default for ValidatorConfig {
|
|||
wait_for_supermajority: None,
|
||||
new_hard_forks: None,
|
||||
trusted_validators: None,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch: false,
|
||||
accounts_hash_fault_injection_slots: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -414,7 +418,6 @@ impl Validator {
|
|||
blockstore.clone(),
|
||||
&storage_state,
|
||||
config.blockstream_unix_socket.as_ref(),
|
||||
config.max_ledger_slots,
|
||||
ledger_signal_receiver,
|
||||
&subscriptions,
|
||||
&poh_recorder,
|
||||
|
@ -422,13 +425,20 @@ impl Validator {
|
|||
&exit,
|
||||
completed_slots_receiver,
|
||||
block_commitment_cache,
|
||||
config.dev_sigverify_disabled,
|
||||
config.enable_partition.clone(),
|
||||
node.info.shred_version,
|
||||
transaction_status_sender.clone(),
|
||||
rewards_recorder_sender,
|
||||
snapshot_package_sender,
|
||||
vote_tracker.clone(),
|
||||
TvuConfig {
|
||||
max_ledger_slots: config.max_ledger_slots,
|
||||
sigverify_disabled: config.dev_sigverify_disabled,
|
||||
halt_on_trusted_validators_accounts_hash_mismatch: config
|
||||
.halt_on_trusted_validators_accounts_hash_mismatch,
|
||||
shred_version: node.info.shred_version,
|
||||
trusted_validators: config.trusted_validators.clone(),
|
||||
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
|
||||
},
|
||||
);
|
||||
|
||||
if config.dev_sigverify_disabled {
|
||||
|
|
|
@ -607,6 +607,95 @@ fn test_softlaunch_operating_mode() {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_consistency_halt() {
|
||||
solana_logger::setup();
|
||||
let snapshot_interval_slots = 20;
|
||||
let num_account_paths = 1;
|
||||
|
||||
// Create cluster with a leader producing bad snapshot hashes.
|
||||
let mut leader_snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
leader_snapshot_test_config
|
||||
.validator_config
|
||||
.accounts_hash_fault_injection_slots = 40;
|
||||
|
||||
let validator_stake = 10_000;
|
||||
let config = ClusterConfig {
|
||||
node_stakes: vec![validator_stake],
|
||||
cluster_lamports: 100_000,
|
||||
validator_configs: vec![leader_snapshot_test_config.validator_config.clone()],
|
||||
..ClusterConfig::default()
|
||||
};
|
||||
|
||||
let mut cluster = LocalCluster::new(&config);
|
||||
|
||||
sleep(Duration::from_millis(5000));
|
||||
let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, 1).unwrap();
|
||||
info!("num_nodes: {}", cluster_nodes.len());
|
||||
|
||||
// Add a validator with the leader as trusted, it should halt when it detects
|
||||
// mismatch.
|
||||
let mut validator_snapshot_test_config =
|
||||
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
|
||||
|
||||
let mut trusted_validators = HashSet::new();
|
||||
trusted_validators.insert(cluster_nodes[0].id);
|
||||
|
||||
validator_snapshot_test_config
|
||||
.validator_config
|
||||
.trusted_validators = Some(trusted_validators);
|
||||
validator_snapshot_test_config
|
||||
.validator_config
|
||||
.halt_on_trusted_validators_accounts_hash_mismatch = true;
|
||||
|
||||
warn!("adding a validator");
|
||||
cluster.add_validator(
|
||||
&validator_snapshot_test_config.validator_config,
|
||||
validator_stake as u64,
|
||||
Arc::new(Keypair::new()),
|
||||
);
|
||||
let num_nodes = 2;
|
||||
assert_eq!(
|
||||
discover_cluster(&cluster.entry_point_info.gossip, num_nodes)
|
||||
.unwrap()
|
||||
.0
|
||||
.len(),
|
||||
num_nodes
|
||||
);
|
||||
|
||||
// Check for only 1 node on the network.
|
||||
let mut encountered_error = false;
|
||||
loop {
|
||||
let discover = discover_cluster(&cluster.entry_point_info.gossip, 2);
|
||||
match discover {
|
||||
Err(_) => {
|
||||
encountered_error = true;
|
||||
break;
|
||||
}
|
||||
Ok(nodes) => {
|
||||
if nodes.0.len() < 2 {
|
||||
encountered_error = true;
|
||||
break;
|
||||
}
|
||||
info!("checking cluster for fewer nodes.. {:?}", nodes.0.len());
|
||||
}
|
||||
}
|
||||
let client = cluster
|
||||
.get_validator_client(&cluster.entry_point_info.id)
|
||||
.unwrap();
|
||||
if let Ok(slot) = client.get_slot() {
|
||||
if slot > 210 {
|
||||
break;
|
||||
}
|
||||
info!("slot: {}", slot);
|
||||
}
|
||||
sleep(Duration::from_millis(1000));
|
||||
}
|
||||
assert!(encountered_error);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_snapshot_download() {
|
||||
|
|
|
@ -145,6 +145,9 @@ while [[ -n $1 ]]; do
|
|||
elif [[ $1 = --trusted-validator ]]; then
|
||||
args+=("$1" "$2")
|
||||
shift 2
|
||||
elif [[ $1 = --halt-on-trusted-validators-accounts-hash-mismatch ]]; then
|
||||
args+=("$1")
|
||||
shift
|
||||
elif [[ $1 = -h ]]; then
|
||||
usage "$@"
|
||||
else
|
||||
|
|
|
@ -663,6 +663,13 @@ pub fn main() {
|
|||
.validator(solana_net_utils::is_host)
|
||||
.help("IP address to bind the RPC port [default: use --bind-address]"),
|
||||
)
|
||||
.arg(
|
||||
clap::Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
|
||||
.long("halt-on-trusted-validators-accounts-hash-mismatch")
|
||||
.requires("trusted_validators")
|
||||
.takes_value(false)
|
||||
.help("Abort the validator if a bank hash mismatch is detected within trusted validator set"),
|
||||
)
|
||||
.get_matches();
|
||||
|
||||
let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(Keypair::new));
|
||||
|
@ -821,6 +828,10 @@ pub fn main() {
|
|||
validator_config.max_ledger_slots = Some(limit_ledger_size);
|
||||
}
|
||||
|
||||
if matches.is_present("halt_on_trusted_validators_accounts_hash_mismatch") {
|
||||
validator_config.halt_on_trusted_validators_accounts_hash_mismatch = true;
|
||||
}
|
||||
|
||||
if matches.value_of("signer_addr").is_some() {
|
||||
warn!("--vote-signer-address ignored");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue