removes erroneous uses of &Arc<...> from broadcast-stage (#25962)

This commit is contained in:
behzad nouri 2022-06-15 13:44:24 +00:00 committed by GitHub
parent 6ec98e07ec
commit fe3c1d3d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 66 deletions

View File

@ -80,9 +80,9 @@ impl BroadcastStageType {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
receiver: Receiver<WorkingBankEntry>, receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver, retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>, exit_sender: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16, shred_version: u16,
) -> BroadcastStage { ) -> BroadcastStage {
match self { match self {
@ -137,23 +137,19 @@ trait BroadcastRun {
fn run( fn run(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()>; ) -> Result<()>;
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Mutex<TransmitReceiver>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()>;
fn record(
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()>; ) -> Result<()>;
fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()>;
} }
// Implement a destructor for the BroadcastStage thread to signal it exited // Implement a destructor for the BroadcastStage thread to signal it exited
@ -182,7 +178,7 @@ impl BroadcastStage {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn run( fn run(
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -243,19 +239,18 @@ impl BroadcastStage {
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
receiver: Receiver<WorkingBankEntry>, receiver: Receiver<WorkingBankEntry>,
retransmit_slots_receiver: RetransmitSlotsReceiver, retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
blockstore: &Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
) -> Self { ) -> Self {
let btree = blockstore.clone();
let exit = exit_sender.clone();
let (socket_sender, socket_receiver) = unbounded(); let (socket_sender, socket_receiver) = unbounded();
let (blockstore_sender, blockstore_receiver) = unbounded(); let (blockstore_sender, blockstore_receiver) = unbounded();
let bs_run = broadcast_stage_run.clone(); let bs_run = broadcast_stage_run.clone();
let socket_sender_ = socket_sender.clone(); let socket_sender_ = socket_sender.clone();
let thread_hdl = { let thread_hdl = {
let blockstore = blockstore.clone();
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
Builder::new() Builder::new()
.name("solana-broadcaster".to_string()) .name("solana-broadcaster".to_string())
@ -263,7 +258,7 @@ impl BroadcastStage {
let _finalizer = Finalizer::new(exit); let _finalizer = Finalizer::new(exit);
Self::run( Self::run(
cluster_info, cluster_info,
&btree, &blockstore,
&receiver, &receiver,
&socket_sender_, &socket_sender_,
&blockstore_sender, &blockstore_sender,
@ -310,7 +305,6 @@ impl BroadcastStage {
thread_hdls.push(t); thread_hdls.push(t);
} }
let blockstore = blockstore.clone();
let retransmit_thread = Builder::new() let retransmit_thread = Builder::new()
.name("solana-broadcaster-retransmit".to_string()) .name("solana-broadcaster-retransmit".to_string())
.spawn(move || loop { .spawn(move || loop {
@ -382,7 +376,7 @@ impl BroadcastStage {
fn update_peer_stats( fn update_peer_stats(
cluster_nodes: &ClusterNodes<BroadcastStage>, cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicInterval>, last_datapoint_submit: &AtomicInterval,
) { ) {
if last_datapoint_submit.should_update(1000) { if last_datapoint_submit.should_update(1000) {
let now = timestamp(); let now = timestamp();
@ -402,10 +396,10 @@ pub fn broadcast_shreds(
s: &UdpSocket, s: &UdpSocket,
shreds: &[Shred], shreds: &[Shred],
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>, cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicInterval>, last_datapoint_submit: &AtomicInterval,
transmit_stats: &mut TransmitShredsStats, transmit_stats: &mut TransmitShredsStats,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
) -> Result<()> { ) -> Result<()> {
let mut result = Ok(()); let mut result = Ok(());
@ -623,9 +617,9 @@ pub mod test {
cluster_info, cluster_info,
entry_receiver, entry_receiver,
retransmit_slots_receiver, retransmit_slots_receiver,
&exit_sender, exit_sender,
&blockstore, blockstore.clone(),
&bank_forks, bank_forks,
StandardBroadcastRun::new(0), StandardBroadcastRun::new(0),
); );

View File

@ -64,7 +64,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn run( fn run(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
_blockstore: &Arc<Blockstore>, _blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -251,10 +251,10 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Mutex<TransmitReceiver>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
let (shreds, _) = receiver.lock().unwrap().recv()?; let (shreds, _) = receiver.lock().unwrap().recv()?;
if shreds.is_empty() { if shreds.is_empty() {
@ -355,11 +355,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
Ok(()) Ok(())
} }
fn record( fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let (all_shreds, _) = receiver.lock().unwrap().recv()?; let (all_shreds, _) = receiver.lock().unwrap().recv()?;
blockstore blockstore
.insert_shreds(all_shreds.to_vec(), None, true) .insert_shreds(all_shreds.to_vec(), None, true)

View File

@ -28,7 +28,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
fn run( fn run(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -120,10 +120,10 @@ impl BroadcastRun for BroadcastFakeShredsRun {
} }
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Mutex<TransmitReceiver>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>, _bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
for (data_shreds, batch_info) in receiver.lock().unwrap().iter() { for (data_shreds, batch_info) in receiver.lock().unwrap().iter() {
let fake = batch_info.is_some(); let fake = batch_info.is_some();
@ -139,11 +139,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
} }
Ok(()) Ok(())
} }
fn record( fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
for (data_shreds, _) in receiver.lock().unwrap().iter() { for (data_shreds, _) in receiver.lock().unwrap().iter() {
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
} }

View File

@ -40,7 +40,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
fn run( fn run(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -148,28 +148,24 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
} }
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Mutex<TransmitReceiver>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
let (shreds, _) = receiver.lock().unwrap().recv()?; let (shreds, _) = receiver.lock().unwrap().recv()?;
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,
&self.cluster_nodes_cache, &self.cluster_nodes_cache,
&Arc::new(AtomicInterval::default()), &AtomicInterval::default(),
&mut TransmitShredsStats::default(), &mut TransmitShredsStats::default(),
cluster_info, cluster_info,
bank_forks, bank_forks,
cluster_info.socket_addr_space(), cluster_info.socket_addr_space(),
) )
} }
fn record( fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let (all_shreds, _) = receiver.lock().unwrap().recv()?; let (all_shreds, _) = receiver.lock().unwrap().recv()?;
blockstore blockstore
.insert_shreds(all_shreds.to_vec(), None, true) .insert_shreds(all_shreds.to_vec(), None, true)

View File

@ -171,9 +171,9 @@ impl StandardBroadcastRun {
keypair: &Keypair, keypair: &Keypair,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receive_results: ReceiveResults, receive_results: ReceiveResults,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
let (bsend, brecv) = unbounded(); let (bsend, brecv) = unbounded();
let (ssend, srecv) = unbounded(); let (ssend, srecv) = unbounded();
@ -193,7 +193,7 @@ impl StandardBroadcastRun {
fn process_receive_results( fn process_receive_results(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
receive_results: ReceiveResults, receive_results: ReceiveResults,
@ -325,7 +325,7 @@ impl StandardBroadcastRun {
fn insert( fn insert(
&mut self, &mut self,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) { ) {
@ -363,7 +363,7 @@ impl StandardBroadcastRun {
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
let mut transmit_stats = TransmitShredsStats::default(); let mut transmit_stats = TransmitShredsStats::default();
@ -467,7 +467,7 @@ impl BroadcastRun for StandardBroadcastRun {
fn run( fn run(
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
blockstore: &Arc<Blockstore>, blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>, receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>, blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -485,19 +485,15 @@ impl BroadcastRun for StandardBroadcastRun {
} }
fn transmit( fn transmit(
&mut self, &mut self,
receiver: &Arc<Mutex<TransmitReceiver>>, receiver: &Mutex<TransmitReceiver>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
) -> Result<()> { ) -> Result<()> {
let (shreds, batch_info) = receiver.lock().unwrap().recv()?; let (shreds, batch_info) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks) self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks)
} }
fn record( fn record(&mut self, receiver: &Mutex<RecordReceiver>, blockstore: &Blockstore) -> Result<()> {
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?; let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?;
self.insert(blockstore, shreds, slot_start_ts); self.insert(blockstore, shreds, slot_start_ts);
Ok(()) Ok(())

View File

@ -236,9 +236,9 @@ impl Tpu {
cluster_info.clone(), cluster_info.clone(),
entry_receiver, entry_receiver,
retransmit_slots_receiver, retransmit_slots_receiver,
exit, exit.clone(),
blockstore, blockstore.clone(),
&bank_forks, bank_forks,
shred_version, shred_version,
); );