diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 4b3c87110..e48745caf 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -80,9 +80,9 @@ impl BroadcastStageType { cluster_info: Arc, receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, - exit_sender: &Arc, - blockstore: &Arc, - bank_forks: &Arc>, + exit_sender: Arc, + blockstore: Arc, + bank_forks: Arc>, shred_version: u16, ) -> BroadcastStage { match self { @@ -137,23 +137,19 @@ trait BroadcastRun { fn run( &mut self, keypair: &Keypair, - blockstore: &Arc, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()>; fn transmit( &mut self, - receiver: &Arc>, + receiver: &Mutex, cluster_info: &ClusterInfo, sock: &UdpSocket, - bank_forks: &Arc>, - ) -> Result<()>; - fn record( - &mut self, - receiver: &Arc>, - blockstore: &Arc, + bank_forks: &RwLock, ) -> Result<()>; + fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()>; } // Implement a destructor for the BroadcastStage thread to signal it exited @@ -182,7 +178,7 @@ impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( cluster_info: Arc, - blockstore: &Arc, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -243,19 +239,18 @@ impl BroadcastStage { cluster_info: Arc, receiver: Receiver, retransmit_slots_receiver: RetransmitSlotsReceiver, - exit_sender: &Arc, - blockstore: &Arc, - bank_forks: &Arc>, + exit: Arc, + blockstore: Arc, + bank_forks: Arc>, broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { - let btree = blockstore.clone(); - let exit = exit_sender.clone(); let (socket_sender, socket_receiver) = unbounded(); let (blockstore_sender, blockstore_receiver) = unbounded(); let bs_run = broadcast_stage_run.clone(); let socket_sender_ = socket_sender.clone(); let thread_hdl = { + let blockstore = blockstore.clone(); let cluster_info = cluster_info.clone(); Builder::new() .name("solana-broadcaster".to_string()) @@ -263,7 +258,7 @@ impl BroadcastStage { let _finalizer = Finalizer::new(exit); Self::run( cluster_info, - &btree, + &blockstore, &receiver, &socket_sender_, &blockstore_sender, @@ -310,7 +305,6 @@ impl BroadcastStage { thread_hdls.push(t); } - let blockstore = blockstore.clone(); let retransmit_thread = Builder::new() .name("solana-broadcaster-retransmit".to_string()) .spawn(move || loop { @@ -382,7 +376,7 @@ impl BroadcastStage { fn update_peer_stats( cluster_nodes: &ClusterNodes, - last_datapoint_submit: &Arc, + last_datapoint_submit: &AtomicInterval, ) { if last_datapoint_submit.should_update(1000) { let now = timestamp(); @@ -402,10 +396,10 @@ pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], cluster_nodes_cache: &ClusterNodesCache, - last_datapoint_submit: &Arc, + last_datapoint_submit: &AtomicInterval, transmit_stats: &mut TransmitShredsStats, cluster_info: &ClusterInfo, - bank_forks: &Arc>, + bank_forks: &RwLock, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { let mut result = Ok(()); @@ -623,9 +617,9 @@ pub mod test { cluster_info, entry_receiver, retransmit_slots_receiver, - &exit_sender, - &blockstore, - &bank_forks, + exit_sender, + blockstore.clone(), + bank_forks, StandardBroadcastRun::new(0), ); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 32db20199..43b538b77 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -64,7 +64,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { fn run( &mut self, keypair: &Keypair, - _blockstore: &Arc, + _blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -251,10 +251,10 @@ impl BroadcastRun for BroadcastDuplicatesRun { fn transmit( &mut self, - receiver: &Arc>, + receiver: &Mutex, cluster_info: &ClusterInfo, sock: &UdpSocket, - bank_forks: &Arc>, + bank_forks: &RwLock, ) -> Result<()> { let (shreds, _) = receiver.lock().unwrap().recv()?; if shreds.is_empty() { @@ -355,11 +355,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { Ok(()) } - fn record( - &mut self, - receiver: &Arc>, - blockstore: &Arc, - ) -> Result<()> { + fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { let (all_shreds, _) = receiver.lock().unwrap().recv()?; blockstore .insert_shreds(all_shreds.to_vec(), None, true) diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 90f404933..572b1c3a9 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -28,7 +28,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { fn run( &mut self, keypair: &Keypair, - blockstore: &Arc, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -120,10 +120,10 @@ impl BroadcastRun for BroadcastFakeShredsRun { } fn transmit( &mut self, - receiver: &Arc>, + receiver: &Mutex, cluster_info: &ClusterInfo, sock: &UdpSocket, - _bank_forks: &Arc>, + _bank_forks: &RwLock, ) -> Result<()> { for (data_shreds, batch_info) in receiver.lock().unwrap().iter() { let fake = batch_info.is_some(); @@ -139,11 +139,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { } Ok(()) } - fn record( - &mut self, - receiver: &Arc>, - blockstore: &Arc, - ) -> Result<()> { + fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { for (data_shreds, _) in receiver.lock().unwrap().iter() { blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index fbb0b5b5e..bcbf1fc2e 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -40,7 +40,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { fn run( &mut self, keypair: &Keypair, - blockstore: &Arc, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -148,28 +148,24 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { } fn transmit( &mut self, - receiver: &Arc>, + receiver: &Mutex, cluster_info: &ClusterInfo, sock: &UdpSocket, - bank_forks: &Arc>, + bank_forks: &RwLock, ) -> Result<()> { let (shreds, _) = receiver.lock().unwrap().recv()?; broadcast_shreds( sock, &shreds, &self.cluster_nodes_cache, - &Arc::new(AtomicInterval::default()), + &AtomicInterval::default(), &mut TransmitShredsStats::default(), cluster_info, bank_forks, cluster_info.socket_addr_space(), ) } - fn record( - &mut self, - receiver: &Arc>, - blockstore: &Arc, - ) -> Result<()> { + fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { let (all_shreds, _) = receiver.lock().unwrap().recv()?; blockstore .insert_shreds(all_shreds.to_vec(), None, true) diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 008a3aaec..b43362cf9 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -171,9 +171,9 @@ impl StandardBroadcastRun { keypair: &Keypair, cluster_info: &ClusterInfo, sock: &UdpSocket, - blockstore: &Arc, + blockstore: &Blockstore, receive_results: ReceiveResults, - bank_forks: &Arc>, + bank_forks: &RwLock, ) -> Result<()> { let (bsend, brecv) = unbounded(); let (ssend, srecv) = unbounded(); @@ -193,7 +193,7 @@ impl StandardBroadcastRun { fn process_receive_results( &mut self, keypair: &Keypair, - blockstore: &Arc, + blockstore: &Blockstore, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, receive_results: ReceiveResults, @@ -325,7 +325,7 @@ impl StandardBroadcastRun { fn insert( &mut self, - blockstore: &Arc, + blockstore: &Blockstore, shreds: Arc>, broadcast_shred_batch_info: Option, ) { @@ -363,7 +363,7 @@ impl StandardBroadcastRun { cluster_info: &ClusterInfo, shreds: Arc>, broadcast_shred_batch_info: Option, - bank_forks: &Arc>, + bank_forks: &RwLock, ) -> Result<()> { trace!("Broadcasting {:?} shreds", shreds.len()); let mut transmit_stats = TransmitShredsStats::default(); @@ -467,7 +467,7 @@ impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, keypair: &Keypair, - blockstore: &Arc, + blockstore: &Blockstore, receiver: &Receiver, socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, @@ -485,19 +485,15 @@ impl BroadcastRun for StandardBroadcastRun { } fn transmit( &mut self, - receiver: &Arc>, + receiver: &Mutex, cluster_info: &ClusterInfo, sock: &UdpSocket, - bank_forks: &Arc>, + bank_forks: &RwLock, ) -> Result<()> { let (shreds, batch_info) = receiver.lock().unwrap().recv()?; self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks) } - fn record( - &mut self, - receiver: &Arc>, - blockstore: &Arc, - ) -> Result<()> { + fn record(&mut self, receiver: &Mutex, blockstore: &Blockstore) -> Result<()> { let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?; self.insert(blockstore, shreds, slot_start_ts); Ok(()) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a3744c77a..11be7900b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -236,9 +236,9 @@ impl Tpu { cluster_info.clone(), entry_receiver, retransmit_slots_receiver, - exit, - blockstore, - &bank_forks, + exit.clone(), + blockstore.clone(), + bank_forks, shred_version, );