removing confirmed fromt transaction map
This commit is contained in:
parent
e765215beb
commit
4d56e63e6f
|
@ -166,16 +166,15 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
let confirming_task = {
|
||||
let transaction_map = transaction_map.clone();
|
||||
let tx_confirm_records = tx_confirm_records.clone();
|
||||
let exit_signal = exit_signal.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut started_getting_mm_transactions = false;
|
||||
let mut tx_record_rx = tx_record_rx;
|
||||
let mut notification_stream = notification_stream;
|
||||
|
||||
while !transaction_map.is_empty() || !started_getting_mm_transactions {
|
||||
let mut remove_tx = None;
|
||||
while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) {
|
||||
tokio::select! {
|
||||
transaction_record = tx_record_rx.recv() => {
|
||||
if let Some(transaction_record) = transaction_record{
|
||||
started_getting_mm_transactions = true;
|
||||
transaction_map
|
||||
.insert(transaction_record.signature.to_string(), (transaction_record, Instant::now()));
|
||||
}
|
||||
|
@ -183,6 +182,8 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
},
|
||||
notification = notification_stream.recv() => {
|
||||
if let Some(notification) = notification {
|
||||
remove_tx = None;
|
||||
|
||||
match notification {
|
||||
NotificationMsg::BlockNotificationMsg(block_notification) => {
|
||||
let _ = tx_block_data.send(BlockData {
|
||||
|
@ -206,8 +207,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
},
|
||||
_ => None
|
||||
};
|
||||
|
||||
transaction_map.remove(&tx_notification.signature);
|
||||
remove_tx = Some(tx_notification.signature.clone());
|
||||
let _ = tx_confirm_records.send(TransactionConfirmRecord {
|
||||
signature: tx_notification.signature,
|
||||
confirmed_slot: Some(tx_notification.slot),
|
||||
|
@ -232,6 +232,10 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
// others do nothing
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(signature) = &remove_tx {
|
||||
transaction_map.remove(signature);
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
|
@ -240,6 +244,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
}
|
||||
}
|
||||
}
|
||||
log::info!("stopped processing the transactions");
|
||||
})
|
||||
};
|
||||
|
||||
|
@ -287,7 +292,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
|
|||
}
|
||||
|
||||
// if exit and all the transactions are processed
|
||||
if exit_signal.load(Ordering::Relaxed) && transaction_map.len() == 0 {
|
||||
if exit_signal.load(Ordering::Relaxed) && transaction_map.is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue