Fix roots overrunning broadcast (#6884)

* Add trusted pathway for insert_shreds to avoid checks
This commit is contained in:
carllin 2019-11-14 00:32:07 -08:00 committed by GitHub
parent 7b05b3dbb3
commit 43e2301e2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 187 additions and 128 deletions

View File

@ -22,7 +22,7 @@ fn bench_write_shreds(bench: &mut Bencher, entries: Vec<Entry>, ledger_path: &Pa
Blocktree::open(ledger_path).expect("Expected to be able to open database ledger");
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
});
Blocktree::destroy(ledger_path).expect("Expected successful database destruction");
@ -45,7 +45,7 @@ fn setup_read_bench(
// Convert the entries to shreds, write the shreds to the ledger
let shreds = entries_to_test_shreds(entries, slot, slot.saturating_sub(1), true);
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expectd successful insertion of shreds into ledger");
}
@ -137,7 +137,7 @@ fn bench_insert_data_shred_small(bench: &mut Bencher) {
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}
@ -152,7 +152,7 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) {
let entries = create_ticks(num_entries, 0, Hash::default());
bench.iter(move || {
let shreds = entries_to_test_shreds(entries.clone(), 0, 0, true);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
});
Blocktree::destroy(&ledger_path).expect("Expected successful database destruction");
}

View File

