From 721c4378c17b920f61756764318405c6f17188e9 Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 14 Jan 2020 15:37:53 -0800 Subject: [PATCH] Plumb ability to handle duplicate shreds into shred insertion functions (#7784) --- ledger/src/blockstore.rs | 167 ++++++++++++++++++++++----------------- 1 file changed, 94 insertions(+), 73 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 00bc0d6e54..ada0908a36 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -579,12 +579,16 @@ impl Blockstore { recovered_data_shreds } - pub fn insert_shreds( + pub fn insert_shreds_handle_duplicate( &self, shreds: Vec, leader_schedule: Option<&Arc>, is_trusted: bool, - ) -> Result { + handle_duplicate: &F, + ) -> Result + where + F: Fn(Shred) -> (), + { let mut total_start = Measure::start("Total elapsed"); let mut start = Measure::start("Blockstore lock"); let _lock = self.insert_shreds_lock.lock().unwrap(); @@ -615,6 +619,7 @@ impl Blockstore { &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, + handle_duplicate, ) { num_inserted += 1; } @@ -658,6 +663,7 @@ impl Blockstore { &mut just_inserted_data_shreds, &mut index_meta_time, is_trusted, + &handle_duplicate, ); } } @@ -733,6 +739,15 @@ impl Blockstore { }) } + pub fn insert_shreds( + &self, + shreds: Vec, + leader_schedule: Option<&Arc>, + is_trusted: bool, + ) -> Result { + self.insert_shreds_handle_duplicate(shreds, leader_schedule, is_trusted, &|_| {}) + } + fn check_insert_coding_shred( &self, shred: Shred, @@ -819,7 +834,8 @@ impl Blockstore { } } - fn check_insert_data_shred( + #[allow(clippy::too_many_arguments)] + fn check_insert_data_shred( &self, shred: Shred, erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>, @@ -829,7 +845,11 @@ impl Blockstore { just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, is_trusted: bool, - ) -> bool { + handle_duplicate: &F, + ) -> bool + where + F: Fn(Shred) -> (), + { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -842,34 +862,32 @@ impl Blockstore { let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); - if is_trusted - || Blockstore::should_insert_data_shred( - &shred, - slot_meta, - index_meta.data(), - &self.last_root, - ) - { - let set_index = u64::from(shred.common_header.fec_set_index); - if let Ok(()) = - self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) - { - just_inserted_data_shreds.insert((slot, shred_index), shred); - index_meta_working_set_entry.did_insert_occur = true; - slot_meta_entry.did_insert_occur = true; - if !erasure_metas.contains_key(&(slot, set_index)) { - if let Some(meta) = self - .erasure_meta_cf - .get((slot, set_index)) - .expect("Expect database get to succeed") - { - erasure_metas.insert((slot, set_index), meta); - } - } - true - } else { - false + if !is_trusted { + if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) { + handle_duplicate(shred); + return false; + } else if !Blockstore::should_insert_data_shred(&shred, slot_meta, &self.last_root) { + return false; } + } + + let set_index = u64::from(shred.common_header.fec_set_index); + if let Ok(()) = + self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) + { + just_inserted_data_shreds.insert((slot, shred_index), shred); + index_meta_working_set_entry.did_insert_occur = true; + slot_meta_entry.did_insert_occur = true; + if !erasure_metas.contains_key(&(slot, set_index)) { + if let Some(meta) = self + .erasure_meta_cf + .get((slot, set_index)) + .expect("Expect database get to succeed") + { + erasure_metas.insert((slot, set_index), meta); + } + } + true } else { false } @@ -916,10 +934,15 @@ impl Blockstore { Ok(()) } + fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool { + let shred_index = u64::from(shred.index()); + // Check that the shred doesn't already exist in blockstore + shred_index < slot_meta.consumed || data_index.is_present(shred_index) + } + fn should_insert_data_shred( shred: &Shred, slot_meta: &SlotMeta, - data_index: &ShredIndex, last_root: &RwLock, ) -> bool { let shred_index = u64::from(shred.index()); @@ -931,11 +954,6 @@ impl Blockstore { false }; - // Check that the data shred doesn't already exist in blockstore - if shred_index < slot_meta.consumed || data_index.is_present(shred_index) { - return false; - } - // Check that we do not receive shred_index >= than the last_index // for the slot let last_index = slot_meta.last_index; @@ -3976,7 +3994,6 @@ pub mod tests { let blockstore_path = get_tmp_ledger_path!(); { let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let index_cf = blockstore.db.column::(); let last_root = RwLock::new(0); // Insert the first 5 shreds, we don't have a "is_last" shred yet @@ -3984,44 +4001,12 @@ pub mod tests { .insert_shreds(shreds[0..5].to_vec(), None, false) .unwrap(); - // Trying to insert a shred less than `slot_meta.consumed` should fail - let slot_meta = blockstore.meta(0).unwrap().unwrap(); - let index = index_cf.get(0).unwrap().unwrap(); - assert_eq!(slot_meta.consumed, 5); - assert_eq!( - Blockstore::should_insert_data_shred( - &shreds[1], - &slot_meta, - index.data(), - &last_root - ), - false - ); - - // Trying to insert the same shred again should fail - // skip over shred 5 so the `slot_meta.consumed` doesn't increment - blockstore - .insert_shreds(shreds[6..7].to_vec(), None, false) - .unwrap(); - let slot_meta = blockstore.meta(0).unwrap().unwrap(); - let index = index_cf.get(0).unwrap().unwrap(); - assert_eq!( - Blockstore::should_insert_data_shred( - &shreds[6], - &slot_meta, - index.data(), - &last_root - ), - false - ); - // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 blockstore .insert_shreds(shreds[8..9].to_vec(), None, false) .unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap(); - let index = index_cf.get(0).unwrap().unwrap(); assert_eq!(slot_meta.received, 9); let shred7 = { if shreds[7].is_data() { @@ -4032,7 +4017,7 @@ pub mod tests { } }; assert_eq!( - Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root), + Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root), false ); @@ -4040,7 +4025,6 @@ pub mod tests { let mut shred8 = shreds[8].clone(); blockstore.insert_shreds(shreds, None, false).unwrap(); let slot_meta = blockstore.meta(0).unwrap().unwrap(); - let index = index_cf.get(0).unwrap().unwrap(); // Trying to insert a shred with index > the "is_last" shred should fail if shred8.is_data() { @@ -4049,13 +4033,50 @@ pub mod tests { panic!("Shred in unexpected format") } assert_eq!( - Blockstore::should_insert_data_shred(&shred7, &slot_meta, index.data(), &last_root), + Blockstore::should_insert_data_shred(&shred7, &slot_meta, &last_root), false ); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } + #[test] + pub fn test_is_data_shred_present() { + let (shreds, _) = make_slot_entries(0, 0, 200); + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let index_cf = blockstore.db.column::(); + + blockstore + .insert_shreds(shreds[0..5].to_vec(), None, false) + .unwrap(); + // Insert a shred less than `slot_meta.consumed`, check that + // it already exists + let slot_meta = blockstore.meta(0).unwrap().unwrap(); + let index = index_cf.get(0).unwrap().unwrap(); + assert_eq!(slot_meta.consumed, 5); + assert!(Blockstore::is_data_shred_present( + &shreds[1], + &slot_meta, + index.data(), + )); + + // Insert a shred, check that it already exists + blockstore + .insert_shreds(shreds[6..7].to_vec(), None, false) + .unwrap(); + let slot_meta = blockstore.meta(0).unwrap().unwrap(); + let index = index_cf.get(0).unwrap().unwrap(); + assert!(Blockstore::is_data_shred_present( + &shreds[6], + &slot_meta, + index.data() + ),); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + #[test] pub fn test_should_insert_coding_shred() { let blockstore_path = get_tmp_ledger_path!();