Plumb ability to handle duplicate shreds into shred insertion functions (#7784)

This commit is contained in:
carllin 2020-01-14 15:37:53 -08:00 committed by GitHub
parent 5f4e0c7e3e
commit 721c4378c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 94 additions and 73 deletions

View File

@ -579,12 +579,16 @@ impl Blockstore {
recovered_data_shreds recovered_data_shreds
} }
pub fn insert_shreds( pub fn insert_shreds_handle_duplicate<F>(
&self, &self,
shreds: Vec<Shred>, shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>, leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool, is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> { handle_duplicate: &F,
) -> Result<BlockstoreInsertionMetrics>
where
F: Fn(Shred) -> (),
{
let mut total_start = Measure::start("Total elapsed"); let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock"); let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap(); let _lock = self.insert_shreds_lock.lock().unwrap();
@ -615,6 +619,7 @@ impl Blockstore {
&mut just_inserted_data_shreds, &mut just_inserted_data_shreds,
&mut index_meta_time, &mut index_meta_time,
is_trusted, is_trusted,
handle_duplicate,
) { ) {
num_inserted += 1; num_inserted += 1;
} }
@ -658,6 +663,7 @@ impl Blockstore {
&mut just_inserted_data_shreds, &mut just_inserted_data_shreds,
&mut index_meta_time, &mut index_meta_time,
is_trusted, is_trusted,
&handle_duplicate,
); );
} }
} }
@ -733,6 +739,15 @@ impl Blockstore {
}) })
} }
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlockstoreInsertionMetrics> {
self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {})
}
fn check_insert_coding_shred( fn check_insert_coding_shred(
&self, &self,
shred: Shred, shred: Shred,
@ -819,7 +834,8 @@ impl Blockstore {
} }
} }
fn check_insert_data_shred( #[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
&self, &self,
shred: Shred, shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
@ -829,7 +845,11 @@ impl Blockstore {
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64, index_meta_time: &mut u64,
is_trusted: bool, is_trusted: bool,
) -> bool { handle_duplicate: &F,
) -> bool
where
F: Fn(Shred) -> (),
{
let slot = shred.slot(); let slot = shred.slot();
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
@ -842,14 +862,15 @@ impl Blockstore {
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
if is_trusted if !is_trusted {
|| Blockstore::should_insert_data_shred( if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
&shred, handle_duplicate(shred);
slot_meta, return false;
index_meta.data(), } else if !Blockstore::should_insert_data_shred(&shred, slot_meta, &self.last_root) {
&self.last_root, return false;
) }
{ }
let set_index = u64::from(shred.common_header.fec_set_index); let set_index = u64::from(shred.common_header.fec_set_index);
if let Ok(()) = if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
@ -870,9 +891,6 @@ impl Blockstore {
} else { } else {
false false
} }
} else {
false
}
} }
fn should_insert_coding_shred( fn should_insert_coding_shred(
@ -916,10 +934,15 @@ impl Blockstore {
Ok(()) Ok(())
} }
fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index());
// Check that the shred doesn't already exist in blockstore
shred_index < slot_meta.consumed || data_index.is_present(shred_index)
}
fn should_insert_data_shred( fn should_insert_data_shred(
shred: &Shred, shred: &Shred,
slot_meta: &SlotMeta, slot_meta: &SlotMeta,
data_index: &ShredIndex,
last_root: &RwLock<u64>, last_root: &RwLock<u64>,
) -> bool { ) -> bool {
let shred_index = u64::from(shred.index()); let shred_index = u64::from(shred.index());
@ -931,11 +954,6 @@ impl Blockstore {
false false
}; };
// Check that the data shred doesn't already exist in blockstore
if shred_index < slot_meta.consumed || data_index.is_present(shred_index) {
return false;
}
// Check that we do not receive shred_index >= than the last_index // Check that we do not receive shred_index >= than the last_index
// for the slot // for the slot
let last_index = slot_meta.last_index; let last_index = slot_meta.last_index;
@ -3976,7 +3994,6 @@ pub mod tests {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();
{ {
let blockstore = Blockstore::open(&blockstore_path).unwrap(); let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
let last_root = RwLock::new(0); let last_root = RwLock::new(0);
// Insert the first 5 shreds, we don't have a "is_last" shred yet // Insert the first 5 shreds, we don't have a "is_last" shred yet
@ -3984,44 +4001,12 @@ pub mod tests {
.insert_shreds(shreds[0..5].to_vec(), None, false) .insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap(); .unwrap();
// Trying to insert a shred less than `slot_meta.consumed` should fail
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[1],
&slot_meta,
index.data(),
&last_root
),
false
);
// Trying to insert the same shred again should fail
// skip over shred 5 so the `slot_meta.consumed` doesn't increment
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(
Blockstore::should_insert_data_shred(
&shreds[6],
&slot_meta,
index.data(),
&last_root
),
false
);
// Trying to insert another "is_last" shred with index < the received index should fail // Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7 // skip over shred 7
blockstore blockstore
.insert_shreds(shreds[8..9].to_vec(), None, false) .insert_shreds(shreds[8..9].to_vec(), None, false)
.unwrap(); .unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9); assert_eq!(slot_meta.received, 9);
let shred7 = { let shred7 = {
if shreds[7].is_data() { if shreds[7].is_data() {
@ -4032,7 +4017,7 @@ pub mod tests {
} }
}; };
assert_eq!( assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root), Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false false
); );
@ -4040,7 +4025,6 @@ pub mod tests {
let mut shred8 = shreds[8].clone(); let mut shred8 = shreds[8].clone();
blockstore.insert_shreds(shreds, None, false).unwrap(); blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
// Trying to insert a shred with index > the "is_last" shred should fail // Trying to insert a shred with index > the "is_last" shred should fail
if shred8.is_data() { if shred8.is_data() {
@ -4049,13 +4033,50 @@ pub mod tests {
panic!("Shred in unexpected format") panic!("Shred in unexpected format")
} }
assert_eq!( assert_eq!(
Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root), Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root),
false false
); );
} }
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
} }
#[test]
pub fn test_is_data_shred_present() {
let (shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
// Insert a shred less than `slot_meta.consumed`, check that
// it already exists
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(Blockstore::is_data_shred_present(
&shreds[1],
&slot_meta,
index.data(),
));
// Insert a shred, check that it already exists
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert!(Blockstore::is_data_shred_present(
&shreds[6],
&slot_meta,
index.data()
),);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test] #[test]
pub fn test_should_insert_coding_shred() { pub fn test_should_insert_coding_shred() {
let blockstore_path = get_tmp_ledger_path!(); let blockstore_path = get_tmp_ledger_path!();