TransactionRecorder uses unique channel so we can use Recv instead of RecvTimeout (#16195)
* time * new channel each call * new channel every time
This commit is contained in:
parent
9ba9d2a8ae
commit
5eff23db0c
|
@ -77,9 +77,6 @@ impl Record {
|
|||
pub struct TransactionRecorder {
|
||||
// shared by all users of PohRecorder
|
||||
pub record_sender: Sender<Record>,
|
||||
// unique to this caller
|
||||
pub result_sender: Sender<Result<()>>,
|
||||
pub result_receiver: Receiver<Result<()>>,
|
||||
}
|
||||
|
||||
impl Clone for TransactionRecorder {
|
||||
|
@ -90,13 +87,9 @@ impl Clone for TransactionRecorder {
|
|||
|
||||
impl TransactionRecorder {
|
||||
pub fn new(record_sender: Sender<Record>) -> Self {
|
||||
let (result_sender, result_receiver) = channel();
|
||||
Self {
|
||||
// shared
|
||||
record_sender,
|
||||
// unique to this caller
|
||||
result_sender,
|
||||
result_receiver,
|
||||
}
|
||||
}
|
||||
pub fn record(
|
||||
|
@ -105,21 +98,18 @@ impl TransactionRecorder {
|
|||
mixin: Hash,
|
||||
transactions: Vec<Transaction>,
|
||||
) -> Result<()> {
|
||||
let res = self.record_sender.send(Record::new(
|
||||
mixin,
|
||||
transactions,
|
||||
bank_slot,
|
||||
self.result_sender.clone(),
|
||||
));
|
||||
// create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
|
||||
let (result_sender, result_receiver) = channel();
|
||||
let res =
|
||||
self.record_sender
|
||||
.send(Record::new(mixin, transactions, bank_slot, result_sender));
|
||||
if res.is_err() {
|
||||
// If the channel is dropped, then the validator is shutting down so return that we are hitting
|
||||
// the max tick height to stop transaction processing and flush any transactions in the pipeline.
|
||||
return Err(PohRecorderError::MaxHeightReached);
|
||||
}
|
||||
// Besides validator exit, this timeout should primarily be seen to affect test execution environments where the various pieces can be shutdown abruptly
|
||||
let res = self
|
||||
.result_receiver
|
||||
.recv_timeout(std::time::Duration::from_millis(5000));
|
||||
let res = result_receiver.recv();
|
||||
match res {
|
||||
Err(_err) => Err(PohRecorderError::MaxHeightReached),
|
||||
Ok(result) => result,
|
||||
|
|
Loading…
Reference in New Issue