@ -927,7 +927,7 @@ impl Archiver {
.into_iter()
.filter_map(|p| Shred::new_from_serialized_shred(p.data.to_vec()).ok())
.collect();
blocktree.insert_shreds(shreds, None)?;
blocktree.insert_shreds(shreds, None, false)?;
}
// check if all the slots in the segment are complete
if Self::segment_complete(start_slot, slots_per_segment, blocktree) {

View File

@ -76,7 +76,7 @@ impl BroadcastRun for BroadcastFakeBlobsRun {
self.last_blockhash = Hash::default();
}
blocktree.insert_shreds(data_shreds.clone(), None)?;
blocktree.insert_shreds(data_shreds.clone(), None, true)?;
// 3) Start broadcast step
let peers = cluster_info.read().unwrap().tvu_peers();

View File

@ -59,7 +59,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
.collect::<Vec<_>>();
let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect();
blocktree
.insert_shreds(all_shreds, None)
.insert_shreds(all_shreds, None, true)
.expect("Failed to insert shreds in blocktree");
// 3) Start broadcast step

View File

@ -218,7 +218,7 @@ impl StandardBroadcastRun {
let insert_shreds_start = Instant::now();
if insert {
blocktree
.insert_shreds(shreds.clone(), None)
.insert_shreds(shreds.clone(), None, true)
.expect("Failed to insert shreds in blocktree");
}
let insert_shreds_elapsed = insert_shreds_start.elapsed();

View File

@ -1992,7 +1992,7 @@ mod tests {
);
blocktree
.insert_shreds(vec![shred_info], None)
.insert_shreds(vec![shred_info], None, false)
.expect("Expect successful ledger write");
let rv = ClusterInfo::run_window_request(
@ -2074,7 +2074,7 @@ mod tests {
let (shreds, _) = make_many_slot_entries(1, 3, 5);
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expect successful ledger write");
// We don't have slot 4, so we don't know how to service this requeset

View File

@ -670,7 +670,7 @@ mod tests {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_slots = 2;
let (shreds, _) = make_many_slot_entries(0, num_slots, 1);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// Write roots so that these slots will qualify to be sent by the repairman
let last_root = num_slots - 1;
@ -741,7 +741,7 @@ mod tests {
let num_shreds_per_slot = shreds.len() as u64 / num_slots;
// Write slots in the range [0, num_slots] to blocktree
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// Write roots so that these slots will qualify to be sent by the repairman
let roots: Vec<_> = (0..=num_slots - 1).collect();
@ -819,7 +819,7 @@ mod tests {
// Create blobs for first two epochs and write them to blocktree
let total_slots = slots_per_epoch * 2;
let (shreds, _) = make_many_slot_entries(0, total_slots, 1);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// Write roots so that these slots will qualify to be sent by the repairman
let roots: Vec<_> = (0..=slots_per_epoch * 2 - 1).collect();

View File

@ -79,7 +79,7 @@ mod tests {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let blocktree = Arc::new(blocktree);
let (sender, receiver) = channel();

View File

@ -411,7 +411,7 @@ mod test {
let (mut shreds, _) = make_slot_entries(1, 0, 1);
let (shreds2, _) = make_slot_entries(5, 2, 1);
shreds.extend(shreds2);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
assert_eq!(
RepairService::generate_repairs(&blocktree, 0, 2).unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Orphan(2)]
@ -431,7 +431,7 @@ mod test {
// Write this blob to slot 2, should chain to slot 0, which we haven't received
// any blobs for
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
@ -467,7 +467,9 @@ mod test {
missing_indexes_per_slot.insert(0, index);
}
}
blocktree.insert_shreds(shreds_to_write, None).unwrap();
blocktree
.insert_shreds(shreds_to_write, None, false)
.unwrap();
// sleep so that the holes are ready for repair
sleep(Duration::from_secs(1));
let expected: Vec<RepairType> = (0..num_slots)
@ -506,7 +508,7 @@ mod test {
// Remove last shred (which is also last in slot) so that slot is not complete
shreds.pop();
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// We didn't get the last blob for this slot, so ask for the highest blob for that slot
let expected: Vec<RepairType> =
@ -532,7 +534,7 @@ mod test {
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot);
for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0);
blocktree.insert_shreds(slot_shreds, None).unwrap();
blocktree.insert_shreds(slot_shreds, None, false).unwrap();
}
// sleep to make slot eligible for repair
sleep(Duration::from_secs(1));
@ -585,7 +587,7 @@ mod test {
let parent = if i > 0 { i - 1 } else { 0 };
let (shreds, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
}
let end = 4;
@ -638,9 +640,9 @@ mod test {
.collect();
let mut full_slots = BTreeSet::new();
blocktree.insert_shreds(fork1_shreds, None).unwrap();
blocktree.insert_shreds(fork1_shreds, None, false).unwrap();
blocktree
.insert_shreds(fork2_incomplete_shreds, None)
.insert_shreds(fork2_incomplete_shreds, None, false)
.unwrap();
// Test that only slots > root from fork1 were included
@ -664,7 +666,7 @@ mod test {
.into_iter()
.flat_map(|(shreds, _)| shreds)
.collect();
blocktree.insert_shreds(fork3_shreds, None).unwrap();
blocktree.insert_shreds(fork3_shreds, None, false).unwrap();
RepairService::get_completed_slots_past_root(
&blocktree,
&mut full_slots,
@ -711,7 +713,9 @@ mod test {
let step = rng.gen_range(1, max_step + 1) as usize;
let step = std::cmp::min(step, num_shreds - i);
let shreds_to_insert = shreds.drain(..step).collect_vec();
blocktree_.insert_shreds(shreds_to_insert, None).unwrap();
blocktree_
.insert_shreds(shreds_to_insert, None, false)
.unwrap();
sleep(Duration::from_millis(repair_interval_ms));
i += step;
}
@ -741,7 +745,7 @@ mod test {
// Update with new root, should filter out the slots <= root
root = num_slots / 2;
let (shreds, _) = make_slot_entries(num_slots + 2, num_slots + 1, entries_per_slot);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
RepairService::update_epoch_slots(
Pubkey::default(),
root,

View File

@ -973,7 +973,7 @@ mod test {
// Insert blob for slot 1, generate new forks, check result
let (shreds, _) = make_slot_entries(1, 0, 8);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
@ -984,7 +984,7 @@ mod test {
// Insert blob for slot 3, generate new forks, check result
let (shreds, _) = make_slot_entries(2, 0, 8);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
@ -1230,7 +1230,7 @@ mod test {
let last_blockhash = bank0.last_blockhash();
progress.insert(bank0.slot(), ForkProgress::new(0, last_blockhash));
let shreds = shred_to_insert(&mint_keypair, bank0.clone());
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let (res, _tx_count) =
ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut progress);

View File

@ -132,7 +132,8 @@ where
}
}
let blocktree_insert_metrics = blocktree.insert_shreds(shreds, Some(leader_schedule_cache))?;
let blocktree_insert_metrics =
blocktree.insert_shreds(shreds, Some(leader_schedule_cache), false)?;
blocktree_insert_metrics.report_metrics("recv-window-insert-shreds");
trace!(
@ -322,7 +323,7 @@ mod test {
let mut shreds = local_entries_to_shred(&original_entries, 0, 0, &Arc::new(Keypair::new()));
shreds.reverse();
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expect successful processing of shred");
assert_eq!(

View File

@ -444,6 +444,7 @@ impl Blocktree {
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<BlocktreeInsertionMetrics> {
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blocktree lock");
@ -465,30 +466,29 @@ impl Blocktree {
let mut num_inserted = 0;
let mut index_meta_time = 0;
shreds.into_iter().for_each(|shred| {
let insert_success = {
if shred.is_data() {
self.check_insert_data_shred(
shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
)
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
)
} else {
panic!("There should be no other case");
if shred.is_data() {
if self.check_insert_data_shred(
shred,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
) {
num_inserted += 1;
}
};
if insert_success {
num_inserted += 1;
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
is_trusted,
);
} else {
panic!("There should be no other case");
}
});
start.stop();
@ -516,6 +516,7 @@ impl Blocktree {
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
);
}
}
@ -533,6 +534,7 @@ impl Blocktree {
&mut write_batch,
&mut index_meta_time,
);
num_inserted += 1;
});
let mut start = Measure::start("Shred recovery");
@ -619,6 +621,7 @@ impl Blocktree {
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
@ -629,7 +632,9 @@ impl Blocktree {
let index_meta = &mut index_meta_working_set_entry.index;
// This gives the index of first coding shred in this FEC block
// So, all coding shreds in a given FEC block will have the same set index
if Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root) {
if is_trusted
|| Blocktree::should_insert_coding_shred(&shred, index_meta.coding(), &self.last_root)
{
let set_index = shred_index - u64::from(shred.coding_header.position);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
@ -670,6 +675,7 @@ impl Blocktree {
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
) -> bool {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
@ -683,12 +689,14 @@ impl Blocktree {
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
if Blocktree::should_insert_data_shred(
&shred,
slot_meta,
index_meta.data(),
&self.last_root,
) {
if is_trusted
|| Blocktree::should_insert_data_shred(
&shred,
slot_meta,
index_meta.data(),
&self.last_root,
)
{
if let Ok(()) =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)
{
@ -977,7 +985,7 @@ impl Blocktree {
}
let num_shreds = all_shreds.len();
self.insert_shreds(all_shreds, None)?;
self.insert_shreds(all_shreds, None, false)?;
Ok(num_shreds)
}
@ -1762,7 +1770,7 @@ pub fn create_new_ledger(ledger_path: &Path, genesis_config: &GenesisConfig) ->
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());
blocktree.insert_shreds(shreds, None)?;
blocktree.insert_shreds(shreds, None, false)?;
blocktree.set_roots(&[0])?;
Ok(last_hash)
@ -1985,7 +1993,7 @@ pub mod tests {
let last_shred = shreds.pop().unwrap();
assert!(last_shred.index() > 0);
ledger
.insert_shreds(vec![last_shred.clone()], None)
.insert_shreds(vec![last_shred.clone()], None, false)
.unwrap();
let serialized_shred = ledger
@ -2158,7 +2166,7 @@ pub mod tests {
let ledger_path = get_tmp_ledger_path!();
let ledger = Blocktree::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
let mut buf = [0; 4096];
let (_, bytes) = ledger.get_data_shreds(slot, 0, 1, &mut buf).unwrap();
@ -2225,7 +2233,7 @@ pub mod tests {
// shreds starting from slot 0, index 0 should exist.
assert!(shreds.len() > 1);
let last_shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![last_shred], None).unwrap();
ledger.insert_shreds(vec![last_shred], None, false).unwrap();
assert!(ledger.get_slot_entries(0, 0, None).unwrap().is_empty());
let meta = ledger
@ -2235,7 +2243,7 @@ pub mod tests {
assert!(meta.consumed == 0 && meta.received == num_shreds);
// Insert the other shreds, check for consecutive returned entries
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
let result = ledger.get_slot_entries(0, 0, None).unwrap();
assert_eq!(result, entries);
@ -2269,7 +2277,7 @@ pub mod tests {
// Insert shreds in reverse, check for consecutive returned shreds
for i in (0..num_shreds).rev() {
let shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![shred], None).unwrap();
ledger.insert_shreds(vec![shred], None, false).unwrap();
let result = ledger.get_slot_entries(0, 0, None).unwrap();
let meta = ledger
@ -2347,7 +2355,7 @@ pub mod tests {
let entries = create_ticks(8, 0, Hash::default());
let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false);
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false);
@ -2355,7 +2363,7 @@ pub mod tests {
b.set_index(8 + i as u32);
}
blocktree
.insert_shreds(shreds1, None)
.insert_shreds(shreds1, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
@ -2389,7 +2397,7 @@ pub mod tests {
index += 1;
}
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
blocktree
@ -2422,7 +2430,7 @@ pub mod tests {
entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false);
assert!(shreds.len() as u64 >= shreds_per_slot);
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), entries);
}
@ -2457,7 +2465,7 @@ pub mod tests {
}
}
blocktree.insert_shreds(odd_shreds, None).unwrap();
blocktree.insert_shreds(odd_shreds, None, false).unwrap();
assert_eq!(blocktree.get_slot_entries(slot, 0, None).unwrap(), vec![]);
@ -2475,7 +2483,7 @@ pub mod tests {
assert_eq!(meta.last_index, std::u64::MAX);
}
blocktree.insert_shreds(even_shreds, None).unwrap();
blocktree.insert_shreds(even_shreds, None, false).unwrap();
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap(),
@ -2508,13 +2516,17 @@ pub mod tests {
// Discard first shred
original_shreds.remove(0);
blocktree.insert_shreds(original_shreds, None).unwrap();
blocktree
.insert_shreds(original_shreds, None, false)
.unwrap();
assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), vec![]);
let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true);
let num_shreds = duplicate_shreds.len() as u64;
blocktree.insert_shreds(duplicate_shreds, None).unwrap();
blocktree
.insert_shreds(duplicate_shreds, None, false)
.unwrap();
assert_eq!(
blocktree.get_slot_entries(0, 0, None).unwrap(),
@ -2544,16 +2556,20 @@ pub mod tests {
// Insert second shred, but we're missing the first shred, so no consecutive
// shreds starting from slot 0, index 0 should exist.
ledger.insert_shreds(vec![shreds.remove(1)], None).unwrap();
ledger
.insert_shreds(vec![shreds.remove(1)], None, false)
.unwrap();
let timer = Duration::new(1, 0);
assert!(recvr.recv_timeout(timer).is_err());
// Insert first shred, now we've made a consecutive block
ledger.insert_shreds(vec![shreds.remove(0)], None).unwrap();
ledger
.insert_shreds(vec![shreds.remove(0)], None, false)
.unwrap();
// Wait to get notified of update, should only be one update
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
// Insert the rest of the ticks
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
// Wait to get notified of update, should only be one update
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
@ -2572,7 +2588,7 @@ pub mod tests {
}
// Should be no updates, since no new chains from block 0 were formed
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
// Insert a shred for each slot that doesn't make a consecutive block, we
@ -2585,7 +2601,7 @@ pub mod tests {
})
.collect();
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
// For slots 1..num_slots/2, fill in the holes in one batch insertion,
@ -2593,13 +2609,13 @@ pub mod tests {
let missing_shreds2 = missing_shreds
.drain((num_slots / 2) as usize..)
.collect_vec();
ledger.insert_shreds(missing_shreds, None).unwrap();
ledger.insert_shreds(missing_shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
// Fill in the holes for each of the remaining slots, we should get a single update
// for each
ledger.insert_shreds(missing_shreds2, None).unwrap();
ledger.insert_shreds(missing_shreds2, None, false).unwrap();
// Destroying database without closing it first is undefined behavior
drop(ledger);
@ -2620,11 +2636,11 @@ pub mod tests {
let shred0 = shreds.remove(0);
// Insert all but the first shred in the slot, should not be considered complete
ledger.insert_shreds(shreds, None).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.try_recv().is_err());
// Insert first shred, slot should now be considered complete
ledger.insert_shreds(vec![shred0], None).unwrap();
ledger.insert_shreds(vec![shred0], None, false).unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![0]);
}
@ -2647,20 +2663,24 @@ pub mod tests {
// Insert all but the first shred in the slot, should not be considered complete
let orphan_child0 = orphan_child.remove(0);
ledger.insert_shreds(orphan_child, None).unwrap();
ledger.insert_shreds(orphan_child, None, false).unwrap();
assert!(recvr.try_recv().is_err());
// Insert first shred, slot should now be considered complete
ledger.insert_shreds(vec![orphan_child0], None).unwrap();
ledger
.insert_shreds(vec![orphan_child0], None, false)
.unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]);
// Insert the shreds for the orphan_slot
let orphan_shred0 = orphan_shreds.remove(0);
ledger.insert_shreds(orphan_shreds, None).unwrap();
ledger.insert_shreds(orphan_shreds, None, false).unwrap();
assert!(recvr.try_recv().is_err());
// Insert first shred, slot should now be considered complete
ledger.insert_shreds(vec![orphan_shred0], None).unwrap();
ledger
.insert_shreds(vec![orphan_shred0], None, false)
.unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]);
}
@ -2687,7 +2707,7 @@ pub mod tests {
.collect();
all_shreds.shuffle(&mut thread_rng());
ledger.insert_shreds(all_shreds, None).unwrap();
ledger.insert_shreds(all_shreds, None, false).unwrap();
let mut result = recvr.try_recv().unwrap();
result.sort();
slots.push(disconnected_slot);
@ -2711,7 +2731,7 @@ pub mod tests {
let shreds1 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blocktree.insert_shreds(shreds1, None).unwrap();
blocktree.insert_shreds(shreds1, None, false).unwrap();
let s1 = blocktree.meta(1).unwrap().unwrap();
assert!(s1.next_slots.is_empty());
// Slot 1 is not trunk because slot 0 hasn't been inserted yet
@ -2723,7 +2743,7 @@ pub mod tests {
let shreds2 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blocktree.insert_shreds(shreds2, None).unwrap();
blocktree.insert_shreds(shreds2, None, false).unwrap();
let s2 = blocktree.meta(2).unwrap().unwrap();
assert!(s2.next_slots.is_empty());
// Slot 2 is not trunk because slot 0 hasn't been inserted yet
@ -2741,7 +2761,7 @@ pub mod tests {
// 3) Write to the zeroth slot, check that every slot
// is now part of the trunk
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
for i in 0..3 {
let s = blocktree.meta(i).unwrap().unwrap();
// The last slot will not chain to any other slots
@ -2791,7 +2811,7 @@ pub mod tests {
}
// Write the shreds for every other slot
blocktree.insert_shreds(slots, None).unwrap();
blocktree.insert_shreds(slots, None, false).unwrap();
// Check metadata
for i in 0..num_slots {
@ -2817,7 +2837,7 @@ pub mod tests {
}
// Write the shreds for the other half of the slots that we didn't insert earlier
blocktree.insert_shreds(missing_slots, None).unwrap();
blocktree.insert_shreds(missing_slots, None, false).unwrap();
for i in 0..num_slots {
// Check that all the slots chain correctly once the missing slots
@ -2863,9 +2883,13 @@ pub mod tests {
if slot % 3 == 0 {
let shred0 = shreds_for_slot.remove(0);
missing_shreds.push(shred0);
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
blocktree
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
} else {
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
blocktree
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
}
}
@ -2900,7 +2924,7 @@ pub mod tests {
for slot_index in 0..num_slots {
if slot_index % 3 == 0 {
let shred = missing_shreds.remove(0);
blocktree.insert_shreds(vec![shred], None).unwrap();
blocktree.insert_shreds(vec![shred], None, false).unwrap();
for i in 0..num_slots {
let s = blocktree.meta(i as u64).unwrap().unwrap();
@ -3081,7 +3105,9 @@ pub mod tests {
// Write slot 2, which chains to slot 1. We're missing slot 0,
// so slot 1 is the orphan
let shreds_for_slot = shreds.drain((shreds_per_slot * 2)..).collect_vec();
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
blocktree
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
@ -3092,7 +3118,9 @@ pub mod tests {
// Write slot 1 which chains to slot 0, so now slot 0 is the
// orphan, and slot 1 is no longer the orphan.
let shreds_for_slot = shreds.drain(shreds_per_slot..).collect_vec();
blocktree.insert_shreds(shreds_for_slot, None).unwrap();
blocktree
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
let meta = blocktree
.meta(1)
.expect("Expect database get to succeed")
@ -3109,12 +3137,12 @@ pub mod tests {
// nothing should change
let (shred4, _) = make_slot_entries(4, 0, 1);
let (shred5, _) = make_slot_entries(5, 1, 1);
blocktree.insert_shreds(shred4, None).unwrap();
blocktree.insert_shreds(shred5, None).unwrap();
blocktree.insert_shreds(shred4, None, false).unwrap();
blocktree.insert_shreds(shred5, None, false).unwrap();
assert_eq!(blocktree.get_orphans(None), vec![0]);
// Write zeroth slot, no more orphans
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
for i in 0..3 {
let meta = blocktree
.meta(i)
@ -3160,11 +3188,11 @@ pub mod tests {
let num_shreds = shreds.len();
// Write shreds to the database
if should_bulk_write {
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
} else {
for _ in 0..num_shreds {
let shred = shreds.remove(0);
blocktree.insert_shreds(vec![shred], None).unwrap();
blocktree.insert_shreds(vec![shred], None, false).unwrap();
}
}
@ -3208,7 +3236,7 @@ pub mod tests {
s.set_index(i as u32 * gap as u32);
s.set_slot(slot);
}
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
// Index of the first shred is 0
// Index of the second shred is "gap"
@ -3290,7 +3318,7 @@ pub mod tests {
let shreds: Vec<_> = (0..64)
.map(|i| Shred::new_from_data(slot, (i * gap) as u32, 0, None, false, false, i as u8))
.collect();
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let empty: Vec<u64> = vec![];
assert_eq!(
@ -3333,7 +3361,7 @@ pub mod tests {
shreds[1].set_index(OTHER as u32);
// Insert one shred at index = first_index
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
const STARTS: u64 = OTHER * 2;
const END: u64 = OTHER * 3;
@ -3367,7 +3395,7 @@ pub mod tests {
let shreds = entries_to_test_shreds(entries, slot, 0, true);
let num_shreds = shreds.len();
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_shreds as u64 {
@ -3394,7 +3422,7 @@ pub mod tests {
// Insert the first 5 shreds, we don't have a "is_last" shred yet
blocktree
.insert_shreds(shreds[0..5].to_vec(), None)
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
// Trying to insert a shred less than `slot_meta.consumed` should fail
@ -3414,7 +3442,7 @@ pub mod tests {
// Trying to insert the same shred again should fail
// skip over shred 5 so the `slot_meta.consumed` doesn't increment
blocktree
.insert_shreds(shreds[6..7].to_vec(), None)
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
@ -3431,7 +3459,7 @@ pub mod tests {
// Trying to insert another "is_last" shred with index < the received index should fail
// skip over shred 7
blocktree
.insert_shreds(shreds[8..9].to_vec(), None)
.insert_shreds(shreds[8..9].to_vec(), None, false)
.unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
@ -3451,7 +3479,7 @@ pub mod tests {
// Insert all pending shreds
let mut shred8 = shreds[8].clone();
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
@ -3494,7 +3522,7 @@ pub mod tests {
// Insertion should succeed
blocktree
.insert_shreds(vec![coding_shred.clone()], None)
.insert_shreds(vec![coding_shred.clone()], None, false)
.unwrap();
// Trying to insert the same shred again should fail
@ -3601,7 +3629,9 @@ pub mod tests {
));
// Insertion should succeed
blocktree.insert_shreds(vec![coding_shred], None).unwrap();
blocktree
.insert_shreds(vec![coding_shred], None, false)
.unwrap();
}
// Trying to insert value into slot <= than last root should fail
@ -3631,7 +3661,7 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
@ -3640,7 +3670,7 @@ pub mod tests {
assert!(slot_meta.is_full());
let (shreds, _) = make_slot_entries(0, 0, 22);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
@ -3662,7 +3692,7 @@ pub mod tests {
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot);
let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds {
blocktree.insert_shreds(slot_shreds, None).unwrap();
blocktree.insert_shreds(slot_shreds, None, false).unwrap();
}
// Slot doesnt exist, iterator should be empty
@ -3707,7 +3737,7 @@ pub mod tests {
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 6);
let shreds_per_slot = shreds.len() as u64 / 50;
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
blocktree
.slot_meta_iterator(0)
.unwrap()
@ -3742,7 +3772,7 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
blocktree.purge_slots(0, Some(5));
@ -3768,7 +3798,7 @@ pub mod tests {
let blocktree_path = get_tmp_ledger_path!();
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let (shreds, _) = make_many_slot_entries(0, 5000, 10);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
blocktree.purge_slots(0, Some(4999));
@ -3882,7 +3912,7 @@ pub mod tests {
let shreds = entries_to_test_shreds(entries, slot, 0, false);
let next_shred_index = shreds.len();
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
blocktree.get_slot_entries(slot, 0, None).unwrap().len() as u64,
@ -3903,7 +3933,7 @@ pub mod tests {
// With the corruption, nothing should be returned, even though an
// earlier data block was valid
blocktree
.insert_shreds(shreds, None)
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert!(blocktree.get_slot_entries(slot, 0, None).is_err());
}
@ -3921,7 +3951,7 @@ pub mod tests {
// Insert the first 5 shreds, we don't have a "is_last" shred yet
blocktree
.insert_shreds(shreds0[0..5].to_vec(), None)
.insert_shreds(shreds0[0..5].to_vec(), None, false)
.unwrap();
// Insert a repetitive shred for slot 's', should get ignored, but also
@ -3931,13 +3961,37 @@ pub mod tests {
let (mut shreds3, _) = make_slot_entries(3, 0, 200);
shreds2.push(shreds0[1].clone());
shreds3.insert(0, shreds0[1].clone());
blocktree.insert_shreds(shreds2, None).unwrap();
blocktree.insert_shreds(shreds2, None, false).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2]);
blocktree.insert_shreds(shreds3, None).unwrap();
blocktree.insert_shreds(shreds3, None, false).unwrap();
let slot_meta = blocktree.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2, 3]);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
fn test_trusted_insert_shreds() {
// Make shred for slot 1
let (shreds1, _) = make_slot_entries(1, 0, 1);
let blocktree_path = get_tmp_ledger_path!();
let last_root = 100;
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
blocktree.set_roots(&[last_root]).unwrap();
// Insert will fail, slot < root
blocktree
.insert_shreds(shreds1.clone()[..].to_vec(), None, false)
.unwrap();
assert!(blocktree.get_data_shred(1, 0).unwrap().is_none());
// Insert through trusted path will succeed
blocktree
.insert_shreds(shreds1[..].to_vec(), None, true)
.unwrap();
assert!(blocktree.get_data_shred(1, 0).unwrap().is_some());
}
}
}

View File

@ -438,7 +438,7 @@ mod tests {
// Write a blob into slot 2 that chains to slot 1,
// but slot 1 is empty so should not be skipped
let (shreds, _) = make_slot_entries(2, 1, 1);
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
assert_eq!(
cache
.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree))
@ -451,7 +451,7 @@ mod tests {
let (shreds, _) = make_slot_entries(1, 0, 1);
// Check that slot 1 and 2 are skipped
blocktree.insert_shreds(shreds, None).unwrap();
blocktree.insert_shreds(shreds, None, false).unwrap();
assert_eq!(
cache
.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree))

View File

@ -25,7 +25,7 @@ fn test_multiple_threads_insert_shred() {
Builder::new()
.name("blocktree-writer".to_string())
.spawn(move || {
blocktree_.insert_shreds(shreds, None).unwrap();
blocktree_.insert_shreds(shreds, None, false).unwrap();
})
.unwrap()
})