removes redundant Arc<RwLock<...>> wrapper off ServeRepair

This commit is contained in:
behzad nouri 2022-08-01 14:22:54 -04:00
parent e5c5055869
commit 6423da0218
4 changed files with 24 additions and 43 deletions

View File

@ -900,10 +900,8 @@ mod test {
Arc::new(Keypair::new()), Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
); );
let responder_serve_repair = Arc::new(RwLock::new(ServeRepair::new( let responder_serve_repair =
Arc::new(cluster_info), ServeRepair::new(Arc::new(cluster_info), vote_simulator.bank_forks);
vote_simulator.bank_forks,
)));
// Set up thread to give us responses // Set up thread to give us responses
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
@ -941,12 +939,11 @@ mod test {
false, false,
None, None,
); );
let t_listen = ServeRepair::listen( let t_listen = responder_serve_repair.listen(
responder_serve_repair,
Some(blockstore), Some(blockstore),
requests_receiver, requests_receiver,
response_sender, response_sender,
&exit, exit.clone(),
); );
Self { Self {

View File

@ -477,7 +477,7 @@ impl ServeRepair {
/// Process messages from the network /// Process messages from the network
fn run_listen( fn run_listen(
obj: &Arc<RwLock<Self>>, &self,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
blockstore: Option<&Arc<Blockstore>>, blockstore: Option<&Arc<Blockstore>>,
@ -505,10 +505,9 @@ impl ServeRepair {
stats.dropped_requests += dropped_requests; stats.dropped_requests += dropped_requests;
stats.total_requests += total_requests; stats.total_requests += total_requests;
let root_bank = obj.read().unwrap().bank_forks.read().unwrap().root_bank(); let root_bank = self.bank_forks.read().unwrap().root_bank();
for reqs in reqs_v { for reqs in reqs_v {
Self::handle_packets( self.handle_packets(
obj,
ping_cache, ping_cache,
recycler, recycler,
blockstore, blockstore,
@ -522,9 +521,9 @@ impl ServeRepair {
Ok(()) Ok(())
} }
fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) { fn report_reset_stats(&self, stats: &mut ServeRepairStats) {
if stats.self_repair > 0 { if stats.self_repair > 0 {
let my_id = me.read().unwrap().cluster_info.id(); let my_id = self.cluster_info.id();
warn!( warn!(
"{}: Ignored received repair requests from ME: {}", "{}: Ignored received repair requests from ME: {}",
my_id, stats.self_repair, my_id, stats.self_repair,
@ -569,11 +568,11 @@ impl ServeRepair {
} }
pub fn listen( pub fn listen(
me: Arc<RwLock<Self>>, self,
blockstore: Option<Arc<Blockstore>>, blockstore: Option<Arc<Blockstore>>,
requests_receiver: PacketBatchReceiver, requests_receiver: PacketBatchReceiver,
response_sender: PacketBatchSender, response_sender: PacketBatchSender,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
const INTERVAL_MS: u64 = 1000; const INTERVAL_MS: u64 = 1000;
const MAX_BYTES_PER_SECOND: usize = 12_000_000; const MAX_BYTES_PER_SECOND: usize = 12_000_000;
@ -581,7 +580,6 @@ impl ServeRepair {
let mut ping_cache = PingCache::new(REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_CAPACITY); let mut ping_cache = PingCache::new(REPAIR_PING_CACHE_TTL, REPAIR_PING_CACHE_CAPACITY);
let exit = exit.clone();
let recycler = PacketBatchRecycler::default(); let recycler = PacketBatchRecycler::default();
Builder::new() Builder::new()
.name("solana-repair-listen".to_string()) .name("solana-repair-listen".to_string())
@ -590,8 +588,7 @@ impl ServeRepair {
let mut stats = ServeRepairStats::default(); let mut stats = ServeRepairStats::default();
let data_budget = DataBudget::default(); let data_budget = DataBudget::default();
loop { loop {
let result = Self::run_listen( let result = self.run_listen(
&me,
&mut ping_cache, &mut ping_cache,
&recycler, &recycler,
blockstore.as_ref(), blockstore.as_ref(),
@ -608,7 +605,7 @@ impl ServeRepair {
return; return;
} }
if last_print.elapsed().as_secs() > 2 { if last_print.elapsed().as_secs() > 2 {
Self::report_reset_stats(&me, &mut stats); self.report_reset_stats(&mut stats);
last_print = Instant::now(); last_print = Instant::now();
} }
data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL); data_budget.update(INTERVAL_MS, |_bytes| MAX_BYTES_PER_INTERVAL);
@ -731,7 +728,7 @@ impl ServeRepair {
} }
fn handle_packets( fn handle_packets(
me: &Arc<RwLock<Self>>, &self,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
blockstore: Option<&Arc<Blockstore>>, blockstore: Option<&Arc<Blockstore>>,
@ -742,12 +739,8 @@ impl ServeRepair {
data_budget: &DataBudget, data_budget: &DataBudget,
) { ) {
let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank); let sign_repairs_epoch = Self::sign_repair_requests_activated_epoch(root_bank);
let (identity_keypair, socket_addr_space) = { let identity_keypair = self.cluster_info.keypair().clone();
let me_r = me.read().unwrap(); let socket_addr_space = *self.cluster_info.socket_addr_space();
let keypair = me_r.cluster_info.keypair().clone();
let socket_addr_space = *me_r.cluster_info.socket_addr_space();
(keypair, socket_addr_space)
};
let my_id = identity_keypair.pubkey(); let my_id = identity_keypair.pubkey();
let mut pending_pings = Vec::default(); let mut pending_pings = Vec::default();

View File

@ -9,7 +9,7 @@ use {
}, },
std::{ std::{
net::UdpSocket, net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock}, sync::{atomic::AtomicBool, Arc},
thread::{self, JoinHandle}, thread::{self, JoinHandle},
}, },
}; };
@ -20,18 +20,18 @@ pub struct ServeRepairService {
impl ServeRepairService { impl ServeRepairService {
pub fn new( pub fn new(
serve_repair: &Arc<RwLock<ServeRepair>>, serve_repair: ServeRepair,
blockstore: Option<Arc<Blockstore>>, blockstore: Option<Arc<Blockstore>>,
serve_repair_socket: UdpSocket, serve_repair_socket: UdpSocket,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>, stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>,
exit: &Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
let (request_sender, request_receiver) = unbounded(); let (request_sender, request_receiver) = unbounded();
let serve_repair_socket = Arc::new(serve_repair_socket); let serve_repair_socket = Arc::new(serve_repair_socket);
trace!( trace!(
"ServeRepairService: id: {}, listening on: {:?}", "ServeRepairService: id: {}, listening on: {:?}",
&serve_repair.read().unwrap().my_id(), &serve_repair.my_id(),
serve_repair_socket.local_addr().unwrap() serve_repair_socket.local_addr().unwrap()
); );
let t_receiver = streamer::receiver( let t_receiver = streamer::receiver(
@ -52,13 +52,7 @@ impl ServeRepairService {
socket_addr_space, socket_addr_space,
Some(stats_reporter_sender), Some(stats_reporter_sender),
); );
let t_listen = ServeRepair::listen( let t_listen = serve_repair.listen(blockstore, request_receiver, response_sender, exit);
serve_repair.clone(),
blockstore,
request_receiver,
response_sender,
exit,
);
let thread_hdls = vec![t_receiver, t_responder, t_listen]; let thread_hdls = vec![t_receiver, t_responder, t_listen];
Self { thread_hdls } Self { thread_hdls }

View File

@ -883,17 +883,14 @@ impl Validator {
Some(stats_reporter_sender.clone()), Some(stats_reporter_sender.clone()),
&exit, &exit,
); );
let serve_repair = Arc::new(RwLock::new(ServeRepair::new( let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone());
cluster_info.clone(),
bank_forks.clone(),
)));
let serve_repair_service = ServeRepairService::new( let serve_repair_service = ServeRepairService::new(
&serve_repair, serve_repair,
Some(blockstore.clone()), Some(blockstore.clone()),
node.sockets.serve_repair, node.sockets.serve_repair,
socket_addr_space, socket_addr_space,
stats_reporter_sender, stats_reporter_sender,
&exit, exit.clone(),
); );
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority( let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(