Add option for repairing only from trusted validators (#11752)

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-08-21 00:35:11 -07:00 committed by GitHub
parent f7adb68599
commit c8d67aa8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 178 additions and 24 deletions

View File

@ -18,7 +18,7 @@ use solana_measure::measure::Measure;
use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE}; use solana_runtime::{bank::Bank, bank_forks::BankForks, commitment::VOTE_THRESHOLD_SIZE};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp};
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
iter::Iterator, iter::Iterator,
net::SocketAddr, net::SocketAddr,
net::UdpSocket, net::UdpSocket,
@ -108,6 +108,7 @@ pub struct RepairInfo {
pub bank_forks: Arc<RwLock<BankForks>>, pub bank_forks: Arc<RwLock<BankForks>>,
pub epoch_schedule: EpochSchedule, pub epoch_schedule: EpochSchedule,
pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, pub duplicate_slots_reset_sender: DuplicateSlotsResetSender,
pub repair_validators: Option<HashSet<Pubkey>>,
} }
pub struct RepairSlotRange { pub struct RepairSlotRange {
@ -234,6 +235,7 @@ impl RepairService {
blockstore, blockstore,
&serve_repair, &serve_repair,
&repair_info.duplicate_slots_reset_sender, &repair_info.duplicate_slots_reset_sender,
&repair_info.repair_validators,
); );
Self::generate_and_send_duplicate_repairs( Self::generate_and_send_duplicate_repairs(
&mut duplicate_slot_repair_statuses, &mut duplicate_slot_repair_statuses,
@ -242,6 +244,7 @@ impl RepairService {
&serve_repair, &serve_repair,
&mut repair_stats, &mut repair_stats,
&repair_socket, &repair_socket,
&repair_info.repair_validators,
);*/ );*/
repair_weight.get_best_weighted_repairs( repair_weight.get_best_weighted_repairs(
@ -263,6 +266,7 @@ impl RepairService {
repair_request, repair_request,
&mut cache, &mut cache,
&mut repair_stats, &mut repair_stats,
&repair_info.repair_validators,
) { ) {
repair_socket.send_to(&req, to).unwrap_or_else(|e| { repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e); info!("{} repair req send_to({}) error {:?}", id, to, e);
@ -444,9 +448,16 @@ impl RepairService {
serve_repair: &ServeRepair, serve_repair: &ServeRepair,
repair_stats: &mut RepairStats, repair_stats: &mut RepairStats,
repair_socket: &UdpSocket, repair_socket: &UdpSocket,
repair_validators: &Option<HashSet<Pubkey>>,
) { ) {
duplicate_slot_repair_statuses.retain(|slot, status| { duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); Self::update_duplicate_slot_repair_addr(
*slot,
status,
cluster_slots,
serve_repair,
repair_validators,
);
if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr { if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr {
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
@ -499,13 +510,17 @@ impl RepairService {
status: &mut DuplicateSlotRepairStatus, status: &mut DuplicateSlotRepairStatus,
cluster_slots: &ClusterSlots, cluster_slots: &ClusterSlots,
serve_repair: &ServeRepair, serve_repair: &ServeRepair,
repair_validators: &Option<HashSet<Pubkey>>,
) { ) {
let now = timestamp(); let now = timestamp();
if status.repair_pubkey_and_addr.is_none() if status.repair_pubkey_and_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
{ {
let repair_pubkey_and_addr = let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer(
serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); slot,
cluster_slots,
repair_validators,
);
status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok(); status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
status.start = timestamp(); status.start = timestamp();
} }
@ -520,6 +535,7 @@ impl RepairService {
blockstore: &Blockstore, blockstore: &Blockstore,
serve_repair: &ServeRepair, serve_repair: &ServeRepair,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender, duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
repair_validators: &Option<HashSet<Pubkey>>,
) { ) {
for slot in new_duplicate_slots { for slot in new_duplicate_slots {
warn!( warn!(
@ -545,7 +561,7 @@ impl RepairService {
// Mark this slot as special repair, try to download from single // Mark this slot as special repair, try to download from single
// validator to avoid corruption // validator to avoid corruption
let repair_pubkey_and_addr = serve_repair let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots) .repair_request_duplicate_compute_best_peer(*slot, cluster_slots, repair_validators)
.ok(); .ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(), start: timestamp(),
@ -953,6 +969,7 @@ mod test {
&serve_repair, &serve_repair,
&mut RepairStats::default(), &mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(), &UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
); );
assert!(duplicate_slot_repair_statuses assert!(duplicate_slot_repair_statuses
.get(&dead_slot) .get(&dead_slot)
@ -976,6 +993,7 @@ mod test {
&serve_repair, &serve_repair,
&mut RepairStats::default(), &mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(), &UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
); );
assert_eq!(duplicate_slot_repair_statuses.len(), 1); assert_eq!(duplicate_slot_repair_statuses.len(), 1);
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
@ -992,6 +1010,7 @@ mod test {
&serve_repair, &serve_repair,
&mut RepairStats::default(), &mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(), &UdpSocket::bind("0.0.0.0:0").unwrap(),
&None,
); );
assert!(duplicate_slot_repair_statuses.is_empty()); assert!(duplicate_slot_repair_statuses.is_empty());
} }
@ -1026,6 +1045,7 @@ mod test {
&mut duplicate_status, &mut duplicate_status,
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
&None,
); );
assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr); assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
@ -1039,6 +1059,7 @@ mod test {
&mut duplicate_status, &mut duplicate_status,
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
&None,
); );
assert!(duplicate_status.repair_pubkey_and_addr.is_some()); assert!(duplicate_status.repair_pubkey_and_addr.is_some());
@ -1052,6 +1073,7 @@ mod test {
&mut duplicate_status, &mut duplicate_status,
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
&None,
); );
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
} }
@ -1108,6 +1130,7 @@ mod test {
&blockstore, &blockstore,
&serve_repair, &serve_repair,
&reset_sender, &reset_sender,
&None,
); );
// Blockstore should have been cleared // Blockstore should have been cleared

View File

@ -28,6 +28,7 @@ use solana_sdk::timing::timestamp;
use solana_streamer::streamer::PacketReceiver; use solana_streamer::streamer::PacketReceiver;
use std::{ use std::{
cmp, cmp,
collections::hash_set::HashSet,
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
net::UdpSocket, net::UdpSocket,
sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::atomic::{AtomicBool, AtomicU64, Ordering},
@ -417,6 +418,7 @@ impl RetransmitStage {
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender, duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver, verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
) -> Self { ) -> Self {
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
@ -442,6 +444,7 @@ impl RetransmitStage {
bank_forks, bank_forks,
epoch_schedule, epoch_schedule,
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
repair_validators,
}; };
let window_service = WindowService::new( let window_service = WindowService::new(
blockstore, blockstore,

View File

@ -21,7 +21,7 @@ use solana_sdk::{
}; };
use solana_streamer::streamer::{PacketReceiver, PacketSender}; use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
net::SocketAddr, net::SocketAddr,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock}, sync::{Arc, RwLock},
@ -382,12 +382,13 @@ impl ServeRepair {
repair_request: RepairType, repair_request: RepairType,
cache: &mut RepairCache, cache: &mut RepairCache,
repair_stats: &mut RepairStats, repair_stats: &mut RepairStats,
repair_validators: &Option<HashSet<Pubkey>>,
) -> Result<(SocketAddr, Vec<u8>)> { ) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated // find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location // by a valid tvu port location
let slot = repair_request.slot(); let slot = repair_request.slot();
if cache.get(&slot).is_none() { if cache.get(&slot).is_none() {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); let repair_peers = self.repair_peers(&repair_validators, slot);
if repair_peers.is_empty() { if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into()); return Err(ClusterInfoError::NoPeers.into());
} }
@ -411,8 +412,9 @@ impl ServeRepair {
&self, &self,
slot: Slot, slot: Slot,
cluster_slots: &ClusterSlots, cluster_slots: &ClusterSlots,
repair_validators: &Option<HashSet<Pubkey>>,
) -> Result<(Pubkey, SocketAddr)> { ) -> Result<(Pubkey, SocketAddr)> {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); let repair_peers: Vec<_> = self.repair_peers(repair_validators, slot);
if repair_peers.is_empty() { if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into()); return Err(ClusterInfoError::NoPeers.into());
} }
@ -448,6 +450,27 @@ impl ServeRepair {
} }
} }
fn repair_peers(
&self,
repair_validators: &Option<HashSet<Pubkey>>,
slot: Slot,
) -> Vec<ContactInfo> {
if let Some(repair_validators) = repair_validators {
repair_validators
.iter()
.filter_map(|key| {
if *key != self.my_info.id {
self.cluster_info.lookup_contact_info(key, |ci| ci.clone())
} else {
None
}
})
.collect()
} else {
self.cluster_info.repair_peers(slot)
}
}
fn run_window_request( fn run_window_request(
recycler: &PacketsRecycler, recycler: &PacketsRecycler,
from: &ContactInfo, from: &ContactInfo,
@ -733,6 +756,7 @@ mod tests {
RepairType::Shred(0, 0), RepairType::Shred(0, 0),
&mut HashMap::new(), &mut HashMap::new(),
&mut RepairStats::default(), &mut RepairStats::default(),
&None,
); );
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers))); assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));
@ -759,6 +783,7 @@ mod tests {
RepairType::Shred(0, 0), RepairType::Shred(0, 0),
&mut HashMap::new(), &mut HashMap::new(),
&mut RepairStats::default(), &mut RepairStats::default(),
&None,
) )
.unwrap(); .unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr); assert_eq!(nxt.serve_repair, serve_repair_addr);
@ -791,6 +816,7 @@ mod tests {
RepairType::Shred(0, 0), RepairType::Shred(0, 0),
&mut HashMap::new(), &mut HashMap::new(),
&mut RepairStats::default(), &mut RepairStats::default(),
&None,
) )
.unwrap(); .unwrap();
if rv.0 == serve_repair_addr { if rv.0 == serve_repair_addr {
@ -937,4 +963,71 @@ mod tests {
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction"); Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
} }
#[test]
fn test_repair_with_repair_validators() {
let cluster_slots = ClusterSlots::default();
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me.clone()));
// Insert two peers on the network
let contact_info2 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let contact_info3 = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
cluster_info.insert_info(contact_info2.clone());
cluster_info.insert_info(contact_info3.clone());
let serve_repair = ServeRepair::new(cluster_info);
// If:
// 1) repair validator set doesn't exist in gossip
// 2) repair validator set only includes our own id
// then no repairs should be generated
for pubkey in &[Pubkey::new_rand(), me.id] {
let trusted_validators = Some(vec![*pubkey].into_iter().collect());
assert!(serve_repair.repair_peers(&trusted_validators, 1).is_empty());
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&trusted_validators,
)
.is_err());
}
// If trusted validator exists in gossip, should return repair successfully
let trusted_validators = Some(vec![contact_info2.id].into_iter().collect());
let repair_peers = serve_repair.repair_peers(&trusted_validators, 1);
assert_eq!(repair_peers.len(), 1);
assert_eq!(repair_peers[0].id, contact_info2.id);
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&trusted_validators,
)
.is_ok());
// Using no trusted validators should default to all
// validator's available in gossip, excluding myself
let repair_peers: HashSet<Pubkey> = serve_repair
.repair_peers(&None, 1)
.into_iter()
.map(|c| c.id)
.collect();
assert_eq!(repair_peers.len(), 2);
assert!(repair_peers.contains(&contact_info2.id));
assert!(repair_peers.contains(&contact_info3.id));
assert!(serve_repair
.repair_request(
&cluster_slots,
RepairType::Shred(0, 0),
&mut HashMap::new(),
&mut RepairStats::default(),
&None,
)
.is_ok());
}
} }

