Push down cluster_info lock (#9594)

* Push down cluster_info lock

* Rework budget decrement

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-04-21 12:54:45 -07:00 committed by GitHub
parent ad186b8652
commit bab3502260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 679 additions and 671 deletions

View File

@ -53,7 +53,7 @@ use std::{
result, result,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
sync::mpsc::{channel, Receiver, Sender}, sync::mpsc::{channel, Receiver, Sender},
sync::{Arc, RwLock}, sync::Arc,
thread::{sleep, spawn, JoinHandle}, thread::{sleep, spawn, JoinHandle},
time::Duration, time::Duration,
}; };
@ -185,9 +185,9 @@ impl Archiver {
info!("Archiver: id: {}", keypair.pubkey()); info!("Archiver: id: {}", keypair.pubkey());
info!("Creating cluster info...."); info!("Creating cluster info....");
let mut cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone()); let cluster_info = ClusterInfo::new(node.info.clone(), keypair.clone());
cluster_info.set_entrypoint(cluster_entrypoint.clone()); cluster_info.set_entrypoint(cluster_entrypoint.clone());
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let cluster_slots = Arc::new(ClusterSlots::default()); let cluster_slots = Arc::new(ClusterSlots::default());
// Note for now, this ledger will not contain any of the existing entries // Note for now, this ledger will not contain any of the existing entries
// in the ledger located at ledger_path, and will only append on newly received // in the ledger located at ledger_path, and will only append on newly received
@ -308,7 +308,7 @@ impl Archiver {
fn run( fn run(
meta: &mut ArchiverMeta, meta: &mut ArchiverMeta,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
archiver_keypair: &Arc<Keypair>, archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -365,12 +365,12 @@ impl Archiver {
} }
fn redeem_rewards( fn redeem_rewards(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
archiver_keypair: &Arc<Keypair>, archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
client_commitment: CommitmentConfig, client_commitment: CommitmentConfig,
) { ) {
let nodes = cluster_info.read().unwrap().tvu_peers(); let nodes = cluster_info.tvu_peers();
let client = solana_core::gossip_service::get_client(&nodes); let client = solana_core::gossip_service::get_client(&nodes);
if let Ok(Some(account)) = if let Ok(Some(account)) =
@ -405,7 +405,7 @@ impl Archiver {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn setup( fn setup(
meta: &mut ArchiverMeta, meta: &mut ArchiverMeta,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
node_info: &ContactInfo, node_info: &ContactInfo,
@ -491,7 +491,7 @@ impl Archiver {
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
node_info: &ContactInfo, node_info: &ContactInfo,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
) { ) {
info!( info!(
"window created, waiting for ledger download starting at slot {:?}", "window created, waiting for ledger download starting at slot {:?}",
@ -519,11 +519,8 @@ impl Archiver {
contact_info.tvu = "0.0.0.0:0".parse().unwrap(); contact_info.tvu = "0.0.0.0:0".parse().unwrap();
contact_info.wallclock = timestamp(); contact_info.wallclock = timestamp();
// copy over the adopted shred_version from the entrypoint // copy over the adopted shred_version from the entrypoint
contact_info.shred_version = cluster_info.read().unwrap().my_data().shred_version; contact_info.shred_version = cluster_info.my_shred_version();
{ cluster_info.update_contact_info(|current| *current = contact_info);
let mut cluster_info_w = cluster_info.write().unwrap();
cluster_info_w.insert_self(contact_info);
}
} }
fn encrypt_ledger(meta: &mut ArchiverMeta, blockstore: &Arc<Blockstore>) -> Result<()> { fn encrypt_ledger(meta: &mut ArchiverMeta, blockstore: &Arc<Blockstore>) -> Result<()> {
@ -626,12 +623,12 @@ impl Archiver {
fn submit_mining_proof( fn submit_mining_proof(
meta: &ArchiverMeta, meta: &ArchiverMeta,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
archiver_keypair: &Arc<Keypair>, archiver_keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
) { ) {
// No point if we've got no storage account... // No point if we've got no storage account...
let nodes = cluster_info.read().unwrap().tvu_peers(); let nodes = cluster_info.tvu_peers();
let client = solana_core::gossip_service::get_client(&nodes); let client = solana_core::gossip_service::get_client(&nodes);
let storage_balance = client let storage_balance = client
.poll_get_balance_with_commitment(&storage_keypair.pubkey(), meta.client_commitment); .poll_get_balance_with_commitment(&storage_keypair.pubkey(), meta.client_commitment);
@ -689,13 +686,10 @@ impl Archiver {
} }
fn get_segment_config( fn get_segment_config(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
client_commitment: CommitmentConfig, client_commitment: CommitmentConfig,
) -> Result<u64> { ) -> Result<u64> {
let rpc_peers = { let rpc_peers = cluster_info.all_rpc_peers();
let cluster_info = cluster_info.read().unwrap();
cluster_info.all_rpc_peers()
};
debug!("rpc peers: {:?}", rpc_peers); debug!("rpc peers: {:?}", rpc_peers);
if !rpc_peers.is_empty() { if !rpc_peers.is_empty() {
let rpc_client = { let rpc_client = {
@ -721,7 +715,7 @@ impl Archiver {
/// Waits until the first segment is ready, and returns the current segment /// Waits until the first segment is ready, and returns the current segment
fn poll_for_segment( fn poll_for_segment(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
slots_per_segment: u64, slots_per_segment: u64,
previous_blockhash: &Hash, previous_blockhash: &Hash,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -741,17 +735,14 @@ impl Archiver {
/// Poll for a different blockhash and associated max_slot than `previous_blockhash` /// Poll for a different blockhash and associated max_slot than `previous_blockhash`
fn poll_for_blockhash_and_slot( fn poll_for_blockhash_and_slot(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
slots_per_segment: u64, slots_per_segment: u64,
previous_blockhash: &Hash, previous_blockhash: &Hash,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
) -> Result<(Hash, u64)> { ) -> Result<(Hash, u64)> {
info!("waiting for the next turn..."); info!("waiting for the next turn...");
loop { loop {
let rpc_peers = { let rpc_peers = cluster_info.all_rpc_peers();
let cluster_info = cluster_info.read().unwrap();
cluster_info.all_rpc_peers()
};
debug!("rpc peers: {:?}", rpc_peers); debug!("rpc peers: {:?}", rpc_peers);
if !rpc_peers.is_empty() { if !rpc_peers.is_empty() {
let rpc_client = { let rpc_client = {

View File

@ -28,7 +28,7 @@ use solana_sdk::{
transaction::Transaction, transaction::Transaction,
}; };
use std::{ use std::{
sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex, RwLock}, sync::{atomic::Ordering, mpsc::Receiver, Arc, Mutex},
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -152,7 +152,7 @@ fn main() {
let (exit, poh_recorder, poh_service, signal_receiver) = let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blockstore, None); create_test_recorder(&bank, &blockstore, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,

View File

@ -190,7 +190,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
let (exit, poh_recorder, poh_service, signal_receiver) = let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blockstore, None); create_test_recorder(&bank, &blockstore, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let _banking_stage = BankingStage::new( let _banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,

View File

@ -36,7 +36,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
} }
let stakes = Arc::new(stakes); let stakes = Arc::new(stakes);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone())); let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone()));
let shreds = Arc::new(shreds); let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0)); let last_datapoint = Arc::new(AtomicU64::new(0));

View File

@ -45,7 +45,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
peer_sockets.push(socket); peer_sockets.push(socket);
} }
let peer_sockets = Arc::new(peer_sockets); let peer_sockets = Arc::new(peer_sockets);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100_000);
let bank0 = Bank::new(&genesis_config); let bank0 = Bank::new(&genesis_config);

View File

@ -15,7 +15,7 @@ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError, mpsc::RecvTimeoutError,
Arc, RwLock, Arc,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
@ -30,7 +30,7 @@ impl AccountsHashVerifier {
accounts_package_receiver: AccountsPackageReceiver, accounts_package_receiver: AccountsPackageReceiver,
accounts_package_sender: Option<AccountsPackageSender>, accounts_package_sender: Option<AccountsPackageSender>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>, trusted_validators: Option<HashSet<Pubkey>>,
halt_on_trusted_validators_accounts_hash_mismatch: bool, halt_on_trusted_validators_accounts_hash_mismatch: bool,
fault_injection_rate_slots: u64, fault_injection_rate_slots: u64,
@ -74,7 +74,7 @@ impl AccountsHashVerifier {
fn process_accounts_package( fn process_accounts_package(
accounts_package: AccountsPackage, accounts_package: AccountsPackage,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
trusted_validators: &Option<HashSet<Pubkey>>, trusted_validators: &Option<HashSet<Pubkey>>,
halt_on_trusted_validator_accounts_hash_mismatch: bool, halt_on_trusted_validator_accounts_hash_mismatch: bool,
accounts_package_sender: &Option<AccountsPackageSender>, accounts_package_sender: &Option<AccountsPackageSender>,
@ -117,14 +117,11 @@ impl AccountsHashVerifier {
} }
} }
cluster_info cluster_info.push_accounts_hashes(hashes.clone());
.write()
.unwrap()
.push_accounts_hashes(hashes.clone());
} }
fn should_halt( fn should_halt(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
trusted_validators: &Option<HashSet<Pubkey>>, trusted_validators: &Option<HashSet<Pubkey>>,
slot_to_hash: &mut HashMap<Slot, Hash>, slot_to_hash: &mut HashMap<Slot, Hash>,
) -> bool { ) -> bool {
@ -132,11 +129,9 @@ impl AccountsHashVerifier {
let mut highest_slot = 0; let mut highest_slot = 0;
if let Some(trusted_validators) = trusted_validators.as_ref() { if let Some(trusted_validators) = trusted_validators.as_ref() {
for trusted_validator in trusted_validators { for trusted_validator in trusted_validators {
let cluster_info_r = cluster_info.read().unwrap(); let is_conflicting = cluster_info.get_accounts_hash_for_node(trusted_validator, |accounts_hashes|
if let Some(accounts_hashes) =
cluster_info_r.get_accounts_hash_for_node(trusted_validator)
{ {
for (slot, hash) in accounts_hashes { accounts_hashes.iter().any(|(slot, hash)| {
if let Some(reference_hash) = slot_to_hash.get(slot) { if let Some(reference_hash) = slot_to_hash.get(slot) {
if *hash != *reference_hash { if *hash != *reference_hash {
error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})", error!("Trusted validator {} produced conflicting hashes for slot: {} ({} != {})",
@ -145,16 +140,21 @@ impl AccountsHashVerifier {
hash, hash,
reference_hash, reference_hash,
); );
true
return true;
} else { } else {
verified_count += 1; verified_count += 1;
false
} }
} else { } else {
highest_slot = std::cmp::max(*slot, highest_slot); highest_slot = std::cmp::max(*slot, highest_slot);
slot_to_hash.insert(*slot, *hash); slot_to_hash.insert(*slot, *hash);
false
} }
} })
}).unwrap_or(false);
if is_conflicting {
return true;
} }
} }
} }
@ -188,7 +188,7 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let mut trusted_validators = HashSet::new(); let mut trusted_validators = HashSet::new();
let mut slot_to_hash = HashMap::new(); let mut slot_to_hash = HashMap::new();
@ -203,8 +203,7 @@ mod tests {
let hash2 = hash(&[2]); let hash2 = hash(&[2]);
{ {
let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap(); let message = make_accounts_hashes_message(&validator1, vec![(0, hash1)]).unwrap();
let mut cluster_info_w = cluster_info.write().unwrap(); cluster_info.push_message(message);
cluster_info_w.push_message(message);
} }
slot_to_hash.insert(0, hash2); slot_to_hash.insert(0, hash2);
trusted_validators.insert(validator1.pubkey()); trusted_validators.insert(validator1.pubkey());
@ -224,7 +223,7 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let trusted_validators = HashSet::new(); let trusted_validators = HashSet::new();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
@ -254,9 +253,8 @@ mod tests {
100, 100,
); );
} }
let cluster_info_r = cluster_info.read().unwrap(); let cluster_hashes = cluster_info
let cluster_hashes = cluster_info_r .get_accounts_hash_for_node(&keypair.pubkey(), |c| c.clone())
.get_accounts_hash_for_node(&keypair.pubkey())
.unwrap(); .unwrap();
info!("{:?}", cluster_hashes); info!("{:?}", cluster_hashes);
assert_eq!(hashes.len(), MAX_SNAPSHOT_HASHES); assert_eq!(hashes.len(), MAX_SNAPSHOT_HASHES);

View File

@ -41,7 +41,7 @@ use std::{
net::UdpSocket, net::UdpSocket,
sync::atomic::AtomicBool, sync::atomic::AtomicBool,
sync::mpsc::Receiver, sync::mpsc::Receiver,
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
time::Instant, time::Instant,
@ -76,7 +76,7 @@ impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped. /// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
@ -93,7 +93,7 @@ impl BankingStage {
} }
fn new_num_threads( fn new_num_threads(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>, verified_vote_receiver: CrossbeamReceiver<Vec<Packets>>,
@ -104,7 +104,7 @@ impl BankingStage {
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
// This thread talks to poh_service and broadcasts the entries once they have been recorded. // This thread talks to poh_service and broadcasts the entries once they have been recorded.
// Once an entry has been recorded, its blockhash is registered with the bank. // Once an entry has been recorded, its blockhash is registered with the bank.
let my_pubkey = cluster_info.read().unwrap().id(); let my_pubkey = cluster_info.id();
// Many banks that process transactions in parallel. // Many banks that process transactions in parallel.
let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads) let bank_thread_hdls: Vec<JoinHandle<()>> = (0..num_threads)
.map(|i| { .map(|i| {
@ -287,7 +287,7 @@ impl BankingStage {
my_pubkey: &Pubkey, my_pubkey: &Pubkey,
socket: &std::net::UdpSocket, socket: &std::net::UdpSocket,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
buffered_packets: &mut Vec<PacketsAndOffsets>, buffered_packets: &mut Vec<PacketsAndOffsets>,
enable_forwarding: bool, enable_forwarding: bool,
batch_limit: usize, batch_limit: usize,
@ -331,10 +331,7 @@ impl BankingStage {
next_leader.map_or((), |leader_pubkey| { next_leader.map_or((), |leader_pubkey| {
let leader_addr = { let leader_addr = {
cluster_info cluster_info
.read() .lookup_contact_info(&leader_pubkey, |leader| leader.tpu_forwards)
.unwrap()
.lookup(&leader_pubkey)
.map(|leader| leader.tpu_forwards)
}; };
leader_addr.map_or((), |leader_addr| { leader_addr.map_or((), |leader_addr| {
@ -358,7 +355,7 @@ impl BankingStage {
my_pubkey: Pubkey, my_pubkey: Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>, verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
recv_start: &mut Instant, recv_start: &mut Instant,
enable_forwarding: bool, enable_forwarding: bool,
id: u32, id: u32,
@ -1049,7 +1046,7 @@ mod tests {
let (exit, poh_recorder, poh_service, _entry_receiever) = let (exit, poh_recorder, poh_service, _entry_receiever) =
create_test_recorder(&bank, &blockstore, None); create_test_recorder(&bank, &blockstore, None);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
@ -1089,7 +1086,7 @@ mod tests {
let (exit, poh_recorder, poh_service, entry_receiver) = let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config)); create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
@ -1152,7 +1149,7 @@ mod tests {
let (exit, poh_recorder, poh_service, entry_receiver) = let (exit, poh_recorder, poh_service, entry_receiver) =
create_test_recorder(&bank, &blockstore, Some(poh_config)); create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let banking_stage = BankingStage::new( let banking_stage = BankingStage::new(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,
@ -1293,7 +1290,7 @@ mod tests {
create_test_recorder(&bank, &blockstore, Some(poh_config)); create_test_recorder(&bank, &blockstore, Some(poh_config));
let cluster_info = let cluster_info =
ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let _banking_stage = BankingStage::new_num_threads( let _banking_stage = BankingStage::new_num_threads(
&cluster_info, &cluster_info,
&poh_recorder, &poh_recorder,

View File

@ -29,7 +29,7 @@ use std::{
net::UdpSocket, net::UdpSocket,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender},
sync::{Arc, Mutex, RwLock}, sync::{Arc, Mutex},
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -62,14 +62,14 @@ impl BroadcastStageType {
pub fn new_broadcast_stage( pub fn new_broadcast_stage(
&self, &self,
sock: Vec<UdpSocket>, sock: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
receiver: Receiver<WorkingBankEntry>, receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver, retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
shred_version: u16, shred_version: u16,
) -> BroadcastStage { ) -> BroadcastStage {
let keypair = cluster_info.read().unwrap().keypair.clone(); let keypair = cluster_info.keypair.clone();
match self { match self {
BroadcastStageType::Standard => BroadcastStage::new( BroadcastStageType::Standard => BroadcastStage::new(
sock, sock,
@ -116,7 +116,7 @@ trait BroadcastRun {
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
) -> Result<()>; ) -> Result<()>;
fn record( fn record(
@ -205,7 +205,7 @@ impl BroadcastStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
socks: Vec<UdpSocket>, socks: Vec<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
receiver: Receiver<WorkingBankEntry>, receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver, retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>, exit_sender: &Arc<AtomicBool>,
@ -357,11 +357,11 @@ fn update_peer_stats(
} }
pub fn get_broadcast_peers<S: std::hash::BuildHasher>( pub fn get_broadcast_peers<S: std::hash::BuildHasher>(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>, stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) { ) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
use crate::cluster_info; use crate::cluster_info;
let mut peers = cluster_info.read().unwrap().tvu_peers(); let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes); let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes);
(peers, peers_and_stakes) (peers, peers_and_stakes)
} }
@ -450,11 +450,7 @@ pub mod test {
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
use std::{ use std::{
path::Path, path::Path, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc, thread::sleep,
sync::atomic::AtomicBool,
sync::mpsc::channel,
sync::{Arc, RwLock},
thread::sleep,
}; };
pub fn make_transmit_shreds( pub fn make_transmit_shreds(
@ -598,16 +594,16 @@ pub mod test {
let broadcast_buddy = Node::new_localhost_with_pubkey(&buddy_keypair.pubkey()); let broadcast_buddy = Node::new_localhost_with_pubkey(&buddy_keypair.pubkey());
// Fill the cluster_info with the buddy's info // Fill the cluster_info with the buddy's info
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone()); let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
cluster_info.insert_info(broadcast_buddy.info); cluster_info.insert_info(broadcast_buddy.info);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let exit_sender = Arc::new(AtomicBool::new(false)); let exit_sender = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Arc::new(Bank::new(&genesis_config)); let bank = Arc::new(Bank::new(&genesis_config));
let leader_keypair = cluster_info.read().unwrap().keypair.clone(); let leader_keypair = cluster_info.keypair.clone();
// Start up the broadcast stage // Start up the broadcast stage
let broadcast_service = BroadcastStage::new( let broadcast_service = BroadcastStage::new(
leader_info.sockets.broadcast, leader_info.sockets.broadcast,

View File

@ -104,11 +104,11 @@ impl BroadcastRun for BroadcastFakeShredsRun {
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
) -> Result<()> { ) -> Result<()> {
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() {
let peers = cluster_info.read().unwrap().tvu_peers(); let peers = cluster_info.tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| { peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition && stakes.is_some() { if i <= self.partition && stakes.is_some() {
// Send fake shreds to the first N peers // Send fake shreds to the first N peers
@ -145,7 +145,7 @@ mod tests {
#[test] #[test]
fn test_tvu_peers_ordering() { fn test_tvu_peers_ordering() {
let mut cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost( let cluster = ClusterInfo::new_with_invalid_keypair(ContactInfo::new_localhost(
&Pubkey::new_rand(), &Pubkey::new_rand(),
0, 0,
)); ));

View File

@ -74,7 +74,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
) -> Result<()> { ) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;

View File

@ -120,7 +120,7 @@ impl StandardBroadcastRun {
#[cfg(test)] #[cfg(test)]
fn test_process_receive_results( fn test_process_receive_results(
&mut self, &mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
receive_results: ReceiveResults, receive_results: ReceiveResults,
@ -288,7 +288,7 @@ impl StandardBroadcastRun {
fn broadcast( fn broadcast(
&mut self, &mut self,
sock: &UdpSocket, sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
stakes: Option<Arc<HashMap<Pubkey, u64>>>, stakes: Option<Arc<HashMap<Pubkey, u64>>>,
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
@ -374,7 +374,7 @@ impl BroadcastRun for StandardBroadcastRun {
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
) -> Result<()> { ) -> Result<()> {
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
@ -404,7 +404,7 @@ mod test {
genesis_config::GenesisConfig, genesis_config::GenesisConfig,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
fn setup( fn setup(
@ -412,7 +412,7 @@ mod test {
) -> ( ) -> (
Arc<Blockstore>, Arc<Blockstore>,
GenesisConfig, GenesisConfig,
Arc<RwLock<ClusterInfo>>, Arc<ClusterInfo>,
Arc<Bank>, Arc<Bank>,
Arc<Keypair>, Arc<Keypair>,
UdpSocket, UdpSocket,
@ -425,9 +425,9 @@ mod test {
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey(); let leader_pubkey = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey); let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
leader_info.info.clone(), leader_info.info.clone(),
))); ));
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut genesis_config = create_genesis_config(10_000).genesis_config; let mut genesis_config = create_genesis_config(10_000).genesis_config;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1;

File diff suppressed because it is too large Load Diff

View File

@ -197,7 +197,7 @@ pub struct ClusterInfoVoteListener {
impl ClusterInfoVoteListener { impl ClusterInfoVoteListener {
pub fn new( pub fn new(
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
sigverify_disabled: bool, sigverify_disabled: bool,
sender: CrossbeamSender<Vec<Packets>>, sender: CrossbeamSender<Vec<Packets>>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -262,7 +262,7 @@ impl ClusterInfoVoteListener {
fn recv_loop( fn recv_loop(
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
sigverify_disabled: bool, sigverify_disabled: bool,
verified_vote_packets_sender: VerifiedVotePacketsSender, verified_vote_packets_sender: VerifiedVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
@ -272,7 +272,7 @@ impl ClusterInfoVoteListener {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
let (labels, votes, new_ts) = cluster_info.read().unwrap().get_votes(last_ts); let (labels, votes, new_ts) = cluster_info.get_votes(last_ts);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len()); inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
last_ts = new_ts; last_ts = new_ts;

View File

@ -27,15 +27,10 @@ impl ClusterSlots {
pub fn lookup(&self, slot: Slot) -> Option<Arc<RwLock<SlotPubkeys>>> { pub fn lookup(&self, slot: Slot) -> Option<Arc<RwLock<SlotPubkeys>>> {
self.cluster_slots.read().unwrap().get(&slot).cloned() self.cluster_slots.read().unwrap().get(&slot).cloned()
} }
pub fn update( pub fn update(&self, root: Slot, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
&self,
root: Slot,
cluster_info: &RwLock<ClusterInfo>,
bank_forks: &RwLock<BankForks>,
) {
self.update_peers(cluster_info, bank_forks); self.update_peers(cluster_info, bank_forks);
let since = *self.since.read().unwrap(); let since = *self.since.read().unwrap();
let epoch_slots = cluster_info.read().unwrap().get_epoch_slots_since(since); let epoch_slots = cluster_info.get_epoch_slots_since(since);
self.update_internal(root, epoch_slots); self.update_internal(root, epoch_slots);
} }
fn update_internal(&self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) { fn update_internal(&self, root: Slot, epoch_slots: (Vec<EpochSlots>, Option<u64>)) {
@ -95,7 +90,7 @@ impl ClusterSlots {
.collect() .collect()
} }
fn update_peers(&self, cluster_info: &RwLock<ClusterInfo>, bank_forks: &RwLock<BankForks>) { fn update_peers(&self, cluster_info: &ClusterInfo, bank_forks: &RwLock<BankForks>) {
let root_bank = bank_forks.read().unwrap().root_bank().clone(); let root_bank = bank_forks.read().unwrap().root_bank().clone();
let root_epoch = root_bank.epoch(); let root_epoch = root_bank.epoch();
let my_epoch = *self.epoch.read().unwrap(); let my_epoch = *self.epoch.read().unwrap();
@ -111,7 +106,7 @@ impl ClusterSlots {
.clone(); .clone();
*self.validator_stakes.write().unwrap() = validator_stakes; *self.validator_stakes.write().unwrap() = validator_stakes;
let id = cluster_info.read().unwrap().id(); let id = cluster_info.id();
*self.self_id.write().unwrap() = id; *self.self_id.write().unwrap() = id;
*self.epoch.write().unwrap() = Some(root_epoch); *self.epoch.write().unwrap() = Some(root_epoch);
} }

View File

@ -22,7 +22,7 @@ pub struct GossipService {
impl GossipService { impl GossipService {
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
bank_forks: Option<Arc<RwLock<BankForks>>>, bank_forks: Option<Arc<RwLock<BankForks>>>,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
@ -31,7 +31,7 @@ impl GossipService {
let gossip_socket = Arc::new(gossip_socket); let gossip_socket = Arc::new(gossip_socket);
trace!( trace!(
"GossipService: id: {}, listening on: {:?}", "GossipService: id: {}, listening on: {:?}",
&cluster_info.read().unwrap().my_data().id, &cluster_info.id(),
gossip_socket.local_addr().unwrap() gossip_socket.local_addr().unwrap()
); );
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
@ -89,7 +89,7 @@ pub fn discover(
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, ip_echo, spy_ref) = make_gossip_node(entrypoint, &exit, my_gossip_addr); let (gossip_service, ip_echo, spy_ref) = make_gossip_node(entrypoint, &exit, my_gossip_addr);
let id = spy_ref.read().unwrap().keypair.pubkey(); let id = spy_ref.id();
info!("Entrypoint: {:?}", entrypoint); info!("Entrypoint: {:?}", entrypoint);
info!("Node Id: {:?}", id); info!("Node Id: {:?}", id);
if let Some(my_gossip_addr) = my_gossip_addr { if let Some(my_gossip_addr) = my_gossip_addr {
@ -113,7 +113,7 @@ pub fn discover(
info!( info!(
"discover success in {}s...\n{}", "discover success in {}s...\n{}",
secs, secs,
spy_ref.read().unwrap().contact_info_trace() spy_ref.contact_info_trace()
); );
return Ok((tvu_peers, storage_peers)); return Ok((tvu_peers, storage_peers));
} }
@ -121,15 +121,12 @@ pub fn discover(
if !tvu_peers.is_empty() { if !tvu_peers.is_empty() {
info!( info!(
"discover failed to match criteria by timeout...\n{}", "discover failed to match criteria by timeout...\n{}",
spy_ref.read().unwrap().contact_info_trace() spy_ref.contact_info_trace()
); );
return Ok((tvu_peers, storage_peers)); return Ok((tvu_peers, storage_peers));
} }
info!( info!("discover failed...\n{}", spy_ref.contact_info_trace());
"discover failed...\n{}",
spy_ref.read().unwrap().contact_info_trace()
);
Err(std::io::Error::new( Err(std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
"Discover failed", "Discover failed",
@ -176,7 +173,7 @@ pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) {
} }
fn spy( fn spy(
spy_ref: Arc<RwLock<ClusterInfo>>, spy_ref: Arc<ClusterInfo>,
num_nodes: Option<usize>, num_nodes: Option<usize>,
timeout: Option<u64>, timeout: Option<u64>,
find_node_by_pubkey: Option<Pubkey>, find_node_by_pubkey: Option<Pubkey>,
@ -194,13 +191,8 @@ fn spy(
} }
} }
tvu_peers = spy_ref tvu_peers = spy_ref.all_tvu_peers().into_iter().collect::<Vec<_>>();
.read() storage_peers = spy_ref.all_storage_peers();
.unwrap()
.all_tvu_peers()
.into_iter()
.collect::<Vec<_>>();
storage_peers = spy_ref.read().unwrap().all_storage_peers();
let mut nodes: Vec<_> = tvu_peers.iter().chain(storage_peers.iter()).collect(); let mut nodes: Vec<_> = tvu_peers.iter().chain(storage_peers.iter()).collect();
nodes.sort(); nodes.sort();
@ -232,10 +224,7 @@ fn spy(
met_criteria = true; met_criteria = true;
} }
if i % 20 == 0 { if i % 20 == 0 {
info!( info!("discovering...\n{}", spy_ref.contact_info_trace());
"discovering...\n{}",
spy_ref.read().unwrap().contact_info_trace()
);
} }
sleep(Duration::from_millis( sleep(Duration::from_millis(
crate::cluster_info::GOSSIP_SLEEP_MILLIS, crate::cluster_info::GOSSIP_SLEEP_MILLIS,
@ -256,18 +245,18 @@ fn make_gossip_node(
entrypoint: Option<&SocketAddr>, entrypoint: Option<&SocketAddr>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
gossip_addr: Option<&SocketAddr>, gossip_addr: Option<&SocketAddr>,
) -> (GossipService, Option<TcpListener>, Arc<RwLock<ClusterInfo>>) { ) -> (GossipService, Option<TcpListener>, Arc<ClusterInfo>) {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr { let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr {
ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr) ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr)
} else { } else {
ClusterInfo::spy_node(&keypair.pubkey()) ClusterInfo::spy_node(&keypair.pubkey())
}; };
let mut cluster_info = ClusterInfo::new(node, keypair); let cluster_info = ClusterInfo::new(node, keypair);
if let Some(entrypoint) = entrypoint { if let Some(entrypoint) = entrypoint {
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint)); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint));
} }
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit); let gossip_service = GossipService::new(&cluster_info.clone(), None, gossip_socket, &exit);
(gossip_service, ip_echo, cluster_info) (gossip_service, ip_echo, cluster_info)
} }
@ -277,7 +266,7 @@ mod tests {
use super::*; use super::*;
use crate::cluster_info::{ClusterInfo, Node}; use crate::cluster_info::{ClusterInfo, Node};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
#[test] #[test]
#[ignore] #[ignore]
@ -286,7 +275,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let tn = Node::new_localhost(); let tn = Node::new_localhost();
let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone()); let cluster_info = ClusterInfo::new_with_invalid_keypair(tn.info.clone());
let c = Arc::new(RwLock::new(cluster_info)); let c = Arc::new(cluster_info);
let d = GossipService::new(&c, None, tn.sockets.gossip, &exit); let d = GossipService::new(&c, None, tn.sockets.gossip, &exit);
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
d.join().unwrap(); d.join().unwrap();
@ -300,16 +289,16 @@ mod tests {
let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0); let contact_info = ContactInfo::new_localhost(&keypair.pubkey(), 0);
let peer0_info = ContactInfo::new_localhost(&peer0, 0); let peer0_info = ContactInfo::new_localhost(&peer0, 0);
let peer1_info = ContactInfo::new_localhost(&peer1, 0); let peer1_info = ContactInfo::new_localhost(&peer1, 0);
let mut cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair)); let cluster_info = ClusterInfo::new(contact_info.clone(), Arc::new(keypair));
cluster_info.insert_info(peer0_info.clone()); cluster_info.insert_info(peer0_info.clone());
cluster_info.insert_info(peer1_info); cluster_info.insert_info(peer1_info);
let spy_ref = Arc::new(RwLock::new(cluster_info)); let spy_ref = Arc::new(cluster_info);
let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None, None); let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None, None);
assert_eq!(met_criteria, false); assert_eq!(met_criteria, false);
assert_eq!(secs, 1); assert_eq!(secs, 1);
assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers()); assert_eq!(tvu_peers, spy_ref.tvu_peers());
// Find num_nodes // Find num_nodes
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None, None); let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None, None);

View File

@ -81,7 +81,7 @@ impl RepairService {
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
) -> Self { ) -> Self {
@ -106,12 +106,12 @@ impl RepairService {
blockstore: &Blockstore, blockstore: &Blockstore,
exit: &AtomicBool, exit: &AtomicBool,
repair_socket: &UdpSocket, repair_socket: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
repair_strategy: RepairStrategy, repair_strategy: RepairStrategy,
cluster_slots: &Arc<ClusterSlots>, cluster_slots: &Arc<ClusterSlots>,
) { ) {
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let id = cluster_info.read().unwrap().id(); let id = cluster_info.id();
if let RepairStrategy::RepairAll { .. } = repair_strategy { if let RepairStrategy::RepairAll { .. } = repair_strategy {
Self::initialize_lowest_slot(id, blockstore, cluster_info); Self::initialize_lowest_slot(id, blockstore, cluster_info);
} }
@ -308,24 +308,17 @@ impl RepairService {
} }
} }
fn initialize_lowest_slot( fn initialize_lowest_slot(id: Pubkey, blockstore: &Blockstore, cluster_info: &ClusterInfo) {
id: Pubkey,
blockstore: &Blockstore,
cluster_info: &RwLock<ClusterInfo>,
) {
// Safe to set into gossip because by this time, the leader schedule cache should // Safe to set into gossip because by this time, the leader schedule cache should
// also be updated with the latest root (done in blockstore_processor) and thus // also be updated with the latest root (done in blockstore_processor) and thus
// will provide a schedule to window_service for any incoming shreds up to the // will provide a schedule to window_service for any incoming shreds up to the
// last_confirmed_epoch. // last_confirmed_epoch.
cluster_info cluster_info.push_lowest_slot(id, blockstore.lowest_slot());
.write()
.unwrap()
.push_lowest_slot(id, blockstore.lowest_slot());
} }
fn update_completed_slots( fn update_completed_slots(
completed_slots_receiver: &CompletedSlotsReceiver, completed_slots_receiver: &CompletedSlotsReceiver,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &ClusterInfo,
) { ) {
let mut slots: Vec<Slot> = vec![]; let mut slots: Vec<Slot> = vec![];
while let Ok(mut more) = completed_slots_receiver.try_recv() { while let Ok(mut more) = completed_slots_receiver.try_recv() {
@ -333,20 +326,17 @@ impl RepairService {
} }
slots.sort(); slots.sort();
if !slots.is_empty() { if !slots.is_empty() {
cluster_info.write().unwrap().push_epoch_slots(&slots); cluster_info.push_epoch_slots(&slots);
} }
} }
fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &RwLock<ClusterInfo>) { fn update_lowest_slot(id: &Pubkey, lowest_slot: Slot, cluster_info: &ClusterInfo) {
cluster_info cluster_info.push_lowest_slot(*id, lowest_slot);
.write()
.unwrap()
.push_lowest_slot(*id, lowest_slot);
} }
fn initialize_epoch_slots( fn initialize_epoch_slots(
blockstore: &Blockstore, blockstore: &Blockstore,
cluster_info: &RwLock<ClusterInfo>, cluster_info: &ClusterInfo,
completed_slots_receiver: &CompletedSlotsReceiver, completed_slots_receiver: &CompletedSlotsReceiver,
) { ) {
let root = blockstore.last_root(); let root = blockstore.last_root();
@ -367,7 +357,7 @@ impl RepairService {
slots.sort(); slots.sort();
slots.dedup(); slots.dedup();
if !slots.is_empty() { if !slots.is_empty() {
cluster_info.write().unwrap().push_epoch_slots(&slots); cluster_info.push_epoch_slots(&slots);
} }
} }
@ -602,17 +592,13 @@ mod test {
#[test] #[test]
pub fn test_update_lowest_slot() { pub fn test_update_lowest_slot() {
let node_info = Node::new_localhost_with_pubkey(&Pubkey::default()); let node_info = Node::new_localhost_with_pubkey(&Pubkey::default());
let cluster_info = RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = ClusterInfo::new_with_invalid_keypair(node_info.info.clone());
node_info.info.clone(),
));
RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info); RepairService::update_lowest_slot(&Pubkey::default(), 5, &cluster_info);
let lowest = cluster_info let lowest = cluster_info
.read() .get_lowest_slot_for_node(&Pubkey::default(), None, |lowest_slot, _| {
.unwrap() lowest_slot.clone()
.get_lowest_slot_for_node(&Pubkey::default(), None) })
.unwrap() .unwrap();
.0
.clone();
assert_eq!(lowest.lowest, 5); assert_eq!(lowest.lowest, 5);
} }
} }

View File

@ -114,7 +114,7 @@ impl ReplayStage {
config: ReplayStageConfig, config: ReplayStageConfig,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
@ -689,7 +689,7 @@ impl ReplayStage {
progress: &mut ProgressMap, progress: &mut ProgressMap,
vote_account_pubkey: &Pubkey, vote_account_pubkey: &Pubkey,
authorized_voter_keypairs: &[Arc<Keypair>], authorized_voter_keypairs: &[Arc<Keypair>],
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
blockstore: &Arc<Blockstore>, blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
root_bank_sender: &Sender<Vec<Arc<Bank>>>, root_bank_sender: &Sender<Vec<Arc<Bank>>>,
@ -762,7 +762,7 @@ impl ReplayStage {
} }
fn push_vote( fn push_vote(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
bank: &Arc<Bank>, bank: &Arc<Bank>,
vote_account_pubkey: &Pubkey, vote_account_pubkey: &Pubkey,
authorized_voter_keypairs: &[Arc<Keypair>], authorized_voter_keypairs: &[Arc<Keypair>],
@ -815,7 +815,7 @@ impl ReplayStage {
} }
Some(authorized_voter_keypair) => authorized_voter_keypair, Some(authorized_voter_keypair) => authorized_voter_keypair,
}; };
let node_keypair = cluster_info.read().unwrap().keypair.clone(); let node_keypair = cluster_info.keypair.clone();
// Send our last few votes along with the new one // Send our last few votes along with the new one
let vote_ix = vote_instruction::vote( let vote_ix = vote_instruction::vote(
@ -829,10 +829,7 @@ impl ReplayStage {
let blockhash = bank.last_blockhash(); let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash); vote_tx.partial_sign(&[authorized_voter_keypair.as_ref()], blockhash);
cluster_info cluster_info.push_vote(tower_index, vote_tx);
.write()
.unwrap()
.push_vote(tower_index, vote_tx);
} }
fn update_commitment_cache( fn update_commitment_cache(

View File

@ -38,7 +38,7 @@ const MAX_PACKET_BATCH_SIZE: usize = 100;
fn retransmit( fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
r: &Arc<Mutex<PacketReceiver>>, r: &Arc<Mutex<PacketReceiver>>,
sock: &UdpSocket, sock: &UdpSocket,
id: u32, id: u32,
@ -63,11 +63,8 @@ fn retransmit(
let mut peers_len = 0; let mut peers_len = 0;
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
let stakes = stakes.map(Arc::new); let stakes = stakes.map(Arc::new);
let (peers, stakes_and_index) = cluster_info let (peers, stakes_and_index) = cluster_info.sorted_retransmit_peers_and_stakes(stakes);
.read() let my_id = cluster_info.id();
.unwrap()
.sorted_retransmit_peers_and_stakes(stakes);
let me = cluster_info.read().unwrap().my_data();
let mut discard_total = 0; let mut discard_total = 0;
let mut repair_total = 0; let mut repair_total = 0;
let mut retransmit_total = 0; let mut retransmit_total = 0;
@ -88,7 +85,7 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&me.id, &my_id,
&peers, &peers,
&stakes_and_index, &stakes_and_index,
packet.meta.seed, packet.meta.seed,
@ -154,7 +151,7 @@ pub fn retransmitter(
sockets: Arc<Vec<UdpSocket>>, sockets: Arc<Vec<UdpSocket>>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>, r: Arc<Mutex<PacketReceiver>>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
(0..sockets.len()) (0..sockets.len())
@ -206,7 +203,7 @@ impl RetransmitStage {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>, retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
@ -316,11 +313,11 @@ mod tests {
.unwrap(); .unwrap();
let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); let other = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(other); let cluster_info = ClusterInfo::new_with_invalid_keypair(other);
cluster_info.insert_info(me); cluster_info.insert_info(me);
let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]); let retransmit_socket = Arc::new(vec![UdpSocket::bind("0.0.0.0:0").unwrap()]);
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let (retransmit_sender, retransmit_receiver) = channel(); let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter( let t_retransmit = retransmitter(

View File

@ -540,8 +540,8 @@ impl JsonRpcRequestProcessor {
} }
} }
fn get_tpu_addr(cluster_info: &Arc<RwLock<ClusterInfo>>) -> Result<SocketAddr> { fn get_tpu_addr(cluster_info: &ClusterInfo) -> Result<SocketAddr> {
let contact_info = cluster_info.read().unwrap().my_data(); let contact_info = cluster_info.my_contact_info();
Ok(contact_info.tpu) Ok(contact_info.tpu)
} }
@ -556,7 +556,7 @@ fn verify_signature(input: &str) -> Result<Signature> {
#[derive(Clone)] #[derive(Clone)]
pub struct Meta { pub struct Meta {
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>,
pub cluster_info: Arc<RwLock<ClusterInfo>>, pub cluster_info: Arc<ClusterInfo>,
pub genesis_hash: Hash, pub genesis_hash: Hash,
} }
impl Metadata for Meta {} impl Metadata for Meta {}
@ -902,7 +902,7 @@ impl RpcSol for RpcSolImpl {
} }
fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> { fn get_cluster_nodes(&self, meta: Self::Metadata) -> Result<Vec<RpcContactInfo>> {
let cluster_info = meta.cluster_info.read().unwrap(); let cluster_info = &meta.cluster_info;
fn valid_address_or_none(addr: &SocketAddr) -> Option<SocketAddr> { fn valid_address_or_none(addr: &SocketAddr) -> Option<SocketAddr> {
if ContactInfo::is_valid_address(addr) { if ContactInfo::is_valid_address(addr) {
Some(*addr) Some(*addr)
@ -910,12 +910,12 @@ impl RpcSol for RpcSolImpl {
None None
} }
} }
let shred_version = cluster_info.my_data().shred_version; let my_shred_version = cluster_info.my_shred_version();
Ok(cluster_info Ok(cluster_info
.all_peers() .all_peers()
.iter() .iter()
.filter_map(|(contact_info, _)| { .filter_map(|(contact_info, _)| {
if shred_version == contact_info.shred_version if my_shred_version == contact_info.shred_version
&& ContactInfo::is_valid_address(&contact_info.gossip) && ContactInfo::is_valid_address(&contact_info.gossip)
{ {
Some(RpcContactInfo { Some(RpcContactInfo {
@ -1555,17 +1555,12 @@ pub mod tests {
StorageState::default(), StorageState::default(),
validator_exit, validator_exit,
))); )));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
cluster_info cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
.write() &leader_pubkey,
.unwrap() &socketaddr!("127.0.0.1:1234"),
.insert_info(ContactInfo::new_with_pubkey_socketaddr( ));
&leader_pubkey,
&socketaddr!("127.0.0.1:1234"),
));
let mut io = MetaIoHandler::default(); let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl; let rpc = RpcSolImpl;
@ -2258,9 +2253,7 @@ pub mod tests {
); );
Arc::new(RwLock::new(request_processor)) Arc::new(RwLock::new(request_processor))
}, },
cluster_info: Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( cluster_info: Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default())),
ContactInfo::default(),
))),
genesis_hash: Hash::default(), genesis_hash: Hash::default(),
}; };
@ -2277,9 +2270,9 @@ pub mod tests {
#[test] #[test]
fn test_rpc_get_tpu_addr() { fn test_rpc_get_tpu_addr() {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")), ContactInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")),
))); ));
assert_eq!( assert_eq!(
get_tpu_addr(&cluster_info), get_tpu_addr(&cluster_info),
Ok(socketaddr!("127.0.0.1:1234")) Ok(socketaddr!("127.0.0.1:1234"))

View File

@ -42,7 +42,7 @@ struct RpcRequestMiddleware {
ledger_path: PathBuf, ledger_path: PathBuf,
snapshot_archive_path_regex: Regex, snapshot_archive_path_regex: Regex,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>, trusted_validators: Option<HashSet<Pubkey>>,
} }
@ -50,7 +50,7 @@ impl RpcRequestMiddleware {
pub fn new( pub fn new(
ledger_path: PathBuf, ledger_path: PathBuf,
snapshot_config: Option<SnapshotConfig>, snapshot_config: Option<SnapshotConfig>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
trusted_validators: Option<HashSet<Pubkey>>, trusted_validators: Option<HashSet<Pubkey>>,
) -> Self { ) -> Self {
Self { Self {
@ -134,22 +134,27 @@ impl RpcRequestMiddleware {
fn health_check(&self) -> &'static str { fn health_check(&self) -> &'static str {
let response = if let Some(trusted_validators) = &self.trusted_validators { let response = if let Some(trusted_validators) = &self.trusted_validators {
let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = { let (latest_account_hash_slot, latest_trusted_validator_account_hash_slot) = {
let cluster_info = self.cluster_info.read().unwrap();
( (
cluster_info self.cluster_info
.get_accounts_hash_for_node(&cluster_info.id()) .get_accounts_hash_for_node(&self.cluster_info.id(), |hashes| {
.map(|hashes| hashes.iter().max_by(|a, b| a.0.cmp(&b.0))) hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten() .flatten()
.map(|slot_hash| slot_hash.0)
.unwrap_or(0), .unwrap_or(0),
trusted_validators trusted_validators
.iter() .iter()
.map(|trusted_validator| { .map(|trusted_validator| {
cluster_info self.cluster_info
.get_accounts_hash_for_node(&trusted_validator) .get_accounts_hash_for_node(&trusted_validator, |hashes| {
.map(|hashes| hashes.iter().max_by(|a, b| a.0.cmp(&b.0))) hashes
.iter()
.max_by(|a, b| a.0.cmp(&b.0))
.map(|slot_hash| slot_hash.0)
})
.flatten() .flatten()
.map(|slot_hash| slot_hash.0)
.unwrap_or(0) .unwrap_or(0)
}) })
.max() .max()
@ -244,7 +249,7 @@ impl JsonRpcService {
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
genesis_hash: Hash, genesis_hash: Hash,
ledger_path: &Path, ledger_path: &Path,
storage_state: StorageState, storage_state: StorageState,
@ -367,9 +372,7 @@ mod tests {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit); let validator_exit = create_validator_exit(&exit);
let bank = Bank::new(&genesis_config); let bank = Bank::new(&genesis_config);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let rpc_addr = SocketAddr::new( let rpc_addr = SocketAddr::new(
ip_addr, ip_addr,
@ -412,9 +415,7 @@ mod tests {
#[test] #[test]
fn test_is_get_path() { fn test_is_get_path() {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
let rrm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None); let rrm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None);
let rrm_with_snapshot_config = RpcRequestMiddleware::new( let rrm_with_snapshot_config = RpcRequestMiddleware::new(
@ -451,9 +452,7 @@ mod tests {
#[test] #[test]
fn test_health_check_with_no_trusted_validators() { fn test_health_check_with_no_trusted_validators() {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None); let rm = RpcRequestMiddleware::new(PathBuf::from("/"), None, cluster_info.clone(), None);
assert_eq!(rm.health_check(), "ok"); assert_eq!(rm.health_check(), "ok");
@ -461,9 +460,7 @@ mod tests {
#[test] #[test]
fn test_health_check_with_trusted_validators() { fn test_health_check_with_trusted_validators() {
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()]; let trusted_validators = vec![Pubkey::new_rand(), Pubkey::new_rand(), Pubkey::new_rand()];
let rm = RpcRequestMiddleware::new( let rm = RpcRequestMiddleware::new(
@ -477,66 +474,59 @@ mod tests {
assert_eq!(rm.health_check(), "behind"); assert_eq!(rm.health_check(), "behind");
// No account hashes for any trusted validators == "behind" // No account hashes for any trusted validators == "behind"
{ cluster_info.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]);
let mut cluster_info = cluster_info.write().unwrap();
cluster_info
.push_accounts_hashes(vec![(1000, Hash::default()), (900, Hash::default())]);
}
assert_eq!(rm.health_check(), "behind"); assert_eq!(rm.health_check(), "behind");
// This node is ahead of the trusted validators == "ok" // This node is ahead of the trusted validators == "ok"
{ cluster_info
let mut cluster_info = cluster_info.write().unwrap(); .gossip
cluster_info .write()
.gossip .unwrap()
.crds .crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[0].clone(), trusted_validators[0].clone(),
vec![ vec![
(1, Hash::default()), (1, Hash::default()),
(1001, Hash::default()), (1001, Hash::default()),
(2, Hash::default()), (2, Hash::default()),
], ],
))), ))),
1, 1,
) )
.unwrap(); .unwrap();
}
assert_eq!(rm.health_check(), "ok"); assert_eq!(rm.health_check(), "ok");
// Node is slightly behind the trusted validators == "ok" // Node is slightly behind the trusted validators == "ok"
{ cluster_info
let mut cluster_info = cluster_info.write().unwrap(); .gossip
cluster_info .write()
.gossip .unwrap()
.crds .crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[1].clone(), trusted_validators[1].clone(),
vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE - 1, Hash::default())], vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE - 1, Hash::default())],
))), ))),
1, 1,
) )
.unwrap(); .unwrap();
}
assert_eq!(rm.health_check(), "ok"); assert_eq!(rm.health_check(), "ok");
// Node is far behind the trusted validators == "behind" // Node is far behind the trusted validators == "behind"
{ cluster_info
let mut cluster_info = cluster_info.write().unwrap(); .gossip
cluster_info .write()
.gossip .unwrap()
.crds .crds
.insert( .insert(
CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new( CrdsValue::new_unsigned(CrdsData::AccountsHashes(SnapshotHash::new(
trusted_validators[2].clone(), trusted_validators[2].clone(),
vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE, Hash::default())], vec![(1000 + HEALTH_CHECK_SLOT_DISTANCE, Hash::default())],
))), ))),
1, 1,
) )
.unwrap(); .unwrap();
}
assert_eq!(rm.health_check(), "behind"); assert_eq!(rm.health_check(), "behind");
} }
} }

View File

@ -72,7 +72,7 @@ pub struct ServeRepair {
/// set the keypair that will be used to sign repair responses /// set the keypair that will be used to sign repair responses
keypair: Arc<Keypair>, keypair: Arc<Keypair>,
my_info: ContactInfo, my_info: ContactInfo,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
} }
type RepairCache = HashMap<Slot, (Vec<ContactInfo>, Vec<(u64, usize)>)>; type RepairCache = HashMap<Slot, (Vec<ContactInfo>, Vec<(u64, usize)>)>;
@ -80,16 +80,13 @@ type RepairCache = HashMap<Slot, (Vec<ContactInfo>, Vec<(u64, usize)>)>;
impl ServeRepair { impl ServeRepair {
/// Without a valid keypair gossip will not function. Only useful for tests. /// Without a valid keypair gossip will not function. Only useful for tests.
pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self { pub fn new_with_invalid_keypair(contact_info: ContactInfo) -> Self {
Self::new(Arc::new(RwLock::new( Self::new(Arc::new(ClusterInfo::new_with_invalid_keypair(
ClusterInfo::new_with_invalid_keypair(contact_info), contact_info,
))) )))
} }
pub fn new(cluster_info: Arc<RwLock<ClusterInfo>>) -> Self { pub fn new(cluster_info: Arc<ClusterInfo>) -> Self {
let (keypair, my_info) = { let (keypair, my_info) = { (cluster_info.keypair.clone(), cluster_info.my_contact_info()) };
let r_cluster_info = cluster_info.read().unwrap();
(r_cluster_info.keypair.clone(), r_cluster_info.my_data())
};
Self { Self {
keypair, keypair,
my_info, my_info,
@ -362,11 +359,7 @@ impl ServeRepair {
// find a peer that appears to be accepting replication and has the desired slot, as indicated // find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location // by a valid tvu port location
if cache.get(&repair_request.slot()).is_none() { if cache.get(&repair_request.slot()).is_none() {
let repair_peers: Vec<_> = self let repair_peers: Vec<_> = self.cluster_info.repair_peers(repair_request.slot());
.cluster_info
.read()
.unwrap()
.repair_peers(repair_request.slot());
if repair_peers.is_empty() { if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into()); return Err(ClusterInfoError::NoPeers.into());
} }
@ -654,7 +647,7 @@ mod tests {
fn window_index_request() { fn window_index_request() {
let cluster_slots = ClusterSlots::default(); let cluster_slots = ClusterSlots::default();
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp()); let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me))); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(me));
let serve_repair = ServeRepair::new(cluster_info.clone()); let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request( let rv = serve_repair.repair_request(
&cluster_slots, &cluster_slots,
@ -680,7 +673,7 @@ mod tests {
wallclock: 0, wallclock: 0,
shred_version: 0, shred_version: 0,
}; };
cluster_info.write().unwrap().insert_info(nxt.clone()); cluster_info.insert_info(nxt.clone());
let rv = serve_repair let rv = serve_repair
.repair_request( .repair_request(
&cluster_slots, &cluster_slots,
@ -708,7 +701,7 @@ mod tests {
wallclock: 0, wallclock: 0,
shred_version: 0, shred_version: 0,
}; };
cluster_info.write().unwrap().insert_info(nxt); cluster_info.insert_info(nxt);
let mut one = false; let mut one = false;
let mut two = false; let mut two = false;
while !one || !two { while !one || !two {

View File

@ -6,7 +6,7 @@ use std::{
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
mpsc::RecvTimeoutError, mpsc::RecvTimeoutError,
Arc, RwLock, Arc,
}, },
thread::{self, Builder, JoinHandle}, thread::{self, Builder, JoinHandle},
time::Duration, time::Duration,
@ -21,7 +21,7 @@ impl SnapshotPackagerService {
snapshot_package_receiver: AccountsPackageReceiver, snapshot_package_receiver: AccountsPackageReceiver,
starting_snapshot_hash: Option<(Slot, Hash)>, starting_snapshot_hash: Option<(Slot, Hash)>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
blockstore: Option<Arc<Blockstore>>, blockstore: Option<Arc<Blockstore>>,
) -> Self { ) -> Self {
let exit = exit.clone(); let exit = exit.clone();
@ -34,10 +34,7 @@ impl SnapshotPackagerService {
if let Some(starting_snapshot_hash) = starting_snapshot_hash { if let Some(starting_snapshot_hash) = starting_snapshot_hash {
hashes.push(starting_snapshot_hash); hashes.push(starting_snapshot_hash);
} }
cluster_info cluster_info.push_snapshot_hashes(hashes.clone());
.write()
.unwrap()
.push_snapshot_hashes(hashes.clone());
loop { loop {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
break; break;
@ -60,10 +57,7 @@ impl SnapshotPackagerService {
while hashes.len() > MAX_SNAPSHOT_HASHES { while hashes.len() > MAX_SNAPSHOT_HASHES {
hashes.remove(0); hashes.remove(0);
} }
cluster_info cluster_info.push_snapshot_hashes(hashes.clone());
.write()
.unwrap()
.push_snapshot_hashes(hashes.clone());
} }
if let Some(ref blockstore) = blockstore { if let Some(ref blockstore) = blockstore {
let _ = blockstore.tar_shreds(snapshot_package.root); let _ = blockstore.tar_shreds(snapshot_package.root);

View File

@ -184,7 +184,7 @@ impl StorageStage {
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>, block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
) -> Self { ) -> Self {
let (instruction_sender, instruction_receiver) = channel(); let (instruction_sender, instruction_receiver) = channel();
@ -286,7 +286,7 @@ impl StorageStage {
fn send_transaction( fn send_transaction(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
instruction: Instruction, instruction: Instruction,
keypair: &Arc<Keypair>, keypair: &Arc<Keypair>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
@ -323,7 +323,7 @@ impl StorageStage {
for _ in 0..5 { for _ in 0..5 {
transactions_socket.send_to( transactions_socket.send_to(
&bincode::serialize(&transaction).unwrap(), &bincode::serialize(&transaction).unwrap(),
cluster_info.read().unwrap().my_data().tpu, cluster_info.my_contact_info().tpu,
)?; )?;
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
if Self::poll_for_signature_confirmation( if Self::poll_for_signature_confirmation(
@ -652,10 +652,10 @@ impl StorageStage {
} }
} }
pub fn test_cluster_info(id: &Pubkey) -> Arc<RwLock<ClusterInfo>> { pub fn test_cluster_info(id: &Pubkey) -> Arc<ClusterInfo> {
let contact_info = ContactInfo::new_localhost(id, 0); let contact_info = ContactInfo::new_localhost(id, 0);
let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info);
Arc::new(RwLock::new(cluster_info)) Arc::new(cluster_info)
} }
#[cfg(test)] #[cfg(test)]

View File

@ -36,7 +36,7 @@ pub struct Tpu {
impl Tpu { impl Tpu {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
entry_receiver: Receiver<WorkingBankEntry>, entry_receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver, retransmit_slots_receiver: RetransmitSlotsReceiver,

View File

@ -84,7 +84,7 @@ impl Tvu {
authorized_voter_keypairs: Vec<Arc<Keypair>>, authorized_voter_keypairs: Vec<Arc<Keypair>>,
storage_keypair: &Arc<Keypair>, storage_keypair: &Arc<Keypair>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &Arc<ClusterInfo>,
sockets: Sockets, sockets: Sockets,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
storage_state: &StorageState, storage_state: &StorageState,
@ -103,11 +103,7 @@ impl Tvu {
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
tvu_config: TvuConfig, tvu_config: TvuConfig,
) -> Self { ) -> Self {
let keypair: Arc<Keypair> = cluster_info let keypair: Arc<Keypair> = cluster_info.keypair.clone();
.read()
.expect("Unable to read from cluster_info during Tvu creation")
.keypair
.clone();
let Sockets { let Sockets {
repair: repair_socket, repair: repair_socket,
@ -178,7 +174,7 @@ impl Tvu {
accounts_hash_receiver, accounts_hash_receiver,
snapshot_package_sender, snapshot_package_sender,
exit, exit,
cluster_info, &cluster_info,
tvu_config.trusted_validators.clone(), tvu_config.trusted_validators.clone(),
tvu_config.halt_on_trusted_validators_accounts_hash_mismatch, tvu_config.halt_on_trusted_validators_accounts_hash_mismatch,
tvu_config.accounts_hash_fault_injection_slots, tvu_config.accounts_hash_fault_injection_slots,
@ -286,9 +282,9 @@ pub mod tests {
let bank_forks = BankForks::new(0, Bank::new(&genesis_config)); let bank_forks = BankForks::new(0, Bank::new(&genesis_config));
//start cluster_info1 //start cluster_info1
let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone()); let cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone());
cluster_info1.insert_info(leader.info.clone()); cluster_info1.insert_info(leader.info.clone());
let cref1 = Arc::new(RwLock::new(cluster_info1)); let cref1 = Arc::new(cluster_info1);
let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config); let (blockstore_path, _) = create_new_tmp_ledger!(&genesis_config);
let (blockstore, l_receiver, completed_slots_receiver) = let (blockstore, l_receiver, completed_slots_receiver) =

View File

@ -235,10 +235,7 @@ impl Validator {
} }
} }
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( let cluster_info = Arc::new(ClusterInfo::new(node.info.clone(), keypair.clone()));
node.info.clone(),
keypair.clone(),
)));
let storage_state = StorageState::new( let storage_state = StorageState::new(
&bank.last_blockhash(), &bank.last_blockhash(),
@ -370,10 +367,7 @@ impl Validator {
// Insert the entrypoint info, should only be None if this node // Insert the entrypoint info, should only be None if this node
// is the bootstrap validator // is the bootstrap validator
if let Some(entrypoint_info) = entrypoint_info_option { if let Some(entrypoint_info) = entrypoint_info_option {
cluster_info cluster_info.set_entrypoint(entrypoint_info.clone());
.write()
.unwrap()
.set_entrypoint(entrypoint_info.clone());
} }
let (snapshot_packager_service, snapshot_package_sender) = let (snapshot_packager_service, snapshot_package_sender) =
@ -647,11 +641,7 @@ fn new_banks_from_blockstore(
) )
} }
fn wait_for_supermajority( fn wait_for_supermajority(config: &ValidatorConfig, bank: &Bank, cluster_info: &ClusterInfo) {
config: &ValidatorConfig,
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
) {
if config.wait_for_supermajority != Some(bank.slot()) { if config.wait_for_supermajority != Some(bank.slot()) {
return; return;
} }
@ -796,11 +786,7 @@ fn report_target_features() {
} }
// Get the activated stake percentage (based on the provided bank) that is visible in gossip // Get the activated stake percentage (based on the provided bank) that is visible in gossip
fn get_stake_percent_in_gossip( fn get_stake_percent_in_gossip(bank: &Bank, cluster_info: &ClusterInfo, log: bool) -> u64 {
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
log: bool,
) -> u64 {
let mut online_stake = 0; let mut online_stake = 0;
let mut wrong_shred_stake = 0; let mut wrong_shred_stake = 0;
let mut wrong_shred_nodes = vec![]; let mut wrong_shred_nodes = vec![];
@ -808,9 +794,9 @@ fn get_stake_percent_in_gossip(
let mut offline_nodes = vec![]; let mut offline_nodes = vec![];
let mut total_activated_stake = 0; let mut total_activated_stake = 0;
let all_tvu_peers = cluster_info.read().unwrap().all_tvu_peers(); let all_tvu_peers = cluster_info.all_tvu_peers();
let my_shred_version = cluster_info.read().unwrap().my_data().shred_version; let my_shred_version = cluster_info.my_shred_version();
let my_id = cluster_info.read().unwrap().id(); let my_id = cluster_info.id();
for (activated_stake, vote_account) in bank.vote_accounts().values() { for (activated_stake, vote_account) in bank.vote_accounts().values() {
let vote_state = let vote_state =

View File

@ -249,7 +249,7 @@ impl WindowService {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new<F>( pub fn new<F>(
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<ClusterInfo>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>, verified_receiver: CrossbeamReceiver<Vec<Packets>>,
retransmit: PacketSender, retransmit: PacketSender,
repair_socket: Arc<UdpSocket>, repair_socket: Arc<UdpSocket>,
@ -294,7 +294,7 @@ impl WindowService {
); );
let t_window = Self::start_recv_window_thread( let t_window = Self::start_recv_window_thread(
cluster_info.read().unwrap().id(), cluster_info.id(),
exit, exit,
&blockstore, &blockstore,
insert_sender, insert_sender,
@ -514,7 +514,7 @@ mod test {
net::UdpSocket, net::UdpSocket,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
sync::mpsc::channel, sync::mpsc::channel,
sync::{Arc, RwLock}, sync::Arc,
thread::sleep, thread::sleep,
time::Duration, time::Duration,
}; };
@ -630,9 +630,9 @@ mod test {
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let (retransmit_sender, _retransmit_receiver) = channel(); let (retransmit_sender, _retransmit_receiver) = channel();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::new_localhost(&Pubkey::default(), 0), ContactInfo::new_localhost(&Pubkey::default(), 0),
))); ));
let cluster_slots = Arc::new(ClusterSlots::default()); let cluster_slots = Arc::new(ClusterSlots::default());
let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap()); let repair_sock = Arc::new(UdpSocket::bind(socketaddr_any!()).unwrap());
let window = WindowService::new( let window = WindowService::new(

View File

@ -25,7 +25,6 @@ mod tests {
signature::{Keypair, Signer}, signature::{Keypair, Signer},
system_transaction, system_transaction,
}; };
use std::sync::RwLock;
use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc}; use std::{fs, path::PathBuf, sync::atomic::AtomicBool, sync::mpsc::channel, sync::Arc};
use tempfile::TempDir; use tempfile::TempDir;
@ -319,9 +318,7 @@ mod tests {
// channel hold hard links to these deleted snapshots. We verify this is the case below. // channel hold hard links to these deleted snapshots. We verify this is the case below.
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(ContactInfo::default()));
ContactInfo::default(),
)));
let snapshot_packager_service = let snapshot_packager_service =
SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None); SnapshotPackagerService::new(receiver, None, &exit, &cluster_info, None);

View File

@ -72,7 +72,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
// describe the leader // describe the leader
let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0); let leader_info = ContactInfo::new_localhost(&Pubkey::new_rand(), 0);
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone()); let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.clone());
// setup staked nodes // setup staked nodes
let mut staked_nodes = HashMap::new(); let mut staked_nodes = HashMap::new();
@ -105,7 +105,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
senders.lock().unwrap().insert(node.id, s); senders.lock().unwrap().insert(node.id, s);
}) })
}); });
let c_info = cluster_info.clone(); let c_info = cluster_info.clone_with_id(&cluster_info.id());
let staked_nodes = Arc::new(staked_nodes); let staked_nodes = Arc::new(staked_nodes);
let shreds_len = 100; let shreds_len = 100;
@ -140,7 +140,6 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
// start turbine simulation // start turbine simulation
let now = Instant::now(); let now = Instant::now();
batches.par_iter_mut().for_each(|batch| { batches.par_iter_mut().for_each(|batch| {
let mut cluster = c_info.clone();
let mut remaining = batch.len(); let mut remaining = batch.len();
let senders: HashMap<_, _> = senders.lock().unwrap().clone(); let senders: HashMap<_, _> = senders.lock().unwrap().clone();
while remaining > 0 { while remaining > 0 {
@ -150,7 +149,7 @@ fn run_simulation(stakes: &[u64], fanout: usize) {
"Timed out with {:?} remaining nodes", "Timed out with {:?} remaining nodes",
remaining remaining
); );
cluster.gossip.set_self(&*id); let cluster = c_info.clone_with_id(id);
if !*layer1_done { if !*layer1_done {
recv.iter().for_each(|i| { recv.iter().for_each(|i| {
retransmit( retransmit(

View File

@ -10,19 +10,16 @@ use solana_sdk::signature::{Keypair, Signer};
use solana_sdk::timing::timestamp; use solana_sdk::timing::timestamp;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket) { fn test_node(exit: &Arc<AtomicBool>) -> (Arc<ClusterInfo>, GossipService, UdpSocket) {
let keypair = Arc::new(Keypair::new()); let keypair = Arc::new(Keypair::new());
let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new( let cluster_info = Arc::new(ClusterInfo::new(test_node.info.clone(), keypair));
test_node.info.clone(),
keypair,
)));
let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit); let gossip_service = GossipService::new(&cluster_info, None, test_node.sockets.gossip, exit);
let _ = cluster_info.read().unwrap().my_data(); let _ = cluster_info.my_contact_info();
( (
cluster_info, cluster_info,
gossip_service, gossip_service,
@ -36,7 +33,7 @@ fn test_node(exit: &Arc<AtomicBool>) -> (Arc<RwLock<ClusterInfo>>, GossipService
/// tests that actually use this function are below /// tests that actually use this function are below
fn run_gossip_topo<F>(num: usize, topo: F) fn run_gossip_topo<F>(num: usize, topo: F)
where where
F: Fn(&Vec<(Arc<RwLock<ClusterInfo>>, GossipService, UdpSocket)>) -> (), F: Fn(&Vec<(Arc<ClusterInfo>, GossipService, UdpSocket)>) -> (),
{ {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect(); let listen: Vec<_> = (0..num).map(|_| test_node(&exit)).collect();
@ -44,10 +41,7 @@ where
let mut done = true; let mut done = true;
for i in 0..(num * 32) { for i in 0..(num * 32) {
done = true; done = true;
let total: usize = listen let total: usize = listen.iter().map(|v| v.0.gossip_peers().len()).sum();
.iter()
.map(|v| v.0.read().unwrap().gossip_peers().len())
.sum();
if (total + num) * 10 > num * num * 9 { if (total + num) * 10 > num * num * 9 {
done = true; done = true;
break; break;
@ -71,11 +65,10 @@ fn gossip_ring() {
for n in 0..num { for n in 0..num {
let y = n % listen.len(); let y = n % listen.len();
let x = (n + 1) % listen.len(); let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap(); let yv = &listen[y].0;
let yv = listen[y].0.read().unwrap(); let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
let mut d = yv.lookup(&yv.id()).unwrap().clone();
d.wallclock = timestamp(); d.wallclock = timestamp();
xv.insert_info(d); listen[x].0.insert_info(d);
} }
}); });
} }
@ -90,11 +83,10 @@ fn gossip_ring_large() {
for n in 0..num { for n in 0..num {
let y = n % listen.len(); let y = n % listen.len();
let x = (n + 1) % listen.len(); let x = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap(); let yv = &listen[y].0;
let yv = listen[y].0.read().unwrap(); let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
let mut d = yv.lookup(&yv.id()).unwrap().clone();
d.wallclock = timestamp(); d.wallclock = timestamp();
xv.insert_info(d); listen[x].0.insert_info(d);
} }
}); });
} }
@ -107,10 +99,10 @@ fn gossip_star() {
for n in 0..(num - 1) { for n in 0..(num - 1) {
let x = 0; let x = 0;
let y = (n + 1) % listen.len(); let y = (n + 1) % listen.len();
let mut xv = listen[x].0.write().unwrap(); let yv = &listen[y].0;
let yv = listen[y].0.read().unwrap(); let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap();
let mut yd = yv.lookup(&yv.id()).unwrap().clone();
yd.wallclock = timestamp(); yd.wallclock = timestamp();
let xv = &listen[x].0;
xv.insert_info(yd); xv.insert_info(yd);
trace!("star leader {}", &xv.id()); trace!("star leader {}", &xv.id());
} }
@ -124,13 +116,13 @@ fn gossip_rstar() {
run_gossip_topo(10, |listen| { run_gossip_topo(10, |listen| {
let num = listen.len(); let num = listen.len();
let xd = { let xd = {
let xv = listen[0].0.read().unwrap(); let xv = &listen[0].0;
xv.lookup(&xv.id()).unwrap().clone() xv.lookup_contact_info(&xv.id(), |ci| ci.clone()).unwrap()
}; };
trace!("rstar leader {}", xd.id); trace!("rstar leader {}", xd.id);
for n in 0..(num - 1) { for n in 0..(num - 1) {
let y = (n + 1) % listen.len(); let y = (n + 1) % listen.len();
let mut yv = listen[y].0.write().unwrap(); let yv = &listen[y].0;
yv.insert_info(xd.clone()); yv.insert_info(xd.clone());
trace!("rstar insert {} into {}", xd.id, yv.id()); trace!("rstar insert {} into {}", xd.id, yv.id());
} }
@ -147,10 +139,10 @@ pub fn cluster_info_retransmit() {
let (c2, dr2, tn2) = test_node(&exit); let (c2, dr2, tn2) = test_node(&exit);
trace!("c3:"); trace!("c3:");
let (c3, dr3, tn3) = test_node(&exit); let (c3, dr3, tn3) = test_node(&exit);
let c1_data = c1.read().unwrap().my_data().clone(); let c1_contact_info = c1.my_contact_info();
c2.write().unwrap().insert_info(c1_data.clone()); c2.insert_info(c1_contact_info.clone());
c3.write().unwrap().insert_info(c1_data.clone()); c3.insert_info(c1_contact_info);
let num = 3; let num = 3;
@ -158,9 +150,9 @@ pub fn cluster_info_retransmit() {
trace!("waiting to converge:"); trace!("waiting to converge:");
let mut done = false; let mut done = false;
for _ in 0..30 { for _ in 0..30 {
done = c1.read().unwrap().gossip_peers().len() == num - 1 done = c1.gossip_peers().len() == num - 1
&& c2.read().unwrap().gossip_peers().len() == num - 1 && c2.gossip_peers().len() == num - 1
&& c3.read().unwrap().gossip_peers().len() == num - 1; && c3.gossip_peers().len() == num - 1;
if done { if done {
break; break;
} }
@ -169,7 +161,7 @@ pub fn cluster_info_retransmit() {
assert!(done); assert!(done);
let mut p = Packet::default(); let mut p = Packet::default();
p.meta.size = 10; p.meta.size = 10;
let peers = c1.read().unwrap().retransmit_peers(); let peers = c1.retransmit_peers();
let retransmit_peers: Vec<_> = peers.iter().collect(); let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap(); ClusterInfo::retransmit_to(&retransmit_peers, &mut p, None, &tn1, false).unwrap();
let res: Vec<_> = [tn1, tn2, tn3] let res: Vec<_> = [tn1, tn2, tn3]

View File

@ -17,10 +17,7 @@ use solana_sdk::{
genesis_config::create_genesis_config, genesis_config::create_genesis_config,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
}; };
use std::{ use std::{fs::remove_dir_all, sync::Arc};
fs::remove_dir_all,
sync::{Arc, RwLock},
};
/// Start the cluster with the given configuration and wait till the archivers are discovered /// Start the cluster with the given configuration and wait till the archivers are discovered
/// Then download shreds from one of them. /// Then download shreds from one of them.
@ -59,9 +56,9 @@ fn run_archiver_startup_basic(num_nodes: usize, num_archivers: usize) {
} }
assert_eq!(archiver_count, num_archivers); assert_eq!(archiver_count, num_archivers);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
cluster_nodes[0].clone(), cluster_nodes[0].clone(),
))); ));
let serve_repair = ServeRepair::new(cluster_info); let serve_repair = ServeRepair::new(cluster_info);
let path = get_tmp_ledger_path!(); let path = get_tmp_ledger_path!();
let blockstore = Arc::new(Blockstore::open(&path).unwrap()); let blockstore = Arc::new(Blockstore::open(&path).unwrap());

View File

@ -43,7 +43,7 @@ use std::{
str::FromStr, str::FromStr,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicBool, Ordering},
Arc, RwLock, Arc,
}, },
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
@ -78,10 +78,10 @@ fn hash_validator(hash: String) -> Result<(), String> {
} }
fn get_shred_rpc_peers( fn get_shred_rpc_peers(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
expected_shred_version: Option<u16>, expected_shred_version: Option<u16>,
) -> Vec<ContactInfo> { ) -> Vec<ContactInfo> {
let rpc_peers = cluster_info.read().unwrap().all_rpc_peers(); let rpc_peers = cluster_info.all_rpc_peers();
match expected_shred_version { match expected_shred_version {
Some(expected_shred_version) => { Some(expected_shred_version) => {
// Filter out rpc peers that don't match the expected shred version // Filter out rpc peers that don't match the expected shred version
@ -114,21 +114,17 @@ fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option<HashSet<Pubkey>
} }
fn get_trusted_snapshot_hashes( fn get_trusted_snapshot_hashes(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
trusted_validators: &Option<HashSet<Pubkey>>, trusted_validators: &Option<HashSet<Pubkey>>,
) -> Option<HashSet<(Slot, Hash)>> { ) -> Option<HashSet<(Slot, Hash)>> {
if let Some(trusted_validators) = trusted_validators { if let Some(trusted_validators) = trusted_validators {
let mut trusted_snapshot_hashes = HashSet::new(); let mut trusted_snapshot_hashes = HashSet::new();
for trusted_validator in trusted_validators { for trusted_validator in trusted_validators {
if let Some(snapshot_hashes) = cluster_info cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| {
.read()
.unwrap()
.get_snapshot_hash_for_node(trusted_validator)
{
for snapshot_hash in snapshot_hashes { for snapshot_hash in snapshot_hashes {
trusted_snapshot_hashes.insert(*snapshot_hash); trusted_snapshot_hashes.insert(*snapshot_hash);
} }
} });
} }
Some(trusted_snapshot_hashes) Some(trusted_snapshot_hashes)
} else { } else {
@ -141,13 +137,13 @@ fn start_gossip_node(
entrypoint_gossip: &SocketAddr, entrypoint_gossip: &SocketAddr,
gossip_addr: &SocketAddr, gossip_addr: &SocketAddr,
gossip_socket: UdpSocket, gossip_socket: UdpSocket,
) -> (Arc<RwLock<ClusterInfo>>, Arc<AtomicBool>, GossipService) { ) -> (Arc<ClusterInfo>, Arc<AtomicBool>, GossipService) {
let mut cluster_info = ClusterInfo::new( let cluster_info = ClusterInfo::new(
ClusterInfo::gossip_contact_info(&identity_keypair.pubkey(), *gossip_addr), ClusterInfo::gossip_contact_info(&identity_keypair.pubkey(), *gossip_addr),
identity_keypair.clone(), identity_keypair.clone(),
); );
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip));
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(cluster_info);
let gossip_exit_flag = Arc::new(AtomicBool::new(false)); let gossip_exit_flag = Arc::new(AtomicBool::new(false));
let gossip_service = GossipService::new( let gossip_service = GossipService::new(
@ -160,7 +156,7 @@ fn start_gossip_node(
} }
fn get_rpc_node( fn get_rpc_node(
cluster_info: &Arc<RwLock<ClusterInfo>>, cluster_info: &ClusterInfo,
validator_config: &ValidatorConfig, validator_config: &ValidatorConfig,
blacklisted_rpc_nodes: &mut HashSet<Pubkey>, blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
snapshot_not_required: bool, snapshot_not_required: bool,
@ -173,7 +169,7 @@ fn get_rpc_node(
validator_config.expected_shred_version validator_config.expected_shred_version
); );
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
info!("\n{}", cluster_info.read().unwrap().contact_info_trace()); info!("\n{}", cluster_info.contact_info_trace());
let rpc_peers = get_shred_rpc_peers(&cluster_info, validator_config.expected_shred_version); let rpc_peers = get_shred_rpc_peers(&cluster_info, validator_config.expected_shred_version);
let rpc_peers_total = rpc_peers.len(); let rpc_peers_total = rpc_peers.len();
@ -222,11 +218,7 @@ fn get_rpc_node(
{ {
continue; continue;
} }
if let Some(snapshot_hashes) = cluster_info cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| {
.read()
.unwrap()
.get_snapshot_hash_for_node(&rpc_peer.id)
{
for snapshot_hash in snapshot_hashes { for snapshot_hash in snapshot_hashes {
if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes { if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes {
if !trusted_snapshot_hashes.contains(snapshot_hash) { if !trusted_snapshot_hashes.contains(snapshot_hash) {
@ -247,7 +239,7 @@ fn get_rpc_node(
eligible_rpc_peers.push(rpc_peer.clone()); eligible_rpc_peers.push(rpc_peer.clone());
} }
} }
} });
} }
match highest_snapshot_hash { match highest_snapshot_hash {