Compare commits

..

2 Commits

Author SHA1 Message Date
Godmode Galactus 3c2ce7647d
solving deadlock and pointing cargo to github repo 2023-06-15 18:43:57 +02:00
Godmode Galactus 4d56e63e6f
removing confirmed fromt transaction map 2023-06-15 12:00:37 +02:00
3 changed files with 18 additions and 9 deletions

2
Cargo.lock generated
View File

@ -5777,6 +5777,7 @@ dependencies = [
[[package]] [[package]]
name = "solana-lite-rpc-core" name = "solana-lite-rpc-core"
version = "0.2.0" version = "0.2.0"
source = "git+https://github.com/blockworks-foundation/lite-rpc.git?branch=mango_simulation_test#ddd9f95a4d23972a4a49ea4db0086030bb7d6a72"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -5808,6 +5809,7 @@ dependencies = [
[[package]] [[package]]
name = "solana-lite-rpc-services" name = "solana-lite-rpc-services"
version = "0.2.0" version = "0.2.0"
source = "git+https://github.com/blockworks-foundation/lite-rpc.git?branch=mango_simulation_test#ddd9f95a4d23972a4a49ea4db0086030bb7d6a72"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-channel", "async-channel",

View File

@ -49,8 +49,8 @@ solana-logger = "1.15.2"
solana-transaction-status = "1.15.2" solana-transaction-status = "1.15.2"
solana-account-decoder = "1.15.2" solana-account-decoder = "1.15.2"
solana-lite-rpc-core = { path = "/home/galactus/mangolana/lite-rpc/core" } solana-lite-rpc-core = { git = "https://github.com/blockworks-foundation/lite-rpc.git", branch = "mango_simulation_test" }
solana-lite-rpc-services = { path = "/home/galactus/mangolana/lite-rpc/services" } solana-lite-rpc-services = { git = "https://github.com/blockworks-foundation/lite-rpc.git", branch = "mango_simulation_test" }
# pin program to mango-v3 version of solana sdk # pin program to mango-v3 version of solana sdk

View File

@ -166,16 +166,17 @@ pub fn confirmation_by_lite_rpc_notification_stream(
let confirming_task = { let confirming_task = {
let transaction_map = transaction_map.clone(); let transaction_map = transaction_map.clone();
let tx_confirm_records = tx_confirm_records.clone(); let tx_confirm_records = tx_confirm_records.clone();
let exit_signal = exit_signal.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut started_getting_mm_transactions = false;
let mut tx_record_rx = tx_record_rx; let mut tx_record_rx = tx_record_rx;
let mut notification_stream = notification_stream; let mut notification_stream = notification_stream;
while !transaction_map.is_empty() || !started_getting_mm_transactions { #[allow(unused_assignments)]
let mut remove_tx = None;
while !transaction_map.is_empty() || !exit_signal.load(Ordering::Relaxed) {
tokio::select! { tokio::select! {
transaction_record = tx_record_rx.recv() => { transaction_record = tx_record_rx.recv() => {
if let Some(transaction_record) = transaction_record{ if let Some(transaction_record) = transaction_record{
started_getting_mm_transactions = true;
transaction_map transaction_map
.insert(transaction_record.signature.to_string(), (transaction_record, Instant::now())); .insert(transaction_record.signature.to_string(), (transaction_record, Instant::now()));
} }
@ -183,6 +184,8 @@ pub fn confirmation_by_lite_rpc_notification_stream(
}, },
notification = notification_stream.recv() => { notification = notification_stream.recv() => {
if let Some(notification) = notification { if let Some(notification) = notification {
remove_tx = None;
match notification { match notification {
NotificationMsg::BlockNotificationMsg(block_notification) => { NotificationMsg::BlockNotificationMsg(block_notification) => {
let _ = tx_block_data.send(BlockData { let _ = tx_block_data.send(BlockData {
@ -206,8 +209,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
}, },
_ => None _ => None
}; };
remove_tx = Some(tx_notification.signature.clone());
transaction_map.remove(&tx_notification.signature);
let _ = tx_confirm_records.send(TransactionConfirmRecord { let _ = tx_confirm_records.send(TransactionConfirmRecord {
signature: tx_notification.signature, signature: tx_notification.signature,
confirmed_slot: Some(tx_notification.slot), confirmed_slot: Some(tx_notification.slot),
@ -232,6 +234,10 @@ pub fn confirmation_by_lite_rpc_notification_stream(
// others do nothing // others do nothing
} }
} }
if let Some(signature) = &remove_tx {
transaction_map.remove(signature);
}
} }
}, },
_ = tokio::time::sleep(Duration::from_secs(1)) => { _ = tokio::time::sleep(Duration::from_secs(1)) => {
@ -240,6 +246,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
} }
} }
} }
log::info!("stopped processing the transactions");
}) })
}; };
@ -287,7 +294,7 @@ pub fn confirmation_by_lite_rpc_notification_stream(
} }
// if exit and all the transactions are processed // if exit and all the transactions are processed
if exit_signal.load(Ordering::Relaxed) && transaction_map.len() == 0 { if exit_signal.load(Ordering::Relaxed) && transaction_map.is_empty() {
break; break;
} }
} }