diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index f07960d9f7..dbbb599fea 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -900,10 +900,8 @@ mod test { Arc::new(Keypair::new()), SocketAddrSpace::Unspecified, ); - let responder_serve_repair = Arc::new(RwLock::new(ServeRepair::new( - Arc::new(cluster_info), - vote_simulator.bank_forks, - ))); + let responder_serve_repair = + ServeRepair::new(Arc::new(cluster_info), vote_simulator.bank_forks); // Set up thread to give us responses let ledger_path = get_tmp_ledger_path!(); @@ -941,12 +939,11 @@ mod test { false, None, ); - let t_listen = ServeRepair::listen( - responder_serve_repair, + let t_listen = responder_serve_repair.listen( Some(blockstore), requests_receiver, response_sender, - &exit, + exit.clone(), ); Self { diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 6adb669fba..4a458c1b4f 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -477,7 +477,7 @@ impl ServeRepair { /// Process messages from the network fn run_listen( - obj: &Arc>, + &self, ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, @@ -505,10 +505,9 @@ impl ServeRepair { stats.dropped_requests += dropped_requests; stats.total_requests += total_requests; - let root_bank = obj.read().unwrap().bank_forks.read().unwrap().root_bank(); + let root_bank = self.bank_forks.read().unwrap().root_bank(); for reqs in reqs_v { - Self::handle_packets( - obj, + self.handle_packets( ping_cache, recycler, blockstore, @@ -522,9 +521,9 @@ impl ServeRepair { Ok(()) } - fn report_reset_stats(me: &Arc>, stats: &mut ServeRepairStats) { + fn report_reset_stats(&self, stats: &mut ServeRepairStats) { if stats.self_repair > 0 { - let my_id = me.read().unwrap().cluster_info.id(); + let my_id = self.cluster_info.id(); warn!( "{}: Ignored received repair requests from ME: {}", my_id, stats.self_repair, @@ -569,11 +568,11 @@ impl ServeRepair { } pub fn listen( - me: Arc>, + self, blockstore: Option>, requests_receiver: PacketBatchReceiver, response_sender: PacketBatchSender, - exit: &Arc, + exit: Arc, ) -> JoinHandle<()> { const INTERVAL_MS: u64 = 1000; const MAX_BYTES_PER_SECOND: usize = 12_000_000; @@ -581,7 +580,6 @@ impl ServeRepair { let mut ping_cache = PingCache::new(REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_CAPACITY); - let exit = exit.clone(); let recycler = PacketBatchRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) @@ -590,8 +588,7 @@ impl ServeRepair { let mut stats = ServeRepairStats::default(); let data_budget = DataBudget::default(); loop { - let result = Self::run_listen( - &me, + let result = self.run_listen( &mut ping_cache, &recycler, blockstore.as_ref(), @@ -608,7 +605,7 @@ impl ServeRepair { return; } if last_print.elapsed().as_secs() > 2 { - Self::report_reset_stats(&me, &mut stats); + self.report_reset_stats(&mut stats); last_print = Instant::now(); } data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL); @@ -731,7 +728,7 @@ impl ServeRepair { } fn handle_packets( - me: &Arc>, + &self, ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, blockstore: Option<&Arc>, @@ -742,12 +739,8 @@ impl ServeRepair { data_budget: &DataBudget, ) { let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank); - let (identity_keypair, socket_addr_space) = { - let me_r = me.read().unwrap(); - let keypair = me_r.cluster_info.keypair().clone(); - let socket_addr_space = *me_r.cluster_info.socket_addr_space(); - (keypair, socket_addr_space) - }; + let identity_keypair = self.cluster_info.keypair().clone(); + let socket_addr_space = *self.cluster_info.socket_addr_space(); let my_id = identity_keypair.pubkey(); let mut pending_pings = Vec::default(); diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index ec038e4e55..e25662fe31 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -9,7 +9,7 @@ use { }, std::{ net::UdpSocket, - sync::{atomic::AtomicBool, Arc, RwLock}, + sync::{atomic::AtomicBool, Arc}, thread::{self, JoinHandle}, }, }; @@ -20,18 +20,18 @@ pub struct ServeRepairService { impl ServeRepairService { pub fn new( - serve_repair: &Arc>, + serve_repair: ServeRepair, blockstore: Option>, serve_repair_socket: UdpSocket, socket_addr_space: SocketAddrSpace, stats_reporter_sender: Sender>, - exit: &Arc, + exit: Arc, ) -> Self { let (request_sender, request_receiver) = unbounded(); let serve_repair_socket = Arc::new(serve_repair_socket); trace!( "ServeRepairService: id: {}, listening on: {:?}", - &serve_repair.read().unwrap().my_id(), + &serve_repair.my_id(), serve_repair_socket.local_addr().unwrap() ); let t_receiver = streamer::receiver( @@ -52,13 +52,7 @@ impl ServeRepairService { socket_addr_space, Some(stats_reporter_sender), ); - let t_listen = ServeRepair::listen( - serve_repair.clone(), - blockstore, - request_receiver, - response_sender, - exit, - ); + let t_listen = serve_repair.listen(blockstore, request_receiver, response_sender, exit); let thread_hdls = vec![t_receiver, t_responder, t_listen]; Self { thread_hdls } diff --git a/core/src/validator.rs b/core/src/validator.rs index 867d1e698f..8700d22472 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -883,17 +883,14 @@ impl Validator { Some(stats_reporter_sender.clone()), &exit, ); - let serve_repair = Arc::new(RwLock::new(ServeRepair::new( - cluster_info.clone(), - bank_forks.clone(), - ))); + let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone()); let serve_repair_service = ServeRepairService::new( - &serve_repair, + serve_repair, Some(blockstore.clone()), node.sockets.serve_repair, socket_addr_space, stats_reporter_sender, - &exit, + exit.clone(), ); let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(