Purge slots greater than new last index (#16071)

This commit is contained in:
carllin 2021-05-26 16:12:57 -07:00 committed by GitHub
parent cbce440af4
commit 52dccc656a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 351 additions and 51 deletions

View File

@ -270,13 +270,6 @@ where
}
};
if shred_filter(&shred, last_root) {
// Mark slot as dead if the current shred is on the boundary
// of max shreds per slot. However, let the current shred
// get retransmitted. It'll allow peer nodes to see this shred
// and trigger them to mark the slot as dead.
if shred.index() >= (MAX_DATA_SHREDS_PER_SLOT - 1) as u32 {
let _ = blockstore.set_dead_slot(shred.slot());
}
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
Some((shred, repair_info))

View File

@ -1183,7 +1183,23 @@ impl Blockstore {
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
handle_duplicate(shred);
return Err(InsertDataShredError::Exists);
} else if !self.should_insert_data_shred(
}
if shred.last_in_slot() && shred_index < slot_meta.received && !slot_meta.is_full() {
// We got a last shred < slot_meta.received, which signals there's an alternative,
// shorter version of the slot. Because also `!slot_meta.is_full()`, then this
// means, for the current version of the slot, we might never get all the
// shreds < the current last index, never replay this slot, and make no
// progress (for instance if a leader sends an additional detached "last index"
// shred with a very high index, but none of the intermediate shreds). Ideally, we would
// just purge all shreds > the new last index slot, but because replay may have already
// replayed entries past the newly detected "last" shred, then mark the slot as dead
// and wait for replay to dump and repair the correct version.
warn!("Received *last* shred index {} less than previous shred index {}, and slot {} is not full, marking slot dead", shred_index, slot_meta.received, slot);
write_batch.put::<cf::DeadSlots>(slot, &true).unwrap();
}
if !self.should_insert_data_shred(
&shred,
slot_meta,
just_inserted_data_shreds,
@ -2553,14 +2569,18 @@ impl Blockstore {
start_index: u64,
allow_dead_slots: bool,
) -> Result<(Vec<Entry>, u64, bool)> {
let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?;
// Check if the slot is dead *after* fetching completed ranges to avoid a race
// where a slot is marked dead by another thread before the completed range query finishes.
// This should be sufficient because full slots will never be marked dead from another thread,
// this can only happen during entry processing during replay stage.
if self.is_dead(slot) && !allow_dead_slots {
return Err(BlockstoreError::DeadSlot);
}
let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?;
if completed_ranges.is_empty() {
} else if completed_ranges.is_empty() {
return Ok((vec![], 0, false));
}
let slot_meta = slot_meta.unwrap();
let num_shreds = completed_ranges
.last()
@ -3778,7 +3798,7 @@ pub mod tests {
};
use solana_storage_proto::convert::generated;
use solana_transaction_status::{InnerInstructions, Reward, Rewards, TransactionTokenBalance};
use std::time::Duration;
use std::{sync::mpsc::channel, thread::Builder, time::Duration};
// used for tests only
pub(crate) fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> {
@ -4361,44 +4381,6 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_duplicate() {
// Create RocksDb ledger
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// Make duplicate entries and shreds
let num_unique_entries = 10;
let (mut original_shreds, original_entries) =
make_slot_entries(0, 0, num_unique_entries);
// Discard first shred
original_shreds.remove(0);
blockstore
.insert_shreds(original_shreds, None, false)
.unwrap();
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), vec![]);
let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true, 0);
let num_shreds = duplicate_shreds.len() as u64;
blockstore
.insert_shreds(duplicate_shreds, None, false)
.unwrap();
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
let meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_data_set_completed_on_insert() {
let ledger_path = get_tmp_ledger_path!();
@ -8189,6 +8171,58 @@ pub mod tests {
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_same_slot_last_index() {
// Create RocksDb ledger
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
// Create enough entries to ensure there are at least two shreds created
let num_unique_entries = max_ticks_per_n_shreds(1, None) + 1;
let (mut original_shreds, original_entries) =
make_slot_entries(0, 0, num_unique_entries);
// Discard first shred, so that the slot is not full
assert!(original_shreds.len() > 1);
let last_index = original_shreds.last().unwrap().index() as u64;
original_shreds.remove(0);
// Insert the same shreds, including the last shred specifically, multiple
// times
for _ in 0..10 {
blockstore
.insert_shreds(original_shreds.clone(), None, false)
.unwrap();
let meta = blockstore.meta(0).unwrap().unwrap();
assert!(!blockstore.is_dead(0));
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), vec![]);
assert_eq!(meta.consumed, 0);
assert_eq!(meta.received, last_index + 1);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, last_index);
assert!(!blockstore.is_full(0));
}
let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true, 0);
let num_shreds = duplicate_shreds.len() as u64;
blockstore
.insert_shreds(duplicate_shreds, None, false)
.unwrap();
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
let meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
assert!(blockstore.is_full(0));
assert!(!blockstore.is_dead(0));
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_duplicate_last_index() {
let num_shreds = 2;
@ -8208,4 +8242,273 @@ pub mod tests {
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_duplicate_last_index_mark_dead() {
let num_shreds = 10;
let smaller_last_shred_index = 5;
let larger_last_shred_index = 8;
let setup_test_shreds = |slot: Slot| -> Vec<Shred> {
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let (mut shreds, _) = make_slot_entries(slot, 0, num_entries);
shreds[smaller_last_shred_index].set_last_in_slot();
shreds[larger_last_shred_index].set_last_in_slot();
shreds
};
let get_expected_slot_meta_and_index_meta =
|blockstore: &Blockstore, shreds: Vec<Shred>| -> (SlotMeta, Index) {
let slot = shreds[0].slot();
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
let meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(meta.consumed, shreds.len() as u64);
let shreds_index = blockstore.get_index(slot).unwrap().unwrap();
for i in 0..shreds.len() as u64 {
assert!(shreds_index.data().is_present(i));
}
// Cleanup the slot
blockstore
.run_purge(slot, slot, PurgeType::PrimaryIndex)
.expect("Purge database operations failed");
assert!(blockstore.meta(slot).unwrap().is_none());
(meta, shreds_index)
};
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let mut slot = 0;
let shreds = setup_test_shreds(slot);
// Case 1: Insert in the same batch. Since we're inserting the shreds in order,
// any shreds > smaller_last_shred_index will not be inserted. Slot is not marked
// as dead because no slots > the first "last" index shred are inserted before
// the "last" index shred itself is inserted.
let (expected_slot_meta, expected_index) = get_expected_slot_meta_and_index_meta(
&blockstore,
shreds[..=smaller_last_shred_index].to_vec(),
);
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
assert!(blockstore.get_duplicate_slot(slot).is_some());
assert!(!blockstore.is_dead(slot));
for i in 0..num_shreds {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
shreds[i as usize].payload
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
}
}
let mut meta = blockstore.meta(slot).unwrap().unwrap();
meta.first_shred_timestamp = expected_slot_meta.first_shred_timestamp;
assert_eq!(meta, expected_slot_meta);
assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index);
// Case 2: Inserting a duplicate with an even smaller last shred index should not
// mark the slot as dead since the Slotmeta is full.
let mut even_smaller_last_shred_duplicate =
shreds[smaller_last_shred_index - 1].clone();
even_smaller_last_shred_duplicate.set_last_in_slot();
// Flip a byte to create a duplicate shred
even_smaller_last_shred_duplicate.payload[0] =
std::u8::MAX - even_smaller_last_shred_duplicate.payload[0];
assert!(blockstore
.is_shred_duplicate(
slot,
even_smaller_last_shred_duplicate.index(),
&even_smaller_last_shred_duplicate.payload,
true
)
.is_some());
blockstore
.insert_shreds(vec![even_smaller_last_shred_duplicate], None, false)
.unwrap();
assert!(!blockstore.is_dead(slot));
for i in 0..num_shreds {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
shreds[i as usize].payload
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
}
}
let mut meta = blockstore.meta(slot).unwrap().unwrap();
meta.first_shred_timestamp = expected_slot_meta.first_shred_timestamp;
assert_eq!(meta, expected_slot_meta);
assert_eq!(blockstore.get_index(slot).unwrap().unwrap(), expected_index);
// Case 3: Insert shreds in reverse so that consumed will not be updated. Now on insert, the
// the slot should be marked as dead
slot += 1;
let mut shreds = setup_test_shreds(slot);
shreds.reverse();
blockstore
.insert_shreds(shreds.clone(), None, false)
.unwrap();
assert!(blockstore.is_dead(slot));
// All the shreds other than the two last index shreds because those two
// are marked as last, but less than the first received index == 10.
// The others will be inserted even after the slot is marked dead on attempted
// insert of the first last_index shred since dead slots can still be
// inserted into.
for i in 0..num_shreds {
let shred_to_check = &shreds[i as usize];
let shred_index = shred_to_check.index() as u64;
if shred_index != smaller_last_shred_index as u64
&& shred_index != larger_last_shred_index as u64
{
assert_eq!(
blockstore
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
shred_to_check.payload
);
} else {
assert!(blockstore
.get_data_shred(slot, shred_index)
.unwrap()
.is_none());
}
}
// Case 4: Same as Case 3, but this time insert the shreds one at a time to test that the clearing
// of data shreds works even after they've been committed
slot += 1;
let mut shreds = setup_test_shreds(slot);
shreds.reverse();
for shred in shreds.clone() {
blockstore.insert_shreds(vec![shred], None, false).unwrap();
}
assert!(blockstore.is_dead(slot));
// All the shreds will be inserted since dead slots can still be inserted into.
for i in 0..num_shreds {
let shred_to_check = &shreds[i as usize];
let shred_index = shred_to_check.index() as u64;
if shred_index != smaller_last_shred_index as u64
&& shred_index != larger_last_shred_index as u64
{
assert_eq!(
blockstore
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
shred_to_check.payload
);
} else {
assert!(blockstore
.get_data_shred(slot, shred_index)
.unwrap()
.is_none());
}
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_slot_entries_dead_slot_race() {
let setup_test_shreds = move |slot: Slot| -> Vec<Shred> {
let num_shreds = 10;
let middle_shred_index = 5;
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let (shreds, _) = make_slot_entries(slot, 0, num_entries);
// Reverse shreds so that last shred gets inserted first and sets meta.received
let mut shreds: Vec<Shred> = shreds.into_iter().rev().collect();
// Push the real middle shred to the end of the shreds list
shreds.push(shreds[middle_shred_index].clone());
// Set the middle shred as a last shred to cause the slot to be marked dead
shreds[middle_shred_index].set_last_in_slot();
shreds
};
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let (slot_sender, slot_receiver) = channel();
let (shred_sender, shred_receiver) = channel::<Vec<Shred>>();
let (signal_sender, signal_receiver) = channel();
let t_entry_getter = {
let blockstore = blockstore.clone();
let signal_sender = signal_sender.clone();
Builder::new()
.spawn(move || {
while let Ok(slot) = slot_receiver.recv() {
match blockstore.get_slot_entries_with_shred_info(slot, 0, false) {
Ok((_entries, _num_shreds, is_full)) => {
if is_full {
signal_sender
.send(Err(IoError::new(
ErrorKind::Other,
"got full slot entries for dead slot",
)))
.unwrap();
}
}
Err(err) => {
assert_matches!(err, BlockstoreError::DeadSlot);
}
}
signal_sender.send(Ok(())).unwrap();
}
})
.unwrap()
};
let t_shred_inserter = Builder::new()
.spawn(move || {
while let Ok(shreds) = shred_receiver.recv() {
let slot = shreds[0].slot();
// Grab this lock to block `get_slot_entries` before it fetches completed datasets
// and then mark the slot as dead, but full, by inserting carefully crafted shreds.
let _lowest_cleanup_slot = blockstore.lowest_cleanup_slot.write().unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(blockstore.get_duplicate_slot(slot).is_some());
assert!(blockstore.is_dead(slot));
assert!(blockstore.meta(slot).unwrap().unwrap().is_full());
signal_sender.send(Ok(())).unwrap();
}
})
.unwrap();
for slot in 0..100 {
let shreds = setup_test_shreds(slot);
// Start a task on each thread to trigger a race condition
slot_sender.send(slot).unwrap();
shred_sender.send(shreds).unwrap();
// Check that each thread processed their task before continuing
for _ in 1..=2 {
let res = signal_receiver.recv().unwrap();
assert!(res.is_ok(), "race condition: {:?}", res);
}
}
drop(slot_sender);
drop(shred_sender);
let handles = vec![t_entry_getter, t_shred_inserter];
for handle in handles {
assert!(handle.join().is_ok());
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}

View File

@ -125,6 +125,10 @@ impl ShredIndex {
self.set_present(idx, present);
}
}
pub fn largest(&self) -> Option<u64> {
self.index.iter().rev().next().copied()
}
}
impl SlotMeta {