diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 119da4591..0e800004c 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -50,7 +50,7 @@ impl StandardBroadcastRun { fn check_for_interrupted_slot(&mut self) -> Option { let (slot, _) = self.current_slot_and_parent.unwrap(); - let last_unfinished_slot_shred = self + let mut last_unfinished_slot_shred = self .unfinished_slot .map(|last_unfinished_slot| { if last_unfinished_slot.slot != slot { @@ -70,7 +70,8 @@ impl StandardBroadcastRun { .unwrap_or(None); // This shred should only be Some if the previous slot was interrupted - if last_unfinished_slot_shred.is_some() { + if let Some(ref mut shred) = last_unfinished_slot_shred { + Shredder::sign_shred(&self.keypair, shred); self.unfinished_slot = None; } @@ -331,6 +332,38 @@ mod test { ) } + #[test] + fn test_interrupted_slot_last_shred() { + let keypair = Arc::new(Keypair::new()); + let mut run = StandardBroadcastRun::new(keypair.clone()); + + // Set up the slot to be interrupted + let next_shred_index = 10; + let slot = 1; + let parent = 0; + run.unfinished_slot = Some(UnfinishedSlotInfo { + next_shred_index, + slot, + parent, + }); + run.slot_broadcast_start = Some(Instant::now()); + + // Set up a slot to interrupt the old slot + run.current_slot_and_parent = Some((4, 2)); + + // Slot 2 interrupted slot 1 + let shred = run + .check_for_interrupted_slot() + .expect("Expected a shred that signals an interrupt"); + + // Validate the shred + assert_eq!(shred.parent(), parent); + assert_eq!(shred.slot(), slot); + assert_eq!(shred.index(), next_shred_index); + assert!(shred.is_data()); + assert!(shred.verify(&keypair.pubkey())); + } + #[test] fn test_slot_interrupt() { // Setup @@ -339,12 +372,12 @@ mod test { setup(num_shreds_per_slot); // Insert 1 less than the number of ticks needed to finish the slot - let ticks = create_ticks(genesis_block.ticks_per_slot - 1, 0, genesis_block.hash()); + let ticks0 = create_ticks(genesis_block.ticks_per_slot - 1, 0, genesis_block.hash()); let receive_results = ReceiveResults { - entries: ticks.clone(), + entries: ticks0.clone(), time_elapsed: Duration::new(3, 0), bank: bank0.clone(), - last_tick_height: (ticks.len() - 1) as u64, + last_tick_height: (ticks0.len() - 1) as u64, }; // Step 1: Make an incomplete transmission for slot 0 @@ -362,7 +395,7 @@ mod test { standard_broadcast_run.stats.receive_elapsed = 10; // Try to fetch ticks from blocktree, nothing should break - assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks); + assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks0); assert_eq!( blocktree .get_slot_entries(0, num_shreds_per_slot, None) @@ -377,12 +410,12 @@ mod test { // Interrupting the slot should cause the unfinished_slot and stats to reset let num_shreds = 1; assert!(num_shreds < num_shreds_per_slot); - let ticks = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_block.hash()); + let ticks1 = create_ticks(max_ticks_per_n_shreds(num_shreds), 0, genesis_block.hash()); let receive_results = ReceiveResults { - entries: ticks.clone(), + entries: ticks1.clone(), time_elapsed: Duration::new(2, 0), bank: bank2.clone(), - last_tick_height: (ticks.len() - 1) as u64, + last_tick_height: (ticks1.len() - 1) as u64, }; standard_broadcast_run .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) @@ -394,8 +427,18 @@ mod test { assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds); assert_eq!(unfinished_slot.slot, 2); assert_eq!(unfinished_slot.parent, 0); + // Check that the stats were reset as well assert_eq!(standard_broadcast_run.stats.receive_elapsed, 0); + + // Try to fetch the incomplete ticks from blocktree, should succeed + assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks0); + assert_eq!( + blocktree + .get_slot_entries(0, num_shreds_per_slot, None) + .unwrap(), + vec![], + ); } #[test] diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 8af9a70e8..ab74bce48 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -3804,7 +3804,7 @@ pub mod tests { slot, next_shred_index as u32, 1, - None, + Some(&[1, 1, 1]), true, true, )]; diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 939976812..360ff32fc 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -160,22 +160,23 @@ impl Shred { data_header.flags |= LAST_SHRED_IN_SLOT } + let mut start = 0; + Self::serialize_obj_into( + &mut start, + SIZE_OF_COMMON_SHRED_HEADER, + &mut payload, + &common_header, + ) + .expect("Failed to write header into shred buffer"); + Self::serialize_obj_into( + &mut start, + SIZE_OF_DATA_SHRED_HEADER, + &mut payload, + &data_header, + ) + .expect("Failed to write data header into shred buffer"); + if let Some(data) = data { - let mut start = 0; - Self::serialize_obj_into( - &mut start, - SIZE_OF_COMMON_SHRED_HEADER, - &mut payload, - &common_header, - ) - .expect("Failed to write header into shred buffer"); - Self::serialize_obj_into( - &mut start, - SIZE_OF_DATA_SHRED_HEADER, - &mut payload, - &data_header, - ) - .expect("Failed to write data header into shred buffer"); payload[start..start + data.len()].clone_from_slice(data); }