From b6792a332868622a4e2547606b00153b43b27f65 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 17 Jun 2021 13:51:06 -0700 Subject: [PATCH] Add ability to change the validator identity at runtime --- core/src/banking_stage.rs | 4 +- core/src/broadcast_stage.rs | 36 +++++----- core/src/consensus.rs | 7 +- core/src/replay_stage.rs | 34 ++++++++-- core/src/test_validator.rs | 12 +++- core/src/validator.rs | 12 ++-- gossip/src/cluster_info.rs | 76 ++++++++++++++-------- gossip/src/crds_value.rs | 16 +++-- gossip/src/gossip_service.rs | 12 ++-- gossip/src/main.rs | 3 +- local-cluster/src/local_cluster.rs | 6 +- validator/src/admin_rpc_service.rs | 23 +++++++ validator/src/bin/solana-test-validator.rs | 5 +- validator/src/main.rs | 70 ++++++++++++++++---- 14 files changed, 223 insertions(+), 93 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index cd7bc1ea0..a497f5b96 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -331,7 +331,6 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. - let my_pubkey = cluster_info.id(); let duplicates = Arc::new(Mutex::new(( LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default(), @@ -358,7 +357,6 @@ impl BankingStage { .name("solana-banking-stage-tx".to_string()) .spawn(move || { Self::process_loop( - my_pubkey, &verified_receiver, &poh_recorder, &cluster_info, @@ -699,7 +697,6 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn process_loop( - my_pubkey: Pubkey, verified_receiver: &CrossbeamReceiver>, poh_recorder: &Arc>, cluster_info: &ClusterInfo, @@ -718,6 +715,7 @@ impl BankingStage { let mut buffered_packets = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); loop { + let my_pubkey = cluster_info.id(); while !buffered_packets.is_empty() { let decision = Self::process_buffered_packets( &my_pubkey, diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 7e5f83b82..a6e1792b6 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -173,7 +173,7 @@ pub struct BroadcastStage { impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( - keypair: &Keypair, + cluster_info: Arc, blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, @@ -182,7 +182,7 @@ impl BroadcastStage { ) -> BroadcastStageReturnType { loop { let res = broadcast_stage_run.run( - keypair, + &cluster_info.keypair(), blockstore, receiver, socket_sender, @@ -248,21 +248,23 @@ impl BroadcastStage { let bs_run = broadcast_stage_run.clone(); let socket_sender_ = socket_sender.clone(); - let keypair = cluster_info.keypair.clone(); - let thread_hdl = Builder::new() - .name("solana-broadcaster".to_string()) - .spawn(move || { - let _finalizer = Finalizer::new(exit); - Self::run( - &keypair, - &btree, - &receiver, - &socket_sender_, - &blockstore_sender, - bs_run, - ) - }) - .unwrap(); + let thread_hdl = { + let cluster_info = cluster_info.clone(); + Builder::new() + .name("solana-broadcaster".to_string()) + .spawn(move || { + let _finalizer = Finalizer::new(exit); + Self::run( + cluster_info, + &btree, + &receiver, + &socket_sender_, + &blockstore_sender, + bs_run, + ) + }) + .unwrap() + }; let mut thread_hdls = vec![thread_hdl]; let socket_receiver = Arc::new(Mutex::new(socket_receiver)); for sock in socks.into_iter() { diff --git a/core/src/consensus.rs b/core/src/consensus.rs index dce829e41..727b0d281 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -32,7 +32,6 @@ use std::{ Deref, }, path::{Path, PathBuf}, - sync::Arc, }; use thiserror::Error; @@ -1190,7 +1189,7 @@ impl Tower { path.with_extension("bin.new") } - pub fn save(&self, node_keypair: &Arc) -> Result<()> { + pub fn save(&self, node_keypair: &Keypair) -> Result<()> { let mut measure = Measure::start("tower_save-ms"); if self.node_pubkey != node_keypair.pubkey() { @@ -1293,7 +1292,7 @@ pub struct SavedTower { } impl SavedTower { - pub fn new(tower: &Tower, keypair: &Arc) -> Result { + pub fn new(tower: &Tower, keypair: &T) -> Result { let data = bincode::serialize(tower)?; let signature = keypair.sign_message(&data); Ok(Self { signature, data }) @@ -1391,7 +1390,7 @@ pub mod test { collections::HashMap, fs::{remove_file, OpenOptions}, io::{Read, Seek, SeekFrom, Write}, - sync::RwLock, + sync::{Arc, RwLock}, }; use tempfile::TempDir; use trees::{tr, Tree, TreeWalk}; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index bb3549df2..b9aaefce1 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -339,7 +339,8 @@ impl ReplayStage { .spawn(move || { let verify_recyclers = VerifyRecyclers::default(); let _exit = Finalizer::new(exit.clone()); - let my_pubkey = cluster_info.id(); + let mut identity_keypair = cluster_info.keypair().clone(); + let mut my_pubkey = identity_keypair.pubkey(); let ( mut progress, mut heaviest_subtree_fork_choice, @@ -515,6 +516,7 @@ impl ReplayStage { heaviest_bank_on_same_voted_fork, &poh_recorder, my_latest_landed_vote, &vote_account, + &identity_keypair, &authorized_voter_keypairs.read().unwrap(), &mut voted_signatures, has_new_vote_been_rooted, &mut @@ -582,6 +584,7 @@ impl ReplayStage { &mut tower, &mut progress, &vote_account, + &identity_keypair, &authorized_voter_keypairs.read().unwrap(), &cluster_info, &blockstore, @@ -627,6 +630,14 @@ impl ReplayStage { i64 ), ); + + if my_pubkey != cluster_info.id() { + identity_keypair = cluster_info.keypair().clone(); + let my_old_pubkey = my_pubkey; + my_pubkey = identity_keypair.pubkey(); + warn!("Identity changed from {} to {}", my_old_pubkey, my_pubkey); + } + Self::reset_poh_recorder( &my_pubkey, &blockstore, @@ -1290,6 +1301,7 @@ impl ReplayStage { tower: &mut Tower, progress: &mut ProgressMap, vote_account_pubkey: &Pubkey, + identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], cluster_info: &Arc, blockstore: &Arc, @@ -1314,7 +1326,7 @@ impl ReplayStage { trace!("handle votable bank {}", bank.slot()); let new_root = tower.record_bank_vote(bank, vote_account_pubkey); - if let Err(err) = tower.save(&cluster_info.keypair) { + if let Err(err) = tower.save(identity_keypair) { error!("Unable to save tower: {:?}", err); std::process::exit(1); } @@ -1386,6 +1398,7 @@ impl ReplayStage { bank, poh_recorder, vote_account_pubkey, + identity_keypair, authorized_voter_keypairs, tower, switch_fork_decision, @@ -1396,7 +1409,7 @@ impl ReplayStage { } fn generate_vote_tx( - node_keypair: &Arc, + node_keypair: &Keypair, bank: &Bank, vote_account_pubkey: &Pubkey, authorized_voter_keypairs: &[Arc], @@ -1465,7 +1478,7 @@ impl ReplayStage { let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); let blockhash = bank.last_blockhash(); - vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); + vote_tx.partial_sign(&[node_keypair], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); if !has_new_vote_been_rooted { @@ -1488,6 +1501,7 @@ impl ReplayStage { poh_recorder: &Mutex, my_latest_landed_vote: Slot, vote_account_pubkey: &Pubkey, + identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, @@ -1526,7 +1540,7 @@ impl ReplayStage { // TODO: check the timestamp in this vote is correct, i.e. it shouldn't // have changed from the original timestamp of the vote. let vote_tx = Self::generate_vote_tx( - &cluster_info.keypair, + identity_keypair, heaviest_bank_on_same_fork, vote_account_pubkey, authorized_voter_keypairs, @@ -1563,6 +1577,7 @@ impl ReplayStage { bank: &Bank, poh_recorder: &Mutex, vote_account_pubkey: &Pubkey, + identity_keypair: &Keypair, authorized_voter_keypairs: &[Arc], tower: &mut Tower, switch_fork_decision: &SwitchForkDecision, @@ -1572,7 +1587,7 @@ impl ReplayStage { ) { let mut generate_time = Measure::start("generate_vote"); let vote_tx = Self::generate_vote_tx( - &cluster_info.keypair, + identity_keypair, bank, vote_account_pubkey, authorized_voter_keypairs, @@ -4668,6 +4683,7 @@ mod tests { let has_new_vote_been_rooted = false; let mut voted_signatures = vec![]; + let identity_keypair = cluster_info.keypair().clone(); let my_vote_keypair = vec![Arc::new( validator_keypairs.remove(&my_pubkey).unwrap().vote_keypair, )]; @@ -4693,6 +4709,7 @@ mod tests { &bank0, &poh_recorder, &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut tower, &SwitchForkDecision::SameFork, @@ -4723,6 +4740,7 @@ mod tests { &poh_recorder, Tower::last_voted_slot_in_bank(refresh_bank, &my_vote_pubkey).unwrap(), &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut voted_signatures, has_new_vote_been_rooted, @@ -4745,6 +4763,7 @@ mod tests { &bank1, &poh_recorder, &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut tower, &SwitchForkDecision::SameFork, @@ -4768,6 +4787,7 @@ mod tests { &poh_recorder, Tower::last_voted_slot_in_bank(&bank2, &my_vote_pubkey).unwrap(), &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut voted_signatures, has_new_vote_been_rooted, @@ -4804,6 +4824,7 @@ mod tests { &poh_recorder, Tower::last_voted_slot_in_bank(&expired_bank, &my_vote_pubkey).unwrap(), &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut voted_signatures, has_new_vote_been_rooted, @@ -4860,6 +4881,7 @@ mod tests { &poh_recorder, Tower::last_voted_slot_in_bank(&expired_bank_sibling, &my_vote_pubkey).unwrap(), &my_vote_pubkey, + &identity_keypair, &my_vote_keypair, &mut voted_signatures, has_new_vote_been_rooted, diff --git a/core/src/test_validator.rs b/core/src/test_validator.rs index e84ac0f23..14a97cc0d 100644 --- a/core/src/test_validator.rs +++ b/core/src/test_validator.rs @@ -1,7 +1,11 @@ use { crate::validator::{Validator, ValidatorConfig, ValidatorStartProgress}, solana_client::rpc_client::RpcClient, - solana_gossip::{cluster_info::Node, gossip_service::discover_cluster, socketaddr}, + solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + gossip_service::discover_cluster, + socketaddr, + }, solana_ledger::{blockstore::create_new_ledger, create_new_tmp_ledger}, solana_net_utils::PortRange, solana_rpc::rpc::JsonRpcConfig, @@ -502,7 +506,7 @@ impl TestValidator { let validator = Some(Validator::new( node, - &Arc::new(validator_identity), + Arc::new(validator_identity), &ledger_path, &vote_account_address, config.authorized_voter_keypairs.clone(), @@ -599,6 +603,10 @@ impl TestValidator { validator.join(); } } + + pub fn cluster_info(&self) -> Arc { + self.validator.as_ref().unwrap().cluster_info.clone() + } } impl Drop for TestValidator { diff --git a/core/src/validator.rs b/core/src/validator.rs index 62eaf4661..12b055aff 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -260,6 +260,7 @@ pub struct Validator { tpu: Tpu, tvu: Tvu, ip_echo_server: Option, + pub cluster_info: Arc, } // in the distant future, get rid of ::new()/exit() and use Result properly... @@ -279,7 +280,7 @@ pub(crate) fn abort() -> ! { impl Validator { pub fn new( mut node: Node, - identity_keypair: &Arc, + identity_keypair: Arc, ledger_path: &Path, vote_account: &Pubkey, authorized_voter_keypairs: Arc>>>, @@ -437,7 +438,7 @@ impl Validator { } } - let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone()); + let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair); cluster_info.set_contact_debug_interval(config.contact_debug_interval); cluster_info.set_entrypoints(cluster_entrypoints); cluster_info.restore_contact_info(ledger_path, config.contact_save_interval); @@ -790,6 +791,7 @@ impl Validator { poh_recorder, ip_echo_server, validator_exit: config.validator_exit.clone(), + cluster_info, } } @@ -829,6 +831,8 @@ impl Validator { } pub fn join(self) { + drop(self.cluster_info); + self.poh_service.join().expect("poh_service"); drop(self.poh_recorder); @@ -1628,7 +1632,7 @@ mod tests { let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); let validator = Validator::new( validator_node, - &Arc::new(validator_keypair), + Arc::new(validator_keypair), &validator_ledger_path, &voting_keypair.pubkey(), Arc::new(RwLock::new(vec![voting_keypair.clone()])), @@ -1706,7 +1710,7 @@ mod tests { }; Validator::new( validator_node, - &Arc::new(validator_keypair), + Arc::new(validator_keypair), &validator_ledger_path, &vote_account_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_account_keypair)])), diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 4875cb700..46f3fb3f2 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -213,7 +213,7 @@ pub struct ClusterInfo { /// The network pub gossip: RwLock, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. - pub keypair: Arc, + keypair: RwLock>, /// Network entrypoints entrypoints: RwLock>, outbound_budget: DataBudget, @@ -224,7 +224,7 @@ pub struct ClusterInfo { local_message_pending_push_queue: Mutex>, contact_debug_interval: u64, // milliseconds, 0 = disabled contact_save_interval: u64, // milliseconds, 0 = disabled - instance: NodeInstance, + instance: RwLock, contact_info_path: PathBuf, } @@ -469,7 +469,7 @@ impl ClusterInfo { let id = contact_info.id; let me = Self { gossip: RwLock::new(CrdsGossip::default()), - keypair, + keypair: RwLock::new(keypair), entrypoints: RwLock::new(vec![]), outbound_budget: DataBudget::default(), my_contact_info: RwLock::new(contact_info), @@ -481,7 +481,7 @@ impl ClusterInfo { socket: UdpSocket::bind("0.0.0.0:0").unwrap(), local_message_pending_push_queue: Mutex::default(), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, - instance: NodeInstance::new(&mut thread_rng(), id, timestamp()), + instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())), contact_info_path: PathBuf::default(), contact_save_interval: 0, // disabled }; @@ -503,7 +503,7 @@ impl ClusterInfo { my_contact_info.id = *new_id; ClusterInfo { gossip: RwLock::new(gossip), - keypair: self.keypair.clone(), + keypair: RwLock::new(self.keypair.read().unwrap().clone()), entrypoints: RwLock::new(self.entrypoints.read().unwrap().clone()), outbound_budget: self.outbound_budget.clone_non_atomic(), my_contact_info: RwLock::new(my_contact_info), @@ -517,7 +517,7 @@ impl ClusterInfo { .clone(), ), contact_debug_interval: self.contact_debug_interval, - instance: NodeInstance::new(&mut thread_rng(), *new_id, timestamp()), + instance: RwLock::new(NodeInstance::new(&mut thread_rng(), *new_id, timestamp())), contact_info_path: PathBuf::default(), contact_save_interval: 0, // disabled } @@ -536,10 +536,10 @@ impl ClusterInfo { self.my_contact_info.write().unwrap().wallclock = now; let entries: Vec<_> = vec![ CrdsData::ContactInfo(self.my_contact_info()), - CrdsData::NodeInstance(self.instance.with_wallclock(now)), + CrdsData::NodeInstance(self.instance.read().unwrap().with_wallclock(now)), ] .into_iter() - .map(|v| CrdsValue::new_signed(v, &self.keypair)) + .map(|v| CrdsValue::new_signed(v, &self.keypair())) .collect(); self.local_message_pending_push_queue .lock() @@ -553,7 +553,7 @@ impl ClusterInfo { // TODO kill insert_info, only used by tests pub fn insert_info(&self, contact_info: ContactInfo) { - let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair); + let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair()); let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); } @@ -680,6 +680,25 @@ impl ClusterInfo { self.my_contact_info.read().unwrap().id } + pub fn keypair(&self) -> RwLockReadGuard> { + self.keypair.read().unwrap() + } + + pub fn set_keypair(&self, new_keypair: Arc) { + let id = new_keypair.pubkey(); + + self.gossip.write().unwrap().set_self(&id); + { + let mut instance = self.instance.write().unwrap(); + *instance = instance.with_id(id); + } + *self.keypair.write().unwrap() = new_keypair; + self.my_contact_info.write().unwrap().id = id; + + self.insert_self(); + self.push_self(&HashMap::new(), None); + } + pub fn lookup_contact_info(&self, id: &Pubkey, map: F) -> Option where F: FnOnce(&ContactInfo) -> Y, @@ -887,7 +906,7 @@ impl ClusterInfo { if min > last { let entry = CrdsValue::new_signed( CrdsData::LowestSlot(0, LowestSlot::new(id, min, now)), - &self.keypair, + &self.keypair(), ); self.local_message_pending_push_queue .lock() @@ -948,7 +967,7 @@ impl ClusterInfo { update = &update[n..]; if n > 0 { let epoch_slots = CrdsData::EpochSlots(ix, slots); - let entry = CrdsValue::new_signed(epoch_slots, &self.keypair); + let entry = CrdsValue::new_signed(epoch_slots, &self.keypair()); entries.push(entry); } epoch_slot_index += 1; @@ -991,7 +1010,7 @@ impl ClusterInfo { } let message = CrdsData::AccountsHashes(SnapshotHash::new(self.id(), accounts_hashes)); - self.push_message(CrdsValue::new_signed(message, &self.keypair)); + self.push_message(CrdsValue::new_signed(message, &self.keypair())); } pub fn push_snapshot_hashes(&self, snapshot_hashes: Vec<(Slot, Hash)>) { @@ -1004,7 +1023,7 @@ impl ClusterInfo { } let message = CrdsData::SnapshotHashes(SnapshotHash::new(self.id(), snapshot_hashes)); - self.push_message(CrdsValue::new_signed(message, &self.keypair)); + self.push_message(CrdsValue::new_signed(message, &self.keypair())); } fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) { @@ -1013,7 +1032,7 @@ impl ClusterInfo { let now = timestamp(); let vote = Vote::new(self_pubkey, vote, now); let vote = CrdsData::Vote(vote_index, vote); - let vote = CrdsValue::new_signed(vote, &self.keypair); + let vote = CrdsValue::new_signed(vote, &self.keypair()); self.gossip .write() .unwrap() @@ -1136,7 +1155,7 @@ impl ClusterInfo { other_payload: &[u8], ) -> Result<(), GossipError> { self.gossip.write().unwrap().push_duplicate_shred( - &self.keypair, + &self.keypair(), shred, other_payload, None:: Option>, // Leader schedule @@ -1431,8 +1450,10 @@ impl ClusterInfo { } fn insert_self(&self) { - let value = - CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); + let value = CrdsValue::new_signed( + CrdsData::ContactInfo(self.my_contact_info()), + &self.keypair(), + ); let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); } @@ -1545,7 +1566,7 @@ impl ClusterInfo { let gossip = self.time_gossip_read_lock("new_pull_reqs", &self.stats.new_pull_requests); match gossip.new_pull_request( thread_pool, - self.keypair.deref(), + self.keypair().deref(), now, gossip_validators, stakes, @@ -1568,7 +1589,7 @@ impl ClusterInfo { } } let self_info = CrdsData::ContactInfo(self.my_contact_info()); - let self_info = CrdsValue::new_signed(self_info, &self.keypair); + let self_info = CrdsValue::new_signed(self_info, &self.keypair()); let pulls = pulls .into_iter() .flat_map(|(peer, filters)| repeat(peer.gossip).zip(filters)) @@ -1804,10 +1825,12 @@ impl ClusterInfo { let recycler = PacketsRecycler::default(); let crds_data = vec![ CrdsData::Version(Version::new(self.id())), - CrdsData::NodeInstance(self.instance.with_wallclock(timestamp())), + CrdsData::NodeInstance( + self.instance.read().unwrap().with_wallclock(timestamp()), + ), ]; for value in crds_data { - let value = CrdsValue::new_signed(value, &self.keypair); + let value = CrdsValue::new_signed(value, &self.keypair()); self.push_message(value); } let mut generate_pull_requests = true; @@ -1997,7 +2020,7 @@ impl ClusterInfo { R: Rng + CryptoRng, { let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new(); - let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair).ok(); + let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair()).ok(); let mut ping_cache = self.ping_cache.lock().unwrap(); let mut hard_check = move |node| { let (check, ping) = ping_cache.check(now, node, &mut pingf); @@ -2266,7 +2289,7 @@ impl ClusterInfo { let packets: Vec<_> = pings .into_iter() .filter_map(|(addr, ping)| { - let pong = Pong::new(&ping, &self.keypair).ok()?; + let pong = Pong::new(&ping, &self.keypair()).ok()?; let pong = Protocol::PongMessage(pong); match Packet::from_data(Some(&addr), pong) { Ok(packet) => Some(packet), @@ -2368,7 +2391,7 @@ impl ClusterInfo { destination: from, wallclock, }; - prune_data.sign(&self.keypair); + prune_data.sign(&self.keypair()); let prune_message = Protocol::PruneMessage(self_pubkey, prune_data); Some((peer.gossip, prune_message)) }) @@ -2469,8 +2492,9 @@ impl ClusterInfo { // this node with more recent timestamp. let check_duplicate_instance = |values: &[CrdsValue]| { if should_check_duplicate_instance { + let instance = self.instance.read().unwrap(); for value in values { - if self.instance.check_duplicate(value) { + if instance.check_duplicate(value) { return Err(GossipError::DuplicateNodeInstance); } } @@ -3567,7 +3591,7 @@ mod tests { .unwrap() .new_pull_request( &thread_pool, - cluster_info.keypair.deref(), + cluster_info.keypair().deref(), timestamp(), None, &HashMap::new(), diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index b725579e9..7670c74ba 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -395,24 +395,26 @@ pub struct NodeInstance { } impl NodeInstance { - pub fn new(rng: &mut R, pubkey: Pubkey, now: u64) -> Self + pub fn new(rng: &mut R, from: Pubkey, now: u64) -> Self where R: Rng + CryptoRng, { Self { - from: pubkey, + from, wallclock: now, timestamp: now, token: rng.gen(), } } + // Clones the value with an updated id. + pub(crate) fn with_id(&self, from: Pubkey) -> Self { + Self { from, ..*self } + } + // Clones the value with an updated wallclock. - pub(crate) fn with_wallclock(&self, now: u64) -> Self { - Self { - wallclock: now, - ..*self - } + pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self { + Self { wallclock, ..*self } } // Returns true if the crds-value is a duplicate instance diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index 73ea8b607..f50f78cf9 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -121,7 +121,7 @@ pub fn discover_cluster( } pub fn discover( - keypair: Option>, + keypair: Option, entrypoint: Option<&SocketAddr>, num_nodes: Option, // num_nodes only counts validators, excludes spy nodes timeout: Duration, @@ -133,8 +133,10 @@ pub fn discover( Vec, // all gossip peers Vec, // tvu peers (validators) )> { - let keypair = keypair.unwrap_or_else(|| Arc::new(Keypair::new())); - + let keypair = { + #[allow(clippy::redundant_closure)] + keypair.unwrap_or_else(|| Keypair::new()) + }; let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, ip_echo, spy_ref) = make_gossip_node( keypair, @@ -295,7 +297,7 @@ fn spy( /// Makes a spy or gossip node based on whether or not a gossip_addr was passed in /// Pass in a gossip addr to fully participate in gossip instead of relying on just pulls fn make_gossip_node( - keypair: Arc, + keypair: Keypair, entrypoint: Option<&SocketAddr>, exit: &Arc, gossip_addr: Option<&SocketAddr>, @@ -307,7 +309,7 @@ fn make_gossip_node( } else { ClusterInfo::spy_node(keypair.pubkey(), shred_version) }; - let cluster_info = ClusterInfo::new(node, keypair); + let cluster_info = ClusterInfo::new(node, Arc::new(keypair)); if let Some(entrypoint) = entrypoint { cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); } diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 6eeeef1a1..ff1d06ea1 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -15,7 +15,6 @@ use { error, net::{IpAddr, Ipv4Addr, SocketAddr}, process::exit, - sync::Arc, time::Duration, }, }; @@ -225,7 +224,7 @@ fn process_spy(matches: &ArgMatches) -> std::io::Result<()> { .value_of("node_pubkey") .map(|pubkey_str| pubkey_str.parse::().unwrap()); let shred_version = value_t_or_exit!(matches, "shred_version", u16); - let identity_keypair = keypair_of(matches, "identity").map(Arc::new); + let identity_keypair = keypair_of(matches, "identity"); let entrypoint_addr = parse_entrypoint(matches); diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index db496e3e7..6f7beaf3e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -211,7 +211,7 @@ impl LocalCluster { let leader_server = Validator::new( leader_node, - &leader_keypair, + leader_keypair.clone(), &leader_ledger_path, &leader_vote_keypair.pubkey(), Arc::new(RwLock::new(vec![leader_vote_keypair.clone()])), @@ -353,7 +353,7 @@ impl LocalCluster { let voting_keypair = voting_keypair.unwrap(); let validator_server = Validator::new( validator_node, - &validator_keypair, + validator_keypair.clone(), &ledger_path, &voting_keypair.pubkey(), Arc::new(RwLock::new(vec![voting_keypair.clone()])), @@ -667,7 +667,7 @@ impl Cluster for LocalCluster { vec![validator_info.ledger_path.join("accounts")]; let restarted_node = Validator::new( node, - &validator_info.keypair, + validator_info.keypair.clone(), &validator_info.ledger_path, &validator_info.voting_keypair.pubkey(), Arc::new(RwLock::new(vec![validator_info.voting_keypair.clone()])), diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index e173ead79..b8f7c15f3 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -6,6 +6,7 @@ use { jsonrpc_server_utils::tokio, log::*, solana_core::validator::ValidatorStartProgress, + solana_gossip::cluster_info::ClusterInfo, solana_sdk::{ exit::Exit, signature::{read_keypair_file, Keypair, Signer}, @@ -26,6 +27,7 @@ pub struct AdminRpcRequestMetadata { pub start_progress: Arc>, pub validator_exit: Arc>, pub authorized_voter_keypairs: Arc>>>, + pub cluster_info: Arc>>>, } impl Metadata for AdminRpcRequestMetadata {} @@ -53,6 +55,9 @@ pub trait AdminRpc { #[rpc(meta, name = "removeAllAuthorizedVoters")] fn remove_all_authorized_voters(&self, meta: Self::Metadata) -> Result<()>; + + #[rpc(meta, name = "setIdentity")] + fn set_identity(&self, meta: Self::Metadata, keypair_file: String) -> Result<()>; } pub struct AdminRpcImpl; @@ -128,6 +133,24 @@ impl AdminRpc for AdminRpcImpl { meta.authorized_voter_keypairs.write().unwrap().clear(); Ok(()) } + + fn set_identity(&self, meta: Self::Metadata, keypair_file: String) -> Result<()> { + debug!("set_identity request received"); + + let identity_keypair = read_keypair_file(keypair_file) + .map_err(|err| jsonrpc_core::error::Error::invalid_params(format!("{}", err)))?; + + if let Some(cluster_info) = meta.cluster_info.read().unwrap().as_ref() { + solana_metrics::set_host_id(identity_keypair.pubkey().to_string()); + cluster_info.set_keypair(Arc::new(identity_keypair)); + warn!("Identity set to {}", cluster_info.id()); + Ok(()) + } else { + Err(jsonrpc_core::error::Error::invalid_params( + "Retry once validator start up is complete", + )) + } + } } // Start the Admin RPC interface diff --git a/validator/src/bin/solana-test-validator.rs b/validator/src/bin/solana-test-validator.rs index 43cb4245f..efe5c6073 100644 --- a/validator/src/bin/solana-test-validator.rs +++ b/validator/src/bin/solana-test-validator.rs @@ -32,7 +32,7 @@ use { net::{IpAddr, Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, process::exit, - sync::mpsc::channel, + sync::{mpsc::channel, Arc, RwLock}, time::{Duration, SystemTime, UNIX_EPOCH}, }, }; @@ -501,6 +501,7 @@ fn main() { let mut genesis = TestValidatorGenesis::default(); genesis.max_ledger_shreds = value_of(&matches, "limit_ledger_size"); + let admin_service_cluster_info = Arc::new(RwLock::new(None)); admin_rpc_service::run( &ledger_path, admin_rpc_service::AdminRpcRequestMetadata { @@ -512,6 +513,7 @@ fn main() { start_time: std::time::SystemTime::now(), validator_exit: genesis.validator_exit.clone(), authorized_voter_keypairs: genesis.authorized_voter_keypairs.clone(), + cluster_info: admin_service_cluster_info.clone(), }, ); let dashboard = if output == Output::Dashboard { @@ -584,6 +586,7 @@ fn main() { match genesis.start_with_mint_address(mint_address) { Ok(test_validator) => { + *admin_service_cluster_info.write().unwrap() = Some(test_validator.cluster_info()); if let Some(dashboard) = dashboard { dashboard.run(Duration::from_millis(250)); } diff --git a/validator/src/main.rs b/validator/src/main.rs index 54ebc61a2..05d64802c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -349,7 +349,7 @@ fn get_trusted_snapshot_hashes( } fn start_gossip_node( - identity_keypair: &Arc, + identity_keypair: Arc, cluster_entrypoints: &[ContactInfo], ledger_path: &Path, gossip_addr: &SocketAddr, @@ -358,14 +358,12 @@ fn start_gossip_node( gossip_validators: Option>, should_check_duplicate_instance: bool, ) -> (Arc, Arc, GossipService) { - let mut cluster_info = ClusterInfo::new( - ClusterInfo::gossip_contact_info( - identity_keypair.pubkey(), - *gossip_addr, - expected_shred_version.unwrap_or(0), - ), - identity_keypair.clone(), + let contact_info = ClusterInfo::gossip_contact_info( + identity_keypair.pubkey(), + *gossip_addr, + expected_shred_version.unwrap_or(0), ); + let mut cluster_info = ClusterInfo::new(contact_info, identity_keypair); cluster_info.set_entrypoints(cluster_entrypoints.to_vec()); cluster_info.restore_contact_info(ledger_path, 0); let cluster_info = Arc::new(cluster_info); @@ -776,7 +774,7 @@ fn rpc_bootstrap( *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; gossip = Some(start_gossip_node( - identity_keypair, + identity_keypair.clone(), cluster_entrypoints, ledger_path, &node.info.gossip, @@ -1914,6 +1912,20 @@ pub fn main() { SubCommand::with_name("run") .about("Run the validator") ) + .subcommand( + SubCommand::with_name("set-identity") + .about("Set the validator identity") + .arg( + Arg::with_name("identity") + .index(1) + .value_name("KEYPAIR") + .takes_value(true) + .validator(is_keypair) + .help("Validator identity keypair") + ) + .after_help("Note: the new identity only applies to the \ + currently running validator instance") + ) .subcommand( SubCommand::with_name("set-log-filter") .about("Adjust the validator log filter") @@ -2028,6 +2040,29 @@ pub fn main() { monitor_validator(&ledger_path); return; } + ("set-identity", Some(subcommand_matches)) => { + let identity_keypair = value_t_or_exit!(subcommand_matches, "identity", String); + + let identity_keypair = fs::canonicalize(&identity_keypair).unwrap_or_else(|err| { + println!("Unable to access path: {}: {:?}", identity_keypair, err); + exit(1); + }); + println!("Validator identity: {}", identity_keypair.display()); + + let admin_client = admin_rpc_service::connect(&ledger_path); + admin_rpc_service::runtime() + .block_on(async move { + admin_client + .await? + .set_identity(identity_keypair.display().to_string()) + .await + }) + .unwrap_or_else(|err| { + println!("setIdentity request failed: {}", err); + exit(1); + }); + return; + } ("set-log-filter", Some(subcommand_matches)) => { let filter = value_t_or_exit!(subcommand_matches, "filter", String); let admin_client = admin_rpc_service::connect(&ledger_path); @@ -2050,17 +2085,21 @@ pub fn main() { _ => unreachable!(), }; - let identity_keypair = Arc::new(keypair_of(&matches, "identity").unwrap_or_else(|| { + let identity_keypair = keypair_of(&matches, "identity").unwrap_or_else(|| { clap::Error::with_description( "The --identity argument is required", clap::ErrorKind::ArgumentNotFound, ) .exit(); - })); + }); let authorized_voter_keypairs = keypairs_of(&matches, "authorized_voter_keypairs") .map(|keypairs| keypairs.into_iter().map(Arc::new).collect()) - .unwrap_or_else(|| vec![identity_keypair.clone()]); + .unwrap_or_else(|| { + vec![Arc::new( + keypair_of(&matches, "identity").expect("identity"), + )] + }); let authorized_voter_keypairs = Arc::new(RwLock::new(authorized_voter_keypairs)); let init_complete_file = matches.value_of("init_complete_file"); @@ -2472,6 +2511,7 @@ pub fn main() { info!("Starting validator with: {:#?}", std::env::args_os()); let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); + let admin_service_cluster_info = Arc::new(RwLock::new(None)); admin_rpc_service::run( &ledger_path, admin_rpc_service::AdminRpcRequestMetadata { @@ -2480,6 +2520,7 @@ pub fn main() { validator_exit: validator_config.validator_exit.clone(), start_progress: start_progress.clone(), authorized_voter_keypairs: authorized_voter_keypairs.clone(), + cluster_info: admin_service_cluster_info.clone(), }, ); @@ -2582,6 +2623,8 @@ pub fn main() { solana_ledger::entry::init_poh(); solana_runtime::snapshot_utils::remove_tmp_snapshot_archives(&snapshot_output_dir); + let identity_keypair = Arc::new(identity_keypair); + let should_check_duplicate_instance = !matches.is_present("no_duplicate_instance_check"); if !cluster_entrypoints.is_empty() { rpc_bootstrap( @@ -2612,7 +2655,7 @@ pub fn main() { let validator = Validator::new( node, - &identity_keypair, + identity_keypair, &ledger_path, &vote_account, authorized_voter_keypairs, @@ -2621,6 +2664,7 @@ pub fn main() { should_check_duplicate_instance, start_progress, ); + *admin_service_cluster_info.write().unwrap() = Some(validator.cluster_info.clone()); if let Some(filename) = init_complete_file { File::create(filename).unwrap_or_else(|_| {