From 12f3fd75e89529fdc6d37804cd9b07bb55e04217 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 8 Mar 2019 10:36:56 -0800 Subject: [PATCH] StorageStage now sends transactions at the local TPU --- core/src/storage_stage.rs | 77 ++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index 5284a77a6..4579303dc 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -195,8 +195,10 @@ impl StorageStage { .spawn(move || loop { match tx_receiver.recv_timeout(Duration::from_secs(1)) { Ok(mut tx) => { - if Self::send_tx(&cluster_info0, &mut tx, &exit1, &keypair1, None).is_ok() { - debug!("sent tx: {:?}", tx); + if Self::send_transaction(&cluster_info0, &mut tx, &exit1, &keypair1, None) + .is_ok() + { + debug!("sent transaction: {:?}", tx); } } Err(e) => match e { @@ -218,58 +220,57 @@ impl StorageStage { } } - fn send_tx( + fn send_transaction( cluster_info: &Arc>, - tx: &mut Transaction, + transaction: &mut Transaction, exit: &Arc, keypair: &Arc, account_to_create: Option, ) -> io::Result<()> { - if let Some(leader_info) = cluster_info.read().unwrap().leader_data() { - let mut client = mk_client_with_timeout(leader_info, Duration::from_secs(5)); + let node_info = cluster_info.read().unwrap().my_data(); + let mut client = mk_client_with_timeout(&node_info, Duration::from_secs(5)); - if let Some(account) = account_to_create { - if client.get_account_userdata(&account).is_ok() { - return Ok(()); - } + if let Some(account) = account_to_create { + if client.get_account_userdata(&account).is_ok() { + return Ok(()); + } + } + + let mut blockhash = None; + for _ in 0..10 { + if let Some(new_blockhash) = client.try_get_recent_blockhash(1) { + blockhash = Some(new_blockhash); + break; } - let mut blockhash = None; - for _ in 0..10 { - if let Some(new_blockhash) = client.try_get_recent_blockhash(1) { - blockhash = Some(new_blockhash); - break; - } + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + } + } - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } + if let Some(blockhash) = blockhash { + transaction.sign(&[keypair.as_ref()], blockhash); + + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; } - if let Some(blockhash) = blockhash { - tx.sign(&[keypair.as_ref()], blockhash); - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } - - if let Ok(signature) = client.transfer_signed(&tx) { - for _ in 0..10 { - if client.check_signature(&signature) { - return Ok(()); - } - - if exit.load(Ordering::Relaxed) { - Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; - } - - sleep(Duration::from_millis(200)); + if let Ok(signature) = client.transfer_signed(&transaction) { + for _ in 0..10 { + if client.check_signature(&signature) { + return Ok(()); } + + if exit.load(Ordering::Relaxed) { + Err(io::Error::new(io::ErrorKind::Other, "exit signaled"))?; + } + + sleep(Duration::from_millis(200)); } } } - Err(io::Error::new(io::ErrorKind::Other, "leader not found")) + Err(io::Error::new(io::ErrorKind::Other, "other failure")) } pub fn process_entry_crossing(