Make Blockstore::scan_and_fix_roots() take optional start/stop slots (#32289)

The optional args allow reuse by ledger-tool repair roots command Also,
hold cleanup lock for duration of Blockstore::scan_and_fix_roots().

This prevents a scenario where scan_and_fix_roots() could identify a
slot as needing to be marked root, that slot getting cleaned by
LedgerCleanupService, and then scan_and_fix_roots() marking the slot as
root on the now purged slot.
This commit is contained in:
steviez 2023-06-28 22:32:03 -05:00 committed by GitHub
parent d6a0406d54
commit d5ad29d837
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 67 deletions

View File

@ -389,7 +389,7 @@ impl Default for ValidatorStartProgress {
} }
struct BlockstoreRootScan { struct BlockstoreRootScan {
thread: Option<JoinHandle<Result<(), BlockstoreError>>>, thread: Option<JoinHandle<Result<usize, BlockstoreError>>>,
} }
impl BlockstoreRootScan { impl BlockstoreRootScan {
@ -402,7 +402,7 @@ impl BlockstoreRootScan {
Some( Some(
Builder::new() Builder::new()
.name("solBStoreRtScan".to_string()) .name("solBStoreRtScan".to_string())
.spawn(move || blockstore.scan_and_fix_roots(&exit)) .spawn(move || blockstore.scan_and_fix_roots(None, None, &exit))
.unwrap(), .unwrap(),
) )
} else { } else {

View File

@ -3965,7 +3965,6 @@ fn main() {
start_root.saturating_sub(max_slots) start_root.saturating_sub(max_slots)
}; };
assert!(start_root > end_root); assert!(start_root > end_root);
assert!(blockstore.is_root(start_root));
let num_slots = start_root - end_root - 1; // Adjust by one since start_root need not be checked let num_slots = start_root - end_root - 1; // Adjust by one since start_root need not be checked
if arg_matches.is_present("end_root") && num_slots > max_slots { if arg_matches.is_present("end_root") && num_slots > max_slots {
eprintln!( eprintln!(
@ -3975,25 +3974,14 @@ fn main() {
); );
exit(1); exit(1);
} }
let ancestor_iterator = AncestorIterator::new(start_root, &blockstore)
.take_while(|&slot| slot >= end_root); let num_repaired_roots = blockstore
let roots_to_fix: Vec<_> = ancestor_iterator .scan_and_fix_roots(Some(start_root), Some(end_root), &AtomicBool::new(false))
.filter(|slot| !blockstore.is_root(*slot)) .unwrap_or_else(|err| {
.collect(); eprintln!("Unable to repair roots: {err}");
if !roots_to_fix.is_empty() { exit(1);
eprintln!("{} slots to be rooted", roots_to_fix.len()); });
for chunk in roots_to_fix.chunks(100) { println!("Successfully repaired {num_repaired_roots} roots");
eprintln!("{chunk:?}");
blockstore
.set_roots(roots_to_fix.iter())
.unwrap_or_else(|err| {
eprintln!("Unable to set roots {roots_to_fix:?}: {err}");
exit(1);
});
}
} else {
println!("No missing roots found in range {end_root} to {start_root}");
}
} }
("bounds", Some(arg_matches)) => { ("bounds", Some(arg_matches)) => {
let blockstore = open_blockstore( let blockstore = open_blockstore(

View File

@ -3323,18 +3323,48 @@ impl Blockstore {
self.db.is_primary_access() self.db.is_primary_access()
} }
/// Search for any ancestors of the latest root that are not marked as /// Scan for any ancestors of the supplied `start_root` that are not
/// roots themselves. Then, mark these found slots as roots since the /// marked as roots themselves. Mark any found slots as roots since
/// ancestor of a root is also inherently a root. /// the ancestor of a root is also inherently a root. Returns the
pub fn scan_and_fix_roots(&self, exit: &AtomicBool) -> Result<()> { /// number of slots that were actually updated.
let ancestor_iterator = AncestorIterator::new(self.last_root(), self) ///
.take_while(|&slot| slot >= self.lowest_cleanup_slot()); /// Arguments:
/// - `start_root`: The root to start scan from, or the highest root in
/// the blockstore if this value is `None`. This slot must be a root.
/// - `end_slot``: The slot to stop the scan at; the scan will continue to
/// the earliest slot in the Blockstore if this value is `None`.
/// - `exit`: Exit early if this flag is set to `true`.
pub fn scan_and_fix_roots(
&self,
start_root: Option<Slot>,
end_slot: Option<Slot>,
exit: &AtomicBool,
) -> Result<usize> {
// Hold the lowest_cleanup_slot read lock to prevent any cleaning of
// the blockstore from another thread. Doing so will prevent a
// possible inconsistency across column families where a slot is:
// - Identified as needing root repair by this thread
// - Cleaned from the blockstore by another thread (LedgerCleanupSerivce)
// - Marked as root via Self::set_root() by this this thread
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
let start_root = if let Some(slot) = start_root {
if !self.is_root(slot) {
return Err(BlockstoreError::SlotNotRooted);
}
slot
} else {
self.last_root()
};
let end_slot = end_slot.unwrap_or(*lowest_cleanup_slot);
let ancestor_iterator =
AncestorIterator::new(start_root, self).take_while(|&slot| slot >= end_slot);
let mut find_missing_roots = Measure::start("find_missing_roots"); let mut find_missing_roots = Measure::start("find_missing_roots");
let mut roots_to_fix = vec![]; let mut roots_to_fix = vec![];
for slot in ancestor_iterator.filter(|slot| !self.is_root(*slot)) { for slot in ancestor_iterator.filter(|slot| !self.is_root(*slot)) {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(0);
} }
roots_to_fix.push(slot); roots_to_fix.push(slot);
} }
@ -3342,19 +3372,16 @@ impl Blockstore {
let mut fix_roots = Measure::start("fix_roots"); let mut fix_roots = Measure::start("fix_roots");
if !roots_to_fix.is_empty() { if !roots_to_fix.is_empty() {
info!("{} slots to be rooted", roots_to_fix.len()); info!("{} slots to be rooted", roots_to_fix.len());
for chunk in roots_to_fix.chunks(100) { let chunk_size = 100;
for (i, chunk) in roots_to_fix.chunks(chunk_size).enumerate() {
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(i * chunk_size);
} }
trace!("{:?}", chunk); trace!("{:?}", chunk);
self.set_roots(chunk.iter())?; self.set_roots(chunk.iter())?;
} }
} else { } else {
debug!( debug!("No missing roots found in range {start_root} to {end_slot}");
"No missing roots found in range {} to {}",
self.lowest_cleanup_slot(),
self.last_root()
);
} }
fix_roots.stop(); fix_roots.stop();
datapoint_info!( datapoint_info!(
@ -3367,7 +3394,7 @@ impl Blockstore {
("num_roots_to_fix", roots_to_fix.len() as i64, i64), ("num_roots_to_fix", roots_to_fix.len() as i64, i64),
("fix_roots_us", fix_roots.as_us() as i64, i64), ("fix_roots_us", fix_roots.as_us() as i64, i64),
); );
Ok(()) Ok(roots_to_fix.len())
} }
/// Mark a root `slot` as connected, traverse `slot`'s children and update /// Mark a root `slot` as connected, traverse `slot`'s children and update
@ -5610,18 +5637,25 @@ pub mod tests {
#[test] #[test]
fn test_scan_and_fix_roots() { fn test_scan_and_fix_roots() {
fn blockstore_roots(blockstore: &Blockstore) -> Vec<Slot> {
blockstore
.rooted_slot_iterator(0)
.unwrap()
.collect::<Vec<_>>()
}
solana_logger::setup(); solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let entries_per_slot = max_ticks_per_n_shreds(5, None); let entries_per_slot = max_ticks_per_n_shreds(5, None);
let start_slot: Slot = 0; let start_slot: Slot = 0;
let num_slots = 10; let num_slots = 18;
// Produce the following chains and insert shreds into Blockstore // Produce the following chains and insert shreds into Blockstore
// 0 --> 2 --> 4 --> 6 --> 8 --> 10 // 0 -> 2 -> 4 -> 6 -> 8 -> 10 -> 12 -> 14 -> 16 -> 18
// \ // \
// --> 1 --> 3 --> 5 --> 7 --> 9 // -> 1 -> 3 -> 5 -> 7 -> 9 -> 11 -> 13 -> 15 -> 17
let shreds: Vec<_> = (start_slot..=num_slots) let shreds: Vec<_> = (start_slot..=num_slots)
.flat_map(|slot| { .flat_map(|slot| {
let parent_slot = if slot % 2 == 0 { let parent_slot = if slot % 2 == 0 {
@ -5640,41 +5674,61 @@ pub mod tests {
.collect(); .collect();
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
// Mark several roots // Start slot must be a root
let roots = vec![6, 10]; let (start, end) = (Some(16), None);
blockstore.set_roots(roots.iter()).unwrap(); assert_matches!(
assert_eq!( blockstore.scan_and_fix_roots(start, end, &AtomicBool::new(false)),
&roots, Err(BlockstoreError::SlotNotRooted)
&blockstore
.rooted_slot_iterator(0)
.unwrap()
.collect::<Vec<_>>(),
); );
// Fix roots and ensure all even slots are now root // Mark several roots
let roots = vec![0, 2, 4, 6, 8, 10]; let new_roots = vec![6, 12];
blockstore.set_roots(new_roots.iter()).unwrap();
assert_eq!(&new_roots, &blockstore_roots(&blockstore));
// Specify both a start root and end slot
let (start, end) = (Some(12), Some(8));
let roots = vec![6, 8, 10, 12];
blockstore blockstore
.scan_and_fix_roots(&AtomicBool::new(false)) .scan_and_fix_roots(start, end, &AtomicBool::new(false))
.unwrap(); .unwrap();
assert_eq!( assert_eq!(&roots, &blockstore_roots(&blockstore));
&roots,
&blockstore // Specify only an end slot
.rooted_slot_iterator(0) let (start, end) = (None, Some(4));
.unwrap() let roots = vec![4, 6, 8, 10, 12];
.collect::<Vec<_>>(), blockstore
); .scan_and_fix_roots(start, end, &AtomicBool::new(false))
.unwrap();
assert_eq!(&roots, &blockstore_roots(&blockstore));
// Specify only a start slot
let (start, end) = (Some(12), None);
let roots = vec![0, 2, 4, 6, 8, 10, 12];
blockstore
.scan_and_fix_roots(start, end, &AtomicBool::new(false))
.unwrap();
assert_eq!(&roots, &blockstore_roots(&blockstore));
// Mark additional root
let new_roots = vec![16];
let roots = vec![0, 2, 4, 6, 8, 10, 12, 16];
blockstore.set_roots(new_roots.iter()).unwrap();
assert_eq!(&roots, &blockstore_roots(&blockstore));
// Leave both start and end unspecified
let (start, end) = (None, None);
let roots = vec![0, 2, 4, 6, 8, 10, 12, 14, 16];
blockstore
.scan_and_fix_roots(start, end, &AtomicBool::new(false))
.unwrap();
assert_eq!(&roots, &blockstore_roots(&blockstore));
// Subsequent calls should have no effect and return without error // Subsequent calls should have no effect and return without error
blockstore blockstore
.scan_and_fix_roots(&AtomicBool::new(false)) .scan_and_fix_roots(start, end, &AtomicBool::new(false))
.unwrap(); .unwrap();
assert_eq!( assert_eq!(&roots, &blockstore_roots(&blockstore));
&roots,
&blockstore
.rooted_slot_iterator(0)
.unwrap()
.collect::<Vec<_>>(),
);
} }
#[test] #[test]