removes redundant Option<&Arc<...>> wrapper for Blockstore in serve-repair

This commit is contained in:
behzad nouri 2022-08-01 14:46:45 -04:00
parent 6423da0218
commit ec36f0c5df
4 changed files with 65 additions and 88 deletions

View File

@ -940,7 +940,7 @@ mod test {
None, None,
); );
let t_listen = responder_serve_repair.listen( let t_listen = responder_serve_repair.listen(
Some(blockstore), blockstore,
requests_receiver, requests_receiver,
response_sender, response_sender,
exit.clone(), exit.clone(),

View File

@ -356,7 +356,7 @@ impl ServeRepair {
fn handle_repair( fn handle_repair(
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
from_addr: &SocketAddr, from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
request: RepairProtocol, request: RepairProtocol,
stats: &mut ServeRepairStats, stats: &mut ServeRepairStats,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
@ -480,7 +480,7 @@ impl ServeRepair {
&self, &self,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
requests_receiver: &PacketBatchReceiver, requests_receiver: &PacketBatchReceiver,
response_sender: &PacketBatchSender, response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats, stats: &mut ServeRepairStats,
@ -569,7 +569,7 @@ impl ServeRepair {
pub fn listen( pub fn listen(
self, self,
blockstore: Option<Arc<Blockstore>>, blockstore: Arc<Blockstore>,
requests_receiver: PacketBatchReceiver, requests_receiver: PacketBatchReceiver,
response_sender: PacketBatchSender, response_sender: PacketBatchSender,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
@ -591,7 +591,7 @@ impl ServeRepair {
let result = self.run_listen( let result = self.run_listen(
&mut ping_cache, &mut ping_cache,
&recycler, &recycler,
blockstore.as_ref(), &blockstore,
&requests_receiver, &requests_receiver,
&response_sender, &response_sender,
&mut stats, &mut stats,
@ -731,7 +731,7 @@ impl ServeRepair {
&self, &self,
ping_cache: &mut PingCache, ping_cache: &mut PingCache,
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
packet_batch: PacketBatch, packet_batch: PacketBatch,
response_sender: &PacketBatchSender, response_sender: &PacketBatchSender,
root_bank: &Bank, root_bank: &Bank,
@ -1093,44 +1093,36 @@ impl ServeRepair {
fn run_window_request( fn run_window_request(
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
from_addr: &SocketAddr, from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
slot: Slot, slot: Slot,
shred_index: u64, shred_index: u64,
nonce: Nonce, nonce: Nonce,
) -> Option<PacketBatch> { ) -> Option<PacketBatch> {
if let Some(blockstore) = blockstore { // Try to find the requested index in one of the slots
// Try to find the requested index in one of the slots let packet = repair_response::repair_response_packet(
let packet = repair_response::repair_response_packet( blockstore,
blockstore, slot,
slot, shred_index,
shred_index, from_addr,
from_addr, nonce,
nonce, )?;
);
if let Some(packet) = packet { inc_new_counter_debug!("serve_repair-window-request-ledger", 1);
inc_new_counter_debug!("serve_repair-window-request-ledger", 1); Some(PacketBatch::new_unpinned_with_recycler_data(
return Some(PacketBatch::new_unpinned_with_recycler_data( recycler,
recycler, "run_window_request",
"run_window_request", vec![packet],
vec![packet], ))
));
}
}
inc_new_counter_debug!("serve_repair-window-request-fail", 1);
None
} }
fn run_highest_window_request( fn run_highest_window_request(
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
from_addr: &SocketAddr, from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
slot: Slot, slot: Slot,
highest_index: u64, highest_index: u64,
nonce: Nonce, nonce: Nonce,
) -> Option<PacketBatch> { ) -> Option<PacketBatch> {
let blockstore = blockstore?;
// Try to find the requested index in one of the slots // Try to find the requested index in one of the slots
let meta = blockstore.meta(slot).ok()??; let meta = blockstore.meta(slot).ok()??;
if meta.received > highest_index { if meta.received > highest_index {
@ -1154,37 +1146,35 @@ impl ServeRepair {
fn run_orphan( fn run_orphan(
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
from_addr: &SocketAddr, from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
mut slot: Slot, mut slot: Slot,
max_responses: usize, max_responses: usize,
nonce: Nonce, nonce: Nonce,
) -> Option<PacketBatch> { ) -> Option<PacketBatch> {
let mut res = let mut res =
PacketBatch::new_unpinned_with_recycler(recycler.clone(), max_responses, "run_orphan"); PacketBatch::new_unpinned_with_recycler(recycler.clone(), max_responses, "run_orphan");
if let Some(blockstore) = blockstore { // Try to find the next "n" parent slots of the input slot
// Try to find the next "n" parent slots of the input slot while let Ok(Some(meta)) = blockstore.meta(slot) {
while let Ok(Some(meta)) = blockstore.meta(slot) { if meta.received == 0 {
if meta.received == 0 { break;
break; }
} let packet = repair_response::repair_response_packet(
let packet = repair_response::repair_response_packet( blockstore,
blockstore, slot,
slot, meta.received - 1,
meta.received - 1, from_addr,
from_addr, nonce,
nonce, );
); if let Some(packet) = packet {
if let Some(packet) = packet { res.push(packet);
res.push(packet); } else {
} else { break;
break; }
}
if meta.parent_slot.is_some() && res.len() < max_responses { if meta.parent_slot.is_some() && res.len() < max_responses {
slot = meta.parent_slot.unwrap(); slot = meta.parent_slot.unwrap();
} else { } else {
break; break;
}
} }
} }
if res.is_empty() { if res.is_empty() {
@ -1196,11 +1186,10 @@ impl ServeRepair {
fn run_ancestor_hashes( fn run_ancestor_hashes(
recycler: &PacketBatchRecycler, recycler: &PacketBatchRecycler,
from_addr: &SocketAddr, from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>, blockstore: &Blockstore,
slot: Slot, slot: Slot,
nonce: Nonce, nonce: Nonce,
) -> Option<PacketBatch> { ) -> Option<PacketBatch> {
let blockstore = blockstore?;
let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) { let ancestor_slot_hashes = if blockstore.is_duplicate_confirmed(slot) {
let ancestor_iterator = let ancestor_iterator =
AncestorIteratorWithHash::from(AncestorIterator::new_inclusive(slot, blockstore)); AncestorIteratorWithHash::from(AncestorIterator::new_inclusive(slot, blockstore));
@ -1635,7 +1624,7 @@ mod tests {
let rv = ServeRepair::run_highest_window_request( let rv = ServeRepair::run_highest_window_request(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
0, 0,
0, 0,
nonce, nonce,
@ -1654,7 +1643,7 @@ mod tests {
let rv = ServeRepair::run_highest_window_request( let rv = ServeRepair::run_highest_window_request(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot, slot,
index, index,
nonce, nonce,
@ -1678,7 +1667,7 @@ mod tests {
let rv = ServeRepair::run_highest_window_request( let rv = ServeRepair::run_highest_window_request(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot, slot,
index + 1, index + 1,
nonce, nonce,
@ -1704,7 +1693,7 @@ mod tests {
let rv = ServeRepair::run_window_request( let rv = ServeRepair::run_window_request(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot, slot,
0, 0,
nonce, nonce,
@ -1720,7 +1709,7 @@ mod tests {
let rv = ServeRepair::run_window_request( let rv = ServeRepair::run_window_request(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot, slot,
index, index,
nonce, nonce,
@ -1855,14 +1844,8 @@ mod tests {
let ledger_path = get_tmp_ledger_path!(); let ledger_path = get_tmp_ledger_path!();
{ {
let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap()); let blockstore = Arc::new(Blockstore::open(&ledger_path).unwrap());
let rv = ServeRepair::run_orphan( let rv =
&recycler, ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce);
&socketaddr_any!(),
Some(&blockstore),
slot,
0,
nonce,
);
assert!(rv.is_none()); assert!(rv.is_none());
// Create slots [slot, slot + num_slots) with 5 shreds apiece // Create slots [slot, slot + num_slots) with 5 shreds apiece
@ -1876,7 +1859,7 @@ mod tests {
let rv = ServeRepair::run_orphan( let rv = ServeRepair::run_orphan(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot + num_slots, slot + num_slots,
5, 5,
nonce, nonce,
@ -1888,7 +1871,7 @@ mod tests {
let rv: Vec<_> = ServeRepair::run_orphan( let rv: Vec<_> = ServeRepair::run_orphan(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot + num_slots - 1, slot + num_slots - 1,
5, 5,
nonce, nonce,
@ -1954,18 +1937,12 @@ mod tests {
// Orphan request for slot 2 should only return slot 1 since // Orphan request for slot 2 should only return slot 1 since
// calling `repair_response_packet` on slot 1's shred will // calling `repair_response_packet` on slot 1's shred will
// be corrupted // be corrupted
let rv: Vec<_> = ServeRepair::run_orphan( let rv: Vec<_> =
&recycler, ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, 2, 5, nonce)
&socketaddr_any!(), .expect("run_orphan packets")
Some(&blockstore), .iter()
2, .cloned()
5, .collect();
nonce,
)
.expect("run_orphan packets")
.iter()
.cloned()
.collect();
// Verify responses // Verify responses
let expected = vec![repair_response::repair_response_packet( let expected = vec![repair_response::repair_response_packet(
@ -2011,7 +1988,7 @@ mod tests {
let rv = ServeRepair::run_ancestor_hashes( let rv = ServeRepair::run_ancestor_hashes(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot + num_slots, slot + num_slots,
nonce, nonce,
) )
@ -2026,7 +2003,7 @@ mod tests {
let rv = ServeRepair::run_ancestor_hashes( let rv = ServeRepair::run_ancestor_hashes(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot + num_slots - 1, slot + num_slots - 1,
nonce, nonce,
) )
@ -2048,7 +2025,7 @@ mod tests {
let rv = ServeRepair::run_ancestor_hashes( let rv = ServeRepair::run_ancestor_hashes(
&recycler, &recycler,
&socketaddr_any!(), &socketaddr_any!(),
Some(&blockstore), &blockstore,
slot + num_slots - 1, slot + num_slots - 1,
nonce, nonce,
) )

View File

@ -21,7 +21,7 @@ pub struct ServeRepairService {
impl ServeRepairService { impl ServeRepairService {
pub fn new( pub fn new(
serve_repair: ServeRepair, serve_repair: ServeRepair,
blockstore: Option<Arc<Blockstore>>, blockstore: Arc<Blockstore>,
serve_repair_socket: UdpSocket, serve_repair_socket: UdpSocket,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>, stats_reporter_sender: Sender<Box<dyn FnOnce() + Send>>,

View File

@ -886,7 +886,7 @@ impl Validator {
let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone()); let serve_repair = ServeRepair::new(cluster_info.clone(), bank_forks.clone());
let serve_repair_service = ServeRepairService::new( let serve_repair_service = ServeRepairService::new(
serve_repair, serve_repair,
Some(blockstore.clone()), blockstore.clone(),
node.sockets.serve_repair, node.sockets.serve_repair,
socket_addr_space, socket_addr_space,
stats_reporter_sender, stats_reporter_sender,