From bab35022605623ce95ec3f9e9e6301fee48fe34a Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 21 Apr 2020 12:54:45 -0700 Subject: [PATCH] Push down cluster_info lock (#9594) * Push down cluster_info lock * Rework budget decrement Co-authored-by: Carl --- archiver-lib/src/archiver.rs | 43 +- banking-bench/src/main.rs | 4 +- core/benches/banking_stage.rs | 2 +- core/benches/cluster_info.rs | 2 +- core/benches/retransmit_stage.rs | 2 +- core/src/accounts_hash_verifier.rs | 42 +- core/src/banking_stage.rs | 25 +- core/src/broadcast_stage.rs | 26 +- .../broadcast_fake_shreds_run.rs | 6 +- .../fail_entry_verification_broadcast_run.rs | 2 +- .../broadcast_stage/standard_broadcast_run.rs | 14 +- core/src/cluster_info.rs | 647 +++++++++++------- core/src/cluster_info_vote_listener.rs | 6 +- core/src/cluster_slots.rs | 13 +- core/src/gossip_service.rs | 47 +- core/src/repair_service.rs | 46 +- core/src/replay_stage.rs | 13 +- core/src/retransmit_stage.rs | 19 +- core/src/rpc.rs | 35 +- core/src/rpc_service.rs | 140 ++-- core/src/serve_repair.rs | 25 +- core/src/snapshot_packager_service.rs | 14 +- core/src/storage_stage.rs | 10 +- core/src/tpu.rs | 2 +- core/src/tvu.rs | 14 +- core/src/validator.rs | 28 +- core/src/window_service.rs | 10 +- core/tests/bank_forks.rs | 5 +- core/tests/cluster_info.rs | 7 +- core/tests/gossip.rs | 58 +- local-cluster/tests/archiver.rs | 9 +- validator/src/main.rs | 34 +- 32 files changed, 679 insertions(+), 671 deletions(-) diff --git a/archiver-lib/src/archiver.rs b/archiver-lib/src/archiver.rs index 901ec86cb..97c4a9e70 100644 --- a/archiver-lib/src/archiver.rs +++ b/archiver-lib/src/archiver.rs @@ -53,7 +53,7 @@ use std::{ result, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::{channel, Receiver, Sender}, - sync::{Arc, RwLock}, + sync::Arc, thread::{sleep, spawn, JoinHandle}, time::Duration, }; @@ -185,9 +185,9 @@ impl Archiver { info!("Archiver: id: {}", keypair.pubkey()); info!("Creating cluster info...."); - let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); + let cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); cluster_info.set_entrypoint(cluster_entrypoint.clone()); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let cluster_slots = Arc::new(ClusterSlots::default()); // Note for now, this ledger will not contain any of the existing entries // in the ledger located at ledger_path, and will only append on newly received @@ -308,7 +308,7 @@ impl Archiver { fn run( meta: &mut ArchiverMeta, blockstore: &Arc, - cluster_info: Arc>, + cluster_info: Arc, archiver_keypair: &Arc, storage_keypair: &Arc, exit: &Arc, @@ -365,12 +365,12 @@ impl Archiver { } fn redeem_rewards( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, archiver_keypair: &Arc, storage_keypair: &Arc, client_commitment: CommitmentConfig, ) { - let nodes = cluster_info.read().unwrap().tvu_peers(); + let nodes = cluster_info.tvu_peers(); let client = solana_core::gossip_service::get_client(&nodes); if let Ok(Some(account)) = @@ -405,7 +405,7 @@ impl Archiver { #[allow(clippy::too_many_arguments)] fn setup( meta: &mut ArchiverMeta, - cluster_info: Arc>, + cluster_info: Arc, blockstore: &Arc, exit: &Arc, node_info: &ContactInfo, @@ -491,7 +491,7 @@ impl Archiver { blockstore: &Arc, exit: &Arc, node_info: &ContactInfo, - cluster_info: Arc>, + cluster_info: Arc, ) { info!( "window created, waiting for ledger download starting at slot {:?}", @@ -519,11 +519,8 @@ impl Archiver { contact_info.tvu = "0.0.0.0:0".parse().unwrap(); contact_info.wallclock = timestamp(); // copy over the adopted shred_version from the entrypoint - contact_info.shred_version = cluster_info.read().unwrap().my_data().shred_version; - { - let mut cluster_info_w = cluster_info.write().unwrap(); - cluster_info_w.insert_self(contact_info); - } + contact_info.shred_version = cluster_info.my_shred_version(); + cluster_info.update_contact_info(|current| *current = contact_info); } fn encrypt_ledger(meta: &mut ArchiverMeta, blockstore: &Arc) -> Result<()> { @@ -626,12 +623,12 @@ impl Archiver { fn submit_mining_proof( meta: &ArchiverMeta, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, archiver_keypair: &Arc, storage_keypair: &Arc, ) { // No point if we've got no storage account... - let nodes = cluster_info.read().unwrap().tvu_peers(); + let nodes = cluster_info.tvu_peers(); let client = solana_core::gossip_service::get_client(&nodes); let storage_balance = client .poll_get_balance_with_commitment(&storage_keypair.pubkey(), meta.client_commitment); @@ -689,13 +686,10 @@ impl Archiver { } fn get_segment_config( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, client_commitment: CommitmentConfig, ) -> Result { - let rpc_peers = { - let cluster_info = cluster_info.read().unwrap(); - cluster_info.all_rpc_peers() - }; + let rpc_peers = cluster_info.all_rpc_peers(); debug!("rpc peers: {:?}", rpc_peers); if !rpc_peers.is_empty() { let rpc_client = { @@ -721,7 +715,7 @@ impl Archiver { /// Waits until the first segment is ready, and returns the current segment fn poll_for_segment( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, slots_per_segment: u64, previous_blockhash: &Hash, exit: &Arc, @@ -741,17 +735,14 @@ impl Archiver { /// Poll for a different blockhash and associated max_slot than `previous_blockhash` fn poll_for_blockhash_and_slot( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, slots_per_segment: u64, previous_blockhash: &Hash, exit: &Arc, ) -> Result<(Hash, u64)> { info!("waiting for the next turn..."); loop { - let rpc_peers = { - let cluster_info = cluster_info.read().unwrap(); - cluster_info.all_rpc_peers() - }; + let rpc_peers = cluster_info.all_rpc_peers(); debug!("rpc peers: {:?}", rpc_peers); if !rpc_peers.is_empty() { let rpc_client = { diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index a9b97c08e..176117d23 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -28,7 +28,7 @@ use solana_sdk::{ transaction::Transaction, }; use std::{ - sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock}, + sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex}, thread::sleep, time::{Duration, Instant}, }; @@ -152,7 +152,7 @@ fn main() { let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank, &blockstore, None); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index c8f3a0427..6351292d6 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -190,7 +190,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let (exit, poh_recorder, poh_service, signal_receiver) = create_test_recorder(&bank, &blockstore, None); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 3e8b8701f..db8d2d6ec 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -36,7 +36,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } let stakes = Arc::new(stakes); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone())); let shreds = Arc::new(shreds); let last_datapoint = Arc::new(AtomicU64::new(0)); diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index ae3280fd6..067ed8440 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -45,7 +45,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { peer_sockets.push(socket); } let peer_sockets = Arc::new(peer_sockets); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); let bank0 = Bank::new(&genesis_config); diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index c3cf4368a..4959b172b 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -15,7 +15,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, - Arc, RwLock, + Arc, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -30,7 +30,7 @@ impl AccountsHashVerifier { accounts_package_receiver: AccountsPackageReceiver, accounts_package_sender: Option, exit: &Arc, - cluster_info: &Arc>, + cluster_info: &Arc, trusted_validators: Option>, halt_on_trusted_validators_accounts_hash_mismatch: bool, fault_injection_rate_slots: u64, @@ -74,7 +74,7 @@ impl AccountsHashVerifier { fn process_accounts_package( accounts_package: AccountsPackage, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, trusted_validators: &Option>, halt_on_trusted_validator_accounts_hash_mismatch: bool, accounts_package_sender: &Option, @@ -117,14 +117,11 @@ impl AccountsHashVerifier { } } - cluster_info - .write() - .unwrap() - .push_accounts_hashes(hashes.clone()); + cluster_info.push_accounts_hashes(hashes.clone()); } fn should_halt( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, trusted_validators: &Option>, slot_to_hash: &mut HashMap, ) -> bool { @@ -132,11 +129,9 @@ impl AccountsHashVerifier { let mut highest_slot = 0; if let Some(trusted_validators) = trusted_validators.as_ref() { for trusted_validator in trusted_validators { - let cluster_info_r = cluster_info.read().unwrap(); - if let Some(accounts_hashes) = - cluster_info_r.get_accounts_hash_for_node(trusted_validator) + let is_conflicting = cluster_info.get_accounts_hash_for_node(trusted_validator, |accounts_hashes| { - for (slot, hash) in accounts_hashes { + accounts_hashes.iter().any(|(slot, hash)| { if let Some(reference_hash) = slot_to_hash.get(slot) { if *hash != *reference_hash { error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})", @@ -145,16 +140,21 @@ impl AccountsHashVerifier { hash, reference_hash, ); - - return true; + true } else { verified_count += 1; + false } } else { highest_slot = std::cmp::max(*slot, highest_slot); slot_to_hash.insert(*slot, *hash); + false } - } + }) + }).unwrap_or(false); + + if is_conflicting { + return true; } } } @@ -188,7 +188,7 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let mut trusted_validators = HashSet::new(); let mut slot_to_hash = HashMap::new(); @@ -203,8 +203,7 @@ mod tests { let hash2 = hash(&[2]); { let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap(); - let mut cluster_info_w = cluster_info.write().unwrap(); - cluster_info_w.push_message(message); + cluster_info.push_message(message); } slot_to_hash.insert(0, hash2); trusted_validators.insert(validator1.pubkey()); @@ -224,7 +223,7 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let trusted_validators = HashSet::new(); let exit = Arc::new(AtomicBool::new(false)); @@ -254,9 +253,8 @@ mod tests { 100, ); } - let cluster_info_r = cluster_info.read().unwrap(); - let cluster_hashes = cluster_info_r - .get_accounts_hash_for_node(&keypair.pubkey()) + let cluster_hashes = cluster_info + .get_accounts_hash_for_node(&keypair.pubkey(), |c| c.clone()) .unwrap(); info!("{:?}", cluster_hashes); assert_eq!(hashes.len(), MAX_SNAPSHOT_HASHES); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 99a1a75ff..0479539db 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -41,7 +41,7 @@ use std::{ net::UdpSocket, sync::atomic::AtomicBool, sync::mpsc::Receiver, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, thread::{self, Builder, JoinHandle}, time::Duration, time::Instant, @@ -76,7 +76,7 @@ impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] pub fn new( - cluster_info: &Arc>, + cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, @@ -93,7 +93,7 @@ impl BankingStage { } fn new_num_threads( - cluster_info: &Arc>, + cluster_info: &Arc, poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, @@ -104,7 +104,7 @@ impl BankingStage { // Single thread to generate entries from many banks. // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. - let my_pubkey = cluster_info.read().unwrap().id(); + let my_pubkey = cluster_info.id(); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -287,7 +287,7 @@ impl BankingStage { my_pubkey: &Pubkey, socket: &std::net::UdpSocket, poh_recorder: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, buffered_packets: &mut Vec, enable_forwarding: bool, batch_limit: usize, @@ -331,10 +331,7 @@ impl BankingStage { next_leader.map_or((), |leader_pubkey| { let leader_addr = { cluster_info - .read() - .unwrap() - .lookup(&leader_pubkey) - .map(|leader| leader.tpu_forwards) + .lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards) }; leader_addr.map_or((), |leader_addr| { @@ -358,7 +355,7 @@ impl BankingStage { my_pubkey: Pubkey, verified_receiver: &CrossbeamReceiver>, poh_recorder: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, recv_start: &mut Instant, enable_forwarding: bool, id: u32, @@ -1049,7 +1046,7 @@ mod tests { let (exit, poh_recorder, poh_service, _entry_receiever) = create_test_recorder(&bank, &blockstore, None); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -1089,7 +1086,7 @@ mod tests { let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -1152,7 +1149,7 @@ mod tests { let (exit, poh_recorder, poh_service, entry_receiver) = create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let banking_stage = BankingStage::new( &cluster_info, &poh_recorder, @@ -1293,7 +1290,7 @@ mod tests { create_test_recorder(&bank, &blockstore, Some(poh_config)); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let _banking_stage = BankingStage::new_num_threads( &cluster_info, &poh_recorder, diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index e0800a925..568e95f65 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -29,7 +29,7 @@ use std::{ net::UdpSocket, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, Mutex}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -62,14 +62,14 @@ impl BroadcastStageType { pub fn new_broadcast_stage( &self, sock: Vec, - cluster_info: Arc>, + cluster_info: Arc, receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, blockstore: &Arc, shred_version: u16, ) -> BroadcastStage { - let keypair = cluster_info.read().unwrap().keypair.clone(); + let keypair = cluster_info.keypair.clone(); match self { BroadcastStageType::Standard => BroadcastStage::new( sock, @@ -116,7 +116,7 @@ trait BroadcastRun { fn transmit( &mut self, receiver: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sock: &UdpSocket, ) -> Result<()>; fn record( @@ -205,7 +205,7 @@ impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn new( socks: Vec, - cluster_info: Arc>, + cluster_info: Arc, receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, exit_sender: &Arc, @@ -357,11 +357,11 @@ fn update_peer_stats( } pub fn get_broadcast_peers( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, stakes: Option>>, ) -> (Vec, Vec<(u64, usize)>) { use crate::cluster_info; - let mut peers = cluster_info.read().unwrap().tvu_peers(); + let mut peers = cluster_info.tvu_peers(); let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes); (peers, peers_and_stakes) } @@ -450,11 +450,7 @@ pub mod test { signature::{Keypair, Signer}, }; use std::{ - path::Path, - sync::atomic::AtomicBool, - sync::mpsc::channel, - sync::{Arc, RwLock}, - thread::sleep, + path::Path, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc, thread::sleep, }; pub fn make_transmit_shreds( @@ -598,16 +594,16 @@ pub mod test { let broadcast_buddy = Node::new_localhost_with_pubkey(&buddy_keypair.pubkey()); // Fill the cluster_info with the buddy's info - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); + let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); cluster_info.insert_info(broadcast_buddy.info); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let exit_sender = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Arc::new(Bank::new(&genesis_config)); - let leader_keypair = cluster_info.read().unwrap().keypair.clone(); + let leader_keypair = cluster_info.keypair.clone(); // Start up the broadcast stage let broadcast_service = BroadcastStage::new( leader_info.sockets.broadcast, diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 081e83d13..46f001fe5 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -104,11 +104,11 @@ impl BroadcastRun for BroadcastFakeShredsRun { fn transmit( &mut self, receiver: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sock: &UdpSocket, ) -> Result<()> { for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { - let peers = cluster_info.read().unwrap().tvu_peers(); + let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { if i <= self.partition && stakes.is_some() { // Send fake shreds to the first N peers @@ -145,7 +145,7 @@ mod tests { #[test] fn test_tvu_peers_ordering() { - let mut cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost( + let cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost( &Pubkey::new_rand(), 0, )); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 7fada6a42..870b05219 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -74,7 +74,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { fn transmit( &mut self, receiver: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sock: &UdpSocket, ) -> Result<()> { let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 7794d532f..790700a67 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -120,7 +120,7 @@ impl StandardBroadcastRun { #[cfg(test)] fn test_process_receive_results( &mut self, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sock: &UdpSocket, blockstore: &Arc, receive_results: ReceiveResults, @@ -288,7 +288,7 @@ impl StandardBroadcastRun { fn broadcast( &mut self, sock: &UdpSocket, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, stakes: Option>>, shreds: Arc>, broadcast_shred_batch_info: Option, @@ -374,7 +374,7 @@ impl BroadcastRun for StandardBroadcastRun { fn transmit( &mut self, receiver: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sock: &UdpSocket, ) -> Result<()> { let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; @@ -404,7 +404,7 @@ mod test { genesis_config::GenesisConfig, signature::{Keypair, Signer}, }; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; use std::time::Duration; fn setup( @@ -412,7 +412,7 @@ mod test { ) -> ( Arc, GenesisConfig, - Arc>, + Arc, Arc, Arc, UdpSocket, @@ -425,9 +425,9 @@ mod test { let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( leader_info.info.clone(), - ))); + )); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index aa825b495..16c86b7f8 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -105,16 +105,17 @@ pub struct DataBudget { last_timestamp_ms: u64, // Last time that we upped the bytes count, // used to detect when to up the bytes budget again } -#[derive(Clone)] + pub struct ClusterInfo { /// The network - pub gossip: CrdsGossip, + pub gossip: RwLock, /// set the keypair that will be used to sign crds values generated. It is unset only in tests. pub(crate) keypair: Arc, /// The network entrypoint - entrypoint: Option, - - outbound_budget: DataBudget, + entrypoint: RwLock>, + outbound_budget: RwLock, + my_contact_info: RwLock, + id: Pubkey, } #[derive(Default, Clone)] @@ -229,68 +230,101 @@ impl ClusterInfo { } pub fn new(contact_info: ContactInfo, keypair: Arc) -> Self { - let mut me = Self { - gossip: CrdsGossip::default(), + let id = contact_info.id; + let me = Self { + gossip: RwLock::new(CrdsGossip::default()), keypair, - entrypoint: None, - outbound_budget: DataBudget { + entrypoint: RwLock::new(None), + outbound_budget: RwLock::new(DataBudget { bytes: 0, last_timestamp_ms: 0, - }, + }), + my_contact_info: RwLock::new(contact_info), + id, }; - let id = contact_info.id; - me.gossip.set_self(&id); - me.insert_self(contact_info); + me.gossip.write().unwrap().set_self(&id); + me.insert_self(); me.push_self(&HashMap::new()); me } - pub fn insert_self(&mut self, contact_info: ContactInfo) { - if self.id() == contact_info.id { - let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair); - let _ = self.gossip.crds.insert(value, timestamp()); + // Should only be used by tests and simulations + pub fn clone_with_id(&self, new_id: &Pubkey) -> Self { + let mut gossip = self.gossip.read().unwrap().clone(); + gossip.id = *new_id; + let mut my_contact_info = self.my_contact_info.read().unwrap().clone(); + my_contact_info.id = *new_id; + ClusterInfo { + gossip: RwLock::new(gossip), + keypair: self.keypair.clone(), + entrypoint: RwLock::new(self.entrypoint.read().unwrap().clone()), + outbound_budget: RwLock::new(self.outbound_budget.read().unwrap().clone()), + my_contact_info: RwLock::new(my_contact_info), + id: *new_id, } } - fn push_self(&mut self, stakes: &HashMap) { - let mut my_data = self.my_data(); + pub fn update_contact_info(&self, modify: F) + where + F: FnOnce(&mut ContactInfo) -> (), + { + let my_id = self.id(); + modify(&mut self.my_contact_info.write().unwrap()); + assert_eq!(self.my_contact_info.read().unwrap().id, my_id); + self.insert_self() + } + + fn push_self(&self, stakes: &HashMap) { let now = timestamp(); - my_data.wallclock = now; - let entry = CrdsValue::new_signed(CrdsData::ContactInfo(my_data), &self.keypair); - self.gossip.refresh_push_active_set(stakes); - self.gossip - .process_push_message(&self.id(), vec![entry], now); + self.my_contact_info.write().unwrap().wallclock = now; + let entry = + CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); + let mut w_gossip = self.gossip.write().unwrap(); + w_gossip.refresh_push_active_set(stakes); + w_gossip.process_push_message(&self.id(), vec![entry], now); } // TODO kill insert_info, only used by tests - pub fn insert_info(&mut self, contact_info: ContactInfo) { + pub fn insert_info(&self, contact_info: ContactInfo) { let value = CrdsValue::new_signed(CrdsData::ContactInfo(contact_info), &self.keypair); - let _ = self.gossip.crds.insert(value, timestamp()); + let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); } - pub fn set_entrypoint(&mut self, entrypoint: ContactInfo) { - self.entrypoint = Some(entrypoint) + pub fn set_entrypoint(&self, entrypoint: ContactInfo) { + *self.entrypoint.write().unwrap() = Some(entrypoint) } pub fn id(&self) -> Pubkey { - self.gossip.id + self.id } - pub fn lookup(&self, id: &Pubkey) -> Option<&ContactInfo> { + pub fn lookup_contact_info(&self, id: &Pubkey, map: F) -> Option + where + F: FnOnce(&ContactInfo) -> Y, + { let entry = CrdsValueLabel::ContactInfo(*id); self.gossip + .read() + .unwrap() .crds .lookup(&entry) .and_then(CrdsValue::contact_info) + .map(map) } - pub fn my_data(&self) -> ContactInfo { - self.lookup(&self.id()).cloned().unwrap() + pub fn my_contact_info(&self) -> ContactInfo { + self.my_contact_info.read().unwrap().clone() + } + + pub fn my_shred_version(&self) -> u16 { + self.my_contact_info.read().unwrap().shred_version } pub fn lookup_epoch_slots(&self, ix: EpochSlotsIndex) -> EpochSlots { let entry = CrdsValueLabel::EpochSlots(ix, self.id()); self.gossip + .read() + .unwrap() .crds .lookup(&entry) .and_then(CrdsValue::epoch_slots) @@ -302,7 +336,7 @@ impl ClusterInfo { let now = timestamp(); let mut spy_nodes = 0; let mut archivers = 0; - let my_pubkey = self.my_data().id; + let my_pubkey = self.id(); let nodes: Vec<_> = self .all_peers() .into_iter() @@ -372,10 +406,12 @@ impl ClusterInfo { ) } - pub fn push_lowest_slot(&mut self, id: Pubkey, min: Slot) { + pub fn push_lowest_slot(&self, id: Pubkey, min: Slot) { let now = timestamp(); let last = self .gossip + .read() + .unwrap() .crds .lookup(&CrdsValueLabel::LowestSlot(self.id())) .and_then(|x| x.lowest_slot()) @@ -387,16 +423,20 @@ impl ClusterInfo { &self.keypair, ); self.gossip + .write() + .unwrap() .process_push_message(&self.id(), vec![entry], now); } } - pub fn push_epoch_slots(&mut self, update: &[Slot]) { + pub fn push_epoch_slots(&self, update: &[Slot]) { let mut num = 0; let mut current_slots: Vec<_> = (0..crds_value::MAX_EPOCH_SLOTS) .filter_map(|ix| { Some(( self.gossip + .read() + .unwrap() .crds .lookup(&CrdsValueLabel::EpochSlots(ix, self.id())) .and_then(CrdsValue::epoch_slots) @@ -438,6 +478,8 @@ impl ClusterInfo { if n > 0 { let entry = CrdsValue::new_signed(CrdsData::EpochSlots(ix, slots), &self.keypair); self.gossip + .write() + .unwrap() .process_push_message(&self.id(), vec![entry], now); } num += n; @@ -448,13 +490,16 @@ impl ClusterInfo { } } - pub fn push_message(&mut self, message: CrdsValue) { + pub fn push_message(&self, message: CrdsValue) { let now = message.wallclock(); let id = message.pubkey(); - self.gossip.process_push_message(&id, vec![message], now); + self.gossip + .write() + .unwrap() + .process_push_message(&id, vec![message], now); } - pub fn push_accounts_hashes(&mut self, accounts_hashes: Vec<(Slot, Hash)>) { + pub fn push_accounts_hashes(&self, accounts_hashes: Vec<(Slot, Hash)>) { if accounts_hashes.len() > MAX_SNAPSHOT_HASHES { warn!( "accounts hashes too large, ignored: {}", @@ -467,7 +512,7 @@ impl ClusterInfo { self.push_message(CrdsValue::new_signed(message, &self.keypair)); } - pub fn push_snapshot_hashes(&mut self, snapshot_hashes: Vec<(Slot, Hash)>) { + pub fn push_snapshot_hashes(&self, snapshot_hashes: Vec<(Slot, Hash)>) { if snapshot_hashes.len() > MAX_SNAPSHOT_HASHES { warn!( "snapshot hashes too large, ignored: {}", @@ -480,19 +525,20 @@ impl ClusterInfo { self.push_message(CrdsValue::new_signed(message, &self.keypair)); } - pub fn push_vote(&mut self, tower_index: usize, vote: Transaction) { + pub fn push_vote(&self, tower_index: usize, vote: Transaction) { let now = timestamp(); let vote = Vote::new(&self.id(), vote, now); - let current_votes: Vec<_> = (0..crds_value::MAX_VOTES) - .filter_map(|ix| { - self.gossip - .crds - .lookup(&CrdsValueLabel::Vote(ix, self.id())) - }) - .collect(); - let vote_ix = CrdsValue::compute_vote_index(tower_index, current_votes); + let vote_ix = { + let r_gossip = self.gossip.read().unwrap(); + let current_votes: Vec<_> = (0..crds_value::MAX_VOTES) + .filter_map(|ix| r_gossip.crds.lookup(&CrdsValueLabel::Vote(ix, self.id()))) + .collect(); + CrdsValue::compute_vote_index(tower_index, current_votes) + }; let entry = CrdsValue::new_signed(CrdsData::Vote(vote_ix, vote), &self.keypair); self.gossip + .write() + .unwrap() .process_push_message(&self.id(), vec![entry], now); } @@ -505,6 +551,8 @@ impl ClusterInfo { let mut max_ts = since; let (labels, txs): (Vec, Vec) = self .gossip + .read() + .unwrap() .crds .table .iter() @@ -522,6 +570,8 @@ impl ClusterInfo { pub fn get_snapshot_hash(&self, slot: Slot) -> Vec<(Pubkey, Hash)> { self.gossip + .read() + .unwrap() .crds .table .values() @@ -537,28 +587,46 @@ impl ClusterInfo { .collect() } - pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { + pub fn get_accounts_hash_for_node(&self, pubkey: &Pubkey, map: F) -> Option + where + F: FnOnce(&Vec<(Slot, Hash)>) -> Y, + { self.gossip + .read() + .unwrap() .crds .table .get(&CrdsValueLabel::AccountsHashes(*pubkey)) .map(|x| &x.value.accounts_hash().unwrap().hashes) + .map(map) } - pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey) -> Option<&Vec<(Slot, Hash)>> { + pub fn get_snapshot_hash_for_node(&self, pubkey: &Pubkey, map: F) -> Option + where + F: FnOnce(&Vec<(Slot, Hash)>) -> Y, + { self.gossip + .read() + .unwrap() .crds .table .get(&CrdsValueLabel::SnapshotHashes(*pubkey)) .map(|x| &x.value.snapshot_hash().unwrap().hashes) + .map(map) } - pub fn get_lowest_slot_for_node( + pub fn get_lowest_slot_for_node( &self, pubkey: &Pubkey, since: Option, - ) -> Option<(&LowestSlot, u64)> { + map: F, + ) -> Option + where + F: FnOnce(&LowestSlot, u64) -> Y, + { self.gossip + .read() + .unwrap() .crds .table .get(&CrdsValueLabel::LowestSlot(*pubkey)) @@ -567,12 +635,14 @@ impl ClusterInfo { .map(|since| x.insert_timestamp > since) .unwrap_or(true) }) - .map(|x| (x.value.lowest_slot().unwrap(), x.insert_timestamp)) + .map(|x| map(x.value.lowest_slot().unwrap(), x.insert_timestamp)) } pub fn get_epoch_slots_since(&self, since: Option) -> (Vec, Option) { let vals: Vec<_> = self .gossip + .read() + .unwrap() .crds .table .values() @@ -581,30 +651,23 @@ impl ClusterInfo { .map(|since| x.insert_timestamp > since) .unwrap_or(true) }) - .filter_map(|x| Some((x.value.epoch_slots()?, x.insert_timestamp))) + .filter_map(|x| Some((x.value.epoch_slots()?.clone(), x.insert_timestamp))) .collect(); let max = vals.iter().map(|x| x.1).max().or(since); - let vec = vals.into_iter().map(|x| x.0).cloned().collect(); + let vec = vals.into_iter().map(|x| x.0).collect(); (vec, max) } - pub fn get_contact_info_for_node(&self, pubkey: &Pubkey) -> Option<&ContactInfo> { - self.gossip - .crds - .table - .get(&CrdsValueLabel::ContactInfo(*pubkey)) - .map(|x| x.value.contact_info().unwrap()) - } - /// all validators that have a valid rpc port regardless of `shred_version`. pub fn all_rpc_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() .filter_map(|x| x.value.contact_info()) - .filter(|x| x.id != me.id && ContactInfo::is_valid_address(&x.rpc)) + .filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.rpc)) .cloned() .collect() } @@ -612,6 +675,8 @@ impl ClusterInfo { // All nodes in gossip (including spy nodes) and the last time we heard about them pub(crate) fn all_peers(&self) -> Vec<(ContactInfo, u64)> { self.gossip + .read() + .unwrap() .crds .table .values() @@ -624,8 +689,10 @@ impl ClusterInfo { } pub fn gossip_peers(&self) -> Vec { - let me = self.my_data().id; + let me = self.id(); self.gossip + .read() + .unwrap() .crds .table .values() @@ -638,8 +705,9 @@ impl ClusterInfo { /// all validators that have a valid tvu port regardless of `shred_version`. pub fn all_tvu_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() @@ -647,7 +715,7 @@ impl ClusterInfo { .filter(|x| { ContactInfo::is_valid_address(&x.tvu) && !ClusterInfo::is_archiver(x) - && x.id != me.id + && x.id != self.id() }) .cloned() .collect() @@ -655,8 +723,9 @@ impl ClusterInfo { /// all validators that have a valid tvu port and are on the same `shred_version`. pub fn tvu_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() @@ -664,8 +733,8 @@ impl ClusterInfo { .filter(|x| { ContactInfo::is_valid_address(&x.tvu) && !ClusterInfo::is_archiver(x) - && x.id != me.id - && x.shred_version == me.shred_version + && x.id != self.id() + && x.shred_version == self.my_shred_version() }) .cloned() .collect() @@ -673,29 +742,31 @@ impl ClusterInfo { /// all peers that have a valid storage addr regardless of `shred_version`. pub fn all_storage_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() .filter_map(|x| x.value.contact_info()) - .filter(|x| ContactInfo::is_valid_address(&x.storage_addr) && x.id != me.id) + .filter(|x| ContactInfo::is_valid_address(&x.storage_addr) && x.id != self.id()) .cloned() .collect() } /// all peers that have a valid storage addr and are on the same `shred_version`. pub fn storage_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() .filter_map(|x| x.value.contact_info()) .filter(|x| { ContactInfo::is_valid_address(&x.storage_addr) - && x.id != me.id - && x.shred_version == me.shred_version + && x.id != self.id() + && x.shred_version == self.my_shred_version() }) .cloned() .collect() @@ -703,15 +774,16 @@ impl ClusterInfo { /// all peers that have a valid tvu pub fn retransmit_peers(&self) -> Vec { - let me = self.my_data(); self.gossip + .read() + .unwrap() .crds .table .values() .filter_map(|x| x.value.contact_info()) .filter(|x| { - x.id != me.id - && x.shred_version == me.shred_version + x.id != self.id() + && x.shred_version == self.my_shred_version() && ContactInfo::is_valid_address(&x.tvu) && ContactInfo::is_valid_address(&x.tvu_forwards) }) @@ -721,17 +793,17 @@ impl ClusterInfo { /// all tvu peers with valid gossip addrs that likely have the slot being requested pub fn repair_peers(&self, slot: Slot) -> Vec { - let me = self.my_data(); ClusterInfo::tvu_peers(self) .into_iter() .filter(|x| { - x.id != me.id - && x.shred_version == me.shred_version + x.id != self.id() + && x.shred_version == self.my_shred_version() && ContactInfo::is_valid_address(&x.serve_repair) && { - self.get_lowest_slot_for_node(&x.id, None) - .map(|(lowest_slot, _)| lowest_slot.lowest <= slot) - .unwrap_or_else(|| /* fallback to legacy behavior */ true) + self.get_lowest_slot_for_node(&x.id, None, |lowest_slot, _| { + lowest_slot.lowest <= slot + }) + .unwrap_or_else(|| /* fallback to legacy behavior */ true) } }) .collect() @@ -796,7 +868,7 @@ impl ClusterInfo { ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.retransmit_peers(); // insert "self" into this list for the layer and neighborhood computation - peers.push(self.lookup(&self.id()).unwrap().clone()); + peers.push(self.my_contact_info()); let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes); (peers, stakes_and_index) } @@ -823,13 +895,14 @@ impl ClusterInfo { /// compute broadcast table pub fn tpu_peers(&self) -> Vec { - let me = self.my_data().id; self.gossip + .read() + .unwrap() .crds .table .values() .filter_map(|x| x.value.contact_info()) - .filter(|x| x.id != me && ContactInfo::is_valid_address(&x.tpu)) + .filter(|x| x.id != self.id() && ContactInfo::is_valid_address(&x.tpu)) .cloned() .collect() } @@ -998,48 +1071,64 @@ impl ClusterInfo { Ok(()) } + fn insert_self(&self) { + let value = + CrdsValue::new_signed(CrdsData::ContactInfo(self.my_contact_info()), &self.keypair); + let _ = self.gossip.write().unwrap().crds.insert(value, timestamp()); + } + // If the network entrypoint hasn't been discovered yet, add it to the crds table - fn add_entrypoint(&mut self, pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>) { - let pull_from_entrypoint = if let Some(entrypoint) = &mut self.entrypoint { - if pulls.is_empty() { - // Nobody else to pull from, try the entrypoint - true - } else { - let now = timestamp(); - // Only consider pulling from the entrypoint periodically to avoid spamming it - if timestamp() - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - false + fn append_entrypoint_to_pulls( + &self, + pulls: &mut Vec<(Pubkey, CrdsFilter, SocketAddr, CrdsValue)>, + ) { + let pull_from_entrypoint = { + let mut w_entrypoint = self.entrypoint.write().unwrap(); + if let Some(ref mut entrypoint) = &mut *w_entrypoint { + if pulls.is_empty() { + // Nobody else to pull from, try the entrypoint + true } else { - entrypoint.wallclock = now; - let found_entrypoint = self.gossip.crds.table.iter().any(|(_, v)| { - v.value - .contact_info() - .map(|ci| ci.gossip == entrypoint.gossip) - .unwrap_or(false) - }); - !found_entrypoint + let now = timestamp(); + // Only consider pulling from the entrypoint periodically to avoid spamming it + if timestamp() - entrypoint.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { + false + } else { + entrypoint.wallclock = now; + let found_entrypoint = + self.gossip.read().unwrap().crds.table.iter().any(|(_, v)| { + v.value + .contact_info() + .map(|ci| ci.gossip == entrypoint.gossip) + .unwrap_or(false) + }); + !found_entrypoint + } } + } else { + false } - } else { - false }; if pull_from_entrypoint { - if let Some(entrypoint) = &self.entrypoint { - let self_info = self - .gossip + let id_and_gossip = { + self.entrypoint + .read() + .unwrap() + .as_ref() + .map(|e| (e.id, e.gossip)) + }; + if let Some((id, gossip)) = id_and_gossip { + let r_gossip = self.gossip.read().unwrap(); + let self_info = r_gossip .crds .lookup(&CrdsValueLabel::ContactInfo(self.id())) .unwrap_or_else(|| panic!("self_id invalid {}", self.id())); - - return self - .gossip + return r_gossip .pull - .build_crds_filters(&self.gossip.crds, MAX_BLOOM_SIZE) + .build_crds_filters(&r_gossip.crds, MAX_BLOOM_SIZE) .into_iter() - .for_each(|filter| { - pulls.push((entrypoint.id, filter, entrypoint.gossip, self_info.clone())) - }); + .for_each(|filter| pulls.push((id, filter, gossip, self_info.clone()))); } } } @@ -1084,44 +1173,52 @@ impl ClusterInfo { messages } - fn new_pull_requests(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn new_pull_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let now = timestamp(); - let mut pulls: Vec<_> = self - .gossip - .new_pull_request(now, stakes, MAX_BLOOM_SIZE) - .ok() - .into_iter() - .filter_map(|(peer, filters, me)| { - let peer_label = CrdsValueLabel::ContactInfo(peer); - self.gossip - .crds - .lookup(&peer_label) - .and_then(CrdsValue::contact_info) - .map(move |peer_info| { - filters - .into_iter() - .map(move |f| (peer, f, peer_info.gossip, me.clone())) - }) - }) - .flatten() - .collect(); - self.add_entrypoint(&mut pulls); + let mut pulls: Vec<_> = { + let r_gossip = self.gossip.read().unwrap(); + + r_gossip + .new_pull_request(now, stakes, MAX_BLOOM_SIZE) + .ok() + .into_iter() + .filter_map(|(peer, filters, me)| { + let peer_label = CrdsValueLabel::ContactInfo(peer); + r_gossip + .crds + .lookup(&peer_label) + .and_then(CrdsValue::contact_info) + .map(move |peer_info| { + filters + .into_iter() + .map(move |f| (peer, f, peer_info.gossip, me.clone())) + }) + }) + .flatten() + .collect() + }; + self.append_entrypoint_to_pulls(&mut pulls); pulls .into_iter() .map(|(peer, filter, gossip, self_info)| { - self.gossip.mark_pull_request_creation_time(&peer, now); + self.gossip + .write() + .unwrap() + .mark_pull_request_creation_time(&peer, now); (gossip, Protocol::PullRequest(filter, self_info)) }) .collect() } - fn new_push_requests(&mut self) -> Vec<(SocketAddr, Protocol)> { - let self_id = self.gossip.id; - let (_, push_messages) = self.gossip.new_push_messages(timestamp()); + fn new_push_requests(&self) -> Vec<(SocketAddr, Protocol)> { + let self_id = self.id(); + let (_, push_messages) = self.gossip.write().unwrap().new_push_messages(timestamp()); push_messages .into_iter() .filter_map(|(peer, messages)| { let peer_label = CrdsValueLabel::ContactInfo(peer); self.gossip + .read() + .unwrap() .crds .lookup(&peer_label) .and_then(CrdsValue::contact_info) @@ -1135,7 +1232,7 @@ impl ClusterInfo { .collect() } - fn gossip_request(&mut self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { + fn gossip_request(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let pulls: Vec<_> = self.new_pull_requests(stakes); let pushes: Vec<_> = self.new_push_requests(); vec![pulls, pushes].into_iter().flatten().collect() @@ -1143,12 +1240,12 @@ impl ClusterInfo { /// At random pick a node and try to get updated changes from them fn run_gossip( - obj: &Arc>, + obj: &Self, recycler: &PacketsRecycler, stakes: &HashMap, sender: &PacketSender, ) -> Result<()> { - let reqs = obj.write().unwrap().gossip_request(&stakes); + let reqs = obj.gossip_request(&stakes); if !reqs.is_empty() { let packets = to_packets_with_destination(recycler.clone(), &reqs); sender.send(packets)?; @@ -1158,7 +1255,7 @@ impl ClusterInfo { /// randomly pick a node and ask them for updates asynchronously pub fn gossip( - obj: Arc>, + obj: Arc, bank_forks: Option>>, sender: PacketSender, exit: &Arc, @@ -1169,14 +1266,14 @@ impl ClusterInfo { .spawn(move || { let mut last_push = timestamp(); let mut last_contact_info_trace = timestamp(); - let mut adopt_shred_version = obj.read().unwrap().my_data().shred_version == 0; + let mut adopt_shred_version = obj.my_shred_version() == 0; let recycler = PacketsRecycler::default(); loop { let start = timestamp(); thread_mem_usage::datapoint("solana-gossip"); if start - last_contact_info_trace > 10000 { // Log contact info every 10 seconds - info!("\n{}", obj.read().unwrap().contact_info_trace()); + info!("\n{}", obj.contact_info_trace()); last_contact_info_trace = start; } @@ -1201,10 +1298,10 @@ impl ClusterInfo { CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS } }; - let timeouts = obj.read().unwrap().gossip.make_timeouts(&stakes, timeout); - let num_purged = obj.write().unwrap().gossip.purge(timestamp(), &timeouts); + let timeouts = obj.gossip.read().unwrap().make_timeouts(&stakes, timeout); + let num_purged = obj.gossip.write().unwrap().purge(timestamp(), &timeouts); inc_new_counter_info!("cluster_info-purge-count", num_purged); - let table_size = obj.read().unwrap().gossip.crds.table.len(); + let table_size = obj.gossip.read().unwrap().crds.table.len(); datapoint_debug!( "cluster_info-purge", ("table_size", table_size as i64, i64), @@ -1213,12 +1310,12 @@ impl ClusterInfo { // Adopt the entrypoint's `shred_version` if ours is unset if adopt_shred_version { // If gossip was given an entrypoint, lookup its id - let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id); + let entrypoint_id = obj.entrypoint.read().unwrap().as_ref().map(|e| e.id); if let Some(entrypoint_id) = entrypoint_id { // If a pull from the entrypoint was successful, it should exist in the crds table - let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned(); + let entrypoint = + obj.lookup_contact_info(&entrypoint_id, |ci| ci.clone()); if let Some(entrypoint) = entrypoint { - let mut self_info = obj.read().unwrap().my_data(); if entrypoint.shred_version == 0 { info!("Unable to adopt entrypoint's shred version"); } else { @@ -1226,8 +1323,9 @@ impl ClusterInfo { "Setting shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id ); - self_info.shred_version = entrypoint.shred_version; - obj.write().unwrap().insert_self(self_info); + obj.my_contact_info.write().unwrap().shred_version = + entrypoint.shred_version; + obj.insert_self(); adopt_shred_version = false; } } @@ -1236,7 +1334,7 @@ impl ClusterInfo { //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep if start - last_push > CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS / 2 { - obj.write().unwrap().push_self(&stakes); + obj.push_self(&stakes); last_push = timestamp(); } let elapsed = timestamp() - start; @@ -1251,7 +1349,7 @@ impl ClusterInfo { #[allow(clippy::cognitive_complexity)] fn handle_packets( - me: &Arc>, + me: &Self, recycler: &PacketsRecycler, stakes: &HashMap, packets: Packets, @@ -1261,7 +1359,7 @@ impl ClusterInfo { // iter over the packets, collect pulls separately and process everything else let allocated = thread_mem_usage::Allocatedp::default(); let mut gossip_pull_data: Vec = vec![]; - let timeouts = me.read().unwrap().gossip.make_timeouts(&stakes, epoch_ms); + let timeouts = me.gossip.read().unwrap().make_timeouts(&stakes, epoch_ms); packets.packets.iter().for_each(|packet| { let from_addr = packet.meta.addr(); limited_deserialize(&packet.data[..packet.meta.size]) @@ -1275,7 +1373,7 @@ impl ClusterInfo { 1 ); } else if let Some(contact_info) = caller.contact_info() { - if contact_info.id == me.read().unwrap().gossip.id { + if contact_info.id == me.id() { warn!("PullRequest ignored, I'm talking to myself"); inc_new_counter_debug!("cluster_info-window-request-loopback", 1); } else { @@ -1338,7 +1436,7 @@ impl ClusterInfo { "cluster_info-prune_message-size", data.prunes.len() ); - match me.write().unwrap().gossip.process_prune_msg( + match me.gossip.write().unwrap().process_prune_msg( &from, &data.destination, &data.prunes, @@ -1373,7 +1471,7 @@ impl ClusterInfo { // Pull requests take an incoming bloom filter of contained entries from a node // and tries to send back to them the values it detects are missing. fn handle_pull_requests( - me: &Arc>, + me: &Self, recycler: &PacketsRecycler, requests: Vec, stakes: &HashMap, @@ -1383,7 +1481,7 @@ impl ClusterInfo { let mut addrs = vec![]; let mut time = Measure::start("handle_pull_requests"); { - let mut cluster_info = me.write().unwrap(); + let mut w_outbound_budget = me.outbound_budget.write().unwrap(); let now = timestamp(); const INTERVAL_MS: u64 = 100; @@ -1391,14 +1489,14 @@ impl ClusterInfo { const BYTES_PER_INTERVAL: usize = 5000; const MAX_BUDGET_MULTIPLE: usize = 5; // allow budget build-up to 5x the interval default - if now - cluster_info.outbound_budget.last_timestamp_ms > INTERVAL_MS { + if now - w_outbound_budget.last_timestamp_ms > INTERVAL_MS { let len = std::cmp::max(stakes.len(), 2); - cluster_info.outbound_budget.bytes += len * BYTES_PER_INTERVAL; - cluster_info.outbound_budget.bytes = std::cmp::min( - cluster_info.outbound_budget.bytes, + w_outbound_budget.bytes += len * BYTES_PER_INTERVAL; + w_outbound_budget.bytes = std::cmp::min( + w_outbound_budget.bytes, MAX_BUDGET_MULTIPLE * len * BYTES_PER_INTERVAL, ); - cluster_info.outbound_budget.last_timestamp_ms = now; + w_outbound_budget.last_timestamp_ms = now; } } for pull_data in requests { @@ -1406,11 +1504,11 @@ impl ClusterInfo { addrs.push(pull_data.from_addr); } let now = timestamp(); - let self_id = me.read().unwrap().id(); + let self_id = me.id(); let pull_responses = me + .gossip .write() .unwrap() - .gossip .process_pull_requests(caller_and_filters, now); // Filter bad to addresses @@ -1464,36 +1562,30 @@ impl ClusterInfo { let mut packets = Packets::new_with_recycler(recycler.clone(), 64, "handle_pull_requests"); let mut total_bytes = 0; - let outbound_budget = me.read().unwrap().outbound_budget.bytes; let mut sent = HashSet::new(); while sent.len() < stats.len() { let index = weighted_index.sample(rng); if sent.contains(&index) { continue; } - sent.insert(index); let stat = &stats[index]; let from_addr = pull_responses[stat.to].1; let response = pull_responses[stat.to].0[stat.responses_index].clone(); let protocol = Protocol::PullResponse(self_id, vec![response]); - packets - .packets - .push(Packet::from_data(&from_addr, protocol)); - let len = packets.packets.len(); - total_bytes += packets.packets[len - 1].meta.size; - - if total_bytes > outbound_budget { - inc_new_counter_info!("gossip_pull_request-no_budget", 1); - break; + let new_packet = Packet::from_data(&from_addr, protocol); + { + let mut w_outbound_budget = me.outbound_budget.write().unwrap(); + if w_outbound_budget.bytes > new_packet.meta.size { + sent.insert(index); + w_outbound_budget.bytes -= new_packet.meta.size; + total_bytes += new_packet.meta.size; + packets.packets.push(new_packet) + } else { + inc_new_counter_info!("gossip_pull_request-no_budget", 1); + break; + } } } - { - let mut cluster_info = me.write().unwrap(); - cluster_info.outbound_budget.bytes = cluster_info - .outbound_budget - .bytes - .saturating_sub(total_bytes); - } time.stop(); inc_new_counter_info!("gossip_pull_request-sent_requests", sent.len()); inc_new_counter_info!( @@ -1514,18 +1606,18 @@ impl ClusterInfo { } fn handle_pull_response( - me: &Arc>, + me: &Self, from: &Pubkey, data: Vec, timeouts: &HashMap, ) { let len = data.len(); let now = Instant::now(); - let self_id = me.read().unwrap().gossip.id; + let self_id = me.gossip.read().unwrap().id; trace!("PullResponse me: {} from: {} len={}", self_id, from, len); - me.write() + me.gossip + .write() .unwrap() - .gossip .process_pull_response(from, timeouts, data, timestamp()); inc_new_counter_debug!("cluster_info-pull_request_response", 1); inc_new_counter_debug!("cluster_info-pull_request_response-size", len); @@ -1534,44 +1626,45 @@ impl ClusterInfo { } fn handle_push_message( - me: &Arc>, + me: &Self, recycler: &PacketsRecycler, from: &Pubkey, data: Vec, stakes: &HashMap, ) -> Option { - let self_id = me.read().unwrap().gossip.id; + let self_id = me.id(); inc_new_counter_debug!("cluster_info-push_message", 1); let updated: Vec<_> = - me.write() + me.gossip + .write() .unwrap() - .gossip .process_push_message(from, data, timestamp()); let updated_labels: Vec<_> = updated.into_iter().map(|u| u.value.label()).collect(); let prunes_map: HashMap> = me + .gossip .write() .unwrap() - .gossip .prune_received_cache(updated_labels, stakes); let rsp: Vec<_> = prunes_map .into_iter() .filter_map(|(from, prune_set)| { inc_new_counter_debug!("cluster_info-push_message-prunes", prune_set.len()); - me.read().unwrap().lookup(&from).cloned().and_then(|ci| { - let mut prune_msg = PruneData { - pubkey: self_id, - prunes: prune_set.into_iter().collect(), - signature: Signature::default(), - destination: from, - wallclock: timestamp(), - }; - prune_msg.sign(&me.read().unwrap().keypair); - let rsp = Protocol::PruneMessage(self_id, prune_msg); - Some((ci.gossip, rsp)) - }) + me.lookup_contact_info(&from, |ci| ci.clone()) + .and_then(|ci| { + let mut prune_msg = PruneData { + pubkey: self_id, + prunes: prune_set.into_iter().collect(), + signature: Signature::default(), + destination: from, + wallclock: timestamp(), + }; + prune_msg.sign(&me.keypair); + let rsp = Protocol::PruneMessage(self_id, prune_msg); + Some((ci.gossip, rsp)) + }) }) .collect(); if rsp.is_empty() { @@ -1579,7 +1672,7 @@ impl ClusterInfo { } let mut packets = to_packets_with_destination(recycler.clone(), &rsp); if !packets.is_empty() { - let pushes: Vec<_> = me.write().unwrap().new_push_requests(); + let pushes: Vec<_> = me.new_push_requests(); inc_new_counter_debug!("cluster_info-push_message-pushes", pushes.len()); pushes.into_iter().for_each(|(remote_gossip_addr, req)| { if !remote_gossip_addr.ip().is_unspecified() && remote_gossip_addr.port() != 0 { @@ -1597,7 +1690,7 @@ impl ClusterInfo { /// Process messages from the network fn run_listen( - obj: &Arc>, + obj: &Self, recycler: &PacketsRecycler, bank_forks: Option<&Arc>>, requests_receiver: &PacketReceiver, @@ -1643,7 +1736,7 @@ impl ClusterInfo { Ok(()) } pub fn listen( - me: Arc>, + me: Arc, bank_forks: Option>>, requests_receiver: PacketReceiver, response_sender: PacketSender, @@ -1671,11 +1764,11 @@ impl ClusterInfo { return; } if e.is_err() { - let me = me.read().unwrap(); + let r_gossip = me.gossip.read().unwrap(); debug!( "{}: run_listen timeout, table size: {}", - me.gossip.id, - me.gossip.crds.table.len() + me.id(), + r_gossip.crds.table.len() ); } thread_mem_usage::datapoint("solana-listen"); @@ -2011,7 +2104,7 @@ mod tests { use solana_sdk::signature::{Keypair, Signer}; use std::collections::HashSet; use std::net::{IpAddr, Ipv4Addr}; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; #[test] fn test_gossip_node() { @@ -2028,19 +2121,14 @@ mod tests { //check that gossip doesn't try to push to invalid addresses let node = Node::new_localhost(); let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand()); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - node.info, - ))); - cluster_info.write().unwrap().insert_info(spy); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(node.info)); + cluster_info.insert_info(spy); cluster_info - .write() - .unwrap() .gossip - .refresh_push_active_set(&HashMap::new()); - let reqs = cluster_info .write() .unwrap() - .gossip_request(&HashMap::new()); + .refresh_push_active_set(&HashMap::new()); + let reqs = cluster_info.gossip_request(&HashMap::new()); //assert none of the addrs are invalid. reqs.iter().all(|(addr, _)| { let res = ContactInfo::is_valid_address(addr); @@ -2053,30 +2141,57 @@ mod tests { fn test_cluster_info_new() { let d = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); - assert_eq!(d.id, cluster_info.my_data().id); + assert_eq!(d.id, cluster_info.id()); } #[test] fn insert_info_test() { let d = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d); + let cluster_info = ClusterInfo::new_with_invalid_keypair(d); let d = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let label = CrdsValueLabel::ContactInfo(d.id); cluster_info.insert_info(d); - assert!(cluster_info.gossip.crds.lookup(&label).is_some()); + assert!(cluster_info + .gossip + .read() + .unwrap() + .crds + .lookup(&label) + .is_some()); } #[test] - fn test_insert_self() { + #[should_panic] + fn test_update_contact_info() { let d = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); + let cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); let entry_label = CrdsValueLabel::ContactInfo(cluster_info.id()); - assert!(cluster_info.gossip.crds.lookup(&entry_label).is_some()); + assert!(cluster_info + .gossip + .read() + .unwrap() + .crds + .lookup(&entry_label) + .is_some()); - // inserting something else shouldn't work - let d = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); - cluster_info.insert_self(d.clone()); - let label = CrdsValueLabel::ContactInfo(d.id); - assert!(cluster_info.gossip.crds.lookup(&label).is_none()); + let now = timestamp(); + cluster_info.update_contact_info(|ci| ci.wallclock = now); + assert_eq!( + cluster_info + .gossip + .read() + .unwrap() + .crds + .lookup(&entry_label) + .unwrap() + .contact_info() + .unwrap() + .wallclock, + now + ); + + // Inserting Contactinfo with different pubkey should panic, + // and update should fail + cluster_info.update_contact_info(|ci| ci.id = Pubkey::new_rand()) } fn assert_in_range(x: u16, range: (u16, u16)) { @@ -2166,11 +2281,19 @@ mod tests { let peer_keypair = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let peer = ContactInfo::new_localhost(&peer_keypair.pubkey(), 0); - let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); + let cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); cluster_info.insert_info(peer.clone()); - cluster_info.gossip.refresh_push_active_set(&HashMap::new()); + cluster_info + .gossip + .write() + .unwrap() + .refresh_push_active_set(&HashMap::new()); //check that all types of gossip messages are signed correctly - let (_, push_messages) = cluster_info.gossip.new_push_messages(timestamp()); + let (_, push_messages) = cluster_info + .gossip + .write() + .unwrap() + .new_push_messages(timestamp()); // there should be some pushes ready assert_eq!(push_messages.len() > 0, true); push_messages @@ -2179,6 +2302,8 @@ mod tests { let (_, _, val) = cluster_info .gossip + .write() + .unwrap() .new_pull_request(timestamp(), &HashMap::new(), MAX_BLOOM_SIZE) .ok() .unwrap(); @@ -2333,7 +2458,7 @@ mod tests { fn test_push_vote() { let keys = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); // make sure empty crds is handled correctly let now = timestamp(); @@ -2369,7 +2494,7 @@ mod tests { fn test_push_epoch_slots() { let keys = Keypair::new(); let contact_info = ContactInfo::new_localhost(&keys.pubkey(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); let (slots, since) = cluster_info.get_epoch_slots_since(None); assert!(slots.is_empty()); assert!(since.is_none()); @@ -2389,9 +2514,9 @@ mod tests { } #[test] - fn test_add_entrypoint() { + fn test_append_entrypoint_to_pulls() { let node_keypair = Arc::new(Keypair::new()); - let mut cluster_info = ClusterInfo::new( + let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), node_keypair, ); @@ -2417,20 +2542,17 @@ mod tests { // now add this message back to the table and make sure after the next pull, the entrypoint is unset let entrypoint_crdsvalue = CrdsValue::new_unsigned(CrdsData::ContactInfo(entrypoint.clone())); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let timeouts = cluster_info.read().unwrap().gossip.make_timeouts_test(); + let cluster_info = Arc::new(cluster_info); + let timeouts = cluster_info.gossip.read().unwrap().make_timeouts_test(); ClusterInfo::handle_pull_response( &cluster_info, &entrypoint_pubkey, vec![entrypoint_crdsvalue], &timeouts, ); - let pulls = cluster_info - .write() - .unwrap() - .new_pull_requests(&HashMap::new()); + let pulls = cluster_info.new_pull_requests(&HashMap::new()); assert_eq!(1, pulls.len() as u64); - assert_eq!(cluster_info.read().unwrap().entrypoint, Some(entrypoint)); + assert_eq!(*cluster_info.entrypoint.read().unwrap(), Some(entrypoint)); } #[test] @@ -2505,7 +2627,7 @@ mod tests { #[test] fn test_tvu_peers_and_stakes() { let d = ContactInfo::new_localhost(&Pubkey::new(&[0; 32]), timestamp()); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); + let cluster_info = ClusterInfo::new_with_invalid_keypair(d.clone()); let mut stakes = HashMap::new(); // no stake @@ -2552,7 +2674,7 @@ mod tests { #[test] fn test_pull_from_entrypoint_if_not_present() { let node_keypair = Arc::new(Keypair::new()); - let mut cluster_info = ClusterInfo::new( + let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), node_keypair, ); @@ -2577,7 +2699,13 @@ mod tests { // Pull request 2: pretend it's been a while since we've pulled from `entrypoint`. There should // now be two pull requests - cluster_info.entrypoint.as_mut().unwrap().wallclock = 0; + cluster_info + .entrypoint + .write() + .unwrap() + .as_mut() + .unwrap() + .wallclock = 0; let pulls = cluster_info.new_pull_requests(&stakes); assert_eq!(2, pulls.len() as u64); assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); @@ -2593,7 +2721,7 @@ mod tests { #[test] fn test_repair_peers() { let node_keypair = Arc::new(Keypair::new()); - let mut cluster_info = ClusterInfo::new( + let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), node_keypair, ); @@ -2610,7 +2738,12 @@ mod tests { 0, LowestSlot::new(other_node_pubkey, peer_lowest, timestamp()), )); - let _ = cluster_info.gossip.crds.insert(value, timestamp()); + let _ = cluster_info + .gossip + .write() + .unwrap() + .crds + .insert(value, timestamp()); } // only half the visible peers should be eligible to serve this repair assert_eq!(cluster_info.repair_peers(5).len(), 5); @@ -2675,7 +2808,7 @@ mod tests { fn test_push_epoch_slots_large() { use rand::Rng; let node_keypair = Arc::new(Keypair::new()); - let mut cluster_info = ClusterInfo::new( + let cluster_info = ClusterInfo::new( ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), node_keypair, ); diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index bff892cc9..0c6c6f5fc 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -197,7 +197,7 @@ pub struct ClusterInfoVoteListener { impl ClusterInfoVoteListener { pub fn new( exit: &Arc, - cluster_info: Arc>, + cluster_info: Arc, sigverify_disabled: bool, sender: CrossbeamSender>, poh_recorder: &Arc>, @@ -262,7 +262,7 @@ impl ClusterInfoVoteListener { fn recv_loop( exit: Arc, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, sigverify_disabled: bool, verified_vote_packets_sender: VerifiedVotePacketsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender, @@ -272,7 +272,7 @@ impl ClusterInfoVoteListener { if exit.load(Ordering::Relaxed) { return Ok(()); } - let (labels, votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); + let (labels, votes, new_ts) = cluster_info.get_votes(last_ts); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); last_ts = new_ts; diff --git a/core/src/cluster_slots.rs b/core/src/cluster_slots.rs index 8dce5a974..de8d26a89 100644 --- a/core/src/cluster_slots.rs +++ b/core/src/cluster_slots.rs @@ -27,15 +27,10 @@ impl ClusterSlots { pub fn lookup(&self, slot: Slot) -> Option>> { self.cluster_slots.read().unwrap().get(&slot).cloned() } - pub fn update( - &self, - root: Slot, - cluster_info: &RwLock, - bank_forks: &RwLock, - ) { + pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock) { self.update_peers(cluster_info, bank_forks); let since = *self.since.read().unwrap(); - let epoch_slots = cluster_info.read().unwrap().get_epoch_slots_since(since); + let epoch_slots = cluster_info.get_epoch_slots_since(since); self.update_internal(root, epoch_slots); } fn update_internal(&self, root: Slot, epoch_slots: (Vec, Option)) { @@ -95,7 +90,7 @@ impl ClusterSlots { .collect() } - fn update_peers(&self, cluster_info: &RwLock, bank_forks: &RwLock) { + fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock) { let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_epoch = root_bank.epoch(); let my_epoch = *self.epoch.read().unwrap(); @@ -111,7 +106,7 @@ impl ClusterSlots { .clone(); *self.validator_stakes.write().unwrap() = validator_stakes; - let id = cluster_info.read().unwrap().id(); + let id = cluster_info.id(); *self.self_id.write().unwrap() = id; *self.epoch.write().unwrap() = Some(root_epoch); } diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 08e9add23..8464fdd9a 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -22,7 +22,7 @@ pub struct GossipService { impl GossipService { pub fn new( - cluster_info: &Arc>, + cluster_info: &Arc, bank_forks: Option>>, gossip_socket: UdpSocket, exit: &Arc, @@ -31,7 +31,7 @@ impl GossipService { let gossip_socket = Arc::new(gossip_socket); trace!( "GossipService: id: {}, listening on: {:?}", - &cluster_info.read().unwrap().my_data().id, + &cluster_info.id(), gossip_socket.local_addr().unwrap() ); let t_receiver = streamer::receiver( @@ -89,7 +89,7 @@ pub fn discover( let exit = Arc::new(AtomicBool::new(false)); let (gossip_service, ip_echo, spy_ref) = make_gossip_node(entrypoint, &exit, my_gossip_addr); - let id = spy_ref.read().unwrap().keypair.pubkey(); + let id = spy_ref.id(); info!("Entrypoint: {:?}", entrypoint); info!("Node Id: {:?}", id); if let Some(my_gossip_addr) = my_gossip_addr { @@ -113,7 +113,7 @@ pub fn discover( info!( "discover success in {}s...\n{}", secs, - spy_ref.read().unwrap().contact_info_trace() + spy_ref.contact_info_trace() ); return Ok((tvu_peers, storage_peers)); } @@ -121,15 +121,12 @@ pub fn discover( if !tvu_peers.is_empty() { info!( "discover failed to match criteria by timeout...\n{}", - spy_ref.read().unwrap().contact_info_trace() + spy_ref.contact_info_trace() ); return Ok((tvu_peers, storage_peers)); } - info!( - "discover failed...\n{}", - spy_ref.read().unwrap().contact_info_trace() - ); + info!("discover failed...\n{}", spy_ref.contact_info_trace()); Err(std::io::Error::new( std::io::ErrorKind::Other, "Discover failed", @@ -176,7 +173,7 @@ pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) { } fn spy( - spy_ref: Arc>, + spy_ref: Arc, num_nodes: Option, timeout: Option, find_node_by_pubkey: Option, @@ -194,13 +191,8 @@ fn spy( } } - tvu_peers = spy_ref - .read() - .unwrap() - .all_tvu_peers() - .into_iter() - .collect::>(); - storage_peers = spy_ref.read().unwrap().all_storage_peers(); + tvu_peers = spy_ref.all_tvu_peers().into_iter().collect::>(); + storage_peers = spy_ref.all_storage_peers(); let mut nodes: Vec<_> = tvu_peers.iter().chain(storage_peers.iter()).collect(); nodes.sort(); @@ -232,10 +224,7 @@ fn spy( met_criteria = true; } if i % 20 == 0 { - info!( - "discovering...\n{}", - spy_ref.read().unwrap().contact_info_trace() - ); + info!("discovering...\n{}", spy_ref.contact_info_trace()); } sleep(Duration::from_millis( crate::cluster_info::GOSSIP_SLEEP_MILLIS, @@ -256,18 +245,18 @@ fn make_gossip_node( entrypoint: Option<&SocketAddr>, exit: &Arc, gossip_addr: Option<&SocketAddr>, -) -> (GossipService, Option, Arc>) { +) -> (GossipService, Option, Arc) { let keypair = Arc::new(Keypair::new()); let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr { ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr) } else { ClusterInfo::spy_node(&keypair.pubkey()) }; - let mut cluster_info = ClusterInfo::new(node, keypair); + let cluster_info = ClusterInfo::new(node, keypair); if let Some(entrypoint) = entrypoint { cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); } - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit); (gossip_service, ip_echo, cluster_info) } @@ -277,7 +266,7 @@ mod tests { use super::*; use crate::cluster_info::{ClusterInfo, Node}; use std::sync::atomic::AtomicBool; - use std::sync::{Arc, RwLock}; + use std::sync::Arc; #[test] #[ignore] @@ -286,7 +275,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let tn = Node::new_localhost(); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); - let c = Arc::new(RwLock::new(cluster_info)); + let c = Arc::new(cluster_info); let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); exit.store(true, Ordering::Relaxed); d.join().unwrap(); @@ -300,16 +289,16 @@ mod tests { let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let peer0_info = ContactInfo::new_localhost(&peer0, 0); let peer1_info = ContactInfo::new_localhost(&peer1, 0); - let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); + let cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); cluster_info.insert_info(peer0_info.clone()); cluster_info.insert_info(peer1_info); - let spy_ref = Arc::new(RwLock::new(cluster_info)); + let spy_ref = Arc::new(cluster_info); let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None, None); assert_eq!(met_criteria, false); assert_eq!(secs, 1); - assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers()); + assert_eq!(tvu_peers, spy_ref.tvu_peers()); // Find num_nodes let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None, None); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index a18781b85..d75d4133b 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -81,7 +81,7 @@ impl RepairService { blockstore: Arc, exit: Arc, repair_socket: Arc, - cluster_info: Arc>, + cluster_info: Arc, repair_strategy: RepairStrategy, cluster_slots: Arc, ) -> Self { @@ -106,12 +106,12 @@ impl RepairService { blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - cluster_info: &Arc>, + cluster_info: &Arc, repair_strategy: RepairStrategy, cluster_slots: &Arc, ) { let serve_repair = ServeRepair::new(cluster_info.clone()); - let id = cluster_info.read().unwrap().id(); + let id = cluster_info.id(); if let RepairStrategy::RepairAll { .. } = repair_strategy { Self::initialize_lowest_slot(id, blockstore, cluster_info); } @@ -308,24 +308,17 @@ impl RepairService { } } - fn initialize_lowest_slot( - id: Pubkey, - blockstore: &Blockstore, - cluster_info: &RwLock, - ) { + fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) { // Safe to set into gossip because by this time, the leader schedule cache should // also be updated with the latest root (done in blockstore_processor) and thus // will provide a schedule to window_service for any incoming shreds up to the // last_confirmed_epoch. - cluster_info - .write() - .unwrap() - .push_lowest_slot(id, blockstore.lowest_slot()); + cluster_info.push_lowest_slot(id, blockstore.lowest_slot()); } fn update_completed_slots( completed_slots_receiver: &CompletedSlotsReceiver, - cluster_info: &RwLock, + cluster_info: &ClusterInfo, ) { let mut slots: Vec = vec![]; while let Ok(mut more) = completed_slots_receiver.try_recv() { @@ -333,20 +326,17 @@ impl RepairService { } slots.sort(); if !slots.is_empty() { - cluster_info.write().unwrap().push_epoch_slots(&slots); + cluster_info.push_epoch_slots(&slots); } } - fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &RwLock) { - cluster_info - .write() - .unwrap() - .push_lowest_slot(*id, lowest_slot); + fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) { + cluster_info.push_lowest_slot(*id, lowest_slot); } fn initialize_epoch_slots( blockstore: &Blockstore, - cluster_info: &RwLock, + cluster_info: &ClusterInfo, completed_slots_receiver: &CompletedSlotsReceiver, ) { let root = blockstore.last_root(); @@ -367,7 +357,7 @@ impl RepairService { slots.sort(); slots.dedup(); if !slots.is_empty() { - cluster_info.write().unwrap().push_epoch_slots(&slots); + cluster_info.push_epoch_slots(&slots); } } @@ -602,17 +592,13 @@ mod test { #[test] pub fn test_update_lowest_slot() { let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); - let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( - node_info.info.clone(), - )); + let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info.clone()); RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); let lowest = cluster_info - .read() - .unwrap() - .get_lowest_slot_for_node(&Pubkey::default(), None) - .unwrap() - .0 - .clone(); + .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| { + lowest_slot.clone() + }) + .unwrap(); assert_eq!(lowest.lowest, 5); } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 0ba38e20a..02acc9858 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -114,7 +114,7 @@ impl ReplayStage { config: ReplayStageConfig, blockstore: Arc, bank_forks: Arc>, - cluster_info: Arc>, + cluster_info: Arc, ledger_signal_receiver: Receiver, poh_recorder: Arc>, vote_tracker: Arc, @@ -689,7 +689,7 @@ impl ReplayStage { progress: &mut ProgressMap, vote_account_pubkey: &Pubkey, authorized_voter_keypairs: &[Arc], - cluster_info: &Arc>, + cluster_info: &Arc, blockstore: &Arc, leader_schedule_cache: &Arc, root_bank_sender: &Sender>>, @@ -762,7 +762,7 @@ impl ReplayStage { } fn push_vote( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, bank: &Arc, vote_account_pubkey: &Pubkey, authorized_voter_keypairs: &[Arc], @@ -815,7 +815,7 @@ impl ReplayStage { } Some(authorized_voter_keypair) => authorized_voter_keypair, }; - let node_keypair = cluster_info.read().unwrap().keypair.clone(); + let node_keypair = cluster_info.keypair.clone(); // Send our last few votes along with the new one let vote_ix = vote_instruction::vote( @@ -829,10 +829,7 @@ impl ReplayStage { let blockhash = bank.last_blockhash(); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); - cluster_info - .write() - .unwrap() - .push_vote(tower_index, vote_tx); + cluster_info.push_vote(tower_index, vote_tx); } fn update_commitment_cache( diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 4eb5d969e..3cf766644 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -38,7 +38,7 @@ const MAX_PACKET_BATCH_SIZE: usize = 100; fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, r: &Arc>, sock: &UdpSocket, id: u32, @@ -63,11 +63,8 @@ fn retransmit( let mut peers_len = 0; let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); let stakes = stakes.map(Arc::new); - let (peers, stakes_and_index) = cluster_info - .read() - .unwrap() - .sorted_retransmit_peers_and_stakes(stakes); - let me = cluster_info.read().unwrap().my_data(); + let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes); + let my_id = cluster_info.id(); let mut discard_total = 0; let mut repair_total = 0; let mut retransmit_total = 0; @@ -88,7 +85,7 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( - &me.id, + &my_id, &peers, &stakes_and_index, packet.meta.seed, @@ -154,7 +151,7 @@ pub fn retransmitter( sockets: Arc>, bank_forks: Arc>, leader_schedule_cache: &Arc, - cluster_info: Arc>, + cluster_info: Arc, r: Arc>, ) -> Vec> { (0..sockets.len()) @@ -206,7 +203,7 @@ impl RetransmitStage { bank_forks: Arc>, leader_schedule_cache: &Arc, blockstore: Arc, - cluster_info: &Arc>, + cluster_info: &Arc, retransmit_sockets: Arc>, repair_socket: Arc, verified_receiver: CrossbeamReceiver>, @@ -316,11 +313,11 @@ mod tests { .unwrap(); let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); + let cluster_info = ClusterInfo::new_with_invalid_keypair(other); cluster_info.insert_info(me); let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let (retransmit_sender, retransmit_receiver) = channel(); let t_retransmit = retransmitter( diff --git a/core/src/rpc.rs b/core/src/rpc.rs index c01f7ab21..d51d2888e 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -540,8 +540,8 @@ impl JsonRpcRequestProcessor { } } -fn get_tpu_addr(cluster_info: &Arc>) -> Result { - let contact_info = cluster_info.read().unwrap().my_data(); +fn get_tpu_addr(cluster_info: &ClusterInfo) -> Result { + let contact_info = cluster_info.my_contact_info(); Ok(contact_info.tpu) } @@ -556,7 +556,7 @@ fn verify_signature(input: &str) -> Result { #[derive(Clone)] pub struct Meta { pub request_processor: Arc>, - pub cluster_info: Arc>, + pub cluster_info: Arc, pub genesis_hash: Hash, } impl Metadata for Meta {} @@ -902,7 +902,7 @@ impl RpcSol for RpcSolImpl { } fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result> { - let cluster_info = meta.cluster_info.read().unwrap(); + let cluster_info = &meta.cluster_info; fn valid_address_or_none(addr: &SocketAddr) -> Option { if ContactInfo::is_valid_address(addr) { Some(*addr) @@ -910,12 +910,12 @@ impl RpcSol for RpcSolImpl { None } } - let shred_version = cluster_info.my_data().shred_version; + let my_shred_version = cluster_info.my_shred_version(); Ok(cluster_info .all_peers() .iter() .filter_map(|(contact_info, _)| { - if shred_version == contact_info.shred_version + if my_shred_version == contact_info.shred_version && ContactInfo::is_valid_address(&contact_info.gossip) { Some(RpcContactInfo { @@ -1555,17 +1555,12 @@ pub mod tests { StorageState::default(), validator_exit, ))); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); - cluster_info - .write() - .unwrap() - .insert_info(ContactInfo::new_with_pubkey_socketaddr( - &leader_pubkey, - &socketaddr!("127.0.0.1:1234"), - )); + cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr( + &leader_pubkey, + &socketaddr!("127.0.0.1:1234"), + )); let mut io = MetaIoHandler::default(); let rpc = RpcSolImpl; @@ -2258,9 +2253,7 @@ pub mod tests { ); Arc::new(RwLock::new(request_processor)) }, - cluster_info: Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))), + cluster_info: Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())), genesis_hash: Hash::default(), }; @@ -2277,9 +2270,9 @@ pub mod tests { #[test] fn test_rpc_get_tpu_addr() { - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), - ))); + )); assert_eq!( get_tpu_addr(&cluster_info), Ok(socketaddr!("127.0.0.1:1234")) diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index e5ed21f8c..7e0197ac4 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -42,7 +42,7 @@ struct RpcRequestMiddleware { ledger_path: PathBuf, snapshot_archive_path_regex: Regex, snapshot_config: Option, - cluster_info: Arc>, + cluster_info: Arc, trusted_validators: Option>, } @@ -50,7 +50,7 @@ impl RpcRequestMiddleware { pub fn new( ledger_path: PathBuf, snapshot_config: Option, - cluster_info: Arc>, + cluster_info: Arc, trusted_validators: Option>, ) -> Self { Self { @@ -134,22 +134,27 @@ impl RpcRequestMiddleware { fn health_check(&self) -> &'static str { let response = if let Some(trusted_validators) = &self.trusted_validators { let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { - let cluster_info = self.cluster_info.read().unwrap(); ( - cluster_info - .get_accounts_hash_for_node(&cluster_info.id()) - .map(|hashes| hashes.iter().max_by(|a, b| a.0.cmp(&b.0))) + self.cluster_info + .get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| { + hashes + .iter() + .max_by(|a, b| a.0.cmp(&b.0)) + .map(|slot_hash| slot_hash.0) + }) .flatten() - .map(|slot_hash| slot_hash.0) .unwrap_or(0), trusted_validators .iter() .map(|trusted_validator| { - cluster_info - .get_accounts_hash_for_node(&trusted_validator) - .map(|hashes| hashes.iter().max_by(|a, b| a.0.cmp(&b.0))) + self.cluster_info + .get_accounts_hash_for_node(&trusted_validator, |hashes| { + hashes + .iter() + .max_by(|a, b| a.0.cmp(&b.0)) + .map(|slot_hash| slot_hash.0) + }) .flatten() - .map(|slot_hash| slot_hash.0) .unwrap_or(0) }) .max() @@ -244,7 +249,7 @@ impl JsonRpcService { bank_forks: Arc>, block_commitment_cache: Arc>, blockstore: Arc, - cluster_info: Arc>, + cluster_info: Arc, genesis_hash: Hash, ledger_path: &Path, storage_state: StorageState, @@ -367,9 +372,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(&exit); let bank = Bank::new(&genesis_config); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let rpc_addr = SocketAddr::new( ip_addr, @@ -412,9 +415,7 @@ mod tests { #[test] fn test_is_get_path() { - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let rrm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None); let rrm_with_snapshot_config = RpcRequestMiddleware::new( @@ -451,9 +452,7 @@ mod tests { #[test] fn test_health_check_with_no_trusted_validators() { - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None); assert_eq!(rm.health_check(), "ok"); @@ -461,9 +460,7 @@ mod tests { #[test] fn test_health_check_with_trusted_validators() { - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; let rm = RpcRequestMiddleware::new( @@ -477,66 +474,59 @@ mod tests { assert_eq!(rm.health_check(), "behind"); // No account hashes for any trusted validators == "behind" - { - let mut cluster_info = cluster_info.write().unwrap(); - cluster_info - .push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]); - } + cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]); assert_eq!(rm.health_check(), "behind"); // This node is ahead of the trusted validators == "ok" - { - let mut cluster_info = cluster_info.write().unwrap(); - cluster_info - .gossip - .crds - .insert( - CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( - trusted_validators[0].clone(), - vec![ - (1, Hash::default()), - (1001, Hash::default()), - (2, Hash::default()), - ], - ))), - 1, - ) - .unwrap(); - } + cluster_info + .gossip + .write() + .unwrap() + .crds + .insert( + CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( + trusted_validators[0].clone(), + vec![ + (1, Hash::default()), + (1001, Hash::default()), + (2, Hash::default()), + ], + ))), + 1, + ) + .unwrap(); assert_eq!(rm.health_check(), "ok"); // Node is slightly behind the trusted validators == "ok" - { - let mut cluster_info = cluster_info.write().unwrap(); - cluster_info - .gossip - .crds - .insert( - CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( - trusted_validators[1].clone(), - vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE - 1, Hash::default())], - ))), - 1, - ) - .unwrap(); - } + cluster_info + .gossip + .write() + .unwrap() + .crds + .insert( + CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( + trusted_validators[1].clone(), + vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE - 1, Hash::default())], + ))), + 1, + ) + .unwrap(); assert_eq!(rm.health_check(), "ok"); // Node is far behind the trusted validators == "behind" - { - let mut cluster_info = cluster_info.write().unwrap(); - cluster_info - .gossip - .crds - .insert( - CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( - trusted_validators[2].clone(), - vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE, Hash::default())], - ))), - 1, - ) - .unwrap(); - } + cluster_info + .gossip + .write() + .unwrap() + .crds + .insert( + CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( + trusted_validators[2].clone(), + vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE, Hash::default())], + ))), + 1, + ) + .unwrap(); assert_eq!(rm.health_check(), "behind"); } } diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 2cb873da0..0ad12b420 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -72,7 +72,7 @@ pub struct ServeRepair { /// set the keypair that will be used to sign repair responses keypair: Arc, my_info: ContactInfo, - cluster_info: Arc>, + cluster_info: Arc, } type RepairCache = HashMap, Vec<(u64, usize)>)>; @@ -80,16 +80,13 @@ type RepairCache = HashMap, Vec<(u64, usize)>)>; impl ServeRepair { /// Without a valid keypair gossip will not function. Only useful for tests. pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { - Self::new(Arc::new(RwLock::new( - ClusterInfo::new_with_invalid_keypair(contact_info), + Self::new(Arc::new(ClusterInfo::new_with_invalid_keypair( + contact_info, ))) } - pub fn new(cluster_info: Arc>) -> Self { - let (keypair, my_info) = { - let r_cluster_info = cluster_info.read().unwrap(); - (r_cluster_info.keypair.clone(), r_cluster_info.my_data()) - }; + pub fn new(cluster_info: Arc) -> Self { + let (keypair, my_info) = { (cluster_info.keypair.clone(), cluster_info.my_contact_info()) }; Self { keypair, my_info, @@ -362,11 +359,7 @@ impl ServeRepair { // find a peer that appears to be accepting replication and has the desired slot, as indicated // by a valid tvu port location if cache.get(&repair_request.slot()).is_none() { - let repair_peers: Vec<_> = self - .cluster_info - .read() - .unwrap() - .repair_peers(repair_request.slot()); + let repair_peers: Vec<_> = self.cluster_info.repair_peers(repair_request.slot()); if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } @@ -654,7 +647,7 @@ mod tests { fn window_index_request() { let cluster_slots = ClusterSlots::default(); let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me)); let serve_repair = ServeRepair::new(cluster_info.clone()); let rv = serve_repair.repair_request( &cluster_slots, @@ -680,7 +673,7 @@ mod tests { wallclock: 0, shred_version: 0, }; - cluster_info.write().unwrap().insert_info(nxt.clone()); + cluster_info.insert_info(nxt.clone()); let rv = serve_repair .repair_request( &cluster_slots, @@ -708,7 +701,7 @@ mod tests { wallclock: 0, shred_version: 0, }; - cluster_info.write().unwrap().insert_info(nxt); + cluster_info.insert_info(nxt); let mut one = false; let mut two = false; while !one || !two { diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index cedd30488..3a7445ed6 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -6,7 +6,7 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, mpsc::RecvTimeoutError, - Arc, RwLock, + Arc, }, thread::{self, Builder, JoinHandle}, time::Duration, @@ -21,7 +21,7 @@ impl SnapshotPackagerService { snapshot_package_receiver: AccountsPackageReceiver, starting_snapshot_hash: Option<(Slot, Hash)>, exit: &Arc, - cluster_info: &Arc>, + cluster_info: &Arc, blockstore: Option>, ) -> Self { let exit = exit.clone(); @@ -34,10 +34,7 @@ impl SnapshotPackagerService { if let Some(starting_snapshot_hash) = starting_snapshot_hash { hashes.push(starting_snapshot_hash); } - cluster_info - .write() - .unwrap() - .push_snapshot_hashes(hashes.clone()); + cluster_info.push_snapshot_hashes(hashes.clone()); loop { if exit.load(Ordering::Relaxed) { break; @@ -60,10 +57,7 @@ impl SnapshotPackagerService { while hashes.len() > MAX_SNAPSHOT_HASHES { hashes.remove(0); } - cluster_info - .write() - .unwrap() - .push_snapshot_hashes(hashes.clone()); + cluster_info.push_snapshot_hashes(hashes.clone()); } if let Some(ref blockstore) = blockstore { let _ = blockstore.tar_shreds(snapshot_package.root); diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 4b4c25d95..38ab4d53a 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -184,7 +184,7 @@ impl StorageStage { storage_keypair: &Arc, exit: &Arc, bank_forks: &Arc>, - cluster_info: &Arc>, + cluster_info: &Arc, block_commitment_cache: Arc>, ) -> Self { let (instruction_sender, instruction_receiver) = channel(); @@ -286,7 +286,7 @@ impl StorageStage { fn send_transaction( bank_forks: &Arc>, - cluster_info: &Arc>, + cluster_info: &ClusterInfo, instruction: Instruction, keypair: &Arc, storage_keypair: &Arc, @@ -323,7 +323,7 @@ impl StorageStage { for _ in 0..5 { transactions_socket.send_to( &bincode::serialize(&transaction).unwrap(), - cluster_info.read().unwrap().my_data().tpu, + cluster_info.my_contact_info().tpu, )?; sleep(Duration::from_millis(100)); if Self::poll_for_signature_confirmation( @@ -652,10 +652,10 @@ impl StorageStage { } } -pub fn test_cluster_info(id: &Pubkey) -> Arc> { +pub fn test_cluster_info(id: &Pubkey) -> Arc { let contact_info = ContactInfo::new_localhost(id, 0); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - Arc::new(RwLock::new(cluster_info)) + Arc::new(cluster_info) } #[cfg(test)] diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 09e7a5915..2d484e054 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -36,7 +36,7 @@ pub struct Tpu { impl Tpu { #[allow(clippy::too_many_arguments)] pub fn new( - cluster_info: &Arc>, + cluster_info: &Arc, poh_recorder: &Arc>, entry_receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 4f121d5f6..bfca12168 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -84,7 +84,7 @@ impl Tvu { authorized_voter_keypairs: Vec>, storage_keypair: &Arc, bank_forks: &Arc>, - cluster_info: &Arc>, + cluster_info: &Arc, sockets: Sockets, blockstore: Arc, storage_state: &StorageState, @@ -103,11 +103,7 @@ impl Tvu { retransmit_slots_sender: RetransmitSlotsSender, tvu_config: TvuConfig, ) -> Self { - let keypair: Arc = cluster_info - .read() - .expect("Unable to read from cluster_info during Tvu creation") - .keypair - .clone(); + let keypair: Arc = cluster_info.keypair.clone(); let Sockets { repair: repair_socket, @@ -178,7 +174,7 @@ impl Tvu { accounts_hash_receiver, snapshot_package_sender, exit, - cluster_info, + &cluster_info, tvu_config.trusted_validators.clone(), tvu_config.halt_on_trusted_validators_accounts_hash_mismatch, tvu_config.accounts_hash_fault_injection_slots, @@ -286,9 +282,9 @@ pub mod tests { let bank_forks = BankForks::new(0, Bank::new(&genesis_config)); //start cluster_info1 - let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); + let cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); cluster_info1.insert_info(leader.info.clone()); - let cref1 = Arc::new(RwLock::new(cluster_info1)); + let cref1 = Arc::new(cluster_info1); let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config); let (blockstore, l_receiver, completed_slots_receiver) = diff --git a/core/src/validator.rs b/core/src/validator.rs index 18d258ff6..aaf22c943 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -235,10 +235,7 @@ impl Validator { } } - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( - node.info.clone(), - keypair.clone(), - ))); + let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone())); let storage_state = StorageState::new( &bank.last_blockhash(), @@ -370,10 +367,7 @@ impl Validator { // Insert the entrypoint info, should only be None if this node // is the bootstrap validator if let Some(entrypoint_info) = entrypoint_info_option { - cluster_info - .write() - .unwrap() - .set_entrypoint(entrypoint_info.clone()); + cluster_info.set_entrypoint(entrypoint_info.clone()); } let (snapshot_packager_service, snapshot_package_sender) = @@ -647,11 +641,7 @@ fn new_banks_from_blockstore( ) } -fn wait_for_supermajority( - config: &ValidatorConfig, - bank: &Arc, - cluster_info: &Arc>, -) { +fn wait_for_supermajority(config: &ValidatorConfig, bank: &Bank, cluster_info: &ClusterInfo) { if config.wait_for_supermajority != Some(bank.slot()) { return; } @@ -796,11 +786,7 @@ fn report_target_features() { } // Get the activated stake percentage (based on the provided bank) that is visible in gossip -fn get_stake_percent_in_gossip( - bank: &Arc, - cluster_info: &Arc>, - log: bool, -) -> u64 { +fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 { let mut online_stake = 0; let mut wrong_shred_stake = 0; let mut wrong_shred_nodes = vec![]; @@ -808,9 +794,9 @@ fn get_stake_percent_in_gossip( let mut offline_nodes = vec![]; let mut total_activated_stake = 0; - let all_tvu_peers = cluster_info.read().unwrap().all_tvu_peers(); - let my_shred_version = cluster_info.read().unwrap().my_data().shred_version; - let my_id = cluster_info.read().unwrap().id(); + let all_tvu_peers = cluster_info.all_tvu_peers(); + let my_shred_version = cluster_info.my_shred_version(); + let my_id = cluster_info.id(); for (activated_stake, vote_account) in bank.vote_accounts().values() { let vote_state = diff --git a/core/src/window_service.rs b/core/src/window_service.rs index eb830f73a..5ae3b2aa4 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -249,7 +249,7 @@ impl WindowService { #[allow(clippy::too_many_arguments)] pub fn new( blockstore: Arc, - cluster_info: Arc>, + cluster_info: Arc, verified_receiver: CrossbeamReceiver>, retransmit: PacketSender, repair_socket: Arc, @@ -294,7 +294,7 @@ impl WindowService { ); let t_window = Self::start_recv_window_thread( - cluster_info.read().unwrap().id(), + cluster_info.id(), exit, &blockstore, insert_sender, @@ -514,7 +514,7 @@ mod test { net::UdpSocket, sync::atomic::{AtomicBool, Ordering}, sync::mpsc::channel, - sync::{Arc, RwLock}, + sync::Arc, thread::sleep, time::Duration, }; @@ -630,9 +630,9 @@ mod test { let blockstore = Arc::new(blockstore); let (retransmit_sender, _retransmit_receiver) = channel(); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( ContactInfo::new_localhost(&Pubkey::default(), 0), - ))); + )); let cluster_slots = Arc::new(ClusterSlots::default()); let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap()); let window = WindowService::new( diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs index 01ebfaece..06cd06bea 100644 --- a/core/tests/bank_forks.rs +++ b/core/tests/bank_forks.rs @@ -25,7 +25,6 @@ mod tests { signature::{Keypair, Signer}, system_transaction, }; - use std::sync::RwLock; use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc}; use tempfile::TempDir; @@ -319,9 +318,7 @@ mod tests { // channel hold hard links to these deleted snapshots. We verify this is the case below. let exit = Arc::new(AtomicBool::new(false)); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( - ContactInfo::default(), - ))); + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())); let snapshot_packager_service = SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None); diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index f209bed33..42f138f39 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -72,7 +72,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { // describe the leader let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); - let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone()); + let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone()); // setup staked nodes let mut staked_nodes = HashMap::new(); @@ -105,7 +105,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { senders.lock().unwrap().insert(node.id, s); }) }); - let c_info = cluster_info.clone(); + let c_info = cluster_info.clone_with_id(&cluster_info.id()); let staked_nodes = Arc::new(staked_nodes); let shreds_len = 100; @@ -140,7 +140,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) { // start turbine simulation let now = Instant::now(); batches.par_iter_mut().for_each(|batch| { - let mut cluster = c_info.clone(); let mut remaining = batch.len(); let senders: HashMap<_, _> = senders.lock().unwrap().clone(); while remaining > 0 { @@ -150,7 +149,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) { "Timed out with {:?} remaining nodes", remaining ); - cluster.gossip.set_self(&*id); + let cluster = c_info.clone_with_id(id); if !*layer1_done { recv.iter().for_each(|i| { retransmit( diff --git a/core/tests/gossip.rs b/core/tests/gossip.rs index 14bfe424a..fd0d05049 100644 --- a/core/tests/gossip.rs +++ b/core/tests/gossip.rs @@ -10,19 +10,16 @@ use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; -fn test_node(exit: &Arc) -> (Arc>, GossipService, UdpSocket) { +fn test_node(exit: &Arc) -> (Arc, GossipService, UdpSocket) { let keypair = Arc::new(Keypair::new()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( - test_node.info.clone(), - keypair, - ))); + let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair)); let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); - let _ = cluster_info.read().unwrap().my_data(); + let _ = cluster_info.my_contact_info(); ( cluster_info, gossip_service, @@ -36,7 +33,7 @@ fn test_node(exit: &Arc) -> (Arc>, GossipService /// tests that actually use this function are below fn run_gossip_topo(num: usize, topo: F) where - F: Fn(&Vec<(Arc>, GossipService, UdpSocket)>) -> (), + F: Fn(&Vec<(Arc, GossipService, UdpSocket)>) -> (), { let exit = Arc::new(AtomicBool::new(false)); let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect(); @@ -44,10 +41,7 @@ where let mut done = true; for i in 0..(num * 32) { done = true; - let total: usize = listen - .iter() - .map(|v| v.0.read().unwrap().gossip_peers().len()) - .sum(); + let total: usize = listen.iter().map(|v| v.0.gossip_peers().len()).sum(); if (total + num) * 10 > num * num * 9 { done = true; break; @@ -71,11 +65,10 @@ fn gossip_ring() { for n in 0..num { let y = n % listen.len(); let x = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut d = yv.lookup(&yv.id()).unwrap().clone(); + let yv = &listen[y].0; + let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.wallclock = timestamp(); - xv.insert_info(d); + listen[x].0.insert_info(d); } }); } @@ -90,11 +83,10 @@ fn gossip_ring_large() { for n in 0..num { let y = n % listen.len(); let x = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut d = yv.lookup(&yv.id()).unwrap().clone(); + let yv = &listen[y].0; + let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.wallclock = timestamp(); - xv.insert_info(d); + listen[x].0.insert_info(d); } }); } @@ -107,10 +99,10 @@ fn gossip_star() { for n in 0..(num - 1) { let x = 0; let y = (n + 1) % listen.len(); - let mut xv = listen[x].0.write().unwrap(); - let yv = listen[y].0.read().unwrap(); - let mut yd = yv.lookup(&yv.id()).unwrap().clone(); + let yv = &listen[y].0; + let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); yd.wallclock = timestamp(); + let xv = &listen[x].0; xv.insert_info(yd); trace!("star leader {}", &xv.id()); } @@ -124,13 +116,13 @@ fn gossip_rstar() { run_gossip_topo(10, |listen| { let num = listen.len(); let xd = { - let xv = listen[0].0.read().unwrap(); - xv.lookup(&xv.id()).unwrap().clone() + let xv = &listen[0].0; + xv.lookup_contact_info(&xv.id(), |ci| ci.clone()).unwrap() }; trace!("rstar leader {}", xd.id); for n in 0..(num - 1) { let y = (n + 1) % listen.len(); - let mut yv = listen[y].0.write().unwrap(); + let yv = &listen[y].0; yv.insert_info(xd.clone()); trace!("rstar insert {} into {}", xd.id, yv.id()); } @@ -147,10 +139,10 @@ pub fn cluster_info_retransmit() { let (c2, dr2, tn2) = test_node(&exit); trace!("c3:"); let (c3, dr3, tn3) = test_node(&exit); - let c1_data = c1.read().unwrap().my_data().clone(); + let c1_contact_info = c1.my_contact_info(); - c2.write().unwrap().insert_info(c1_data.clone()); - c3.write().unwrap().insert_info(c1_data.clone()); + c2.insert_info(c1_contact_info.clone()); + c3.insert_info(c1_contact_info); let num = 3; @@ -158,9 +150,9 @@ pub fn cluster_info_retransmit() { trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { - done = c1.read().unwrap().gossip_peers().len() == num - 1 - && c2.read().unwrap().gossip_peers().len() == num - 1 - && c3.read().unwrap().gossip_peers().len() == num - 1; + done = c1.gossip_peers().len() == num - 1 + && c2.gossip_peers().len() == num - 1 + && c3.gossip_peers().len() == num - 1; if done { break; } @@ -169,7 +161,7 @@ pub fn cluster_info_retransmit() { assert!(done); let mut p = Packet::default(); p.meta.size = 10; - let peers = c1.read().unwrap().retransmit_peers(); + let peers = c1.retransmit_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap(); let res: Vec<_> = [tn1, tn2, tn3] diff --git a/local-cluster/tests/archiver.rs b/local-cluster/tests/archiver.rs index 55e4ed88e..de219af0a 100644 --- a/local-cluster/tests/archiver.rs +++ b/local-cluster/tests/archiver.rs @@ -17,10 +17,7 @@ use solana_sdk::{ genesis_config::create_genesis_config, signature::{Keypair, Signer}, }; -use std::{ - fs::remove_dir_all, - sync::{Arc, RwLock}, -}; +use std::{fs::remove_dir_all, sync::Arc}; /// Start the cluster with the given configuration and wait till the archivers are discovered /// Then download shreds from one of them. @@ -59,9 +56,9 @@ fn run_archiver_startup_basic(num_nodes: usize, num_archivers: usize) { } assert_eq!(archiver_count, num_archivers); - let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( cluster_nodes[0].clone(), - ))); + )); let serve_repair = ServeRepair::new(cluster_info); let path = get_tmp_ledger_path!(); let blockstore = Arc::new(Blockstore::open(&path).unwrap()); diff --git a/validator/src/main.rs b/validator/src/main.rs index af2c2cf01..89847fa9c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -43,7 +43,7 @@ use std::{ str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, RwLock, + Arc, }, thread::sleep, time::{Duration, Instant}, @@ -78,10 +78,10 @@ fn hash_validator(hash: String) -> Result<(), String> { } fn get_shred_rpc_peers( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, expected_shred_version: Option, ) -> Vec { - let rpc_peers = cluster_info.read().unwrap().all_rpc_peers(); + let rpc_peers = cluster_info.all_rpc_peers(); match expected_shred_version { Some(expected_shred_version) => { // Filter out rpc peers that don't match the expected shred version @@ -114,21 +114,17 @@ fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option } fn get_trusted_snapshot_hashes( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, trusted_validators: &Option>, ) -> Option> { if let Some(trusted_validators) = trusted_validators { let mut trusted_snapshot_hashes = HashSet::new(); for trusted_validator in trusted_validators { - if let Some(snapshot_hashes) = cluster_info - .read() - .unwrap() - .get_snapshot_hash_for_node(trusted_validator) - { + cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| { for snapshot_hash in snapshot_hashes { trusted_snapshot_hashes.insert(*snapshot_hash); } - } + }); } Some(trusted_snapshot_hashes) } else { @@ -141,13 +137,13 @@ fn start_gossip_node( entrypoint_gossip: &SocketAddr, gossip_addr: &SocketAddr, gossip_socket: UdpSocket, -) -> (Arc>, Arc, GossipService) { - let mut cluster_info = ClusterInfo::new( +) -> (Arc, Arc, GossipService) { + let cluster_info = ClusterInfo::new( ClusterInfo::gossip_contact_info(&identity_keypair.pubkey(), *gossip_addr), identity_keypair.clone(), ); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(cluster_info); let gossip_exit_flag = Arc::new(AtomicBool::new(false)); let gossip_service = GossipService::new( @@ -160,7 +156,7 @@ fn start_gossip_node( } fn get_rpc_node( - cluster_info: &Arc>, + cluster_info: &ClusterInfo, validator_config: &ValidatorConfig, blacklisted_rpc_nodes: &mut HashSet, snapshot_not_required: bool, @@ -173,7 +169,7 @@ fn get_rpc_node( validator_config.expected_shred_version ); sleep(Duration::from_secs(1)); - info!("\n{}", cluster_info.read().unwrap().contact_info_trace()); + info!("\n{}", cluster_info.contact_info_trace()); let rpc_peers = get_shred_rpc_peers(&cluster_info, validator_config.expected_shred_version); let rpc_peers_total = rpc_peers.len(); @@ -222,11 +218,7 @@ fn get_rpc_node( { continue; } - if let Some(snapshot_hashes) = cluster_info - .read() - .unwrap() - .get_snapshot_hash_for_node(&rpc_peer.id) - { + cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| { for snapshot_hash in snapshot_hashes { if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes { if !trusted_snapshot_hashes.contains(snapshot_hash) { @@ -247,7 +239,7 @@ fn get_rpc_node( eligible_rpc_peers.push(rpc_peer.clone()); } } - } + }); } match highest_snapshot_hash {