View File

@ -66,6 +66,7 @@ pub struct TvuConfig {
pub shred_version: u16, pub shred_version: u16,
pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub trusted_validators: Option<HashSet<Pubkey>>, pub trusted_validators: Option<HashSet<Pubkey>>,
pub repair_validators: Option<HashSet<Pubkey>>,
pub accounts_hash_fault_injection_slots: u64, pub accounts_hash_fault_injection_slots: u64,
} }
@ -150,6 +151,7 @@ impl Tvu {
cluster_slots.clone(), cluster_slots.clone(),
duplicate_slots_reset_sender, duplicate_slots_reset_sender,
verified_vote_receiver, verified_vote_receiver,
tvu_config.repair_validators,
); );
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel(); let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = channel();

View File

@ -82,6 +82,7 @@ pub struct ValidatorConfig {
pub wait_for_supermajority: Option<Slot>, pub wait_for_supermajority: Option<Slot>,
pub new_hard_forks: Option<Vec<Slot>>, pub new_hard_forks: Option<Vec<Slot>>,
pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all pub trusted_validators: Option<HashSet<Pubkey>>, // None = trust all
pub repair_validators: Option<HashSet<Pubkey>>, // None = repair from all
pub halt_on_trusted_validators_accounts_hash_mismatch: bool, pub halt_on_trusted_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub frozen_accounts: Vec<Pubkey>, pub frozen_accounts: Vec<Pubkey>,
@ -110,6 +111,7 @@ impl Default for ValidatorConfig {
wait_for_supermajority: None, wait_for_supermajority: None,
new_hard_forks: None, new_hard_forks: None,
trusted_validators: None, trusted_validators: None,
repair_validators: None,
halt_on_trusted_validators_accounts_hash_mismatch: false, halt_on_trusted_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0, accounts_hash_fault_injection_slots: 0,
frozen_accounts: vec![], frozen_accounts: vec![],
@ -472,6 +474,7 @@ impl Validator {
.halt_on_trusted_validators_accounts_hash_mismatch, .halt_on_trusted_validators_accounts_hash_mismatch,
shred_version: node.info.shred_version, shred_version: node.info.shred_version,
trusted_validators: config.trusted_validators.clone(), trusted_validators: config.trusted_validators.clone(),
repair_validators: config.repair_validators.clone(),
accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots, accounts_hash_fault_injection_slots: config.accounts_hash_fault_injection_slots,
}, },
); );

