diff --git a/core/benches/shredder.rs b/core/benches/shredder.rs index 48813115c6..1ee7deac7c 100644 --- a/core/benches/shredder.rs +++ b/core/benches/shredder.rs @@ -4,7 +4,7 @@ extern crate test; use solana_core::entry::create_ticks; use solana_core::shred::{ - max_ticks_per_shred, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER, + max_ticks_per_n_shreds, Shredder, RECOMMENDED_FEC_RATE, SIZE_OF_DATA_SHRED_HEADER, }; use solana_sdk::hash::Hash; use solana_sdk::packet::PACKET_DATA_SIZE; @@ -18,7 +18,7 @@ fn bench_shredder(bencher: &mut Bencher) { let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; let num_shreds = ((1000 * 1000) + (shred_size - 1)) / shred_size; // ~1Mb - let num_ticks = max_ticks_per_shred() * num_shreds as u64; + let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; let entries = create_ticks(num_ticks, Hash::default()); bencher.iter(|| { let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp.clone()).unwrap(); @@ -32,7 +32,7 @@ fn bench_deshredder(bencher: &mut Bencher) { let shred_size = PACKET_DATA_SIZE - *SIZE_OF_DATA_SHRED_HEADER; // ~10Mb let num_shreds = ((10000 * 1000) + (shred_size - 1)) / shred_size; - let num_ticks = max_ticks_per_shred() * num_shreds as u64; + let num_ticks = max_ticks_per_n_shreds(1) * num_shreds as u64; let entries = create_ticks(num_ticks, Hash::default()); let shredder = Shredder::new(1, 0, RECOMMENDED_FEC_RATE, kp).unwrap(); let data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index db59cf15fc..7a5b87c62c 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -1653,7 +1653,7 @@ pub mod tests { use super::*; use crate::entry::{create_ticks, Entry}; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; - use crate::shred::max_ticks_per_shred; + use crate::shred::max_ticks_per_n_shreds; use itertools::Itertools; use rand::seq::SliceRandom; use rand::thread_rng; @@ -1682,7 +1682,7 @@ pub mod tests { #[test] fn test_insert_get_bytes() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_shred() + 1; + let num_entries = max_ticks_per_n_shreds(1) + 1; assert!(num_entries > 1); let (mut shreds, _) = make_slot_entries(0, 0, num_entries); @@ -1921,7 +1921,7 @@ pub mod tests { #[test] fn test_insert_data_shreds_basic() { // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_shred() + 1; + let num_entries = max_ticks_per_n_shreds(1) + 1; assert!(num_entries > 1); let (mut shreds, entries) = make_slot_entries(0, 0, num_entries); @@ -2144,7 +2144,7 @@ pub mod tests { { let blocktree = Blocktree::open(&blocktree_path).unwrap(); // Create enough entries to ensure there are at least two shreds created - let min_entries = max_ticks_per_shred() + 1; + let min_entries = max_ticks_per_n_shreds(1) + 1; for i in 0..4 { let slot = i; let parent_slot = if i == 0 { 0 } else { i - 1 }; @@ -2557,7 +2557,7 @@ pub mod tests { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let num_slots = 15; // Create enough entries to ensure there are at least two shreds created - let entries_per_slot = max_ticks_per_shred() + 1; + let entries_per_slot = max_ticks_per_n_shreds(1) + 1; assert!(entries_per_slot > 1); let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot); @@ -2907,7 +2907,7 @@ pub mod tests { let gap: u64 = 10; assert!(gap > 3); // Create enough entries to ensure there are at least two shreds created - let num_entries = max_ticks_per_shred() + 1; + let num_entries = max_ticks_per_n_shreds(1) + 1; let entries = create_ticks(num_entries, Hash::default()); let mut shreds = entries_to_test_shreds(entries, slot, 0, true); let num_shreds = shreds.len(); diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 63146402c3..a7fa48f6ed 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -8,7 +8,7 @@ use crate::poh_recorder::WorkingBankEntry; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; -use solana_metrics::{datapoint_debug, inc_new_counter_error, inc_new_counter_info}; +use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 30cacb6813..26d0f038b4 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -15,7 +15,7 @@ pub(super) struct ReceiveResults { #[derive(Copy, Clone)] pub struct UnfinishedSlotInfo { - pub next_index: u64, + pub next_shred_index: u32, pub slot: u64, pub parent: u64, } diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 35c7fc5988..4bc516d539 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,16 +1,14 @@ -use super::broadcast_utils; +use super::broadcast_utils::{self, ReceiveResults}; use super::*; -use crate::shred::{Shredder, RECOMMENDED_FEC_RATE}; -use solana_sdk::timing::duration_as_ms; +use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; +use crate::entry::Entry; +use crate::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE}; +use solana_sdk::signature::Keypair; +use solana_sdk::timing::duration_as_us; use std::time::Duration; #[derive(Default)] struct BroadcastStats { - num_entries: Vec, - run_elapsed: Vec, - to_blobs_elapsed: Vec, - slots: Vec, - // Per-slot elapsed time shredding_elapsed: u64, insert_shreds_elapsed: u64, @@ -19,9 +17,20 @@ struct BroadcastStats { clone_and_seed_elapsed: u64, } +impl BroadcastStats { + fn reset(&mut self) { + self.insert_shreds_elapsed = 0; + self.shredding_elapsed = 0; + self.broadcast_elapsed = 0; + self.receive_elapsed = 0; + self.clone_and_seed_elapsed = 0; + } +} + pub(super) struct StandardBroadcastRun { stats: BroadcastStats, - current_slot: Option, + unfinished_slot: Option, + current_slot_and_parent: Option<(u64, u64)>, slot_broadcast_start: Option, } @@ -29,177 +38,162 @@ impl StandardBroadcastRun { pub(super) fn new() -> Self { Self { stats: BroadcastStats::default(), - current_slot: None, + unfinished_slot: None, + current_slot_and_parent: None, slot_broadcast_start: None, } } - #[allow(clippy::too_many_arguments)] - fn update_broadcast_stats( - &mut self, - receive_entries_elapsed: u64, - shredding_elapsed: u64, - insert_shreds_elapsed: u64, - broadcast_elapsed: u64, - run_elapsed: u64, - clone_and_seed_elapsed: u64, - num_entries: usize, - num_shreds: usize, - shred_index: u32, - slot: u64, - slot_ended: bool, - latest_shred_index: u32, - ) { - self.stats.insert_shreds_elapsed += insert_shreds_elapsed; - self.stats.shredding_elapsed += shredding_elapsed; - self.stats.broadcast_elapsed += broadcast_elapsed; - self.stats.receive_elapsed += receive_entries_elapsed; - self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed; + fn check_for_interrupted_slot(&mut self) -> Option { + let (slot, _) = self.current_slot_and_parent.unwrap(); + let last_unfinished_slot_shred = self + .unfinished_slot + .map(|last_unfinished_slot| { + if last_unfinished_slot.slot != slot { + self.report_and_reset_stats(); + Some(Shred::new_from_data( + last_unfinished_slot.slot, + last_unfinished_slot.next_shred_index, + (last_unfinished_slot.slot - last_unfinished_slot.parent) as u16, + None, + true, + true, + )) + } else { + None + } + }) + .unwrap_or(None); - if slot_ended { - datapoint_info!( - "broadcast-bank-stats", - ("slot", slot as i64, i64), - ("shredding_time", self.stats.shredding_elapsed as i64, i64), - ( - "insertion_time", - self.stats.insert_shreds_elapsed as i64, - i64 - ), - ("broadcast_time", self.stats.broadcast_elapsed as i64, i64), - ("receive_time", self.stats.receive_elapsed as i64, i64), - ( - "clone_and_seed", - self.stats.clone_and_seed_elapsed as i64, - i64 - ), - ("num_shreds", i64::from(latest_shred_index), i64), - ( - "slot_broadcast_time", - self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64, - i64 - ), - ); - self.stats.insert_shreds_elapsed = 0; - self.stats.shredding_elapsed = 0; - self.stats.broadcast_elapsed = 0; - self.stats.receive_elapsed = 0; - self.stats.clone_and_seed_elapsed = 0; + // This shred should only be Some if the previous slot was interrupted + if last_unfinished_slot_shred.is_some() { + self.unfinished_slot = None; } - inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); - - self.stats.num_entries.push(num_entries); - self.stats.to_blobs_elapsed.push(shredding_elapsed); - self.stats.run_elapsed.push(run_elapsed); - self.stats.slots.push(slot); - if self.stats.num_entries.len() >= 16 { - info!( - "broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?} slots: {:?}", - self.stats.num_entries, - self.stats.to_blobs_elapsed, - self.stats.run_elapsed, - self.stats.slots, - ); - self.stats.num_entries.clear(); - self.stats.to_blobs_elapsed.clear(); - self.stats.run_elapsed.clear(); - self.stats.slots.clear(); - } - - datapoint_debug!( - "broadcast-service", - ("num_entries", num_entries as i64, i64), - ("num_shreds", num_shreds as i64, i64), - ("receive_time", receive_entries_elapsed as i64, i64), - ("shredding_time", shredding_elapsed as i64, i64), - ("insert_shred_time", insert_shreds_elapsed as i64, i64), - ("broadcast_time", broadcast_elapsed as i64, i64), - ("transmit-index", i64::from(shred_index), i64), - ); + last_unfinished_slot_shred } -} -impl BroadcastRun for StandardBroadcastRun { - fn run( + fn coalesce_shreds( + data_shreds: Vec, + coding_shreds: Vec, + last_unfinished_slot_shred: Option, + ) -> Vec { + if let Some(shred) = last_unfinished_slot_shred { + data_shreds + .iter() + .chain(coding_shreds.iter()) + .cloned() + .chain(std::iter::once(shred)) + .collect::>() + } else { + data_shreds + .iter() + .chain(coding_shreds.iter()) + .cloned() + .collect::>() + } + } + + fn entries_to_shreds( + &mut self, + blocktree: &Blocktree, + entries: &[Entry], + keypair: Arc, + is_slot_end: bool, + ) -> (Vec, Vec) { + let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); + let shredder = Shredder::new(slot, parent_slot, RECOMMENDED_FEC_RATE, keypair) + .expect("Expected to create a new shredder"); + + let next_shred_index = self + .unfinished_slot + .map(|s| s.next_shred_index) + .unwrap_or_else(|| { + blocktree + .meta(slot) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0) as u32 + }); + + let (data_shreds, coding_shreds, new_next_shred_index) = + shredder.entries_to_shreds(entries, is_slot_end, next_shred_index); + + self.unfinished_slot = Some(UnfinishedSlotInfo { + next_shred_index: new_next_shred_index, + slot, + parent: parent_slot, + }); + + (data_shreds, coding_shreds) + } + + fn process_receive_results( &mut self, cluster_info: &Arc>, - receiver: &Receiver, sock: &UdpSocket, blocktree: &Arc, + receive_results: ReceiveResults, ) -> Result<()> { - // 1) Pull entries from banking stage - let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let mut receive_elapsed = receive_results.time_elapsed; let num_entries = receive_results.entries.len(); let bank = receive_results.bank.clone(); let last_tick = receive_results.last_tick; inc_new_counter_info!("broadcast_service-entries_received", num_entries); - if Some(bank.slot()) != self.current_slot { + if self.current_slot_and_parent.is_none() + || bank.slot() != self.current_slot_and_parent.unwrap().0 + { self.slot_broadcast_start = Some(Instant::now()); - self.current_slot = Some(bank.slot()); + let slot = bank.slot(); + let parent_slot = { + if let Some(parent_bank) = bank.parent() { + parent_bank.slot() + } else { + 0 + } + }; + + self.current_slot_and_parent = Some((slot, parent_slot)); receive_elapsed = Duration::new(0, 0); } - // 2) Convert entries to blobs + generate coding blobs - let keypair = &cluster_info.read().unwrap().keypair.clone(); - let next_shred_index = blocktree - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0) as u32; + let keypair = cluster_info.read().unwrap().keypair.clone(); - let parent_slot = if let Some(parent_bank) = bank.parent() { - parent_bank.slot() - } else { - 0 - }; - - // Create shreds from entries let to_shreds_start = Instant::now(); - let shredder = Shredder::new( - bank.slot(), - parent_slot, - RECOMMENDED_FEC_RATE, - keypair.clone(), - ) - .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, latest_shred_index) = shredder.entries_to_shreds( + // 1) Check if slot was interrupted + let last_unfinished_slot_shred = self.check_for_interrupted_slot(); + + // 2) Convert entries to shreds and coding shreds + let (data_shreds, coding_shreds) = self.entries_to_shreds( + blocktree, &receive_results.entries, + keypair, last_tick == bank.max_tick_height(), - next_shred_index, ); let to_shreds_elapsed = to_shreds_start.elapsed(); let clone_and_seed_start = Instant::now(); - let all_shreds = data_shreds - .iter() - .cloned() - .chain(coding_shreds.iter().cloned()) - .collect::>(); + let all_shreds = + Self::coalesce_shreds(data_shreds, coding_shreds, last_unfinished_slot_shred); + let all_shreds_ = all_shreds.clone(); let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); - let num_shreds = all_shreds.len(); let clone_and_seed_elapsed = clone_and_seed_start.elapsed(); - // Insert shreds into blocktree + // 3) Insert shreds into blocktree let insert_shreds_start = Instant::now(); blocktree - .insert_shreds(all_shreds, None) + .insert_shreds(all_shreds_, None) .expect("Failed to insert shreds in blocktree"); let insert_shreds_elapsed = insert_shreds_start.elapsed(); - // 3) Start broadcast step + // 4) Broadcast the shreds let broadcast_start = Instant::now(); let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let all_shred_bufs: Vec> = data_shreds - .into_iter() - .chain(coding_shreds.into_iter()) - .map(|s| s.payload) - .collect(); + let all_shred_bufs: Vec> = all_shreds.into_iter().map(|s| s.payload).collect(); trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); cluster_info.read().unwrap().broadcast_shreds( @@ -212,22 +206,223 @@ impl BroadcastRun for StandardBroadcastRun { let broadcast_elapsed = broadcast_start.elapsed(); self.update_broadcast_stats( - duration_as_ms(&receive_elapsed), - duration_as_ms(&to_shreds_elapsed), - duration_as_ms(&insert_shreds_elapsed), - duration_as_ms(&broadcast_elapsed), - duration_as_ms(&clone_and_seed_elapsed), - duration_as_ms( - &(receive_elapsed + to_shreds_elapsed + insert_shreds_elapsed + broadcast_elapsed), - ), - num_entries, - num_shreds, - next_shred_index, - bank.slot(), + duration_as_us(&receive_elapsed), + duration_as_us(&to_shreds_elapsed), + duration_as_us(&insert_shreds_elapsed), + duration_as_us(&broadcast_elapsed), + duration_as_us(&clone_and_seed_elapsed), last_tick == bank.max_tick_height(), - latest_shred_index, ); + if last_tick == bank.max_tick_height() { + self.unfinished_slot = None; + } + Ok(()) } + + #[allow(clippy::too_many_arguments)] + fn update_broadcast_stats( + &mut self, + receive_entries_elapsed: u64, + shredding_elapsed: u64, + insert_shreds_elapsed: u64, + broadcast_elapsed: u64, + clone_and_seed_elapsed: u64, + slot_ended: bool, + ) { + self.stats.receive_elapsed += receive_entries_elapsed; + self.stats.shredding_elapsed += shredding_elapsed; + self.stats.insert_shreds_elapsed += insert_shreds_elapsed; + self.stats.broadcast_elapsed += broadcast_elapsed; + self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed; + + if slot_ended { + self.report_and_reset_stats() + } + } + + fn report_and_reset_stats(&mut self) { + assert!(self.unfinished_slot.is_some()); + datapoint_info!( + "broadcast-bank-stats", + ("slot", self.unfinished_slot.unwrap().slot as i64, i64), + ("shredding_time", self.stats.shredding_elapsed as i64, i64), + ( + "insertion_time", + self.stats.insert_shreds_elapsed as i64, + i64 + ), + ("broadcast_time", self.stats.broadcast_elapsed as i64, i64), + ("receive_time", self.stats.receive_elapsed as i64, i64), + ( + "clone_and_seed", + self.stats.clone_and_seed_elapsed as i64, + i64 + ), + ( + "num_shreds", + i64::from(self.unfinished_slot.unwrap().next_shred_index), + i64 + ), + ( + "slot_broadcast_time", + self.slot_broadcast_start.unwrap().elapsed().as_millis() as i64, + i64 + ), + ); + self.stats.reset(); + } +} + +impl BroadcastRun for StandardBroadcastRun { + fn run( + &mut self, + cluster_info: &Arc>, + receiver: &Receiver, + sock: &UdpSocket, + blocktree: &Arc, + ) -> Result<()> { + let receive_results = broadcast_utils::recv_slot_entries(receiver)?; + self.process_receive_results(cluster_info, sock, blocktree, receive_results) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::blocktree::{get_tmp_ledger_path, Blocktree}; + use crate::cluster_info::{ClusterInfo, Node}; + use crate::entry::create_ticks; + use crate::genesis_utils::create_genesis_block; + use crate::shred::max_ticks_per_n_shreds; + use solana_runtime::bank::Bank; + use solana_sdk::genesis_block::GenesisBlock; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::sync::{Arc, RwLock}; + use std::time::Duration; + + fn setup( + num_shreds_per_slot: u64, + ) -> ( + Arc, + GenesisBlock, + Arc>, + Arc, + Keypair, + UdpSocket, + ) { + // Setup + let ledger_path = get_tmp_ledger_path!(); + let blocktree = Arc::new( + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"), + ); + let leader_keypair = Keypair::new(); + let leader_pubkey = leader_keypair.pubkey(); + let leader_info = Node::new_localhost_with_pubkey(&leader_pubkey); + let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( + leader_info.info.clone(), + ))); + let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let mut genesis_block = create_genesis_block(10_000).genesis_block; + genesis_block.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot) + 1; + let bank0 = Arc::new(Bank::new(&genesis_block)); + ( + blocktree, + genesis_block, + cluster_info, + bank0, + leader_keypair, + socket, + ) + } + + #[test] + fn test_slot_interrupt() { + // Setup + let num_shreds_per_slot = 2; + let (blocktree, genesis_block, cluster_info, bank0, leader_keypair, socket) = + setup(num_shreds_per_slot); + + // Insert 1 less than the number of ticks needed to finish the slot + let ticks = create_ticks(genesis_block.ticks_per_slot - 1, genesis_block.hash()); + let receive_results = ReceiveResults { + entries: ticks.clone(), + time_elapsed: Duration::new(3, 0), + bank: bank0.clone(), + last_tick: (ticks.len() - 1) as u64, + }; + + // Step 1: Make an incomplete transmission for slot 0 + let mut standard_broadcast_run = StandardBroadcastRun::new(); + standard_broadcast_run + .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .unwrap(); + let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); + assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot); + assert_eq!(unfinished_slot.slot, 0); + assert_eq!(unfinished_slot.parent, 0); + // Make sure the slot is not complete + assert!(!blocktree.is_full(0)); + // Modify the stats, should reset later + standard_broadcast_run.stats.receive_elapsed = 10; + + // Try to fetch ticks from blocktree, nothing should break + assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks); + assert_eq!( + blocktree + .get_slot_entries(0, num_shreds_per_slot, None) + .unwrap(), + vec![], + ); + + // Step 2: Make a transmission for another bank that interrupts the transmission for + // slot 0 + let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2)); + + // Interrupting the slot should cause the unfinished_slot and stats to reset + let num_shreds = 1; + assert!(num_shreds < num_shreds_per_slot); + let ticks = create_ticks(max_ticks_per_n_shreds(num_shreds), genesis_block.hash()); + let receive_results = ReceiveResults { + entries: ticks.clone(), + time_elapsed: Duration::new(2, 0), + bank: bank2.clone(), + last_tick: (ticks.len() - 1) as u64, + }; + standard_broadcast_run + .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .unwrap(); + let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); + + // The shred index should have reset to 0, which makes it possible for the + // index < the previous shred index for slot 0 + assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds); + assert_eq!(unfinished_slot.slot, 2); + assert_eq!(unfinished_slot.parent, 0); + // Check that the stats were reset as well + assert_eq!(standard_broadcast_run.stats.receive_elapsed, 0); + } + + #[test] + fn test_slot_finish() { + // Setup + let num_shreds_per_slot = 2; + let (blocktree, genesis_block, cluster_info, bank0, _, socket) = setup(num_shreds_per_slot); + + // Insert complete slot of ticks needed to finish the slot + let ticks = create_ticks(genesis_block.ticks_per_slot, genesis_block.hash()); + let receive_results = ReceiveResults { + entries: ticks.clone(), + time_elapsed: Duration::new(3, 0), + bank: bank0.clone(), + last_tick: (ticks.len() - 1) as u64, + }; + + let mut standard_broadcast_run = StandardBroadcastRun::new(); + standard_broadcast_run + .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .unwrap(); + assert!(standard_broadcast_run.unfinished_slot.is_none()) + } } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index d302e35726..30ce606f72 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1771,7 +1771,7 @@ mod tests { use crate::crds_value::CrdsValueLabel; use crate::repair_service::RepairType; use crate::result::Error; - use crate::shred::max_ticks_per_shred; + use crate::shred::max_ticks_per_n_shreds; use crate::shred::{DataShredHeader, Shred}; use crate::test_tx::test_tx; use solana_sdk::hash::Hash; @@ -1976,7 +1976,7 @@ mod tests { let _ = fill_blocktree_slot_with_ticks( &blocktree, - max_ticks_per_shred() + 1, + max_ticks_per_n_shreds(1) + 1, 2, 1, Hash::default(), diff --git a/core/src/lib.rs b/core/src/lib.rs index 1846ce72f2..41cabb5f90 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -7,6 +7,8 @@ pub mod bank_forks; pub mod banking_stage; +#[macro_use] +pub mod blocktree; pub mod broadcast_stage; pub mod chacha; pub mod chacha_cuda; @@ -17,20 +19,18 @@ pub mod recycler; pub mod shred_fetch_stage; #[macro_use] pub mod contact_info; -pub mod crds; -pub mod crds_gossip; -pub mod crds_gossip_error; -pub mod crds_gossip_pull; -pub mod crds_gossip_push; -pub mod crds_value; -#[macro_use] -pub mod blocktree; pub mod blockstream; pub mod blockstream_service; pub mod blocktree_processor; pub mod cluster_info; pub mod cluster_info_repair_listener; pub mod consensus; +pub mod crds; +pub mod crds_gossip; +pub mod crds_gossip_error; +pub mod crds_gossip_pull; +pub mod crds_gossip_push; +pub mod crds_value; pub mod cuda_runtime; pub mod entry; pub mod erasure; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index d88f232db0..c2804996ca 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -406,7 +406,7 @@ mod test { }; use crate::blocktree::{get_tmp_ledger_path, Blocktree}; use crate::cluster_info::Node; - use crate::shred::max_ticks_per_shred; + use crate::shred::max_ticks_per_n_shreds; use itertools::Itertools; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; @@ -538,7 +538,7 @@ mod test { let blocktree = Blocktree::open(&blocktree_path).unwrap(); let slots: Vec = vec![1, 3, 5, 7, 8]; - let num_entries_per_slot = max_ticks_per_shred() + 1; + let num_entries_per_slot = max_ticks_per_n_shreds(1) + 1; let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); for (mut slot_shreds, _) in shreds.into_iter() { diff --git a/core/src/shred.rs b/core/src/shred.rs index 44f8d610f0..e160a00c99 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -122,14 +122,23 @@ impl Shred { index: u32, parent_offset: u16, data: Option<&[u8]>, - flags: u8, + is_last_data: bool, + is_last_in_slot: bool, ) -> Self { let mut shred_buf = vec![0; PACKET_DATA_SIZE]; let mut header = DataShredHeader::default(); header.data_header.slot = slot; header.data_header.index = index; header.parent_offset = parent_offset; - header.flags = flags; + header.flags = 0; + + if is_last_data { + header.flags |= DATA_COMPLETE_SHRED + } + + if is_last_in_slot { + header.flags |= LAST_SHRED_IN_SLOT + } if let Some(data) = data { bincode::serialize_into(&mut shred_buf[..*SIZE_OF_DATA_SHRED_HEADER], &header) @@ -345,20 +354,21 @@ impl Shredder { .map(|(i, shred_data)| { let shred_index = next_shred_index + i as u32; - let mut header: u8 = 0; - if shred_index == last_shred_index { - header |= DATA_COMPLETE_SHRED; - if is_last_in_slot { - header |= LAST_SHRED_IN_SLOT; + let (is_last_data, is_last_in_slot) = { + if shred_index == last_shred_index { + (true, is_last_in_slot) + } else { + (false, false) } - } + }; let mut shred = Shred::new_from_data( self.slot, shred_index, (self.slot - self.parent_slot) as u16, Some(shred_data), - header, + is_last_data, + is_last_in_slot, ); Shredder::sign_shred( @@ -663,9 +673,9 @@ impl Shredder { } } -pub fn max_ticks_per_shred() -> u64 { +pub fn max_ticks_per_n_shreds(num_shreds: u64) -> u64 { let ticks = create_ticks(1, Hash::default()); - max_entries_per_n_shred(&ticks[0], 1) + max_entries_per_n_shred(&ticks[0], num_shreds) } pub fn max_entries_per_n_shred(entry: &Entry, num_shreds: u64) -> u64 { @@ -848,7 +858,7 @@ pub mod tests { .expect("Failed in creating shredder"); // Create enough entries to make > 1 shred - let num_entries = max_ticks_per_shred() + 1; + let num_entries = max_ticks_per_n_shreds(1) + 1; let entries: Vec<_> = (0..num_entries) .map(|_| { let keypair0 = Keypair::new();