Refactor Blocktree for clarity and correctness (#5700)

* Refactor shreds to prevent insertion of any metadata on bad shreds

* Refactor fetching Index in blocktree

* Refactor get_slot_meta_entry

* Re-enable local cluster test

* cleanup

* Add tests for success/fail insertion of coding/data shreds

* Remove assert

* Fix and add tests for should_insert coding and data blobs
This commit is contained in:
carllin 2019-09-04 17:14:42 -07:00 committed by GitHub
parent b6da5a3f47
commit 7062fe4b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 555 additions and 248 deletions

View File

@ -67,6 +67,7 @@ db_imports! {kvs, Kvs, "kvstore"}
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub type SlotMetaWorkingSetEntry = (Rc<RefCell<SlotMeta>>, Option<SlotMeta>);
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
#[derive(Debug)]
@ -416,51 +417,29 @@ impl Blocktree {
let mut batch_processor = self.batch_processor.write().unwrap();
let mut write_batch = batch_processor.batch()?;
let mut just_inserted_data_shreds = HashMap::new();
let mut just_inserted_coding_shreds = HashMap::new();
let mut just_inserted_data_shreds = HashMap::new();
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
shreds.into_iter().for_each(|shred| {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let index_meta = index_working_set.entry(slot).or_insert_with(|| {
self.index_cf
.get(slot)
.unwrap()
.unwrap_or_else(|| Index::new(slot))
});
if let Shred::Coding(coding_shred) = &shred {
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
let pos = u64::from(coding_shred.header.position);
if shred_index >= pos {
let set_index = shred_index - pos;
self.insert_coding_shred(
set_index,
coding_shred.header.num_data_shreds as usize,
coding_shred.header.num_coding_shreds as usize,
&mut just_inserted_coding_shreds,
&mut erasure_metas,
index_meta,
shred,
&mut write_batch,
)
}
} else if self
.insert_data_shred(
&mut slot_meta_working_set,
if let Shred::Coding(_) = &shred {
self.check_insert_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&shred,
&mut write_batch,
)
.unwrap_or(false)
{
just_inserted_data_shreds.insert((slot, shred_index), shred);
&mut just_inserted_coding_shreds,
);
} else {
self.check_insert_data_shred(
shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
);
}
});
@ -473,10 +452,10 @@ impl Blocktree {
);
recovered_data.into_iter().for_each(|shred| {
let _ = self.insert_data_shred(
&mut slot_meta_working_set,
&mut index_working_set,
self.insert_recovered_data_shred(
&shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
);
});
@ -484,7 +463,7 @@ impl Blocktree {
// Handle chaining for the working set
handle_chaining(&self.db, &mut write_batch, &slot_meta_working_set)?;
let (should_signal, newly_completed_slots) = prepare_signals(
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders,
&mut write_batch,
@ -516,20 +495,165 @@ impl Blocktree {
Ok(())
}
fn insert_coding_shred(
fn insert_recovered_data_shred(
&self,
set_index: u64,
num_data: usize,
num_coding: usize,
prev_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_meta: &mut Index,
shred: Shred,
shred: &Shred,
index_working_set: &mut HashMap<u64, Index>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
) {
let slot = shred.slot();
let (index_meta, mut new_index_meta) =
get_index_meta_entry(&self.db, slot, index_working_set);
let (slot_meta_entry, mut new_slot_meta_entry) =
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
let insert_ok = {
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
let mut slot_meta = entry.0.borrow_mut();
self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch)
.is_ok()
};
if insert_ok {
new_index_meta.map(|n| index_working_set.insert(slot, n));
new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n));
}
}
fn check_insert_coding_shred(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, Index>,
write_batch: &mut WriteBatch,
just_inserted_coding_shreds: &mut HashMap<(u64, u64), Shred>,
) {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let (index_meta, mut new_index_meta) =
get_index_meta_entry(&self.db, slot, index_working_set);
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root)
&& self
.insert_coding_shred(erasure_metas, index_meta, &shred, write_batch)
.is_ok()
{
just_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
}
}
fn check_insert_data_shred(
&self,
shred: Shred,
index_working_set: &mut HashMap<u64, Index>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
) {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let (index_meta, mut new_index_meta) =
get_index_meta_entry(&self.db, slot, index_working_set);
let (slot_meta_entry, mut new_slot_meta_entry) =
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
let insert_success = {
let index_meta = index_meta.unwrap_or_else(|| new_index_meta.as_mut().unwrap());
let entry = slot_meta_entry.unwrap_or_else(|| new_slot_meta_entry.as_mut().unwrap());
let mut slot_meta = entry.0.borrow_mut();
if Blocktree::should_insert_data_shred(
&shred,
&slot_meta,
index_meta.data(),
&self.last_root,
) {
self.insert_data_shred(&mut slot_meta, index_meta.data_mut(), &shred, write_batch)
.is_ok()
} else {
false
}
};
if insert_success {
just_inserted_data_shreds.insert((slot, shred_index), shred);
new_index_meta.map(|n| index_working_set.insert(slot, n));
new_slot_meta_entry.map(|n| slot_meta_working_set.insert(slot, n));
}
}
fn should_insert_coding_shred(
shred: &Shred,
coding_index: &CodingIndex,
last_root: &RwLock<u64>,
) -> bool {
let slot = shred.slot();
let shred_index = shred.index();
let (pos, num_coding) = {
if let Shred::Coding(coding_shred) = &shred {
(
u32::from(coding_shred.header.position),
coding_shred.header.num_coding_shreds,
)
} else {
panic!("should_insert_coding_shred called with non-coding shred")
}
};
if shred_index < pos {
return false;
}
let set_index = shred_index - pos;
!(num_coding == 0
|| pos >= u32::from(num_coding)
|| std::u32::MAX - set_index < u32::from(num_coding) - 1
|| coding_index.is_present(u64::from(shred_index))
|| slot <= *last_root.read().unwrap())
}
fn insert_coding_shred(
&self,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_meta: &mut Index,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let (num_data, num_coding, pos) = {
if let Shred::Coding(coding_shred) = &shred {
(
coding_shred.header.num_data_shreds as usize,
coding_shred.header.num_coding_shreds as usize,
u64::from(coding_shred.header.position),
)
} else {
panic!("insert_coding_shred called with non-coding shred")
}
};
// Assert guaranteed by integrity checks on the shred that happen before
// `insert_coding_shred` is called
if shred_index < pos {
error!("Due to earlier validation, shred index must be >= pos");
return Err(Error::BlocktreeError(BlocktreeError::InvalidShredData(
Box::new(bincode::ErrorKind::Custom("shred index < pos".to_string())),
)));
}
let set_index = shred_index - pos;
let erasure_config = ErasureConfig::new(num_data, num_coding);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
@ -548,26 +672,109 @@ impl Blocktree {
);
}
let serialized_shred = bincode::serialize(&shred).unwrap();
let inserted =
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &serialized_shred);
if inserted.is_ok() {
index_meta.coding_mut().set_present(shred_index, true);
let serialized_shred = bincode::serialize(shred).unwrap();
// `or_insert_with` used to prevent stack overflow
prev_inserted_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &serialized_shred)?;
index_meta.coding_mut().set_present(shred_index, true);
Ok(())
}
fn should_insert_data_shred(
shred: &Shred,
slot_meta: &SlotMeta,
data_index: &DataIndex,
last_root: &RwLock<u64>,
) -> bool {
let shred_index = u64::from(shred.index());
let slot = shred.slot();
let last_in_slot = if let Shred::LastInSlot(_) = shred {
debug!("got last in slot");
true
} else {
false
};
// Check that the data shred doesn't already exist in blocktree
if shred_index < slot_meta.consumed || data_index.is_present(shred_index) {
return false;
}
// Check that we do not receive shred_index >= than the last_index
// for the slot
let last_index = slot_meta.last_index;
if shred_index >= last_index {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received index {} >= slot.last_index {}",
shred_index, last_index
),
String
)
);
return false;
}
// Check that we do not receive a blob with "last_index" true, but shred_index
// less than our current received
if last_in_slot && shred_index < slot_meta.received {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received shred_index {} < slot.received {}",
shred_index, slot_meta.received
),
String
)
);
return false;
}
let last_root = *last_root.read().unwrap();
if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) {
// Check that the parent_slot < slot
if slot_meta.parent_slot >= slot {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received blob with parent_slot {} >= slot {}",
slot_meta.parent_slot, slot
),
String
)
);
return false;
}
// Check that the blob is for a slot that is past the root
if slot <= last_root {
return false;
}
// Ignore blobs that chain to slots before the last root
if slot_meta.parent_slot < last_root {
return false;
}
}
true
}
fn insert_data_shred(
&self,
mut slot_meta_working_set: &mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
index_working_set: &mut HashMap<u64, Index>,
slot_meta: &mut SlotMeta,
data_index: &mut DataIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<bool> {
) -> Result<()> {
let slot = shred.slot();
let index = u64::from(shred.index());
let parent = shred.parent();
@ -579,9 +786,6 @@ impl Blocktree {
false
};
let entry = get_slot_meta_entry(&self.db, &mut slot_meta_working_set, slot, parent);
let slot_meta = &mut entry.0.borrow_mut();
if is_orphan(slot_meta) {
slot_meta.parent_slot = parent;
}
@ -595,45 +799,26 @@ impl Blocktree {
.unwrap_or(false)
};
let index_meta = index_working_set
.get_mut(&slot)
.expect("Index must be present for all data shreds")
.data_mut();
let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1;
if !index_meta.is_present(index)
&& should_insert(
slot_meta,
index,
slot,
last_in_slot,
check_data_cf,
*self.last_root.read().unwrap(),
)
{
let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1;
while index_meta.is_present(current_index) || check_data_cf(slot, current_index) {
current_index += 1;
}
current_index
} else {
slot_meta.consumed
};
let serialized_shred = bincode::serialize(shred).unwrap();
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?;
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
index_meta.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(true)
while data_index.is_present(current_index) || check_data_cf(slot, current_index) {
current_index += 1;
}
current_index
} else {
debug!("didn't insert shred");
Ok(false)
}
slot_meta.consumed
};
let serialized_shred = bincode::serialize(shred).unwrap();
// Commit step: commit all changes to the mutable structures at once, or none at all.
// We don't want only a subset of these changes going through.
write_batch.put_bytes::<cf::ShredData>((slot, index), &serialized_shred)?;
update_slot_meta(last_in_slot, slot_meta, index, new_consumed);
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(())
}
pub fn get_data_shred(&self, slot: u64, index: u64) -> Result<Option<Vec<u8>>> {
@ -1086,112 +1271,65 @@ fn update_slot_meta(
};
}
fn get_slot_meta_entry<'a>(
fn get_index_meta_entry<'a>(
db: &Database,
slot_meta_working_set: &'a mut HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
slot: u64,
parent_slot: u64,
) -> &'a mut (Rc<RefCell<SlotMeta>>, Option<SlotMeta>) {
let meta_cf = db.column::<cf::SlotMeta>();
index_working_set: &'a mut HashMap<u64, Index>,
) -> (Option<&'a mut Index>, Option<Index>) {
let index_cf = db.column::<cf::Index>();
index_working_set
.get_mut(&slot)
.map(|i| (Some(i), None))
.unwrap_or_else(|| {
let newly_inserted_meta = Some(
index_cf
.get(slot)
.unwrap()
.unwrap_or_else(|| Index::new(slot)),
);
// Check if we've already inserted the slot metadata for this shred's slot
slot_meta_working_set.entry(slot).or_insert_with(|| {
// Store a 2-tuple of the metadata (working copy, backup copy)
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
let backup = Some(meta.clone());
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
// during the chaining process, see the function find_slot_meta_in_cached_state()
// for details. Slots that are orphans are missing a parent_slot, so we should
// fill in the parent now that we know it.
if is_orphan(&meta) {
meta.parent_slot = parent_slot;
}
(Rc::new(RefCell::new(meta)), backup)
} else {
(
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
None,
)
}
})
(None, newly_inserted_meta)
})
}
fn should_insert<F>(
slot_meta: &SlotMeta,
index: u64,
fn get_slot_meta_entry<'a>(
db: &Database,
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
slot: u64,
last_in_slot: bool,
db_check: F,
last_root: u64,
) -> bool
where
F: Fn(u64, u64) -> bool,
{
// Check that the index doesn't already exist
if index < slot_meta.consumed || db_check(slot, index) {
return false;
}
// Check that we do not receive index >= than the last_index
// for the slot
let last_index = slot_meta.last_index;
if index >= last_index {
datapoint_error!(
"blocktree_error",
(
"error",
format!("Received index {} >= slot.last_index {}", index, last_index),
String
)
);
return false;
}
// Check that we do not receive a shred with "last_index" true, but index
// less than our current received
if last_in_slot && index < slot_meta.received {
datapoint_error!(
"blocktree_error",
(
"error",
format!(
"Received index {} < slot.received {}",
index, slot_meta.received
),
String
)
);
return false;
}
parent_slot: u64,
) -> (
Option<&'a mut SlotMetaWorkingSetEntry>,
Option<SlotMetaWorkingSetEntry>,
) {
let meta_cf = db.column::<cf::SlotMeta>();
if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) {
// Check that the parent_slot < slot
if slot_meta.parent_slot >= slot {
datapoint_error!(
"blocktree_error",
// Check if we've already inserted the slot metadata for this blob's slot
slot_meta_working_set
.get_mut(&slot)
.map(|s| (Some(s), None))
.unwrap_or_else(|| {
// Store a 2-tuple of the metadata (working copy, backup copy)
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
let backup = Some(meta.clone());
// If parent_slot == std::u64::MAX, then this is one of the orphans inserted
// during the chaining process, see the function find_slot_meta_in_cached_state()
// for details. Slots that are orphans are missing a parent_slot, so we should
// fill in the parent now that we know it.
if is_orphan(&meta) {
meta.parent_slot = parent_slot;
}
(None, Some((Rc::new(RefCell::new(meta)), backup)))
} else {
(
"error",
format!(
"Received shred with parent_slot {} >= slot {}",
slot_meta.parent_slot, slot
),
String
None,
Some((
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
None,
)),
)
);
return false;
}
// Check that the shred is for a slot that is past the root
if slot <= last_root {
return false;
}
// Ignore shreds that chain to slots before the last root
if slot_meta.parent_slot < last_root {
return false;
}
}
true
}
})
}
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: u64, last_root: u64) -> bool {
@ -1239,8 +1377,8 @@ fn send_signals(
Ok(())
}
fn prepare_signals(
slot_meta_working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
fn commit_slot_meta_working_set(
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
completed_slots_senders: &[SyncSender<Vec<u64>>],
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
@ -1270,7 +1408,7 @@ fn prepare_signals(
// 3) Create a dummy orphan slot in the database
fn find_slot_meta_else_create<'a>(
db: &Database,
working_set: &'a HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_index: u64,
) -> Result<Rc<RefCell<SlotMeta>>> {
@ -1306,7 +1444,7 @@ fn find_slot_meta_in_db_else_create<'a>(
// Find the slot metadata in the cache of dirty slot metadata we've previously touched
fn find_slot_meta_in_cached_state<'a>(
working_set: &'a HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: u64,
) -> Result<Option<Rc<RefCell<SlotMeta>>>> {
@ -1339,7 +1477,7 @@ fn get_slot_consecutive_shreds<'a>(
fn handle_chaining(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
) -> Result<()> {
let mut new_chained_slots = HashMap::new();
let working_set_slots: Vec<_> = working_set.iter().map(|s| *s.0).collect();
@ -1358,7 +1496,7 @@ fn handle_chaining(
fn handle_chaining_for_slot(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: u64,
) -> Result<()> {
@ -1438,7 +1576,7 @@ fn traverse_children_mut<F>(
db: &Database,
slot: u64,
slot_meta: &Rc<RefCell<(SlotMeta)>>,
working_set: &HashMap<u64, (Rc<RefCell<SlotMeta>>, Option<SlotMeta>)>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_function: F,
) -> Result<()>
@ -1620,6 +1758,7 @@ pub fn entries_to_test_shreds(
pub mod tests {
use super::*;
use crate::entry::{create_ticks, make_tiny_test_entries, Entry};
use crate::shred::CodingShred;
use itertools::Itertools;
use rand::seq::SliceRandom;
use rand::thread_rng;
@ -2965,62 +3104,230 @@ pub mod tests {
}
#[test]
pub fn test_should_insert_shred() {
let (mut shreds, _) = make_slot_entries(0, 0, 100);
pub fn test_should_insert_data_shred() {
let (shreds, _) = make_slot_entries(0, 0, 100);
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let index_cf = blocktree.db.column::<cf::Index>();
let last_root = RwLock::new(0);
// Insert the first 5 shreds, we don't have a "is_last" shred yet
let shreds1 = shreds.drain(0..5).collect_vec();
blocktree.insert_shreds(shreds1).unwrap();
blocktree.insert_shreds(shreds[0..5].to_vec()).unwrap();
let data_cf = blocktree.db.column::<cf::ShredData>();
let check_data_cf = |slot, index| {
data_cf
.get_bytes((slot, index))
.map(|opt| opt.is_some())
.unwrap_or(false)
};
// Trying to insert a shred less than consumed should fail
// Trying to insert a shred less than `slot_meta.consumed` should fail
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(!should_insert(&slot_meta, 3, 0, false, check_data_cf, 0));
assert!(!Blocktree::should_insert_data_shred(
&shreds[1],
&slot_meta,
index.data(),
&last_root
));
// Trying to insert the same shred again should fail
// skip over shred 5
let shreds1 = shreds.drain(1..2).collect_vec();
blocktree.insert_shreds(shreds1).unwrap();
// skip over shred 5 so the `slot_meta.consumed` doesn't increment
blocktree.insert_shreds(shreds[6..7].to_vec()).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert!(!should_insert(&slot_meta, 6, 0, false, check_data_cf, 0));
let index = index_cf.get(0).unwrap().unwrap();
assert!(!Blocktree::should_insert_data_shred(
&shreds[6],
&slot_meta,
index.data(),
&last_root
));
// Trying to insert another "is_last" shred with index < the received index
// should fail
// skip over shred 5 and 7
let shreds1 = shreds.drain(2..3).collect_vec();
blocktree.insert_shreds(shreds1).unwrap();
// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blocktree.insert_shreds(shreds[8..9].to_vec()).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
assert!(!should_insert(&slot_meta, 7, 0, true, check_data_cf, 0));
let shred7 = {
if let Shred::Data(ref s) = shreds[7] {
Shred::LastInSlot(s.clone())
} else {
panic!("Shred in unexpected format")
}
};
assert!(!Blocktree::should_insert_data_shred(
&shred7,
&slot_meta,
index.data(),
&last_root
));
// Insert all pending shreds
let mut shred8 = shreds[8].clone();
blocktree.insert_shreds(shreds).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
// Trying to insert a shred with index > the "is_last" shred should fail
assert!(!should_insert(
if let Shred::Data(ref mut s) = shred8 {
s.header.common_header.slot = slot_meta.last_index + 1;
} else {
panic!("Shred in unexpected format")
}
assert!(!Blocktree::should_insert_data_shred(
&shred7,
&slot_meta,
slot_meta.last_index + 1,
0,
true,
check_data_cf,
0
index.data(),
&last_root
));
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_should_insert_coding_shred() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let index_cf = blocktree.db.column::<cf::Index>();
let last_root = RwLock::new(0);
let mut shred = CodingShred::default();
let slot = 1;
shred.header.position = 10;
shred.header.common_header.index = 11;
shred.header.common_header.slot = 1;
shred.header.num_coding_shreds = shred.header.position + 1;
let coding_shred = Shred::Coding(shred.clone());
// Insert a good coding shred
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
Index::new(slot).coding(),
&last_root
));
// Insertion should succeed
blocktree.insert_shreds(vec![coding_shred.clone()]).unwrap();
// Trying to insert the same shred again should fail
{
let index = index_cf
.get(shred.header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
}
shred.header.common_header.index += 1;
// Establish a baseline that works
{
let index = index_cf
.get(shred.header.common_header.slot)
.unwrap()
.unwrap();
assert!(Blocktree::should_insert_coding_shred(
&Shred::Coding(shred.clone()),
index.coding(),
&last_root
));
}
// Trying to insert a shred with index < position should fail
{
let mut shred_ = shred.clone();
shred_.header.common_header.index = (shred_.header.position - 1).into();
let index = index_cf
.get(shred_.header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
index.coding(),
&last_root
));
}
// Trying to insert shred with num_coding == 0 should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 0;
let index = index_cf
.get(shred_.header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
index.coding(),
&last_root
));
}
// Trying to insert shred with pos >= num_coding should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = shred_.header.position;
let index = index_cf
.get(shred_.header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
index.coding(),
&last_root
));
}
// Trying to insert with set_index with num_coding that would imply the last blob
// has index > u32::MAX should fail
{
let mut shred_ = shred.clone();
shred_.header.num_coding_shreds = 3;
shred_.header.common_header.index = std::u32::MAX - 1;
shred_.header.position = 0;
let index = index_cf
.get(shred_.header.common_header.slot)
.unwrap()
.unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_.clone()),
index.coding(),
&last_root
));
// Decreasing the number of num_coding_shreds will put it within the allowed limit
shred_.header.num_coding_shreds = 2;
let coding_shred = Shred::Coding(shred_);
assert!(Blocktree::should_insert_coding_shred(
&coding_shred,
index.coding(),
&last_root
));
// Insertion should succeed
blocktree.insert_shreds(vec![coding_shred]).unwrap();
}
// Trying to insert value into slot <= than last root should fail
{
let mut shred_ = shred.clone();
let index = index_cf
.get(shred_.header.common_header.slot)
.unwrap()
.unwrap();
shred_.header.common_header.slot = *last_root.read().unwrap();
assert!(!Blocktree::should_insert_coding_shred(
&Shred::Coding(shred_),
index.coding(),
&last_root
));
}
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_multiple_is_last() {
let (shreds, _) = make_slot_entries(0, 0, 20);

View File

@ -12,7 +12,7 @@ use std::io::{Error as IOError, ErrorKind, Write};
use std::sync::Arc;
use std::{cmp, io};
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub enum Shred {
FirstInSlot(FirstDataShred),
FirstInFECSet(DataShred),
@ -130,7 +130,7 @@ impl Shred {
}
/// A common header that is present at start of every shred
#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct ShredCommonHeader {
pub signature: Signature,
pub slot: u64,
@ -138,7 +138,7 @@ pub struct ShredCommonHeader {
}
/// A common header that is present at start of every data shred
#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct DataShredHeader {
_reserved: CodingShredHeader,
pub common_header: ShredCommonHeader,
@ -147,14 +147,14 @@ pub struct DataShredHeader {
}
/// The first data shred also has parent slot value in it
#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct FirstDataShredHeader {
pub data_header: DataShredHeader,
pub parent: u64,
}
/// The coding shred header has FEC information
#[derive(Serialize, Deserialize, Default, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)]
pub struct CodingShredHeader {
pub common_header: ShredCommonHeader,
pub num_data_shreds: u16,
@ -163,19 +163,19 @@ pub struct CodingShredHeader {
pub payload: Vec<u8>,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct FirstDataShred {
pub header: FirstDataShredHeader,
pub payload: Vec<u8>,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct DataShred {
pub header: DataShredHeader,
pub payload: Vec<u8>,
}
#[derive(Serialize, Deserialize, PartialEq, Debug)]
#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)]
pub struct CodingShred {
pub header: CodingShredHeader,
}
@ -454,7 +454,7 @@ impl Shredder {
first_shred
}
fn new_coding_shred(
pub fn new_coding_shred(
slot: u64,
index: u32,
num_data: usize,

View File

@ -303,7 +303,6 @@ fn test_listener_startup() {
#[allow(unused_attributes)]
#[test]
#[serial]
#[ignore]
fn test_snapshots_blocktree_floor() {
// First set up the cluster with 1 snapshotting leader
let snapshot_interval_slots = 10;
@ -347,7 +346,7 @@ fn test_snapshots_blocktree_floor() {
fs::hard_link(tar, &validator_tar_path).unwrap();
let slot_floor = snapshot_utils::bank_slot_from_archive(&validator_tar_path).unwrap();
// Start up a new node from a snapshot, wait for it to catchup with the leader
// Start up a new node from a snapshot
let validator_stake = 5;
cluster.add_validator(
&validator_snapshot_test_config.validator_config,
@ -361,7 +360,7 @@ fn test_snapshots_blocktree_floor() {
let validator_client = cluster.get_validator_client(&validator_id).unwrap();
let mut current_slot = 0;
// Make sure this validator can get repaired past the first few warmup epochs
// Let this validator run a while with repair
let target_slot = slot_floor + 40;
while current_slot <= target_slot {
trace!("current_slot: {}", current_slot);
@ -380,6 +379,7 @@ fn test_snapshots_blocktree_floor() {
// Skip the zeroth slot in blocktree that the ledger is initialized with
let (first_slot, _) = blocktree.slot_meta_iterator(1).unwrap().next().unwrap();
assert_eq!(first_slot, slot_floor);
}