diff --git a/core/benches/blocktree.rs b/core/benches/blocktree.rs index 1ee226951..222824162 100644 --- a/core/benches/blocktree.rs +++ b/core/benches/blocktree.rs @@ -22,7 +22,7 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec, ledger_path: &Pa Blocktree::open(ledger_path).expect("Expected to be able to open database ledger"); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); }); Blocktree::destroy(ledger_path).expect("Expected successful database destruction"); @@ -45,7 +45,7 @@ fn setup_read_bench( // Convert the entries to shreds, write the shreds to the ledger let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expectd successful insertion of shreds into ledger"); } @@ -137,7 +137,7 @@ fn bench_insert_data_shred_small(bench: &mut Bencher) { let entries = create_ticks(num_entries, 0, Hash::default()); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); }); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -152,7 +152,7 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) { let entries = create_ticks(num_entries, 0, Hash::default()); bench.iter(move || { let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); }); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } diff --git a/core/src/archiver.rs b/core/src/archiver.rs index fe2028cdd..8e2096781 100644 --- a/core/src/archiver.rs +++ b/core/src/archiver.rs @@ -927,7 +927,7 @@ impl Archiver { .into_iter() .filter_map(|p| Shred::new_from_serialized_shred(p.data.to_vec()).ok()) .collect(); - blocktree.insert_shreds(shreds, None)?; + blocktree.insert_shreds(shreds, None, false)?; } // check if all the slots in the segment are complete if Self::segment_complete(start_slot, slots_per_segment, blocktree) { diff --git a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs index b20ea3976..7ce057cca 100644 --- a/core/src/broadcast_stage/broadcast_fake_blobs_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_blobs_run.rs @@ -76,7 +76,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun { self.last_blockhash = Hash::default(); } - blocktree.insert_shreds(data_shreds.clone(), None)?; + blocktree.insert_shreds(data_shreds.clone(), None, true)?; // 3) Start broadcast step let peers = cluster_info.read().unwrap().tvu_peers(); diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index fabcd8e4c..b9931cda9 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -59,7 +59,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { .collect::>(); let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); blocktree - .insert_shreds(all_shreds, None) + .insert_shreds(all_shreds, None, true) .expect("Failed to insert shreds in blocktree"); // 3) Start broadcast step diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 86001a156..eb0c558de 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -218,7 +218,7 @@ impl StandardBroadcastRun { let insert_shreds_start = Instant::now(); if insert { blocktree - .insert_shreds(shreds.clone(), None) + .insert_shreds(shreds.clone(), None, true) .expect("Failed to insert shreds in blocktree"); } let insert_shreds_elapsed = insert_shreds_start.elapsed(); diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 85fc2ffcf..3eacc2a68 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1992,7 +1992,7 @@ mod tests { ); blocktree - .insert_shreds(vec![shred_info], None) + .insert_shreds(vec![shred_info], None, false) .expect("Expect successful ledger write"); let rv = ClusterInfo::run_window_request( @@ -2074,7 +2074,7 @@ mod tests { let (shreds, _) = make_many_slot_entries(1, 3, 5); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expect successful ledger write"); // We don't have slot 4, so we don't know how to service this requeset diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index e34431bf6..403e5d60b 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -670,7 +670,7 @@ mod tests { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let num_slots = 2; let (shreds, _) = make_many_slot_entries(0, num_slots, 1); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let last_root = num_slots - 1; @@ -741,7 +741,7 @@ mod tests { let num_shreds_per_slot = shreds.len() as u64 / num_slots; // Write slots in the range [0, num_slots] to blocktree - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=num_slots - 1).collect(); @@ -819,7 +819,7 @@ mod tests { // Create blobs for first two epochs and write them to blocktree let total_slots = slots_per_epoch * 2; let (shreds, _) = make_many_slot_entries(0, total_slots, 1); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // Write roots so that these slots will qualify to be sent by the repairman let roots: Vec<_> = (0..=slots_per_epoch * 2 - 1).collect(); diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 8b88ebde5..2ddafa4cd 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -79,7 +79,7 @@ mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let blocktree = Arc::new(blocktree); let (sender, receiver) = channel(); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index cc7f34784..9d8fa06dd 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -411,7 +411,7 @@ mod test { let (mut shreds, _) = make_slot_entries(1, 0, 1); let (shreds2, _) = make_slot_entries(5, 2, 1); shreds.extend(shreds2); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); assert_eq!( RepairService::generate_repairs(&blocktree, 0, 2).unwrap(), vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)] @@ -431,7 +431,7 @@ mod test { // Write this blob to slot 2, should chain to slot 0, which we haven't received // any blobs for - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // Check that repair tries to patch the empty slot assert_eq!( @@ -467,7 +467,9 @@ mod test { missing_indexes_per_slot.insert(0, index); } } - blocktree.insert_shreds(shreds_to_write, None).unwrap(); + blocktree + .insert_shreds(shreds_to_write, None, false) + .unwrap(); // sleep so that the holes are ready for repair sleep(Duration::from_secs(1)); let expected: Vec = (0..num_slots) @@ -506,7 +508,7 @@ mod test { // Remove last shred (which is also last in slot) so that slot is not complete shreds.pop(); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // We didn't get the last blob for this slot, so ask for the highest blob for that slot let expected: Vec = @@ -532,7 +534,7 @@ mod test { let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { slot_shreds.remove(0); - blocktree.insert_shreds(slot_shreds, None).unwrap(); + blocktree.insert_shreds(slot_shreds, None, false).unwrap(); } // sleep to make slot eligible for repair sleep(Duration::from_secs(1)); @@ -585,7 +587,7 @@ mod test { let parent = if i > 0 { i - 1 } else { 0 }; let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); } let end = 4; @@ -638,9 +640,9 @@ mod test { .collect(); let mut full_slots = BTreeSet::new(); - blocktree.insert_shreds(fork1_shreds, None).unwrap(); + blocktree.insert_shreds(fork1_shreds, None, false).unwrap(); blocktree - .insert_shreds(fork2_incomplete_shreds, None) + .insert_shreds(fork2_incomplete_shreds, None, false) .unwrap(); // Test that only slots > root from fork1 were included @@ -664,7 +666,7 @@ mod test { .into_iter() .flat_map(|(shreds, _)| shreds) .collect(); - blocktree.insert_shreds(fork3_shreds, None).unwrap(); + blocktree.insert_shreds(fork3_shreds, None, false).unwrap(); RepairService::get_completed_slots_past_root( &blocktree, &mut full_slots, @@ -711,7 +713,9 @@ mod test { let step = rng.gen_range(1, max_step + 1) as usize; let step = std::cmp::min(step, num_shreds - i); let shreds_to_insert = shreds.drain(..step).collect_vec(); - blocktree_.insert_shreds(shreds_to_insert, None).unwrap(); + blocktree_ + .insert_shreds(shreds_to_insert, None, false) + .unwrap(); sleep(Duration::from_millis(repair_interval_ms)); i += step; } @@ -741,7 +745,7 @@ mod test { // Update with new root, should filter out the slots <= root root = num_slots / 2; let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); RepairService::update_epoch_slots( Pubkey::default(), root, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 06b325d8a..f0d7dbfa9 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -973,7 +973,7 @@ mod test { // Insert blob for slot 1, generate new forks, check result let (shreds, _) = make_slot_entries(1, 0, 8); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); assert!(bank_forks.get(1).is_none()); ReplayStage::generate_new_bank_forks( &blocktree, @@ -984,7 +984,7 @@ mod test { // Insert blob for slot 3, generate new forks, check result let (shreds, _) = make_slot_entries(2, 0, 8); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); assert!(bank_forks.get(2).is_none()); ReplayStage::generate_new_bank_forks( &blocktree, @@ -1230,7 +1230,7 @@ mod test { let last_blockhash = bank0.last_blockhash(); progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 2a85968fb..af6f0e4c1 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -132,7 +132,8 @@ where } } - let blocktree_insert_metrics = blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?; + let blocktree_insert_metrics = + blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?; blocktree_insert_metrics.report_metrics("recv-window-insert-shreds"); trace!( @@ -322,7 +323,7 @@ mod test { let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new())); shreds.reverse(); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expect successful processing of shred"); assert_eq!( diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index d83383830..bb7108b8b 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -444,6 +444,7 @@ impl Blocktree { &self, shreds: Vec, leader_schedule: Option<&Arc>, + is_trusted: bool, ) -> Result { let mut total_start = Measure::start("Total elapsed"); let mut start = Measure::start("Blocktree lock"); @@ -465,30 +466,29 @@ impl Blocktree { let mut num_inserted = 0; let mut index_meta_time = 0; shreds.into_iter().for_each(|shred| { - let insert_success = { - if shred.is_data() { - self.check_insert_data_shred( - shred, - &mut index_working_set, - &mut slot_meta_working_set, - &mut write_batch, - &mut just_inserted_data_shreds, - &mut index_meta_time, - ) - } else if shred.is_code() { - self.check_cache_coding_shred( - shred, - &mut erasure_metas, - &mut index_working_set, - &mut just_inserted_coding_shreds, - &mut index_meta_time, - ) - } else { - panic!("There should be no other case"); + if shred.is_data() { + if self.check_insert_data_shred( + shred, + &mut index_working_set, + &mut slot_meta_working_set, + &mut write_batch, + &mut just_inserted_data_shreds, + &mut index_meta_time, + is_trusted, + ) { + num_inserted += 1; } - }; - if insert_success { - num_inserted += 1; + } else if shred.is_code() { + self.check_cache_coding_shred( + shred, + &mut erasure_metas, + &mut index_working_set, + &mut just_inserted_coding_shreds, + &mut index_meta_time, + is_trusted, + ); + } else { + panic!("There should be no other case"); } }); start.stop(); @@ -516,6 +516,7 @@ impl Blocktree { &mut write_batch, &mut just_inserted_data_shreds, &mut index_meta_time, + is_trusted, ); } } @@ -533,6 +534,7 @@ impl Blocktree { &mut write_batch, &mut index_meta_time, ); + num_inserted += 1; }); let mut start = Measure::start("Shred recovery"); @@ -619,6 +621,7 @@ impl Blocktree { index_working_set: &mut HashMap, just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, + is_trusted: bool, ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -629,7 +632,9 @@ impl Blocktree { let index_meta = &mut index_meta_working_set_entry.index; // This gives the index of first coding shred in this FEC block // So, all coding shreds in a given FEC block will have the same set index - if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) { + if is_trusted + || Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) + { let set_index = shred_index - u64::from(shred.coding_header.position); let erasure_config = ErasureConfig::new( shred.coding_header.num_data_shreds as usize, @@ -670,6 +675,7 @@ impl Blocktree { write_batch: &mut WriteBatch, just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>, index_meta_time: &mut u64, + is_trusted: bool, ) -> bool { let slot = shred.slot(); let shred_index = u64::from(shred.index()); @@ -683,12 +689,14 @@ impl Blocktree { let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut(); - if Blocktree::should_insert_data_shred( - &shred, - slot_meta, - index_meta.data(), - &self.last_root, - ) { + if is_trusted + || Blocktree::should_insert_data_shred( + &shred, + slot_meta, + index_meta.data(), + &self.last_root, + ) + { if let Ok(()) = self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch) { @@ -977,7 +985,7 @@ impl Blocktree { } let num_shreds = all_shreds.len(); - self.insert_shreds(all_shreds, None)?; + self.insert_shreds(all_shreds, None, false)?; Ok(num_shreds) } @@ -1762,7 +1770,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_config: &GenesisConfig) -> let shreds = shredder.entries_to_shreds(&entries, true, 0).0; assert!(shreds.last().unwrap().last_in_slot()); - blocktree.insert_shreds(shreds, None)?; + blocktree.insert_shreds(shreds, None, false)?; blocktree.set_roots(&[0])?; Ok(last_hash) @@ -1985,7 +1993,7 @@ pub mod tests { let last_shred = shreds.pop().unwrap(); assert!(last_shred.index() > 0); ledger - .insert_shreds(vec![last_shred.clone()], None) + .insert_shreds(vec![last_shred.clone()], None, false) .unwrap(); let serialized_shred = ledger @@ -2158,7 +2166,7 @@ pub mod tests { let ledger_path = get_tmp_ledger_path!(); let ledger = Blocktree::open(&ledger_path).unwrap(); - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); let mut buf = [0; 4096]; let (_, bytes) = ledger.get_data_shreds(slot, 0, 1, &mut buf).unwrap(); @@ -2225,7 +2233,7 @@ pub mod tests { // shreds starting from slot 0, index 0 should exist. assert!(shreds.len() > 1); let last_shred = shreds.pop().unwrap(); - ledger.insert_shreds(vec![last_shred], None).unwrap(); + ledger.insert_shreds(vec![last_shred], None, false).unwrap(); assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty()); let meta = ledger @@ -2235,7 +2243,7 @@ pub mod tests { assert!(meta.consumed == 0 && meta.received == num_shreds); // Insert the other shreds, check for consecutive returned entries - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); assert_eq!(result, entries); @@ -2269,7 +2277,7 @@ pub mod tests { // Insert shreds in reverse, check for consecutive returned shreds for i in (0..num_shreds).rev() { let shred = shreds.pop().unwrap(); - ledger.insert_shreds(vec![shred], None).unwrap(); + ledger.insert_shreds(vec![shred], None, false).unwrap(); let result = ledger.get_slot_entries(0, 0, None).unwrap(); let meta = ledger @@ -2347,7 +2355,7 @@ pub mod tests { let entries = create_ticks(8, 0, Hash::default()); let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false); @@ -2355,7 +2363,7 @@ pub mod tests { b.set_index(8 + i as u32); } blocktree - .insert_shreds(shreds1, None) + .insert_shreds(shreds1, None, false) .expect("Expected successful write of shreds"); assert_eq!( @@ -2389,7 +2397,7 @@ pub mod tests { index += 1; } blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert_eq!( blocktree @@ -2422,7 +2430,7 @@ pub mod tests { entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false); assert!(shreds.len() as u64 >= shreds_per_slot); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries); } @@ -2457,7 +2465,7 @@ pub mod tests { } } - blocktree.insert_shreds(odd_shreds, None).unwrap(); + blocktree.insert_shreds(odd_shreds, None, false).unwrap(); assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]); @@ -2475,7 +2483,7 @@ pub mod tests { assert_eq!(meta.last_index, std::u64::MAX); } - blocktree.insert_shreds(even_shreds, None).unwrap(); + blocktree.insert_shreds(even_shreds, None, false).unwrap(); assert_eq!( blocktree.get_slot_entries(slot, 0, None).unwrap(), @@ -2508,13 +2516,17 @@ pub mod tests { // Discard first shred original_shreds.remove(0); - blocktree.insert_shreds(original_shreds, None).unwrap(); + blocktree + .insert_shreds(original_shreds, None, false) + .unwrap(); assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]); let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true); let num_shreds = duplicate_shreds.len() as u64; - blocktree.insert_shreds(duplicate_shreds, None).unwrap(); + blocktree + .insert_shreds(duplicate_shreds, None, false) + .unwrap(); assert_eq!( blocktree.get_slot_entries(0, 0, None).unwrap(), @@ -2544,16 +2556,20 @@ pub mod tests { // Insert second shred, but we're missing the first shred, so no consecutive // shreds starting from slot 0, index 0 should exist. - ledger.insert_shreds(vec![shreds.remove(1)], None).unwrap(); + ledger + .insert_shreds(vec![shreds.remove(1)], None, false) + .unwrap(); let timer = Duration::new(1, 0); assert!(recvr.recv_timeout(timer).is_err()); // Insert first shred, now we've made a consecutive block - ledger.insert_shreds(vec![shreds.remove(0)], None).unwrap(); + ledger + .insert_shreds(vec![shreds.remove(0)], None, false) + .unwrap(); // Wait to get notified of update, should only be one update assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); // Insert the rest of the ticks - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); // Wait to get notified of update, should only be one update assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); @@ -2572,7 +2588,7 @@ pub mod tests { } // Should be no updates, since no new chains from block 0 were formed - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); assert!(recvr.recv_timeout(timer).is_err()); // Insert a shred for each slot that doesn't make a consecutive block, we @@ -2585,7 +2601,7 @@ pub mod tests { }) .collect(); - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); assert!(recvr.recv_timeout(timer).is_err()); // For slots 1..num_slots/2, fill in the holes in one batch insertion, @@ -2593,13 +2609,13 @@ pub mod tests { let missing_shreds2 = missing_shreds .drain((num_slots / 2) as usize..) .collect_vec(); - ledger.insert_shreds(missing_shreds, None).unwrap(); + ledger.insert_shreds(missing_shreds, None, false).unwrap(); assert!(recvr.recv_timeout(timer).is_ok()); assert!(recvr.try_recv().is_err()); // Fill in the holes for each of the remaining slots, we should get a single update // for each - ledger.insert_shreds(missing_shreds2, None).unwrap(); + ledger.insert_shreds(missing_shreds2, None, false).unwrap(); // Destroying database without closing it first is undefined behavior drop(ledger); @@ -2620,11 +2636,11 @@ pub mod tests { let shred0 = shreds.remove(0); // Insert all but the first shred in the slot, should not be considered complete - ledger.insert_shreds(shreds, None).unwrap(); + ledger.insert_shreds(shreds, None, false).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![shred0], None).unwrap(); + ledger.insert_shreds(vec![shred0], None, false).unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![0]); } @@ -2647,20 +2663,24 @@ pub mod tests { // Insert all but the first shred in the slot, should not be considered complete let orphan_child0 = orphan_child.remove(0); - ledger.insert_shreds(orphan_child, None).unwrap(); + ledger.insert_shreds(orphan_child, None, false).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![orphan_child0], None).unwrap(); + ledger + .insert_shreds(vec![orphan_child0], None, false) + .unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]); // Insert the shreds for the orphan_slot let orphan_shred0 = orphan_shreds.remove(0); - ledger.insert_shreds(orphan_shreds, None).unwrap(); + ledger.insert_shreds(orphan_shreds, None, false).unwrap(); assert!(recvr.try_recv().is_err()); // Insert first shred, slot should now be considered complete - ledger.insert_shreds(vec![orphan_shred0], None).unwrap(); + ledger + .insert_shreds(vec![orphan_shred0], None, false) + .unwrap(); assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]); } @@ -2687,7 +2707,7 @@ pub mod tests { .collect(); all_shreds.shuffle(&mut thread_rng()); - ledger.insert_shreds(all_shreds, None).unwrap(); + ledger.insert_shreds(all_shreds, None, false).unwrap(); let mut result = recvr.try_recv().unwrap(); result.sort(); slots.push(disconnected_slot); @@ -2711,7 +2731,7 @@ pub mod tests { let shreds1 = shreds .drain(shreds_per_slot..2 * shreds_per_slot) .collect_vec(); - blocktree.insert_shreds(shreds1, None).unwrap(); + blocktree.insert_shreds(shreds1, None, false).unwrap(); let s1 = blocktree.meta(1).unwrap().unwrap(); assert!(s1.next_slots.is_empty()); // Slot 1 is not trunk because slot 0 hasn't been inserted yet @@ -2723,7 +2743,7 @@ pub mod tests { let shreds2 = shreds .drain(shreds_per_slot..2 * shreds_per_slot) .collect_vec(); - blocktree.insert_shreds(shreds2, None).unwrap(); + blocktree.insert_shreds(shreds2, None, false).unwrap(); let s2 = blocktree.meta(2).unwrap().unwrap(); assert!(s2.next_slots.is_empty()); // Slot 2 is not trunk because slot 0 hasn't been inserted yet @@ -2741,7 +2761,7 @@ pub mod tests { // 3) Write to the zeroth slot, check that every slot // is now part of the trunk - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); for i in 0..3 { let s = blocktree.meta(i).unwrap().unwrap(); // The last slot will not chain to any other slots @@ -2791,7 +2811,7 @@ pub mod tests { } // Write the shreds for every other slot - blocktree.insert_shreds(slots, None).unwrap(); + blocktree.insert_shreds(slots, None, false).unwrap(); // Check metadata for i in 0..num_slots { @@ -2817,7 +2837,7 @@ pub mod tests { } // Write the shreds for the other half of the slots that we didn't insert earlier - blocktree.insert_shreds(missing_slots, None).unwrap(); + blocktree.insert_shreds(missing_slots, None, false).unwrap(); for i in 0..num_slots { // Check that all the slots chain correctly once the missing slots @@ -2863,9 +2883,13 @@ pub mod tests { if slot % 3 == 0 { let shred0 = shreds_for_slot.remove(0); missing_shreds.push(shred0); - blocktree.insert_shreds(shreds_for_slot, None).unwrap(); + blocktree + .insert_shreds(shreds_for_slot, None, false) + .unwrap(); } else { - blocktree.insert_shreds(shreds_for_slot, None).unwrap(); + blocktree + .insert_shreds(shreds_for_slot, None, false) + .unwrap(); } } @@ -2900,7 +2924,7 @@ pub mod tests { for slot_index in 0..num_slots { if slot_index % 3 == 0 { let shred = missing_shreds.remove(0); - blocktree.insert_shreds(vec![shred], None).unwrap(); + blocktree.insert_shreds(vec![shred], None, false).unwrap(); for i in 0..num_slots { let s = blocktree.meta(i as u64).unwrap().unwrap(); @@ -3081,7 +3105,9 @@ pub mod tests { // Write slot 2, which chains to slot 1. We're missing slot 0, // so slot 1 is the orphan let shreds_for_slot = shreds.drain((shreds_per_slot * 2)..).collect_vec(); - blocktree.insert_shreds(shreds_for_slot, None).unwrap(); + blocktree + .insert_shreds(shreds_for_slot, None, false) + .unwrap(); let meta = blocktree .meta(1) .expect("Expect database get to succeed") @@ -3092,7 +3118,9 @@ pub mod tests { // Write slot 1 which chains to slot 0, so now slot 0 is the // orphan, and slot 1 is no longer the orphan. let shreds_for_slot = shreds.drain(shreds_per_slot..).collect_vec(); - blocktree.insert_shreds(shreds_for_slot, None).unwrap(); + blocktree + .insert_shreds(shreds_for_slot, None, false) + .unwrap(); let meta = blocktree .meta(1) .expect("Expect database get to succeed") @@ -3109,12 +3137,12 @@ pub mod tests { // nothing should change let (shred4, _) = make_slot_entries(4, 0, 1); let (shred5, _) = make_slot_entries(5, 1, 1); - blocktree.insert_shreds(shred4, None).unwrap(); - blocktree.insert_shreds(shred5, None).unwrap(); + blocktree.insert_shreds(shred4, None, false).unwrap(); + blocktree.insert_shreds(shred5, None, false).unwrap(); assert_eq!(blocktree.get_orphans(None), vec![0]); // Write zeroth slot, no more orphans - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); for i in 0..3 { let meta = blocktree .meta(i) @@ -3160,11 +3188,11 @@ pub mod tests { let num_shreds = shreds.len(); // Write shreds to the database if should_bulk_write { - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); } else { for _ in 0..num_shreds { let shred = shreds.remove(0); - blocktree.insert_shreds(vec![shred], None).unwrap(); + blocktree.insert_shreds(vec![shred], None, false).unwrap(); } } @@ -3208,7 +3236,7 @@ pub mod tests { s.set_index(i as u32 * gap as u32); s.set_slot(slot); } - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); // Index of the first shred is 0 // Index of the second shred is "gap" @@ -3290,7 +3318,7 @@ pub mod tests { let shreds: Vec<_> = (0..64) .map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8)) .collect(); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let empty: Vec = vec![]; assert_eq!( @@ -3333,7 +3361,7 @@ pub mod tests { shreds[1].set_index(OTHER as u32); // Insert one shred at index = first_index - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); const STARTS: u64 = OTHER * 2; const END: u64 = OTHER * 3; @@ -3367,7 +3395,7 @@ pub mod tests { let shreds = entries_to_test_shreds(entries, slot, 0, true); let num_shreds = shreds.len(); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let empty: Vec = vec![]; for i in 0..num_shreds as u64 { @@ -3394,7 +3422,7 @@ pub mod tests { // Insert the first 5 shreds, we don't have a "is_last" shred yet blocktree - .insert_shreds(shreds[0..5].to_vec(), None) + .insert_shreds(shreds[0..5].to_vec(), None, false) .unwrap(); // Trying to insert a shred less than `slot_meta.consumed` should fail @@ -3414,7 +3442,7 @@ pub mod tests { // Trying to insert the same shred again should fail // skip over shred 5 so the `slot_meta.consumed` doesn't increment blocktree - .insert_shreds(shreds[6..7].to_vec(), None) + .insert_shreds(shreds[6..7].to_vec(), None, false) .unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); @@ -3431,7 +3459,7 @@ pub mod tests { // Trying to insert another "is_last" shred with index < the received index should fail // skip over shred 7 blocktree - .insert_shreds(shreds[8..9].to_vec(), None) + .insert_shreds(shreds[8..9].to_vec(), None, false) .unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); @@ -3451,7 +3479,7 @@ pub mod tests { // Insert all pending shreds let mut shred8 = shreds[8].clone(); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); let index = index_cf.get(0).unwrap().unwrap(); @@ -3494,7 +3522,7 @@ pub mod tests { // Insertion should succeed blocktree - .insert_shreds(vec![coding_shred.clone()], None) + .insert_shreds(vec![coding_shred.clone()], None, false) .unwrap(); // Trying to insert the same shred again should fail @@ -3601,7 +3629,9 @@ pub mod tests { )); // Insertion should succeed - blocktree.insert_shreds(vec![coding_shred], None).unwrap(); + blocktree + .insert_shreds(vec![coding_shred], None, false) + .unwrap(); } // Trying to insert value into slot <= than last root should fail @@ -3631,7 +3661,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, num_shreds); @@ -3640,7 +3670,7 @@ pub mod tests { assert!(slot_meta.is_full()); let (shreds, _) = make_slot_entries(0, 0, 22); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.consumed, num_shreds); @@ -3662,7 +3692,7 @@ pub mod tests { let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot); let slot_8_shreds = all_shreds[2].0.clone(); for (slot_shreds, _) in all_shreds { - blocktree.insert_shreds(slot_shreds, None).unwrap(); + blocktree.insert_shreds(slot_shreds, None, false).unwrap(); } // Slot doesnt exist, iterator should be empty @@ -3707,7 +3737,7 @@ pub mod tests { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 6); let shreds_per_slot = shreds.len() as u64 / 50; - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree .slot_meta_iterator(0) .unwrap() @@ -3742,7 +3772,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 50, 5); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree.purge_slots(0, Some(5)); @@ -3768,7 +3798,7 @@ pub mod tests { let blocktree_path = get_tmp_ledger_path!(); let blocktree = Blocktree::open(&blocktree_path).unwrap(); let (shreds, _) = make_many_slot_entries(0, 5000, 10); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); blocktree.purge_slots(0, Some(4999)); @@ -3882,7 +3912,7 @@ pub mod tests { let shreds = entries_to_test_shreds(entries, slot, 0, false); let next_shred_index = shreds.len(); blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert_eq!( blocktree.get_slot_entries(slot, 0, None).unwrap().len() as u64, @@ -3903,7 +3933,7 @@ pub mod tests { // With the corruption, nothing should be returned, even though an // earlier data block was valid blocktree - .insert_shreds(shreds, None) + .insert_shreds(shreds, None, false) .expect("Expected successful write of shreds"); assert!(blocktree.get_slot_entries(slot, 0, None).is_err()); } @@ -3921,7 +3951,7 @@ pub mod tests { // Insert the first 5 shreds, we don't have a "is_last" shred yet blocktree - .insert_shreds(shreds0[0..5].to_vec(), None) + .insert_shreds(shreds0[0..5].to_vec(), None, false) .unwrap(); // Insert a repetitive shred for slot 's', should get ignored, but also @@ -3931,13 +3961,37 @@ pub mod tests { let (mut shreds3, _) = make_slot_entries(3, 0, 200); shreds2.push(shreds0[1].clone()); shreds3.insert(0, shreds0[1].clone()); - blocktree.insert_shreds(shreds2, None).unwrap(); + blocktree.insert_shreds(shreds2, None, false).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.next_slots, vec![2]); - blocktree.insert_shreds(shreds3, None).unwrap(); + blocktree.insert_shreds(shreds3, None, false).unwrap(); let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert_eq!(slot_meta.next_slots, vec![2, 3]); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + fn test_trusted_insert_shreds() { + // Make shred for slot 1 + let (shreds1, _) = make_slot_entries(1, 0, 1); + let blocktree_path = get_tmp_ledger_path!(); + let last_root = 100; + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + blocktree.set_roots(&[last_root]).unwrap(); + + // Insert will fail, slot < root + blocktree + .insert_shreds(shreds1.clone()[..].to_vec(), None, false) + .unwrap(); + assert!(blocktree.get_data_shred(1, 0).unwrap().is_none()); + + // Insert through trusted path will succeed + blocktree + .insert_shreds(shreds1[..].to_vec(), None, true) + .unwrap(); + assert!(blocktree.get_data_shred(1, 0).unwrap().is_some()); + } + } } diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 3bd5f85a1..56e03a412 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -438,7 +438,7 @@ mod tests { // Write a blob into slot 2 that chains to slot 1, // but slot 1 is empty so should not be skipped let (shreds, _) = make_slot_entries(2, 1, 1); - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); assert_eq!( cache .next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)) @@ -451,7 +451,7 @@ mod tests { let (shreds, _) = make_slot_entries(1, 0, 1); // Check that slot 1 and 2 are skipped - blocktree.insert_shreds(shreds, None).unwrap(); + blocktree.insert_shreds(shreds, None, false).unwrap(); assert_eq!( cache .next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)) diff --git a/ledger/tests/blocktree.rs b/ledger/tests/blocktree.rs index 724dbd4c6..d3aea8733 100644 --- a/ledger/tests/blocktree.rs +++ b/ledger/tests/blocktree.rs @@ -25,7 +25,7 @@ fn test_multiple_threads_insert_shred() { Builder::new() .name("blocktree-writer".to_string()) .spawn(move || { - blocktree_.insert_shreds(shreds, None).unwrap(); + blocktree_.insert_shreds(shreds, None, false).unwrap(); }) .unwrap() })