Fix SlotMeta connected tracking (#28069)

Fix SlotMeta is_connected tracking

The tracking of connected status was previously based upon an assumption
that would be practically false for all validators. The connected status
of slots played into whether Blockstore would signal ReplayStage that it
had new shreds ready to be replayed. Prior to the change, we would never
signal and ReplayStage would always wait the entire duration of a 100ms
timeout before restarting its' main processing loop.

This commit introduces a change where we mark snapshot slots as
connected. A validator may not have a path all the way back to genesis
itself; however, snapshots are taken at known roots so we extend the
connected status to these slots. Once a node has been bootstrapped once
to have is connected, the logic persists in Blockstore such that all
children on the main fork also get their connected status updated
properly.
This commit is contained in:
steviez 2023-03-21 20:17:58 +08:00 committed by GitHub
parent 66da71fa7a
commit bc933c63ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 228 additions and 71 deletions

View File

@ -3364,6 +3364,53 @@ impl Blockstore {
);
Ok(())
}
/// Mark a root `slot` as connected, traverse `slot`'s children and update
/// the children's connected status if appropriate.
///
/// A ledger with a full path of blocks from genesis to the latest root will
/// have all of the rooted blocks marked as connected such that new blocks
/// could also be connected. However, starting from some root (such as from
/// a snapshot) is a valid way to join a cluster. For this case, mark this
/// root as connected such that the node that joined midway through can
/// have their slots considered connected.
pub fn set_and_chain_connected_on_root_and_next_slots(&self, root: Slot) -> Result<()> {
let mut root_meta = self
.meta(root)?
.unwrap_or_else(|| SlotMeta::new(root, None));
// If the slot was already connected, there is nothing to do as this slot's
// children are also assumed to be appropriately connected
if root_meta.is_connected() {
return Ok(());
}
info!(
"Marking slot {} and any full children slots as connected",
root
);
let mut write_batch = self.db.batch()?;
// Mark both connected bits on the root slot so that the flags for this
// slot match the flags of slots that become connected the typical way.
root_meta.set_parent_connected();
root_meta.set_connected();
write_batch.put::<cf::SlotMeta>(root_meta.slot, &root_meta)?;
let mut next_slots = VecDeque::from(root_meta.next_slots);
while !next_slots.is_empty() {
let slot = next_slots.pop_front().unwrap();
let mut meta = self.meta(slot)?.unwrap_or_else(|| {
panic!("Slot {slot} is a child but has no SlotMeta in blockstore")
});
if meta.set_parent_connected() {
next_slots.extend(meta.next_slots.iter());
}
write_batch.put::<cf::SlotMeta>(meta.slot, &meta)?;
}
self.db.write(write_batch)?;
Ok(())
}
}
// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
@ -3741,7 +3788,6 @@ fn handle_chaining_for_slot(
let meta = &slot_meta_entry.new_slot_meta;
let meta_backup = &slot_meta_entry.old_slot_meta;
{
let mut meta_mut = meta.borrow_mut();
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
@ -3778,44 +3824,32 @@ fn handle_chaining_for_slot(
}
}
// If this is a newly inserted slot, then we know the children of this slot were not previously
// connected to the trunk of the ledger. Thus if slot.is_connected is now true, we need to
// update all child slots with `is_connected` = true because these children are also now newly
// connected to trunk of the ledger
// If this is a newly completed slot and the parent is connected, then the
// slot is now connected. Mark the slot as connected, and then traverse the
// children to update their parent_connected and connected status.
let should_propagate_is_connected =
is_newly_completed_slot(&RefCell::borrow(meta), meta_backup)
&& RefCell::borrow(meta).is_connected();
&& RefCell::borrow(meta).is_parent_connected();
if should_propagate_is_connected {
// slot_function returns a boolean indicating whether to explore the children
// of the input slot
let slot_function = |slot: &mut SlotMeta| {
slot.set_connected();
// We don't want to set the is_connected flag on the children of non-full
// slots
slot.is_full()
};
meta.borrow_mut().set_connected();
traverse_children_mut(
db,
slot,
meta,
working_set,
new_chained_slots,
slot_function,
SlotMeta::set_parent_connected,
)?;
}
Ok(())
}
/// Traverse all the direct and indirect children slots and apply the specified
/// `slot_function`.
/// Traverse all the children (direct and indirect) of `slot_meta`, and apply
/// `slot_function` to each of the children (but not `slot_meta`).
///
/// Arguments:
/// `db`: the blockstore db that stores shreds and their metadata.
/// `slot`: starting slot to traverse.
/// `slot_meta`: the SlotMeta of the above `slot`.
/// `working_set`: a slot-id to SlotMetaWorkingSetEntry map which is used
/// to traverse the graph.
@ -3826,7 +3860,6 @@ fn handle_chaining_for_slot(
/// a given slot.
fn traverse_children_mut<F>(
db: &Database,
slot: Slot,
slot_meta: &Rc<RefCell<SlotMeta>>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
passed_visisted_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
@ -3835,25 +3868,18 @@ fn traverse_children_mut<F>(
where
F: Fn(&mut SlotMeta) -> bool,
{
let mut next_slots: VecDeque<(u64, Rc<RefCell<SlotMeta>>)> =
vec![(slot, slot_meta.clone())].into();
let slot_meta = slot_meta.borrow();
let mut next_slots: VecDeque<u64> = slot_meta.next_slots.to_vec().into();
while !next_slots.is_empty() {
let (_, current_slot) = next_slots.pop_front().unwrap();
// Check whether we should explore the children of this slot
if slot_function(&mut current_slot.borrow_mut()) {
let current_slot = &RefCell::borrow(&*current_slot);
for next_slot_index in current_slot.next_slots.iter() {
let next_slot = find_slot_meta_else_create(
db,
working_set,
passed_visisted_slots,
*next_slot_index,
)?;
next_slots.push_back((*next_slot_index, next_slot));
}
let slot = next_slots.pop_front().unwrap();
let meta_ref = find_slot_meta_else_create(db, working_set, passed_visisted_slots, slot)?;
let mut meta = meta_ref.borrow_mut();
if slot_function(&mut meta) {
meta.next_slots
.iter()
.for_each(|slot| next_slots.push_back(*slot));
}
}
Ok(())
}
@ -3864,15 +3890,14 @@ fn is_orphan(meta: &SlotMeta) -> bool {
}
// 1) Chain current_slot to the previous slot defined by prev_slot_meta
// 2) Determine whether to set the is_connected flag
fn chain_new_slot_to_prev_slot(
prev_slot_meta: &mut SlotMeta,
current_slot: Slot,
current_slot_meta: &mut SlotMeta,
) {
prev_slot_meta.next_slots.push(current_slot);
if prev_slot_meta.is_connected() && prev_slot_meta.is_full() {
current_slot_meta.set_connected();
if prev_slot_meta.is_connected() {
current_slot_meta.set_parent_connected();
}
}
@ -3882,18 +3907,19 @@ fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotM
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
}
/// Returns a boolean indicating whether a slot has received additional shreds
/// that can be replayed since the previous update to the slot's SlotMeta.
fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -> bool {
// We should signal that there are updates if we extended the chain of consecutive blocks starting
// from block 0, which is true iff:
// 1) The block with index prev_block_index is itself part of the trunk of consecutive blocks
// starting from block 0,
slot_meta.is_connected() &&
// AND either:
// 1) The slot didn't exist in the database before, and now we have a consecutive
// block for that slot
// First, this slot's parent must be connected in order to even consider
// starting replay; otherwise, the replayed results may not be valid.
slot_meta.is_parent_connected() &&
// Then,
// If the slot didn't exist in the db before, any consecutive shreds
// at the start of the slot are ready to be replayed.
((slot_meta_backup.is_none() && slot_meta.consumed != 0) ||
// OR
// 2) The slot did exist, but now we have a new consecutive block for that slot
// Or,
// If the slot has more consecutive shreds than it last did from the
// last update, those shreds are new and also ready to be replayed.
(slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed))
}
@ -4454,7 +4480,7 @@ pub mod tests {
solana_transaction_status::{
InnerInstruction, InnerInstructions, Reward, Rewards, TransactionTokenBalance,
},
std::{thread::Builder, time::Duration},
std::{cmp::Ordering, thread::Builder, time::Duration},
};
// used for tests only
@ -5388,7 +5414,7 @@ pub mod tests {
blockstore.insert_shreds(shreds1, None, false).unwrap();
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert!(meta1.next_slots.is_empty());
// Slot 1 is not trunk because slot 0 hasn't been inserted yet
// Slot 1 is not connected because slot 0 hasn't been inserted yet
assert!(!meta1.is_connected());
assert_eq!(meta1.parent_slot, Some(0));
assert_eq!(meta1.last_index, Some(shreds_per_slot as u64 - 1));
@ -5400,13 +5426,13 @@ pub mod tests {
blockstore.insert_shreds(shreds2, None, false).unwrap();
let meta2 = blockstore.meta(2).unwrap().unwrap();
assert!(meta2.next_slots.is_empty());
// Slot 2 is not trunk because slot 0 hasn't been inserted yet
// Slot 2 is not connected because slot 0 hasn't been inserted yet
assert!(!meta2.is_connected());
assert_eq!(meta2.parent_slot, Some(1));
assert_eq!(meta2.last_index, Some(shreds_per_slot as u64 - 1));
// Check the first slot again, it should chain to the second slot,
// but still isn't part of the trunk
// but still isn't connected.
let meta1 = blockstore.meta(1).unwrap().unwrap();
assert_eq!(meta1.next_slots, vec![2]);
assert!(!meta1.is_connected());
@ -5440,7 +5466,7 @@ pub mod tests {
let num_slots = 30;
let entries_per_slot = 5;
// Make a bunch of shreds and split by whether slot is even or odd
// Make some shreds and split based on whether the slot is odd or even.
let (shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() as u64 / num_slots;
let (even_slots, odd_slots): (Vec<_>, Vec<_>) =
@ -5466,8 +5492,10 @@ pub mod tests {
assert_eq!(meta.parent_slot, Some(slot - 1));
}
// Slot 0 is the only connected slot
assert!(!meta.is_connected() || meta.slot == 0);
// None of the slot should be connected, but since slot 0 is
// the special case, it will have parent_connected as true.
assert!(!meta.is_connected());
assert!(!meta.is_parent_connected() || slot == 0);
}
// Write the even slot shreds that we did not earlier
@ -5497,6 +5525,7 @@ pub mod tests {
#[test]
#[allow(clippy::cognitive_complexity)]
pub fn test_forward_chaining_is_connected() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
@ -5533,14 +5562,14 @@ pub mod tests {
}
// Ensure that each slot has their parent correct
// Additionally, slot 0 should be the only connected slot
if slot == 0 {
assert_eq!(meta.parent_slot, Some(0));
assert!(meta.is_connected());
} else {
assert_eq!(meta.parent_slot, Some(slot - 1));
assert!(!meta.is_connected());
}
// No slots should be connected yet, not even slot 0
// as slot 0 is still not full yet
assert!(!meta.is_connected());
assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
}
@ -5554,28 +5583,123 @@ pub mod tests {
for slot in 0..num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
if slot != num_slots - 1 {
assert_eq!(meta.next_slots, vec![slot + 1]);
} else {
assert!(meta.next_slots.is_empty());
}
if slot <= slot_index + 3 {
if slot < slot_index + 3 {
assert!(meta.is_full());
assert!(meta.is_connected());
} else {
assert!(!meta.is_connected());
}
if slot == 0 {
assert_eq!(meta.parent_slot, Some(0));
} else {
assert_eq!(meta.parent_slot, Some(slot - 1));
}
assert_eq!(meta.last_index, Some(shreds_per_slot as u64 - 1));
}
}
}
}
#[test]
fn test_set_and_chain_connected_on_root_and_next_slots() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
// Create enough entries to ensure 5 shreds result
let entries_per_slot = max_ticks_per_n_shreds(5, None);
let mut start_slot = 5;
// Start a chain from a slot not in blockstore, this is the case when
// node starts with no blockstore and downloads a snapshot. In this
// scenario, the slot will be marked connected despite its' parent not
// being connected (or existing) and not being full.
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
let slot_meta5 = blockstore.meta(start_slot).unwrap().unwrap();
assert!(!slot_meta5.is_full());
assert!(slot_meta5.is_parent_connected());
assert!(slot_meta5.is_connected());
let num_slots = 5;
// Insert some new slots and ensure they connect to the root correctly
start_slot += 1;
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
blockstore.insert_shreds(shreds, None, false).unwrap();
for slot in start_slot..start_slot + num_slots {
info!("Evaluating slot {}", slot);
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
// Chain connected on slots that are already connected, should just noop
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
// Start another chain that is disconnected from previous chain. But, insert
// a non-full slot and ensure this slot (and its' children) are not marked
// as connected.
start_slot += 2 * num_slots;
let (shreds, _) = make_many_slot_entries(start_slot, num_slots, entries_per_slot);
// Insert all shreds except for the shreds with index > 0 from non_full_slot
let non_full_slot = start_slot + num_slots / 2;
let (shreds, missing_shreds) = shreds
.into_iter()
.partition(|shred| shred.slot() != non_full_slot || shred.index() == 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
// Chain method hasn't been called yet, so none of these connected yet
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(!meta.is_parent_connected());
assert!(!meta.is_connected());
}
// Now chain from the new starting point
blockstore
.set_and_chain_connected_on_root_and_next_slots(start_slot)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
match slot.cmp(&non_full_slot) {
Ordering::Less => {
// These are fully connected as expected
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
Ordering::Equal => {
// Parent will be connected, but this slot not connected itself
assert!(meta.is_parent_connected());
assert!(!meta.is_connected());
}
Ordering::Greater => {
// All children are not connected either
assert!(!meta.is_parent_connected());
assert!(!meta.is_connected());
}
}
}
// Insert the missing shreds and ensure all slots connected now
blockstore
.insert_shreds(missing_shreds, None, false)
.unwrap();
for slot in start_slot..start_slot + num_slots {
let meta = blockstore.meta(slot).unwrap().unwrap();
assert!(meta.is_parent_connected());
assert!(meta.is_connected());
}
}
/*
#[test]
pub fn test_chaining_tree() {

View File

@ -253,14 +253,42 @@ impl SlotMeta {
Some(self.consumed) == self.last_index.map(|ix| ix + 1)
}
/// Returns a boolean indicating whether the meta is connected.
pub fn is_connected(&self) -> bool {
self.connected_flags.contains(ConnectedFlags::CONNECTED)
}
/// Mark the meta as connected.
pub fn set_connected(&mut self) {
assert!(self.is_parent_connected());
self.connected_flags.set(ConnectedFlags::CONNECTED, true);
}
/// Returns a boolean indicating whether the meta's parent is connected.
pub fn is_parent_connected(&self) -> bool {
self.connected_flags
.contains(ConnectedFlags::PARENT_CONNECTED)
}
/// Mark the meta's parent as connected.
/// If the meta is also full, the meta is now connected as well. Return a
/// boolean indicating whether the meta becamed connected from this call.
pub fn set_parent_connected(&mut self) -> bool {
// Already connected so nothing to do, bail early
if self.is_connected() {
return false;
}
self.connected_flags
.set(ConnectedFlags::PARENT_CONNECTED, true);
if self.is_full() {
self.set_connected();
}
self.is_connected()
}
/// Dangerous. Currently only needed for a local-cluster test
pub fn unset_parent(&mut self) {
self.parent_slot = None;
@ -275,7 +303,7 @@ impl SlotMeta {
let connected_flags = if slot == 0 {
// Slot 0 is the start, mark it as having its' parent connected
// such that slot 0 becoming full will be updated as connected
ConnectedFlags::CONNECTED
ConnectedFlags::PARENT_CONNECTED
} else {
ConnectedFlags::default()
};
@ -461,7 +489,8 @@ mod test {
#[test]
fn test_slot_meta_slot_zero_connected() {
let meta = SlotMeta::new(0 /* slot */, None /* parent */);
assert!(meta.is_connected());
assert!(meta.is_parent_connected());
assert!(!meta.is_connected());
}
#[test]

View File

@ -738,7 +738,8 @@ pub fn process_blockstore_from_root(
info!("Processing ledger from slot {}...", start_slot);
let now = Instant::now();
// ensure start_slot is rooted for correct replay
// Ensure start_slot is rooted for correct replay; also ensure start_slot and
// qualifying children are marked as connected
if blockstore.is_primary_access() {
blockstore
.mark_slots_as_if_rooted_normally_at_startup(
@ -746,6 +747,9 @@ pub fn process_blockstore_from_root(
true,
)
.expect("Couldn't mark start_slot as root on startup");
blockstore
.set_and_chain_connected_on_root_and_next_slots(bank.slot())
.expect("Couldn't mark start_slot as connected during startup")
} else {
info!(
"Starting slot {} isn't root and won't be updated due to being secondary blockstore access",