diff --git a/cli/src/program.rs b/cli/src/program.rs index 3a224d830..642b8aac0 100644 --- a/cli/src/program.rs +++ b/cli/src/program.rs @@ -12,9 +12,9 @@ use solana_account_decoder::{UiAccountEncoding, UiDataSliceConfig}; use solana_bpf_loader_program::{syscalls::register_syscalls, BpfError, ThisInstructionMeter}; use solana_clap_utils::{self, input_parsers::*, input_validators::*, keypair::*}; use solana_cli_output::{ - display::new_spinner_progress_bar, CliProgram, CliProgramAccountType, CliProgramAuthority, - CliProgramBuffer, CliProgramId, CliUpgradeableBuffer, CliUpgradeableBuffers, - CliUpgradeableProgram, CliUpgradeableProgramClosed, CliUpgradeablePrograms, + CliProgram, CliProgramAccountType, CliProgramAuthority, CliProgramBuffer, CliProgramId, + CliUpgradeableBuffer, CliUpgradeableBuffers, CliUpgradeableProgram, + CliUpgradeableProgramClosed, CliUpgradeablePrograms, }; use solana_client::{ client_error::ClientErrorKind, @@ -22,7 +22,6 @@ use solana_client::{ rpc_config::RpcSendTransactionConfig, rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType}, - rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, tpu_client::{TpuClient, TpuClientConfig}, }; use solana_rbpf::{ @@ -44,24 +43,18 @@ use solana_sdk::{ process_instruction::MockInvokeContext, pubkey::Pubkey, signature::{keypair_from_seed, read_keypair_file, Keypair, Signature, Signer}, - signers::Signers, system_instruction::{self, SystemError}, system_program, transaction::Transaction, transaction::TransactionError, }; -use solana_transaction_status::TransactionConfirmationStatus; use std::{ - collections::HashMap, - error, fs::File, io::{Read, Write}, mem::size_of, path::PathBuf, str::FromStr, sync::Arc, - thread::sleep, - time::Duration, }; #[derive(Debug, PartialEq)] @@ -2121,16 +2114,20 @@ fn send_deploy_messages( if let Some(write_messages) = write_messages { if let Some(write_signer) = write_signer { trace!("Writing program data"); - let transaction_errors = send_and_confirm_messages_with_spinner( + let tpu_client = TpuClient::new( rpc_client.clone(), &config.websocket_url, - write_messages, - &[payer_signer, write_signer], - ) - .map_err(|err| format!("Data writes to account failed: {}", err))? - .into_iter() - .flatten() - .collect::>(); + TpuClientConfig::default(), + )?; + let transaction_errors = tpu_client + .send_and_confirm_messages_with_spinner( + write_messages, + &[payer_signer, write_signer], + ) + .map_err(|err| format!("Data writes to account failed: {}", err))? + .into_iter() + .flatten() + .collect::>(); if !transaction_errors.is_empty() { for transaction_error in &transaction_errors { @@ -2200,146 +2197,6 @@ fn report_ephemeral_mnemonic(words: usize, mnemonic: bip39::Mnemonic) { ); } -fn send_and_confirm_messages_with_spinner( - rpc_client: Arc, - websocket_url: &str, - messages: &[Message], - signers: &T, -) -> Result>, Box> { - let commitment = rpc_client.commitment(); - - let progress_bar = new_spinner_progress_bar(); - let send_transaction_interval = Duration::from_millis(10); /* ~100 TPS */ - let mut send_retries = 5; - - let (blockhash, mut last_valid_block_height) = - rpc_client.get_latest_blockhash_with_commitment(commitment)?; - - let mut transactions = vec![]; - let mut transaction_errors = vec![None; messages.len()]; - for (i, message) in messages.iter().enumerate() { - let mut transaction = Transaction::new_unsigned(message.clone()); - transaction.try_sign(signers, blockhash)?; - transactions.push((i, transaction)); - } - - progress_bar.set_message("Finding leader nodes..."); - let tpu_client = TpuClient::new( - rpc_client.clone(), - websocket_url, - TpuClientConfig::default(), - )?; - loop { - // Send all transactions - let mut pending_transactions = HashMap::new(); - let num_transactions = transactions.len(); - for (i, transaction) in transactions { - if !tpu_client.send_transaction(&transaction) { - let _result = rpc_client - .send_transaction_with_config( - &transaction, - RpcSendTransactionConfig { - preflight_commitment: Some(commitment.commitment), - ..RpcSendTransactionConfig::default() - }, - ) - .ok(); - } - pending_transactions.insert(transaction.signatures[0], (i, transaction)); - progress_bar.set_message(format!( - "[{}/{}] Transactions sent", - pending_transactions.len(), - num_transactions - )); - - sleep(send_transaction_interval); - } - - // Collect statuses for all the transactions, drop those that are confirmed - loop { - let mut block_height = 0; - let pending_signatures = pending_transactions.keys().cloned().collect::>(); - for pending_signatures_chunk in - pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) - { - if let Ok(result) = rpc_client.get_signature_statuses(pending_signatures_chunk) { - let statuses = result.value; - for (signature, status) in - pending_signatures_chunk.iter().zip(statuses.into_iter()) - { - if let Some(status) = status { - if let Some(confirmation_status) = &status.confirmation_status { - if *confirmation_status != TransactionConfirmationStatus::Processed - { - if let Some((i, _)) = pending_transactions.remove(signature) { - transaction_errors[i] = status.err; - } - } - } else if status.confirmations.is_none() - || status.confirmations.unwrap() > 1 - { - if let Some((i, _)) = pending_transactions.remove(signature) { - transaction_errors[i] = status.err; - } - } - } - } - } - - block_height = rpc_client.get_block_height()?; - progress_bar.set_message(format!( - "[{}/{}] Transactions confirmed. Retrying in {} blocks", - num_transactions - pending_transactions.len(), - num_transactions, - last_valid_block_height.saturating_sub(block_height) - )); - } - - if pending_transactions.is_empty() { - return Ok(transaction_errors); - } - - if block_height > last_valid_block_height { - break; - } - - for (_i, transaction) in pending_transactions.values() { - if !tpu_client.send_transaction(transaction) { - let _result = rpc_client - .send_transaction_with_config( - transaction, - RpcSendTransactionConfig { - preflight_commitment: Some(commitment.commitment), - ..RpcSendTransactionConfig::default() - }, - ) - .ok(); - } - } - - if cfg!(not(test)) { - // Retry twice a second - sleep(Duration::from_millis(500)); - } - } - - if send_retries == 0 { - return Err("Transactions failed".into()); - } - send_retries -= 1; - - // Re-sign any failed transactions with a new blockhash and retry - let (blockhash, new_last_valid_block_height) = - rpc_client.get_latest_blockhash_with_commitment(commitment)?; - last_valid_block_height = new_last_valid_block_height; - transactions = vec![]; - for (_, (i, mut transaction)) in pending_transactions.into_iter() { - transaction.try_sign(signers, blockhash)?; - transactions.push((i, transaction)); - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/client/src/lib.rs b/client/src/lib.rs index 33d96e5de..0794d0f4c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -18,6 +18,7 @@ pub mod rpc_filter; pub mod rpc_request; pub mod rpc_response; pub mod rpc_sender; +pub mod spinner; pub mod thin_client; pub mod tpu_client; pub mod transaction_executor; diff --git a/client/src/mock_sender.rs b/client/src/mock_sender.rs index e05984f01..a7d8344fe 100644 --- a/client/src/mock_sender.rs +++ b/client/src/mock_sender.rs @@ -347,7 +347,7 @@ impl RpcSender for MockSender { context: RpcResponseContext { slot: 1 }, value: RpcBlockhash { blockhash: PUBKEY.to_string(), - last_valid_block_height: 0, + last_valid_block_height: 1234, }, })?, "getFeeForMessage" => serde_json::to_value(Response { diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index c5ffa63a7..e3d929a82 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -21,9 +21,9 @@ use { rpc_request::{RpcError, RpcRequest, RpcResponseErrorData, TokenAccountsFilter}, rpc_response::*, rpc_sender::*, + spinner, }, bincode::serialize, - indicatif::{ProgressBar, ProgressStyle}, log::*, serde_json::{json, Value}, solana_account_decoder::{ @@ -1062,7 +1062,7 @@ impl RpcClient { }; let mut confirmations = 0; - let progress_bar = new_spinner_progress_bar(); + let progress_bar = spinner::new_progress_bar(); progress_bar.set_message(format!( "[{}/{}] Finalizing transaction {}", @@ -4876,14 +4876,6 @@ pub struct GetConfirmedSignaturesForAddress2Config { pub commitment: Option, } -fn new_spinner_progress_bar() -> ProgressBar { - let progress_bar = ProgressBar::new(42); - progress_bar - .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); - progress_bar.enable_steady_tick(100); - progress_bar -} - fn get_rpc_request_str(rpc_addr: SocketAddr, tls: bool) -> String { if tls { format!("https://{}", rpc_addr) diff --git a/client/src/spinner.rs b/client/src/spinner.rs new file mode 100644 index 000000000..97ae0bee2 --- /dev/null +++ b/client/src/spinner.rs @@ -0,0 +1,11 @@ +//! Spinner creator + +use indicatif::{ProgressBar, ProgressStyle}; + +pub(crate) fn new_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 2497514b4..6823f0d98 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,12 +1,21 @@ use crate::{ + client_error::ClientError, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, + rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, rpc_response::SlotUpdate, + spinner, }; use bincode::serialize; use log::*; use solana_sdk::{ - clock::Slot, commitment_config::CommitmentConfig, pubkey::Pubkey, transaction::Transaction, + clock::Slot, + commitment_config::CommitmentConfig, + message::Message, + pubkey::Pubkey, + signature::SignerError, + signers::Signers, + transaction::{Transaction, TransactionError}, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -16,7 +25,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, }, - thread::JoinHandle, + thread::{sleep, JoinHandle}, time::{Duration, Instant}, }; use thiserror::Error; @@ -26,9 +35,13 @@ pub enum TpuSenderError { #[error("Pubsub error: {0:?}")] PubsubError(#[from] PubsubClientError), #[error("RPC error: {0:?}")] - RpcError(#[from] crate::client_error::ClientError), + RpcError(#[from] ClientError), #[error("IO error: {0:?}")] IoError(#[from] std::io::Error), + #[error("Signer error: {0:?}")] + SignerError(#[from] SignerError), + #[error("Custom error: {0}")] + Custom(String), } type Result = std::result::Result; @@ -62,6 +75,7 @@ pub struct TpuClient { fanout_slots: u64, leader_tpu_service: LeaderTpuService, exit: Arc, + rpc_client: Arc, } impl TpuClient { @@ -97,15 +111,159 @@ impl TpuClient { config: TpuClientConfig, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); - let leader_tpu_service = LeaderTpuService::new(rpc_client, websocket_url, exit.clone())?; + let leader_tpu_service = + LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?; Ok(Self { send_socket: UdpSocket::bind("0.0.0.0:0").unwrap(), fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1), leader_tpu_service, exit, + rpc_client, }) } + + pub fn send_and_confirm_messages_with_spinner( + &self, + messages: &[Message], + signers: &T, + ) -> Result>> { + let mut expired_blockhash_retries = 5; + /* Send at ~100 TPS */ + const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10); + /* Retry batch send after 4 seconds */ + const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4); + + let progress_bar = spinner::new_progress_bar(); + progress_bar.set_message("Setting up..."); + + let mut transactions = messages + .iter() + .enumerate() + .map(|(i, message)| (i, Transaction::new_unsigned(message.clone()))) + .collect::>(); + let num_transactions = transactions.len() as f64; + let mut transaction_errors = vec![None; transactions.len()]; + let set_message = |confirmed_transactions, + block_height: Option, + last_valid_block_height: u64, + status: &str| { + progress_bar.set_message(format!( + "{:>5.1}% | {:<40}{}", + confirmed_transactions as f64 * 100. / num_transactions, + status, + match block_height { + Some(block_height) => format!( + " [block height {}; re-sign in {} blocks]", + block_height, + last_valid_block_height.saturating_sub(block_height), + ), + None => String::new(), + }, + )); + }; + + let mut confirmed_transactions = 0; + let mut block_height = self.rpc_client.get_block_height()?; + while expired_blockhash_retries > 0 { + let (blockhash, last_valid_block_height) = self + .rpc_client + .get_latest_blockhash_with_commitment(self.rpc_client.commitment())?; + + let mut pending_transactions = HashMap::new(); + for (i, mut transaction) in transactions { + transaction.try_sign(signers, blockhash)?; + pending_transactions.insert(transaction.signatures[0], (i, transaction)); + } + + let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL; + while block_height <= last_valid_block_height { + let num_transactions = pending_transactions.len(); + + // Periodically re-send all pending transactions + if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL { + for (index, (_i, transaction)) in pending_transactions.values().enumerate() { + if !self.send_transaction(transaction) { + let _result = self.rpc_client.send_transaction(transaction).ok(); + } + set_message( + confirmed_transactions, + None, //block_height, + last_valid_block_height, + &format!("Sending {}/{} transactions", index + 1, num_transactions,), + ); + sleep(SEND_TRANSACTION_INTERVAL); + } + last_resend = Instant::now(); + } + + // Wait for the next block before checking for transaction statuses + let mut block_height_refreshes = 10; + set_message( + confirmed_transactions, + Some(block_height), + last_valid_block_height, + &format!("Waiting for next block, {} pending...", num_transactions), + ); + let mut new_block_height = block_height; + while block_height == new_block_height && block_height_refreshes > 0 { + sleep(Duration::from_millis(500)); + new_block_height = self.rpc_client.get_block_height()?; + block_height_refreshes -= 1; + } + block_height = new_block_height; + + // Collect statuses for the transactions, drop those that are confirmed + let pending_signatures = pending_transactions.keys().cloned().collect::>(); + for pending_signatures_chunk in + pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS) + { + if let Ok(result) = self + .rpc_client + .get_signature_statuses(pending_signatures_chunk) + { + let statuses = result.value; + for (signature, status) in + pending_signatures_chunk.iter().zip(statuses.into_iter()) + { + if let Some(status) = status { + if status.satisfies_commitment(self.rpc_client.commitment()) { + if let Some((i, _)) = pending_transactions.remove(signature) { + confirmed_transactions += 1; + if status.err.is_some() { + progress_bar.println(format!( + "Failed transaction: {:?}", + status + )); + } + transaction_errors[i] = status.err; + } + } + } + } + } + set_message( + confirmed_transactions, + Some(block_height), + last_valid_block_height, + "Checking transaction status...", + ); + } + + if pending_transactions.is_empty() { + return Ok(transaction_errors); + } + } + + transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect(); + progress_bar.println(format!( + "Blockhash expired. {} retries remaining", + expired_blockhash_retries + )); + expired_blockhash_retries -= 1; + } + Err(TpuSenderError::Custom("Max retries exceeded".into())) + } } impl Drop for TpuClient {