From ec36f0c5dffb36d639bc447a7552edbb8a241aa1 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 1 Aug 2022 14:46:45 -0400 Subject: [PATCH] removes redundant Option<&Arc<...>> wrapper for Blockstore in serve-repair --- core/src/ancestor_hashes_service.rs | 2 +- core/src/serve_repair.rs | 147 ++++++++++++---------------- core/src/serve_repair_service.rs | 2 +- core/src/validator.rs | 2 +- 4 files changed, 65 insertions(+), 88 deletions(-) diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index dbbb599fe..cc142168c 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -940,7 +940,7 @@ mod test { None, ); let t_listen = responder_serve_repair.listen( - Some(blockstore), + blockstore, requests_receiver, response_sender, exit.clone(), diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 4a458c1b4..a9cc6cd0c 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -356,7 +356,7 @@ impl ServeRepair { fn handle_repair( recycler: &PacketBatchRecycler, from_addr: &SocketAddr, - blockstore: Option<&Arc>, + blockstore: &Blockstore, request: RepairProtocol, stats: &mut ServeRepairStats, ping_cache: &mut PingCache, @@ -480,7 +480,7 @@ impl ServeRepair { &self, ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, - blockstore: Option<&Arc>, + blockstore: &Blockstore, requests_receiver: &PacketBatchReceiver, response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, @@ -569,7 +569,7 @@ impl ServeRepair { pub fn listen( self, - blockstore: Option>, + blockstore: Arc, requests_receiver: PacketBatchReceiver, response_sender: PacketBatchSender, exit: Arc, @@ -591,7 +591,7 @@ impl ServeRepair { let result = self.run_listen( &mut ping_cache, &recycler, - blockstore.as_ref(), + &blockstore, &requests_receiver, &response_sender, &mut stats, @@ -731,7 +731,7 @@ impl ServeRepair { &self, ping_cache: &mut PingCache, recycler: &PacketBatchRecycler, - blockstore: Option<&Arc>, + blockstore: &Blockstore, packet_batch: PacketBatch, response_sender: &PacketBatchSender, root_bank: &Bank, @@ -1093,44 +1093,36 @@ impl ServeRepair { fn run_window_request( recycler: &PacketBatchRecycler, from_addr: &SocketAddr, - blockstore: Option<&Arc>, + blockstore: &Blockstore, slot: Slot, shred_index: u64, nonce: Nonce, ) -> Option { - if let Some(blockstore) = blockstore { - // Try to find the requested index in one of the slots - let packet = repair_response::repair_response_packet( - blockstore, - slot, - shred_index, - from_addr, - nonce, - ); + // Try to find the requested index in one of the slots + let packet = repair_response::repair_response_packet( + blockstore, + slot, + shred_index, + from_addr, + nonce, + )?; - if let Some(packet) = packet { - inc_new_counter_debug!("serve_repair-window-request-ledger", 1); - return Some(PacketBatch::new_unpinned_with_recycler_data( - recycler, - "run_window_request", - vec![packet], - )); - } - } - - inc_new_counter_debug!("serve_repair-window-request-fail", 1); - None + inc_new_counter_debug!("serve_repair-window-request-ledger", 1); + Some(PacketBatch::new_unpinned_with_recycler_data( + recycler, + "run_window_request", + vec![packet], + )) } fn run_highest_window_request( recycler: &PacketBatchRecycler, from_addr: &SocketAddr, - blockstore: Option<&Arc>, + blockstore: &Blockstore, slot: Slot, highest_index: u64, nonce: Nonce, ) -> Option { - let blockstore = blockstore?; // Try to find the requested index in one of the slots let meta = blockstore.meta(slot).ok()??; if meta.received > highest_index { @@ -1154,37 +1146,35 @@ impl ServeRepair { fn run_orphan( recycler: &PacketBatchRecycler, from_addr: &SocketAddr, - blockstore: Option<&Arc>, + blockstore: &Blockstore, mut slot: Slot, max_responses: usize, nonce: Nonce, ) -> Option { let mut res = PacketBatch::new_unpinned_with_recycler(recycler.clone(), max_responses, "run_orphan"); - if let Some(blockstore) = blockstore { - // Try to find the next "n" parent slots of the input slot - while let Ok(Some(meta)) = blockstore.meta(slot) { - if meta.received == 0 { - break; - } - let packet = repair_response::repair_response_packet( - blockstore, - slot, - meta.received - 1, - from_addr, - nonce, - ); - if let Some(packet) = packet { - res.push(packet); - } else { - break; - } + // Try to find the next "n" parent slots of the input slot + while let Ok(Some(meta)) = blockstore.meta(slot) { + if meta.received == 0 { + break; + } + let packet = repair_response::repair_response_packet( + blockstore, + slot, + meta.received - 1, + from_addr, + nonce, + ); + if let Some(packet) = packet { + res.push(packet); + } else { + break; + } - if meta.parent_slot.is_some() && res.len() < max_responses { - slot = meta.parent_slot.unwrap(); - } else { - break; - } + if meta.parent_slot.is_some() && res.len() < max_responses { + slot = meta.parent_slot.unwrap(); + } else { + break; } } if res.is_empty() { @@ -1196,11 +1186,10 @@ impl ServeRepair { fn run_ancestor_hashes( recycler: &PacketBatchRecycler, from_addr: &SocketAddr, - blockstore: Option<&Arc>, + blockstore: &Blockstore, slot: Slot, nonce: Nonce, ) -> Option { - let blockstore = blockstore?; let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) { let ancestor_iterator = AncestorIteratorWithHash::from(AncestorIterator::new_inclusive(slot, blockstore)); @@ -1635,7 +1624,7 @@ mod tests { let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, 0, 0, nonce, @@ -1654,7 +1643,7 @@ mod tests { let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot, index, nonce, @@ -1678,7 +1667,7 @@ mod tests { let rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot, index + 1, nonce, @@ -1704,7 +1693,7 @@ mod tests { let rv = ServeRepair::run_window_request( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot, 0, nonce, @@ -1720,7 +1709,7 @@ mod tests { let rv = ServeRepair::run_window_request( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot, index, nonce, @@ -1855,14 +1844,8 @@ mod tests { let ledger_path = get_tmp_ledger_path!(); { let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); - let rv = ServeRepair::run_orphan( - &recycler, - &socketaddr_any!(), - Some(&blockstore), - slot, - 0, - nonce, - ); + let rv = + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce); assert!(rv.is_none()); // Create slots [slot, slot + num_slots) with 5 shreds apiece @@ -1876,7 +1859,7 @@ mod tests { let rv = ServeRepair::run_orphan( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot + num_slots, 5, nonce, @@ -1888,7 +1871,7 @@ mod tests { let rv: Vec<_> = ServeRepair::run_orphan( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot + num_slots - 1, 5, nonce, @@ -1954,18 +1937,12 @@ mod tests { // Orphan request for slot 2 should only return slot 1 since // calling `repair_response_packet` on slot 1's shred will // be corrupted - let rv: Vec<_> = ServeRepair::run_orphan( - &recycler, - &socketaddr_any!(), - Some(&blockstore), - 2, - 5, - nonce, - ) - .expect("run_orphan packets") - .iter() - .cloned() - .collect(); + let rv: Vec<_> = + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, 2, 5, nonce) + .expect("run_orphan packets") + .iter() + .cloned() + .collect(); // Verify responses let expected = vec![repair_response::repair_response_packet( @@ -2011,7 +1988,7 @@ mod tests { let rv = ServeRepair::run_ancestor_hashes( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot + num_slots, nonce, ) @@ -2026,7 +2003,7 @@ mod tests { let rv = ServeRepair::run_ancestor_hashes( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot + num_slots - 1, nonce, ) @@ -2048,7 +2025,7 @@ mod tests { let rv = ServeRepair::run_ancestor_hashes( &recycler, &socketaddr_any!(), - Some(&blockstore), + &blockstore, slot + num_slots - 1, nonce, ) diff --git a/core/src/serve_repair_service.rs b/core/src/serve_repair_service.rs index e25662fe3..72dc7a49e 100644 --- a/core/src/serve_repair_service.rs +++ b/core/src/serve_repair_service.rs @@ -21,7 +21,7 @@ pub struct ServeRepairService { impl ServeRepairService { pub fn new( serve_repair: ServeRepair, - blockstore: Option>, + blockstore: Arc, serve_repair_socket: UdpSocket, socket_addr_space: SocketAddrSpace, stats_reporter_sender: Sender>, diff --git a/core/src/validator.rs b/core/src/validator.rs index 8700d2247..240d0ade2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -886,7 +886,7 @@ impl Validator { let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone()); let serve_repair_service = ServeRepairService::new( serve_repair, - Some(blockstore.clone()), + blockstore.clone(), node.sockets.serve_repair, socket_addr_space, stats_reporter_sender,