From c8d67aa8ebbf5f3e1e75ff69f380b2ae26d1fd0b Mon Sep 17 00:00:00 2001 From: carllin Date: Fri, 21 Aug 2020 00:35:11 -0700 Subject: [PATCH] Add option for repairing only from trusted validators (#11752) Co-authored-by: Carl --- core/src/repair_service.rs | 33 ++++++++++-- core/src/retransmit_stage.rs | 3 ++ core/src/serve_repair.rs | 99 ++++++++++++++++++++++++++++++++++-- core/src/tvu.rs | 2 + core/src/validator.rs | 3 ++ validator/src/main.rs | 62 ++++++++++++++++------ 6 files changed, 178 insertions(+), 24 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 7bc73d61e6..f38d1432e0 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -18,7 +18,7 @@ use solana_measure::measure::Measure; use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, iter::Iterator, net::SocketAddr, net::UdpSocket, @@ -108,6 +108,7 @@ pub struct RepairInfo { pub bank_forks: Arc>, pub epoch_schedule: EpochSchedule, pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, + pub repair_validators: Option>, } pub struct RepairSlotRange { @@ -234,6 +235,7 @@ impl RepairService { blockstore, &serve_repair, &repair_info.duplicate_slots_reset_sender, + &repair_info.repair_validators, ); Self::generate_and_send_duplicate_repairs( &mut duplicate_slot_repair_statuses, @@ -242,6 +244,7 @@ impl RepairService { &serve_repair, &mut repair_stats, &repair_socket, + &repair_info.repair_validators, );*/ repair_weight.get_best_weighted_repairs( @@ -263,6 +266,7 @@ impl RepairService { repair_request, &mut cache, &mut repair_stats, + &repair_info.repair_validators, ) { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e); @@ -444,9 +448,16 @@ impl RepairService { serve_repair: &ServeRepair, repair_stats: &mut RepairStats, repair_socket: &UdpSocket, + repair_validators: &Option>, ) { duplicate_slot_repair_statuses.retain(|slot, status| { - Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); + Self::update_duplicate_slot_repair_addr( + *slot, + status, + cluster_slots, + serve_repair, + repair_validators, + ); if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr { let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); @@ -499,13 +510,17 @@ impl RepairService { status: &mut DuplicateSlotRepairStatus, cluster_slots: &ClusterSlots, serve_repair: &ServeRepair, + repair_validators: &Option>, ) { let now = timestamp(); if status.repair_pubkey_and_addr.is_none() || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 { - let repair_pubkey_and_addr = - serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); + let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer( + slot, + cluster_slots, + repair_validators, + ); status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok(); status.start = timestamp(); } @@ -520,6 +535,7 @@ impl RepairService { blockstore: &Blockstore, serve_repair: &ServeRepair, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + repair_validators: &Option>, ) { for slot in new_duplicate_slots { warn!( @@ -545,7 +561,7 @@ impl RepairService { // Mark this slot as special repair, try to download from single // validator to avoid corruption let repair_pubkey_and_addr = serve_repair - .repair_request_duplicate_compute_best_peer(*slot, cluster_slots) + .repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators) .ok(); let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { start: timestamp(), @@ -953,6 +969,7 @@ mod test { &serve_repair, &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), + &None, ); assert!(duplicate_slot_repair_statuses .get(&dead_slot) @@ -976,6 +993,7 @@ mod test { &serve_repair, &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), + &None, ); assert_eq!(duplicate_slot_repair_statuses.len(), 1); assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); @@ -992,6 +1010,7 @@ mod test { &serve_repair, &mut RepairStats::default(), &UdpSocket::bind("0.0.0.0:0").unwrap(), + &None, ); assert!(duplicate_slot_repair_statuses.is_empty()); } @@ -1026,6 +1045,7 @@ mod test { &mut duplicate_status, &cluster_slots, &serve_repair, + &None, ); assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr); @@ -1039,6 +1059,7 @@ mod test { &mut duplicate_status, &cluster_slots, &serve_repair, + &None, ); assert!(duplicate_status.repair_pubkey_and_addr.is_some()); @@ -1052,6 +1073,7 @@ mod test { &mut duplicate_status, &cluster_slots, &serve_repair, + &None, ); assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); } @@ -1108,6 +1130,7 @@ mod test { &blockstore, &serve_repair, &reset_sender, + &None, ); // Blockstore should have been cleared diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 0c0c77bd36..478db61832 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -28,6 +28,7 @@ use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; use std::{ cmp, + collections::hash_set::HashSet, collections::{BTreeMap, HashMap}, net::UdpSocket, sync::atomic::{AtomicBool, AtomicU64, Ordering}, @@ -417,6 +418,7 @@ impl RetransmitStage { cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, verified_vote_receiver: VerifiedVoteReceiver, + repair_validators: Option>, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -442,6 +444,7 @@ impl RetransmitStage { bank_forks, epoch_schedule, duplicate_slots_reset_sender, + repair_validators, }; let window_service = WindowService::new( blockstore, diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 57f5f7f09f..2f2a210f0f 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -21,7 +21,7 @@ use solana_sdk::{ }; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, net::SocketAddr, sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, @@ -382,12 +382,13 @@ impl ServeRepair { repair_request: RepairType, cache: &mut RepairCache, repair_stats: &mut RepairStats, + repair_validators: &Option>, ) -> Result<(SocketAddr, Vec)> { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location let slot = repair_request.slot(); if cache.get(&slot).is_none() { - let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); + let repair_peers = self.repair_peers(&repair_validators, slot); if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } @@ -411,8 +412,9 @@ impl ServeRepair { &self, slot: Slot, cluster_slots: &ClusterSlots, + repair_validators: &Option>, ) -> Result<(Pubkey, SocketAddr)> { - let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); + let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot); if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } @@ -448,6 +450,27 @@ impl ServeRepair { } } + fn repair_peers( + &self, + repair_validators: &Option>, + slot: Slot, + ) -> Vec { + if let Some(repair_validators) = repair_validators { + repair_validators + .iter() + .filter_map(|key| { + if *key != self.my_info.id { + self.cluster_info.lookup_contact_info(key, |ci| ci.clone()) + } else { + None + } + }) + .collect() + } else { + self.cluster_info.repair_peers(slot) + } + } + fn run_window_request( recycler: &PacketsRecycler, from: &ContactInfo, @@ -733,6 +756,7 @@ mod tests { RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), + &None, ); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); @@ -759,6 +783,7 @@ mod tests { RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), + &None, ) .unwrap(); assert_eq!(nxt.serve_repair, serve_repair_addr); @@ -791,6 +816,7 @@ mod tests { RepairType::Shred(0, 0), &mut HashMap::new(), &mut RepairStats::default(), + &None, ) .unwrap(); if rv.0 == serve_repair_addr { @@ -937,4 +963,71 @@ mod tests { Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); } + + #[test] + fn test_repair_with_repair_validators() { + let cluster_slots = ClusterSlots::default(); + let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me.clone())); + + // Insert two peers on the network + let contact_info2 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); + let contact_info3 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); + cluster_info.insert_info(contact_info2.clone()); + cluster_info.insert_info(contact_info3.clone()); + let serve_repair = ServeRepair::new(cluster_info); + + // If: + // 1) repair validator set doesn't exist in gossip + // 2) repair validator set only includes our own id + // then no repairs should be generated + for pubkey in &[Pubkey::new_rand(), me.id] { + let trusted_validators = Some(vec![*pubkey].into_iter().collect()); + assert!(serve_repair.repair_peers(&trusted_validators, 1).is_empty()); + assert!(serve_repair + .repair_request( + &cluster_slots, + RepairType::Shred(0, 0), + &mut HashMap::new(), + &mut RepairStats::default(), + &trusted_validators, + ) + .is_err()); + } + + // If trusted validator exists in gossip, should return repair successfully + let trusted_validators = Some(vec![contact_info2.id].into_iter().collect()); + let repair_peers = serve_repair.repair_peers(&trusted_validators, 1); + assert_eq!(repair_peers.len(), 1); + assert_eq!(repair_peers[0].id, contact_info2.id); + assert!(serve_repair + .repair_request( + &cluster_slots, + RepairType::Shred(0, 0), + &mut HashMap::new(), + &mut RepairStats::default(), + &trusted_validators, + ) + .is_ok()); + + // Using no trusted validators should default to all + // validator's available in gossip, excluding myself + let repair_peers: HashSet = serve_repair + .repair_peers(&None, 1) + .into_iter() + .map(|c| c.id) + .collect(); + assert_eq!(repair_peers.len(), 2); + assert!(repair_peers.contains(&contact_info2.id)); + assert!(repair_peers.contains(&contact_info3.id)); + assert!(serve_repair + .repair_request( + &cluster_slots, + RepairType::Shred(0, 0), + &mut HashMap::new(), + &mut RepairStats::default(), + &None, + ) + .is_ok()); + } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index ca04e302ee..5764da83a1 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -66,6 +66,7 @@ pub struct TvuConfig { pub shred_version: u16, pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub trusted_validators: Option>, + pub repair_validators: Option>, pub accounts_hash_fault_injection_slots: u64, } @@ -150,6 +151,7 @@ impl Tvu { cluster_slots.clone(), duplicate_slots_reset_sender, verified_vote_receiver, + tvu_config.repair_validators, ); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); diff --git a/core/src/validator.rs b/core/src/validator.rs index db126f8cb3..023fa6254e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -82,6 +82,7 @@ pub struct ValidatorConfig { pub wait_for_supermajority: Option, pub new_hard_forks: Option>, pub trusted_validators: Option>, // None = trust all + pub repair_validators: Option>, // None = repair from all pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub frozen_accounts: Vec, @@ -110,6 +111,7 @@ impl Default for ValidatorConfig { wait_for_supermajority: None, new_hard_forks: None, trusted_validators: None, + repair_validators: None, halt_on_trusted_validators_accounts_hash_mismatch: false, accounts_hash_fault_injection_slots: 0, frozen_accounts: vec![], @@ -472,6 +474,7 @@ impl Validator { .halt_on_trusted_validators_accounts_hash_mismatch, shred_version: node.info.shred_version, trusted_validators: config.trusted_validators.clone(), + repair_validators: config.repair_validators.clone(), accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, }, ); diff --git a/validator/src/main.rs b/validator/src/main.rs index 183391786d..a9d11c96d1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -352,6 +352,29 @@ fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option> { } } +fn validators_set( + identity_pubkey: &Pubkey, + matches: &ArgMatches<'_>, + matches_name: &str, + arg_name: &str, +) -> Option> { + if matches.is_present(matches_name) { + let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey) + .into_iter() + .collect(); + if validators_set.contains(identity_pubkey) { + eprintln!( + "The validator's identity pubkey cannot be a {}: {}", + arg_name, identity_pubkey + ); + exit(1); + } + Some(validators_set) + } else { + None + } +} + fn check_genesis_hash( genesis_config: &GenesisConfig, expected_genesis_hash: Option, @@ -812,6 +835,16 @@ pub fn main() { .takes_value(false) .help("Use the RPC service of trusted validators only") ) + .arg( + Arg::with_name("repair_validators") + .long("repair-validator") + .validator(is_pubkey) + .value_name("PUBKEY") + .multiple(true) + .takes_value(true) + .help("A list of validators to request repairs from. If specified, repair will not \ + request from validators outside this set [default: request repairs from all validators]") + ) .arg( Arg::with_name("no_rocksdb_compaction") .long("no-rocksdb-compaction") @@ -914,22 +947,18 @@ pub fn main() { }); let no_untrusted_rpc = matches.is_present("no_untrusted_rpc"); - let trusted_validators = if matches.is_present("trusted_validators") { - let trusted_validators: HashSet<_> = - values_t_or_exit!(matches, "trusted_validators", Pubkey) - .into_iter() - .collect(); - if trusted_validators.contains(&identity_keypair.pubkey()) { - eprintln!( - "The validator's identity pubkey cannot be a --trusted-validator: {}", - identity_keypair.pubkey() - ); - exit(1); - } - Some(trusted_validators) - } else { - None - }; + let trusted_validators = validators_set( + &identity_keypair.pubkey(), + &matches, + "trusted_validators", + "--trusted-validator", + ); + let repair_validators = validators_set( + &identity_keypair.pubkey(), + &matches, + "repair_validators", + "--repair-validator", + ); let mut validator_config = ValidatorConfig { dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), @@ -963,6 +992,7 @@ pub fn main() { voting_disabled: matches.is_present("no_voting"), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), trusted_validators, + repair_validators, frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), no_rocksdb_compaction, wal_recovery_mode,