diff --git a/src/confirmation_strategies.rs b/src/confirmation_strategies.rs index 8d91e31..d45b99f 100644 --- a/src/confirmation_strategies.rs +++ b/src/confirmation_strategies.rs @@ -166,16 +166,15 @@ pub fn confirmation_by_lite_rpc_notification_stream( let confirming_task = { let transaction_map = transaction_map.clone(); let tx_confirm_records = tx_confirm_records.clone(); + let exit_signal = exit_signal.clone(); tokio::spawn(async move { - let mut started_getting_mm_transactions = false; let mut tx_record_rx = tx_record_rx; let mut notification_stream = notification_stream; - - while !transaction_map.is_empty() || !started_getting_mm_transactions { + let mut remove_tx = None; + while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) { tokio::select! { transaction_record = tx_record_rx.recv() => { if let Some(transaction_record) = transaction_record{ - started_getting_mm_transactions = true; transaction_map .insert(transaction_record.signature.to_string(), (transaction_record, Instant::now())); } @@ -183,6 +182,8 @@ pub fn confirmation_by_lite_rpc_notification_stream( }, notification = notification_stream.recv() => { if let Some(notification) = notification { + remove_tx = None; + match notification { NotificationMsg::BlockNotificationMsg(block_notification) => { let _ = tx_block_data.send(BlockData { @@ -206,8 +207,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( }, _ => None }; - - transaction_map.remove(&tx_notification.signature); + remove_tx = Some(tx_notification.signature.clone()); let _ = tx_confirm_records.send(TransactionConfirmRecord { signature: tx_notification.signature, confirmed_slot: Some(tx_notification.slot), @@ -232,6 +232,10 @@ pub fn confirmation_by_lite_rpc_notification_stream( // others do nothing } } + + if let Some(signature) = &remove_tx { + transaction_map.remove(signature); + } } }, _ = tokio::time::sleep(Duration::from_secs(1)) => { @@ -240,6 +244,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( } } } + log::info!("stopped processing the transactions"); }) }; @@ -287,7 +292,7 @@ pub fn confirmation_by_lite_rpc_notification_stream( } // if exit and all the transactions are processed - if exit_signal.load(Ordering::Relaxed) && transaction_map.len() == 0 { + if exit_signal.load(Ordering::Relaxed) && transaction_map.is_empty() { break; } }