Make sure banking stage is recording with the same bank that it read (#3447)
* make sure banking stage is recording with the same bank that it read with
This commit is contained in:
parent
60dfb35924
commit
52f6c33ff9
|
@ -217,6 +217,7 @@ impl BankingStage {
|
|||
}
|
||||
|
||||
fn record_transactions(
|
||||
bank_slot: u64,
|
||||
txs: &[Transaction],
|
||||
results: &[bank::Result<()>],
|
||||
poh: &Arc<Mutex<PohRecorder>>,
|
||||
|
@ -241,7 +242,9 @@ impl BankingStage {
|
|||
if !processed_transactions.is_empty() {
|
||||
let hash = Transaction::hash(&processed_transactions);
|
||||
// record and unlock will unlock all the successfull transactions
|
||||
poh.lock().unwrap().record(hash, processed_transactions)?;
|
||||
poh.lock()
|
||||
.unwrap()
|
||||
.record(bank_slot, hash, processed_transactions)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -268,7 +271,7 @@ impl BankingStage {
|
|||
|
||||
let record_time = {
|
||||
let now = Instant::now();
|
||||
Self::record_transactions(txs, &results, poh)?;
|
||||
Self::record_transactions(bank.slot(), txs, &results, poh)?;
|
||||
now.elapsed()
|
||||
};
|
||||
|
||||
|
@ -714,7 +717,8 @@ mod tests {
|
|||
];
|
||||
|
||||
let mut results = vec![Ok(()), Ok(())];
|
||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
|
||||
.unwrap();
|
||||
let (_, entries) = entry_receiver.recv().unwrap();
|
||||
assert_eq!(entries[0].0.transactions.len(), transactions.len());
|
||||
|
||||
|
@ -723,13 +727,15 @@ mod tests {
|
|||
1,
|
||||
InstructionError::new_result_with_negative_lamports(),
|
||||
));
|
||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
|
||||
.unwrap();
|
||||
let (_, entries) = entry_receiver.recv().unwrap();
|
||||
assert_eq!(entries[0].0.transactions.len(), transactions.len());
|
||||
|
||||
// Other TransactionErrors should not be recorded
|
||||
results[0] = Err(TransactionError::AccountNotFound);
|
||||
BankingStage::record_transactions(&transactions, &results, &poh_recorder).unwrap();
|
||||
BankingStage::record_transactions(bank.slot(), &transactions, &results, &poh_recorder)
|
||||
.unwrap();
|
||||
let (_, entries) = entry_receiver.recv().unwrap();
|
||||
assert_eq!(entries[0].0.transactions.len(), transactions.len() - 1);
|
||||
}
|
||||
|
|
|
@ -257,9 +257,9 @@ impl PohRecorder {
|
|||
let _ = self.flush_cache(true);
|
||||
}
|
||||
|
||||
pub fn record(&mut self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
|
||||
pub fn record(&mut self, bank_slot: u64, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
|
||||
self.flush_cache(false)?;
|
||||
self.record_and_send_txs(mixin, txs)
|
||||
self.record_and_send_txs(bank_slot, mixin, txs)
|
||||
}
|
||||
|
||||
/// A recorder to synchronize PoH with the following data structures
|
||||
|
@ -299,11 +299,19 @@ impl PohRecorder {
|
|||
)
|
||||
}
|
||||
|
||||
fn record_and_send_txs(&mut self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
|
||||
fn record_and_send_txs(
|
||||
&mut self,
|
||||
bank_slot: u64,
|
||||
mixin: Hash,
|
||||
txs: Vec<Transaction>,
|
||||
) -> Result<()> {
|
||||
let working_bank = self
|
||||
.working_bank
|
||||
.as_ref()
|
||||
.ok_or(Error::PohRecorderError(PohRecorderError::MaxHeightReached))?;
|
||||
if bank_slot != working_bank.bank.slot() {
|
||||
return Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached));
|
||||
}
|
||||
let poh_entry = self.poh.record(mixin);
|
||||
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
|
||||
let recorded_entry = Entry {
|
||||
|
@ -506,7 +514,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let working_bank = WorkingBank {
|
||||
bank,
|
||||
bank: bank.clone(),
|
||||
min_tick_height: 2,
|
||||
max_tick_height: 3,
|
||||
};
|
||||
|
@ -514,10 +522,43 @@ mod tests {
|
|||
poh_recorder.tick();
|
||||
let tx = test_tx();
|
||||
let h1 = hash(b"hello world!");
|
||||
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
|
||||
assert!(poh_recorder
|
||||
.record(bank.slot(), h1, vec![tx.clone()])
|
||||
.is_err());
|
||||
assert!(entry_receiver.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poh_recorder_record_bad_slot() {
|
||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let prev_hash = bank.last_blockhash();
|
||||
let (mut poh_recorder, _entry_receiver) = PohRecorder::new(
|
||||
0,
|
||||
prev_hash,
|
||||
0,
|
||||
Some(4),
|
||||
bank.ticks_per_slot(),
|
||||
&Pubkey::default(),
|
||||
);
|
||||
|
||||
let working_bank = WorkingBank {
|
||||
bank: bank.clone(),
|
||||
min_tick_height: 1,
|
||||
max_tick_height: 2,
|
||||
};
|
||||
poh_recorder.set_working_bank(working_bank);
|
||||
poh_recorder.tick();
|
||||
assert_eq!(poh_recorder.tick_cache.len(), 1);
|
||||
assert_eq!(poh_recorder.poh.tick_height, 1);
|
||||
let tx = test_tx();
|
||||
let h1 = hash(b"hello world!");
|
||||
assert_matches!(
|
||||
poh_recorder.record(bank.slot() + 1, h1, vec![tx.clone()]),
|
||||
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_poh_recorder_record_at_min_passes() {
|
||||
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
|
||||
|
@ -533,7 +574,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let working_bank = WorkingBank {
|
||||
bank,
|
||||
bank: bank.clone(),
|
||||
min_tick_height: 1,
|
||||
max_tick_height: 2,
|
||||
};
|
||||
|
@ -543,7 +584,9 @@ mod tests {
|
|||
assert_eq!(poh_recorder.poh.tick_height, 1);
|
||||
let tx = test_tx();
|
||||
let h1 = hash(b"hello world!");
|
||||
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_ok());
|
||||
assert!(poh_recorder
|
||||
.record(bank.slot(), h1, vec![tx.clone()])
|
||||
.is_ok());
|
||||
assert_eq!(poh_recorder.tick_cache.len(), 0);
|
||||
|
||||
//tick in the cache + entry
|
||||
|
@ -569,7 +612,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let working_bank = WorkingBank {
|
||||
bank,
|
||||
bank: bank.clone(),
|
||||
min_tick_height: 1,
|
||||
max_tick_height: 2,
|
||||
};
|
||||
|
@ -579,7 +622,9 @@ mod tests {
|
|||
assert_eq!(poh_recorder.poh.tick_height, 2);
|
||||
let tx = test_tx();
|
||||
let h1 = hash(b"hello world!");
|
||||
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
|
||||
assert!(poh_recorder
|
||||
.record(bank.slot(), h1, vec![tx.clone()])
|
||||
.is_err());
|
||||
|
||||
let (_bank, e) = entry_receiver.recv().expect("recv 1");
|
||||
assert_eq!(e.len(), 2);
|
||||
|
@ -745,7 +790,7 @@ mod tests {
|
|||
let end_slot = 3;
|
||||
let max_tick_height = (end_slot + 1) * ticks_per_slot - 1;
|
||||
let working_bank = WorkingBank {
|
||||
bank,
|
||||
bank: bank.clone(),
|
||||
min_tick_height: 1,
|
||||
max_tick_height,
|
||||
};
|
||||
|
@ -757,7 +802,9 @@ mod tests {
|
|||
|
||||
let tx = test_tx();
|
||||
let h1 = hash(b"hello world!");
|
||||
assert!(poh_recorder.record(h1, vec![tx.clone()]).is_err());
|
||||
assert!(poh_recorder
|
||||
.record(bank.slot(), h1, vec![tx.clone()])
|
||||
.is_err());
|
||||
assert!(poh_recorder.working_bank.is_none());
|
||||
// Make sure the starting slot is updated
|
||||
assert_eq!(poh_recorder.start_slot(), end_slot);
|
||||
|
|
|
@ -138,7 +138,11 @@ mod tests {
|
|||
// send some data
|
||||
let h1 = hash(b"hello world!");
|
||||
let tx = test_tx();
|
||||
poh_recorder.lock().unwrap().record(h1, vec![tx]).unwrap();
|
||||
poh_recorder
|
||||
.lock()
|
||||
.unwrap()
|
||||
.record(bank.slot(), h1, vec![tx])
|
||||
.unwrap();
|
||||
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
break Ok(());
|
||||
|
|
Loading…
Reference in New Issue