From 58eebd7f6c5eb1b2337ab8c5142df610352b41b8 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 25 Feb 2019 13:50:31 -0800 Subject: [PATCH] Remove tick counting from broadast service --- src/banking_stage.rs | 26 +++++++++++++------- src/broadcast_service.rs | 52 ++++++++++++++-------------------------- src/poh_recorder.rs | 26 ++++++++++---------- src/poh_service.rs | 1 + 4 files changed, 51 insertions(+), 54 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index f9423ec570..46f1be89ca 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -50,7 +50,7 @@ impl BankingStage { last_entry_id: &Hash, max_tick_height: u64, leader_id: Pubkey, - ) -> (Self, Receiver>) { + ) -> (Self, Receiver>) { let (entry_sender, entry_receiver) = channel(); let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver)); let working_bank = WorkingBank { @@ -394,7 +394,10 @@ mod tests { sleep(Duration::from_millis(500)); drop(verified_sender); - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); + let entries: Vec<_> = entry_receiver + .iter() + .flat_map(|x| x.into_iter().map(|e| e.0)) + .collect(); assert!(entries.len() != 0); assert!(entries.verify(&start_hash)); assert_eq!(entries[entries.len() - 1].id, bank.last_id()); @@ -440,7 +443,11 @@ mod tests { drop(verified_sender); //receive entries + ticks - let entries: Vec<_> = entry_receiver.iter().map(|x| x).collect(); + let entries: Vec> = entry_receiver + .iter() + .map(|x| x.into_iter().map(|e| e.0).collect()) + .collect(); + assert!(entries.len() >= 1); let mut last_id = start_hash; @@ -500,7 +507,10 @@ mod tests { banking_stage.join().unwrap(); // Collect the ledger and feed it to a new bank. - let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect(); + let entries: Vec<_> = entry_receiver + .iter() + .flat_map(|x| x.into_iter().map(|e| e.0)) + .collect(); // same assertion as running through the bank, really... assert!(entries.len() >= 2); @@ -619,7 +629,7 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len()); + assert_eq!(entries[0].0.transactions.len(), transactions.len()); // ProgramErrors should still be recorded results[0] = Err(BankError::ProgramError( @@ -628,13 +638,13 @@ mod tests { )); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len()); + assert_eq!(entries[0].0.transactions.len(), transactions.len()); // Other BankErrors should not be recorded results[0] = Err(BankError::AccountNotFound); BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); let entries = entry_receiver.recv().unwrap(); - assert_eq!(entries[0].transactions.len(), transactions.len() - 1); + assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); } #[test] @@ -671,7 +681,7 @@ mod tests { // read entries until I find mine, might be ticks... while need_tick { let entries = entry_receiver.recv().unwrap(); - for entry in entries { + for (entry, _) in entries { if !entry.is_tick() { assert_eq!(entry.transactions.len(), transactions.len()); assert_eq!(bank.get_balance(&pubkey), 1); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 1fd2a8f82d..e5b0fdab85 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -33,7 +33,6 @@ pub enum BroadcastServiceReturnType { struct Broadcast { id: Pubkey, - tick_height: u64, max_tick_height: u64, blob_index: u64, @@ -42,19 +41,10 @@ struct Broadcast { } impl Broadcast { - fn count_ticks(entries: &[Entry]) -> u64 { - entries.iter().fold(0, |mut sum, e| { - if e.is_tick() { - sum += 1 - } - sum - }) - } - fn run( &mut self, broadcast_table: &[NodeInfo], - receiver: &Receiver>, + receiver: &Receiver>, sock: &UdpSocket, leader_scheduler: &Arc>, blocktree: &Arc, @@ -64,12 +54,12 @@ impl Broadcast { let now = Instant::now(); let mut num_entries = entries.len(); let mut ventries = Vec::new(); - let mut ticks = Self::count_ticks(&entries); + let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0); ventries.push(entries); while let Ok(entries) = receiver.try_recv() { num_entries += entries.len(); - ticks += Self::count_ticks(&entries); + last_tick = entries.last().map(|v| v.1).unwrap_or(0); ventries.push(entries); } @@ -78,12 +68,15 @@ impl Broadcast { let to_blobs_start = Instant::now(); // Generate the slot heights for all the entries inside ventries - // this may span slots if this leader broadcasts for consecutive slots... - let slots = generate_slots(&ventries, self.tick_height + 1, leader_scheduler); + // this may span slots if this leader broadcasts for consecutive slots... + let slots = generate_slots(&ventries, leader_scheduler); let blobs: Vec<_> = ventries .into_par_iter() - .flat_map(|p| p.to_shared_blobs()) + .flat_map(|p| { + let entries: Vec<_> = p.into_iter().map(|e| e.0).collect(); + entries.to_shared_blobs() + }) .collect(); // TODO: blob_index should be slot-relative... @@ -105,9 +98,8 @@ impl Broadcast { inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - assert!(self.tick_height + ticks <= self.max_tick_height); - self.tick_height += ticks; - let contains_last_tick = self.tick_height == self.max_tick_height; + assert!(last_tick <= self.max_tick_height); + let contains_last_tick = last_tick == self.max_tick_height; if contains_last_tick { blobs.last().unwrap().write().unwrap().set_is_last_in_slot(); @@ -152,8 +144,7 @@ impl Broadcast { } fn generate_slots( - ventries: &[Vec], - mut tick_height: u64, + ventries: &[Vec<(Entry, u64)>], leader_scheduler: &Arc>, ) -> Vec { // Generate the slot heights for all the entries inside ventries @@ -163,13 +154,7 @@ fn generate_slots( .flat_map(|p| { let slot_heights: Vec = p .iter() - .map(|e| { - let slot = r_leader_scheduler.tick_height_to_slot(tick_height); - if e.is_tick() { - tick_height += 1; - } - slot - }) + .map(|(_, tick_height)| r_leader_scheduler.tick_height_to_slot(*tick_height)) .collect(); slot_heights @@ -207,7 +192,7 @@ impl BroadcastService { cluster_info: &Arc>, blob_index: u64, leader_scheduler: &Arc>, - receiver: &Receiver>, + receiver: &Receiver>, max_tick_height: u64, exit_signal: &Arc, blocktree: &Arc, @@ -216,7 +201,6 @@ impl BroadcastService { let mut broadcast = Broadcast { id: me.id, - tick_height: bank.tick_height(), max_tick_height, blob_index, #[cfg(feature = "erasure")] @@ -278,7 +262,7 @@ impl BroadcastService { cluster_info: Arc>, blob_index: u64, leader_scheduler: Arc>, - receiver: Receiver>, + receiver: Receiver>, max_tick_height: u64, exit_sender: Arc, blocktree: &Arc, @@ -341,7 +325,7 @@ mod test { leader_pubkey: Pubkey, ledger_path: &str, leader_scheduler: Arc>, - entry_receiver: Receiver>, + entry_receiver: Receiver>, blob_index: u64, max_tick_height: u64, ) -> MockBroadcastService { @@ -409,9 +393,9 @@ mod test { ); let ticks = create_ticks(max_tick_height - start_tick_height, Hash::default()); - for (_, tick) in ticks.into_iter().enumerate() { + for (i, tick) in ticks.into_iter().enumerate() { entry_sender - .send(vec![tick]) + .send(vec![(tick, i as u64 + 1)]) .expect("Expect successful send to broadcast service"); } diff --git a/src/poh_recorder.rs b/src/poh_recorder.rs index eef0edeea8..6bda70287b 100644 --- a/src/poh_recorder.rs +++ b/src/poh_recorder.rs @@ -29,7 +29,7 @@ pub enum PohRecorderError { #[derive(Clone)] pub struct WorkingBank { pub bank: Arc, - pub sender: Sender>, + pub sender: Sender>, pub min_tick_height: u64, pub max_tick_height: u64, } @@ -100,11 +100,11 @@ impl PohRecorder { working_bank.max_tick_height, cnt, ); - let cache: Vec = self.tick_cache[..cnt].iter().map(|x| x.0.clone()).collect(); - for t in &cache { - working_bank.bank.register_tick(&t.id); + let cache = &self.tick_cache[..cnt]; + for t in cache { + working_bank.bank.register_tick(&t.0.id); } - working_bank.sender.send(cache) + working_bank.sender.send(cache.to_vec()) } else { Ok(()) }; @@ -160,13 +160,15 @@ impl PohRecorder { .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; let entry = self.poh.record(mixin); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); - let entry = Entry { + let recorded_entry = Entry { num_hashes: entry.num_hashes, id: entry.id, transactions: txs, }; - trace!("sending entry {}", entry.is_tick()); - working_bank.sender.send(vec![entry])?; + trace!("sending entry {}", recorded_entry.is_tick()); + working_bank + .sender + .send(vec![(recorded_entry, entry.tick_height)])?; Ok(()) } @@ -352,9 +354,9 @@ mod tests { //tick in the cache + entry let e = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 1); - assert!(e[0].is_tick()); + assert!(e[0].0.is_tick()); let e = entry_receiver.recv().expect("recv 2"); - assert!(!e[0].is_tick()); + assert!(!e[0].0.is_tick()); } #[test] @@ -381,8 +383,8 @@ mod tests { let e = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 2); - assert!(e[0].is_tick()); - assert!(e[1].is_tick()); + assert!(e[0].0.is_tick()); + assert!(e[1].0.is_tick()); } #[test] diff --git a/src/poh_service.rs b/src/poh_service.rs index f3000cea53..2d2d151426 100644 --- a/src/poh_service.rs +++ b/src/poh_service.rs @@ -162,6 +162,7 @@ mod tests { while need_tick || need_entry || need_partial { for entry in entry_receiver.recv().unwrap() { + let entry = &entry.0; if entry.is_tick() { assert!(entry.num_hashes <= HASHES_PER_TICK);