Remove write lock (#9311)

* Remove write lock

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-04-05 15:18:45 -07:00 committed by GitHub
parent b584174d67
commit 7b68628e6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 36 additions and 25 deletions

View File

@ -7,9 +7,7 @@ use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::collections::HashMap;
use std::net::UdpSocket;
use std::sync::Arc;
use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
use test::Bencher;
#[bench]
@ -36,7 +34,13 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
bencher.iter(move || {
let shreds = shreds.clone();
cluster_info
.broadcast_shreds(&socket, shreds, &seeds, Some(stakes.clone()))
.broadcast_shreds(
&socket,
shreds,
&seeds,
Some(stakes.clone()),
&mut Instant::now(),
)
.unwrap();
});
}

View File

@ -104,7 +104,7 @@ trait BroadcastRun {
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
) -> Result<()>;
fn transmit(
&self,
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
@ -226,7 +226,7 @@ impl BroadcastStage {
let socket_receiver = Arc::new(Mutex::new(socket_receiver));
for sock in socks.into_iter() {
let socket_receiver = socket_receiver.clone();
let bs_transmit = broadcast_stage_run.clone();
let mut bs_transmit = broadcast_stage_run.clone();
let cluster_info = cluster_info.clone();
let t = Builder::new()
.name("solana-broadcaster-transmit".to_string())

View File

@ -96,7 +96,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
Ok(())
}
fn transmit(
&self,
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,

View File

@ -72,7 +72,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
Ok(())
}
fn transmit(
&self,
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
@ -81,10 +81,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
// Broadcast data
let all_shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
cluster_info
.write()
.unwrap()
.broadcast_shreds(sock, all_shred_bufs, &all_seeds, stakes)?;
cluster_info.read().unwrap().broadcast_shreds(
sock,
all_shred_bufs,
&all_seeds,
stakes,
&mut Instant::now(),
)?;
Ok(())
}
fn record(

View File

@ -37,6 +37,7 @@ pub(super) struct StandardBroadcastRun {
slot_broadcast_start: Option<Instant>,
keypair: Arc<Keypair>,
shred_version: u16,
last_datapoint_submit: Instant,
}
impl StandardBroadcastRun {
@ -48,6 +49,7 @@ impl StandardBroadcastRun {
slot_broadcast_start: None,
keypair,
shred_version,
last_datapoint_submit: Instant::now(),
}
}
@ -249,7 +251,7 @@ impl StandardBroadcastRun {
}
fn broadcast(
&self,
&mut self,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
@ -264,10 +266,13 @@ impl StandardBroadcastRun {
let shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
trace!("Broadcasting {:?} shreds", shred_bufs.len());
cluster_info
.write()
.unwrap()
.broadcast_shreds(sock, shred_bufs, &seeds, stakes)?;
cluster_info.read().unwrap().broadcast_shreds(
sock,
shred_bufs,
&seeds,
stakes,
&mut self.last_datapoint_submit,
)?;
let broadcast_elapsed = broadcast_start.elapsed();
@ -332,7 +337,7 @@ impl BroadcastRun for StandardBroadcastRun {
)
}
fn transmit(
&self,
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,

View File

@ -101,7 +101,6 @@ pub struct ClusterInfo {
pub(crate) keypair: Arc<Keypair>,
/// The network entrypoint
entrypoint: Option<ContactInfo>,
last_datapoint_submit: Instant,
}
#[derive(Default, Clone)]
@ -209,7 +208,6 @@ impl ClusterInfo {
gossip: CrdsGossip::default(),
keypair,
entrypoint: None,
last_datapoint_submit: Instant::now(),
};
let id = contact_info.id;
me.gossip.set_self(&id);
@ -949,22 +947,23 @@ impl ClusterInfo {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
&mut self,
&self,
s: &UdpSocket,
shreds: Vec<Vec<u8>>,
seeds: &[[u8; 32]],
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
last_datapoint_submit: &mut Instant,
) -> Result<()> {
let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 {
if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 {
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", 1, i64),
("broadcast_count", 1, i64)
);
self.last_datapoint_submit = Instant::now();
*last_datapoint_submit = Instant::now();
}
return Ok(());
}
@ -995,13 +994,13 @@ impl ClusterInfo {
num_live_peers += 1;
}
});
if duration_as_s(&Instant::now().duration_since(self.last_datapoint_submit)) >= 1.0 {
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", num_live_peers, i64),
("broadcast_count", broadcast_len + 1, i64)
);
self.last_datapoint_submit = Instant::now();
*last_datapoint_submit = Instant::now();
}
Ok(())
}