From 52f6c33ff938501950c6f971aae2811dcac8ba0d Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Fri, 22 Mar 2019 14:17:39 -0700 Subject: [PATCH] Make sure banking stage is recording with the same bank that it read (#3447) * make sure banking stage is recording with the same bank that it read with --- core/src/banking_stage.rs | 16 ++++++--- core/src/poh_recorder.rs | 69 ++++++++++++++++++++++++++++++++------- core/src/poh_service.rs | 6 +++- 3 files changed, 74 insertions(+), 17 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 106cb1af7..4fb9ab3ce 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -217,6 +217,7 @@ impl BankingStage { } fn record_transactions( + bank_slot: u64, txs: &[Transaction], results: &[bank::Result<()>], poh: &Arc>, @@ -241,7 +242,9 @@ impl BankingStage { if !processed_transactions.is_empty() { let hash = Transaction::hash(&processed_transactions); // record and unlock will unlock all the successfull transactions - poh.lock().unwrap().record(hash, processed_transactions)?; + poh.lock() + .unwrap() + .record(bank_slot, hash, processed_transactions)?; } Ok(()) } @@ -268,7 +271,7 @@ impl BankingStage { let record_time = { let now = Instant::now(); - Self::record_transactions(txs, &results, poh)?; + Self::record_transactions(bank.slot(), txs, &results, poh)?; now.elapsed() }; @@ -714,7 +717,8 @@ mod tests { ]; let mut results = vec![Ok(()), Ok(())]; - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); @@ -723,13 +727,15 @@ mod tests { 1, InstructionError::new_result_with_negative_lamports(), )); - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len()); // Other TransactionErrors should not be recorded results[0] = Err(TransactionError::AccountNotFound); - BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap(); + BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder) + .unwrap(); let (_, entries) = entry_receiver.recv().unwrap(); assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1); } diff --git a/core/src/poh_recorder.rs b/core/src/poh_recorder.rs index 6a4c47d77..84f52d7f4 100644 --- a/core/src/poh_recorder.rs +++ b/core/src/poh_recorder.rs @@ -257,9 +257,9 @@ impl PohRecorder { let _ = self.flush_cache(true); } - pub fn record(&mut self, mixin: Hash, txs: Vec) -> Result<()> { + pub fn record(&mut self, bank_slot: u64, mixin: Hash, txs: Vec) -> Result<()> { self.flush_cache(false)?; - self.record_and_send_txs(mixin, txs) + self.record_and_send_txs(bank_slot, mixin, txs) } /// A recorder to synchronize PoH with the following data structures @@ -299,11 +299,19 @@ impl PohRecorder { ) } - fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec) -> Result<()> { + fn record_and_send_txs( + &mut self, + bank_slot: u64, + mixin: Hash, + txs: Vec, + ) -> Result<()> { let working_bank = self .working_bank .as_ref() .ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?; + if bank_slot != working_bank.bank.slot() { + return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)); + } let poh_entry = self.poh.record(mixin); assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function"); let recorded_entry = Entry { @@ -506,7 +514,7 @@ mod tests { ); let working_bank = WorkingBank { - bank, + bank: bank.clone(), min_tick_height: 2, max_tick_height: 3, }; @@ -514,10 +522,43 @@ mod tests { poh_recorder.tick(); let tx = test_tx(); let h1 = hash(b"hello world!"); - assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); assert!(entry_receiver.try_recv().is_err()); } + #[test] + fn test_poh_recorder_record_bad_slot() { + let (genesis_block, _mint_keypair) = GenesisBlock::new(2); + let bank = Arc::new(Bank::new(&genesis_block)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver) = PohRecorder::new( + 0, + prev_hash, + 0, + Some(4), + bank.ticks_per_slot(), + &Pubkey::default(), + ); + + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: 1, + max_tick_height: 2, + }; + poh_recorder.set_working_bank(working_bank); + poh_recorder.tick(); + assert_eq!(poh_recorder.tick_cache.len(), 1); + assert_eq!(poh_recorder.poh.tick_height, 1); + let tx = test_tx(); + let h1 = hash(b"hello world!"); + assert_matches!( + poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]), + Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) + ); + } + #[test] fn test_poh_recorder_record_at_min_passes() { let (genesis_block, _mint_keypair) = GenesisBlock::new(2); @@ -533,7 +574,7 @@ mod tests { ); let working_bank = WorkingBank { - bank, + bank: bank.clone(), min_tick_height: 1, max_tick_height: 2, }; @@ -543,7 +584,9 @@ mod tests { assert_eq!(poh_recorder.poh.tick_height, 1); let tx = test_tx(); let h1 = hash(b"hello world!"); - assert!(poh_recorder.record(h1, vec![tx.clone()]).is_ok()); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_ok()); assert_eq!(poh_recorder.tick_cache.len(), 0); //tick in the cache + entry @@ -569,7 +612,7 @@ mod tests { ); let working_bank = WorkingBank { - bank, + bank: bank.clone(), min_tick_height: 1, max_tick_height: 2, }; @@ -579,7 +622,9 @@ mod tests { assert_eq!(poh_recorder.poh.tick_height, 2); let tx = test_tx(); let h1 = hash(b"hello world!"); - assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); let (_bank, e) = entry_receiver.recv().expect("recv 1"); assert_eq!(e.len(), 2); @@ -745,7 +790,7 @@ mod tests { let end_slot = 3; let max_tick_height = (end_slot + 1) * ticks_per_slot - 1; let working_bank = WorkingBank { - bank, + bank: bank.clone(), min_tick_height: 1, max_tick_height, }; @@ -757,7 +802,9 @@ mod tests { let tx = test_tx(); let h1 = hash(b"hello world!"); - assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err()); + assert!(poh_recorder + .record(bank.slot(), h1, vec![tx.clone()]) + .is_err()); assert!(poh_recorder.working_bank.is_none()); // Make sure the starting slot is updated assert_eq!(poh_recorder.start_slot(), end_slot); diff --git a/core/src/poh_service.rs b/core/src/poh_service.rs index 4c4f229a1..8744ea5e6 100644 --- a/core/src/poh_service.rs +++ b/core/src/poh_service.rs @@ -138,7 +138,11 @@ mod tests { // send some data let h1 = hash(b"hello world!"); let tx = test_tx(); - poh_recorder.lock().unwrap().record(h1, vec![tx]).unwrap(); + poh_recorder + .lock() + .unwrap() + .record(bank.slot(), h1, vec![tx]) + .unwrap(); if exit.load(Ordering::Relaxed) { break Ok(());