From 0dc0594aaaeea5573c8969ae3085d306311bdb14 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Wed, 21 Aug 2019 15:27:42 -0700 Subject: [PATCH] Fixes to repair and orphan logic for data shreds (#5587) --- core/src/blocktree.rs | 71 ++++++------------- .../broadcast_stage/standard_broadcast_run.rs | 8 ++- core/src/repair_service.rs | 34 +++++---- core/src/shred.rs | 27 ++++++- core/src/window_service.rs | 2 +- local_cluster/tests/local_cluster.rs | 6 +- 6 files changed, 76 insertions(+), 72 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index a981a0da8..263df76d7 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -469,12 +469,7 @@ impl Blocktree { ) -> Result { let slot = shred.slot(); let index = u64::from(shred.index()); - let parent = if let Shred::FirstInSlot(s) = shred { - debug!("got first in slot"); - s.header.parent - } else { - std::u64::MAX - }; + let parent = shred.parent(); let last_in_slot = if let Shred::LastInSlot(_) = shred { debug!("got last in slot"); @@ -1169,7 +1164,7 @@ impl Blocktree { end_index: u64, max_missing: usize, ) -> Vec { - if let Ok(mut db_iterator) = self.db.cursor::() { + if let Ok(mut db_iterator) = self.db.cursor::() { Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) } else { vec![] @@ -1187,11 +1182,6 @@ impl Blocktree { .map(|x| x.0) } - pub fn read_ledger_blobs(&self) -> impl Iterator + '_ { - let iter = self.db.iter::(None).unwrap(); - iter.map(|(_, blob_data)| Blob::new(&blob_data)) - } - pub fn get_slot_entries_with_blob_count( &self, slot: u64, @@ -2439,28 +2429,6 @@ pub fn create_new_tmp_ledger(name: &str, genesis_block: &GenesisBlock) -> (PathB (ledger_path, blockhash) } -#[macro_export] -macro_rules! tmp_copy_blocktree { - ($from:expr) => { - tmp_copy_blocktree($from, tmp_ledger_name!()) - }; -} - -pub fn tmp_copy_blocktree(from: &Path, name: &str) -> PathBuf { - let path = get_tmp_ledger_path(name); - - let blocktree = Blocktree::open(from).unwrap(); - let blobs = blocktree.read_ledger_blobs(); - let genesis_block = GenesisBlock::load(from).unwrap(); - - Blocktree::destroy(&path).expect("Expected successful database destruction"); - let blocktree = Blocktree::open(&path).unwrap(); - blocktree.write_blobs(blobs).unwrap(); - genesis_block.write(&path).unwrap(); - - path -} - #[cfg(test)] pub mod tests { use super::*; @@ -3659,15 +3627,17 @@ pub mod tests { let blocktree = Blocktree::open(&blocktree_path).unwrap(); // Write entries - let gap = 10; + let gap: u64 = 10; assert!(gap > 3); let num_entries = 10; - let mut blobs = make_tiny_test_entries(num_entries).to_single_entry_blobs(); - for (i, b) in blobs.iter_mut().enumerate() { - b.set_index(i as u64 * gap); + let entries = make_tiny_test_entries(num_entries); + let mut shreds = entries_to_test_shreds(entries, slot, 0, true); + let num_shreds = shreds.len(); + for (i, b) in shreds.iter_mut().enumerate() { + b.set_index(i as u32 * gap as u32); b.set_slot(slot); } - blocktree.write_blobs(&blobs).unwrap(); + blocktree.insert_shreds(&shreds).unwrap(); // Index of the first blob is 0 // Index of the second blob is "gap" @@ -3711,7 +3681,7 @@ pub mod tests { &expected[..expected.len() - 1], ); - for i in 0..num_entries as u64 { + for i in 0..num_shreds as u64 { for j in 0..i { let expected: Vec = (j..i) .flat_map(|k| { @@ -3750,16 +3720,17 @@ pub mod tests { assert_eq!(blocktree.find_missing_data_indexes(slot, 4, 3, 1), empty); assert_eq!(blocktree.find_missing_data_indexes(slot, 1, 2, 0), empty); - let mut blobs = make_tiny_test_entries(2).to_single_entry_blobs(); + let entries = make_tiny_test_entries(20); + let mut shreds = entries_to_test_shreds(entries, slot, 0, true); const ONE: u64 = 1; const OTHER: u64 = 4; - blobs[0].set_index(ONE); - blobs[1].set_index(OTHER); + shreds[0].set_index(ONE as u32); + shreds[1].set_index(OTHER as u32); // Insert one blob at index = first_index - blocktree.write_blobs(&blobs).unwrap(); + blocktree.insert_shreds(&shreds[0..2]).unwrap(); const STARTS: u64 = OTHER * 2; const END: u64 = OTHER * 3; @@ -3789,16 +3760,14 @@ pub mod tests { // Write entries let num_entries = 10; - let shared_blobs = make_tiny_test_entries(num_entries).to_single_entry_shared_blobs(); + let entries = make_tiny_test_entries(num_entries); + let shreds = entries_to_test_shreds(entries, slot, 0, true); + let num_shreds = shreds.len(); - crate::packet::index_blobs(&shared_blobs, &Pubkey::new_rand(), 0, slot, 0); - - let blob_locks: Vec<_> = shared_blobs.iter().map(|b| b.read().unwrap()).collect(); - let blobs: Vec<&Blob> = blob_locks.iter().map(|b| &**b).collect(); - blocktree.write_blobs(blobs).unwrap(); + blocktree.insert_shreds(&shreds).unwrap(); let empty: Vec = vec![]; - for i in 0..num_entries as u64 { + for i in 0..num_shreds as u64 { for j in 0..i { assert_eq!( blocktree.find_missing_data_indexes(slot, j, i, (i - j) as usize), diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index d64b75c85..01f3b6e6f 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -88,7 +88,13 @@ impl BroadcastRun for StandardBroadcastRun { ) } else { trace!("Renew shredder with same parent slot {:?}", parent_slot); - Shredder::new(bank.slot(), None, 0.0, keypair, latest_blob_index as u32) + Shredder::new( + bank.slot(), + Some(parent_slot), + 0.0, + keypair, + latest_blob_index as u32, + ) } } else { trace!("New shredder with parent slot {:?}", parent_slot); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 74d26b4b4..595b527bf 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -406,7 +406,8 @@ impl Service for RepairService { mod test { use super::*; use crate::blocktree::tests::{ - make_chaining_slot_entries, make_many_slot_entries, make_slot_entries, + make_chaining_slot_entries, make_chaining_slot_entries_using_shreds, + make_many_slot_entries_using_shreds, make_slot_entries, }; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::Node; @@ -468,21 +469,26 @@ mod test { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let nth = 3; - let num_entries_per_slot = 5 * nth; let num_slots = 2; // Create some blobs - let (blobs, _) = - make_many_slot_entries(0, num_slots as u64, num_entries_per_slot as u64); + let (mut shreds, _) = + make_many_slot_entries_using_shreds(0, num_slots as u64, 50 as u64); + let num_shreds = shreds.len() as u64; + let num_shreds_per_slot = num_shreds / num_slots; // write every nth blob - let blobs_to_write: Vec<_> = blobs.iter().step_by(nth as usize).collect(); - - blocktree.write_blobs(blobs_to_write).unwrap(); - - let missing_indexes_per_slot: Vec = (0..num_entries_per_slot / nth - 1) - .flat_map(|x| ((nth * x + 1) as u64..(nth * x + nth) as u64)) - .collect(); + let mut shreds_to_write = vec![]; + let mut missing_indexes_per_slot = vec![]; + for i in (0..num_shreds).rev() { + let index = i % num_shreds_per_slot; + if index % nth == 0 { + shreds_to_write.insert(0, shreds.remove(i as usize)); + } else if i < num_shreds_per_slot { + missing_indexes_per_slot.insert(0, index); + } + } + blocktree.insert_shreds(&shreds_to_write).unwrap(); let expected: Vec = (0..num_slots) .flat_map(|slot| { @@ -541,9 +547,9 @@ mod test { let slots: Vec = vec![1, 3, 5, 7, 8]; let num_entries_per_slot = 10; - let blobs = make_chaining_slot_entries(&slots, num_entries_per_slot); - for (slot_blobs, _) in blobs.iter() { - blocktree.write_blobs(&slot_blobs[1..]).unwrap(); + let shreds = make_chaining_slot_entries_using_shreds(&slots, num_entries_per_slot); + for (slot_shreds, _) in shreds.iter() { + blocktree.insert_shreds(&slot_shreds[1..]).unwrap(); } // Iterate through all possible combinations of start..end (inclusive on both diff --git a/core/src/shred.rs b/core/src/shred.rs index 089f9c4fd..b4ca97762 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -34,6 +34,17 @@ impl Shred { } } + pub fn parent(&self) -> u64 { + match self { + Shred::FirstInSlot(s) => s.header.data_header.parent, + Shred::FirstInFECSet(s) + | Shred::Data(s) + | Shred::LastInFECSet(s) + | Shred::LastInSlot(s) => s.header.parent, + Shred::Coding(_) => std::u64::MAX, + } + } + pub fn set_slot(&mut self, slot: u64) { match self { Shred::FirstInSlot(s) => s.header.data_header.common_header.slot = slot, @@ -127,6 +138,7 @@ pub struct ShredCommonHeader { pub struct DataShredHeader { _reserved: CodingShredHeader, pub common_header: ShredCommonHeader, + pub parent: u64, pub last_in_slot: u8, } @@ -284,6 +296,7 @@ pub struct Shredder { slot: u64, index: u32, pub parent: Option, + parent_slot: u64, fec_rate: f32, signer: Arc, pub shreds: Vec>, @@ -301,8 +314,15 @@ impl Write for Shredder { self.parent .take() .map(|parent| { - // If parent slot is provided, assume it's first shred in slot - Shred::FirstInSlot(self.new_first_shred(parent)) + self.parent_slot = parent; + // If parent slot is available + if self.index == 0 { + // If index is 0, it's the first shred in slot + Shred::FirstInSlot(self.new_first_shred(parent)) + } else { + // Or, it is the first shred in FEC set + Shred::FirstInFECSet(self.new_data_shred()) + } }) .unwrap_or_else(|| // If parent slot is not provided, and since there's no existing shred, @@ -371,6 +391,7 @@ impl Shredder { slot, index, parent, + parent_slot: 0, fec_rate, signer: signer.clone(), ..Shredder::default() @@ -403,6 +424,7 @@ impl Shredder { let mut data_shred = DataShred::default(); data_shred.header.common_header.slot = self.slot; data_shred.header.common_header.index = self.index; + data_shred.header.parent = self.parent_slot; data_shred } @@ -410,6 +432,7 @@ impl Shredder { fn new_first_shred(&self, parent: u64) -> FirstDataShred { let mut first_shred = FirstDataShred::default(); first_shred.header.parent = parent; + first_shred.header.data_header.parent = parent; first_shred.header.data_header.common_header.slot = self.slot; first_shred.header.data_header.common_header.index = self.index; first_shred diff --git a/core/src/window_service.rs b/core/src/window_service.rs index a080882bf..242ee6817 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -114,7 +114,7 @@ where blocktree.insert_shreds(&shreds)?; - info!( + trace!( "Elapsed processing time in recv_window(): {}", duration_as_ms(&now.elapsed()) ); diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index 068bb7fbf..1fefd92c7 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -15,7 +15,7 @@ use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing}; use std::{collections::HashSet, thread::sleep, time::Duration}; #[test] -#[ignore] +#[serial] fn test_ledger_cleanup_service() { solana_logger::setup(); error!("test_ledger_cleanup_service"); @@ -69,7 +69,7 @@ fn test_spend_and_verify_all_nodes_1() { } #[test] -#[ignore] +#[serial] fn test_spend_and_verify_all_nodes_2() { solana_logger::setup(); error!("test_spend_and_verify_all_nodes_2"); @@ -84,7 +84,7 @@ fn test_spend_and_verify_all_nodes_2() { } #[test] -#[ignore] +#[serial] fn test_spend_and_verify_all_nodes_3() { solana_logger::setup(); error!("test_spend_and_verify_all_nodes_3");