diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index ae516ee25..6d7993c48 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -751,33 +751,7 @@ impl Blocktree { } let last_root = *last_root.read().unwrap(); - if !is_valid_write_to_slot_0(slot, slot_meta.parent_slot, last_root) { - // Check that the parent_slot < slot - if slot_meta.parent_slot >= slot { - datapoint_error!( - "blocktree_error", - ( - "error", - format!( - "Received blob with parent_slot {} >= slot {}", - slot_meta.parent_slot, slot - ), - String - ) - ); - return false; - } - - // Check that the blob is for a slot that is past the root - if slot <= last_root { - return false; - } - - // Ignore blobs that chain to slots before the last root - if slot_meta.parent_slot < last_root { - return false; - } - } + verify_shred_slots(slot, slot_meta.parent_slot, last_root); true } @@ -1699,6 +1673,24 @@ macro_rules! create_new_tmp_ledger { }; } +pub fn verify_shred_slots(slot: u64, parent_slot: u64, last_root: u64) -> bool { + if !is_valid_write_to_slot_0(slot, parent_slot, last_root) { + // Check that the parent_slot < slot + if parent_slot >= slot { + return false; + } + + // Ignore blobs that chain to slots before the last root + if parent_slot < last_root { + return false; + } + + // Above two checks guarantee that by this point, slot > last_root + } + + true +} + // Same as `create_new_ledger()` but use a temporary ledger name based on the provided `name` // // Note: like `create_new_ledger` the returned ledger will have slot 0 full of ticks (and only diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 137b14f66..6eac3d7c1 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -477,7 +477,7 @@ impl Replicator { &exit, RepairStrategy::RepairRange(repair_slot_range), &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _| true, + |_, _, _, _, _| true, ); info!("waiting for ledger download"); Self::wait_for_segment_download( diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 5dee03e18..9c491cdfe 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -151,13 +151,14 @@ impl RetransmitStage { exit, repair_strategy, &leader_schedule_cache.clone(), - move |id, shred, shred_buf, working_bank| { + move |id, shred, shred_buf, working_bank, last_root| { should_retransmit_and_persist( shred, shred_buf, working_bank, &leader_schedule_cache, id, + last_root, ) }, ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index b6e2a8b21..b56445905 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -1,7 +1,7 @@ //! `window_service` handles the data plane incoming blobs, storing them in //! blocktree and retransmitting where required //! -use crate::blocktree::Blocktree; +use crate::blocktree::{self, Blocktree}; use crate::cluster_info::ClusterInfo; use crate::leader_schedule_cache::LeaderScheduleCache; use crate::repair_service::{RepairService, RepairStrategy}; @@ -33,16 +33,19 @@ pub fn should_retransmit_and_persist( bank: Option>, leader_schedule_cache: &Arc, my_pubkey: &Pubkey, + root: u64, ) -> bool { let slot_leader_pubkey = match bank { None => leader_schedule_cache.slot_leader_at(shred.slot(), None), Some(bank) => leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)), }; - if let Some(leader_id) = slot_leader_pubkey { if leader_id == *my_pubkey { inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1); false + } else if !blocktree::verify_shred_slots(shred.slot(), shred.parent(), root) { + inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1); + false } else if !shred.fast_verify(&shred_buf, &leader_id) { inc_new_counter_debug!("streamer-recv_window-invalid_signature", 1); false @@ -65,7 +68,7 @@ fn recv_window( leader_schedule_cache: &Arc, ) -> Result<()> where - F: Fn(&Shred, &[u8]) -> bool, + F: Fn(&Shred, &[u8], u64) -> bool, F: Sync, { let timer = Duration::from_millis(200); @@ -77,6 +80,7 @@ where let now = Instant::now(); inc_new_counter_debug!("streamer-recv_window-recv", packets.packets.len()); + let last_root = blocktree.last_root(); let (shreds, packets_ix): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets .packets @@ -85,7 +89,7 @@ where .filter_map(|(i, packet)| { if let Ok(s) = bincode::deserialize(&packet.data) { let shred: Shred = s; - if shred_filter(&shred, &packet.data) { + if shred_filter(&shred, &packet.data, last_root) { packet.meta.slot = shred.slot(); packet.meta.seed = shred.seed(); Some((shred, i)) @@ -175,7 +179,7 @@ impl WindowService { ) -> WindowService where F: 'static - + Fn(&Pubkey, &Shred, &[u8], Option>) -> bool + + Fn(&Pubkey, &Shred, &[u8], Option>, u64) -> bool + std::marker::Send + std::marker::Sync, { @@ -219,7 +223,7 @@ impl WindowService { &id, &r, &retransmit, - |shred, shred_buf| { + |shred, shred_buf, last_root| { shred_filter( &id, shred, @@ -227,6 +231,7 @@ impl WindowService { bank_forks .as_ref() .map(|bank_forks| bank_forks.read().unwrap().working_bank()), + last_root, ) }, &thread_pool, @@ -297,9 +302,14 @@ mod test { use std::thread::sleep; use std::time::Duration; - fn local_entries_to_shred(entries: Vec, keypair: &Arc) -> Vec { + fn local_entries_to_shred( + entries: Vec, + slot: u64, + parent: u64, + keypair: &Arc, + ) -> Vec { let mut shredder = - Shredder::new(0, 0, 0.0, keypair, 0).expect("Failed to create entry shredder"); + Shredder::new(slot, parent, 0.0, keypair, 0).expect("Failed to create entry shredder"); bincode::serialize_into(&mut shredder, &entries) .expect("Expect to write all entries to shreds"); shredder.finalize_slot(); @@ -313,7 +323,7 @@ mod test { let num_entries = 10; let original_entries = make_tiny_test_entries(num_entries); let mut shreds = - local_entries_to_shred(original_entries.clone(), &Arc::new(Keypair::new())); + local_entries_to_shred(original_entries.clone(), 0, 0, &Arc::new(Keypair::new())); shreds.reverse(); blocktree .insert_shreds(shreds, None) @@ -331,15 +341,14 @@ mod test { #[test] fn test_should_retransmit_and_persist() { let me_id = Pubkey::new_rand(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let bank = Arc::new(Bank::new( &create_genesis_block_with_leader(100, &leader_pubkey, 10).genesis_block, )); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); - let entry = Entry::default(); - let mut shreds = local_entries_to_shred(vec![entry], &Arc::new(leader_keypair)); + let mut shreds = local_entries_to_shred(vec![Entry::default()], 0, 0, &leader_keypair); let shred_bufs: Vec<_> = shreds .iter() .map(|s| bincode::serialize(s).unwrap()) @@ -352,7 +361,8 @@ mod test { &shred_bufs[0], Some(bank.clone()), &cache, - &me_id + &me_id, + 0, ), true ); @@ -368,7 +378,46 @@ mod test { // with a Bank and no idea who leader is, blob gets thrown out shreds[0].set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); assert_eq!( - should_retransmit_and_persist(&shreds[0], &shred_bufs[0], Some(bank), &cache, &me_id), + should_retransmit_and_persist( + &shreds[0], + &shred_bufs[0], + Some(bank.clone()), + &cache, + &me_id, + 0 + ), + false + ); + + // with a shred where shred.slot() == root, blob gets thrown out + let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; + let shreds = + local_entries_to_shred(vec![Entry::default()], slot, slot - 1, &leader_keypair); + assert_eq!( + should_retransmit_and_persist( + &shreds[0], + &shred_bufs[0], + Some(bank.clone()), + &cache, + &me_id, + slot + ), + false + ); + + // with a shred where shred.parent() < root, blob gets thrown out + let slot = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; + let shreds = + local_entries_to_shred(vec![Entry::default()], slot + 1, slot - 1, &leader_keypair); + assert_eq!( + should_retransmit_and_persist( + &shreds[0], + &shred_bufs[0], + Some(bank.clone()), + &cache, + &me_id, + slot + ), false ); @@ -429,7 +478,7 @@ mod test { &exit, repair_strategy, &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _| true, + |_, _, _, _, _| true, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -519,7 +568,7 @@ mod test { &exit, repair_strategy, &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _| true, + |_, _, _, _, _| true, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -580,7 +629,7 @@ mod test { &exit, RepairStrategy::RepairRange(RepairSlotRange { start: 0, end: 0 }), &Arc::new(LeaderScheduleCache::default()), - |_, _, _, _| true, + |_, _, _, _, _| true, ); window }