View File

@ -352,6 +352,29 @@ fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
} }
} }
fn validators_set(
identity_pubkey: &Pubkey,
matches: &ArgMatches<'_>,
matches_name: &str,
arg_name: &str,
) -> Option<HashSet<Pubkey>> {
if matches.is_present(matches_name) {
let validators_set: HashSet<_> = values_t_or_exit!(matches, matches_name, Pubkey)
.into_iter()
.collect();
if validators_set.contains(identity_pubkey) {
eprintln!(
"The validator's identity pubkey cannot be a {}: {}",
arg_name, identity_pubkey
);
exit(1);
}
Some(validators_set)
} else {
None
}
}
fn check_genesis_hash( fn check_genesis_hash(
genesis_config: &GenesisConfig, genesis_config: &GenesisConfig,
expected_genesis_hash: Option<Hash>, expected_genesis_hash: Option<Hash>,
@ -812,6 +835,16 @@ pub fn main() {
.takes_value(false) .takes_value(false)
.help("Use the RPC service of trusted validators only") .help("Use the RPC service of trusted validators only")
) )
.arg(
Arg::with_name("repair_validators")
.long("repair-validator")
.validator(is_pubkey)
.value_name("PUBKEY")
.multiple(true)
.takes_value(true)
.help("A list of validators to request repairs from. If specified, repair will not \
request from validators outside this set [default: request repairs from all validators]")
)
.arg( .arg(
Arg::with_name("no_rocksdb_compaction") Arg::with_name("no_rocksdb_compaction")
.long("no-rocksdb-compaction") .long("no-rocksdb-compaction")
@ -914,22 +947,18 @@ pub fn main() {
}); });
let no_untrusted_rpc = matches.is_present("no_untrusted_rpc"); let no_untrusted_rpc = matches.is_present("no_untrusted_rpc");
let trusted_validators = if matches.is_present("trusted_validators") { let trusted_validators = validators_set(
let trusted_validators: HashSet<_> = &identity_keypair.pubkey(),
values_t_or_exit!(matches, "trusted_validators", Pubkey) &matches,
.into_iter() "trusted_validators",
.collect(); "--trusted-validator",
if trusted_validators.contains(&identity_keypair.pubkey()) { );
eprintln!( let repair_validators = validators_set(
"The validator's identity pubkey cannot be a --trusted-validator: {}", &identity_keypair.pubkey(),
identity_keypair.pubkey() &matches,
); "repair_validators",
exit(1); "--repair-validator",
} );
Some(trusted_validators)
} else {
None
};
let mut validator_config = ValidatorConfig { let mut validator_config = ValidatorConfig {
dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(), dev_halt_at_slot: value_t!(matches, "dev_halt_at_slot", Slot).ok(),
@ -963,6 +992,7 @@ pub fn main() {
voting_disabled: matches.is_present("no_voting"), voting_disabled: matches.is_present("no_voting"),
wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(), wait_for_supermajority: value_t!(matches, "wait_for_supermajority", Slot).ok(),
trusted_validators, trusted_validators,
repair_validators,
frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(), frozen_accounts: values_t!(matches, "frozen_accounts", Pubkey).unwrap_or_default(),
no_rocksdb_compaction, no_rocksdb_compaction,
wal_recovery_mode, wal_recovery_mode,