From 3f10fb993b9090e5c2815aa7be87e31cf66e596c Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 29 Dec 2020 09:48:43 -0700 Subject: [PATCH] Retry durable-nonce transactions (#14308) * Retry durable-nonce transactions * Add metric to track durable-nonce txs in queue * Populate send-tx-service initial addresses with tpu_address if empty (primarily for testing) * Reinstate last_valid_slot check for durable-nonce txs; use arbitrary future slot --- core/src/rpc.rs | 35 ++- core/src/send_transaction_service.rs | 391 +++++++++++++++++++++++---- 2 files changed, 375 insertions(+), 51 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 9e24f5b4e0..f67ece5d7e 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -46,7 +46,7 @@ use solana_runtime::{ use solana_sdk::{ account::Account, account_utils::StateMut, - clock::{Slot, UnixTimestamp}, + clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES}, commitment_config::{CommitmentConfig, CommitmentLevel}, epoch_info::EpochInfo, epoch_schedule::EpochSchedule, @@ -1875,12 +1875,18 @@ fn _send_transaction( transaction: Transaction, wire_transaction: Vec, last_valid_slot: Slot, + durable_nonce_info: Option<(Pubkey, Hash)>, ) -> Result { if transaction.signatures.is_empty() { return Err(RpcCustomError::TransactionSignatureVerificationFailure.into()); } let signature = transaction.signatures[0]; - let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot); + let transaction_info = TransactionInfo::new( + signature, + wire_transaction, + last_valid_slot, + durable_nonce_info, + ); meta.transaction_sender .lock() .unwrap() @@ -2309,7 +2315,7 @@ impl RpcSol for RpcSolImpl { Error::internal_error() })?; - _send_transaction(meta, transaction, wire_transaction, last_valid_slot) + _send_transaction(meta, transaction, wire_transaction, last_valid_slot, None) } fn send_transaction( @@ -2328,10 +2334,23 @@ impl RpcSol for RpcSolImpl { .map(|commitment| CommitmentConfig { commitment }); let preflight_bank = &*meta.bank(preflight_commitment); - let last_valid_slot = preflight_bank + let mut last_valid_slot = preflight_bank .get_blockhash_last_valid_slot(&transaction.message.recent_blockhash) .unwrap_or(0); + let durable_nonce_info = solana_sdk::transaction::uses_durable_nonce(&transaction) + .and_then(|nonce_ix| { + solana_sdk::transaction::get_nonce_pubkey_from_instruction(&nonce_ix, &transaction) + }) + .map(|&pubkey| (pubkey, transaction.message.recent_blockhash)); + if durable_nonce_info.is_some() { + // While it uses a defined constant, this last_valid_slot value is chosen arbitrarily. + // It provides a fallback timeout for durable-nonce transaction retries in case of + // malicious packing of the retry queue. Durable-nonce transactions are otherwise + // retried until the nonce is advanced. + last_valid_slot = preflight_bank.slot() + MAX_RECENT_BLOCKHASHES as u64; + } + if !config.skip_preflight { if let Err(e) = verify_transaction(&transaction) { return Err(e); @@ -2352,7 +2371,13 @@ impl RpcSol for RpcSolImpl { } } - _send_transaction(meta, transaction, wire_transaction, last_valid_slot) + _send_transaction( + meta, + transaction, + wire_transaction, + last_valid_slot, + durable_nonce_info, + ) } fn simulate_transaction( diff --git a/core/src/send_transaction_service.rs b/core/src/send_transaction_service.rs index fafe5b1022..85f0e5194a 100644 --- a/core/src/send_transaction_service.rs +++ b/core/src/send_transaction_service.rs @@ -6,6 +6,8 @@ use solana_metrics::{datapoint_warn, inc_new_counter_info}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_sdk::{ clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, + hash::Hash, + nonce_account, pubkey::Pubkey, signature::Signature, }; @@ -32,14 +34,21 @@ pub struct TransactionInfo { pub signature: Signature, pub wire_transaction: Vec, pub last_valid_slot: Slot, + pub durable_nonce_info: Option<(Pubkey, Hash)>, } impl TransactionInfo { - pub fn new(signature: Signature, wire_transaction: Vec, last_valid_slot: Slot) -> Self { + pub fn new( + signature: Signature, + wire_transaction: Vec, + last_valid_slot: Slot, + durable_nonce_info: Option<(Pubkey, Hash)>, + ) -> Self { Self { signature, wire_transaction, last_valid_slot, + durable_nonce_info, } } } @@ -142,7 +151,15 @@ impl SendTransactionService { let addresses = leader_info .as_ref() .map(|leader_info| leader_info.get_leader_tpus(leader_forward_count)); - let addresses = addresses.unwrap_or_else(|| vec![&tpu_address]); + let addresses = addresses + .map(|address_list| { + if address_list.is_empty() { + vec![&tpu_address] + } else { + address_list + } + }) + .unwrap_or_else(|| vec![&tpu_address]); for address in addresses { Self::send_transaction( &send_socket, @@ -204,53 +221,68 @@ impl SendTransactionService { let mut result = ProcessTransactionsResult::default(); transactions.retain(|signature, transaction_info| { + if transaction_info.durable_nonce_info.is_some() { + inc_new_counter_info!("send_transaction_service-nonced", 1); + } if root_bank.has_signature(signature) { info!("Transaction is rooted: {}", signature); result.rooted += 1; inc_new_counter_info!("send_transaction_service-rooted", 1); - false - } else if transaction_info.last_valid_slot < root_bank.slot() { + return false; + } + if let Some((nonce_pubkey, durable_nonce)) = transaction_info.durable_nonce_info { + let nonce_account = working_bank.get_account(&nonce_pubkey).unwrap_or_default(); + if !nonce_account::verify_nonce_account(&nonce_account, &durable_nonce) + && working_bank.get_signature_status_slot(signature).is_none() + { + info!("Dropping expired durable-nonce transaction: {}", signature); + result.expired += 1; + inc_new_counter_info!("send_transaction_service-expired", 1); + return false; + } + } + if transaction_info.last_valid_slot < root_bank.slot() { info!("Dropping expired transaction: {}", signature); result.expired += 1; inc_new_counter_info!("send_transaction_service-expired", 1); - false - } else { - match working_bank.get_signature_status_slot(signature) { - None => { - // Transaction is unknown to the working bank, it might have been - // dropped or landed in another fork. Re-send it - info!("Retrying transaction: {}", signature); - result.retried += 1; - inc_new_counter_info!("send_transaction_service-retry", 1); - let leaders = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(1)); - let leader = if let Some(leaders) = leaders { - if leaders.is_empty() { - &tpu_address - } else { - leaders[0] - } - } else { + return false; + } + + match working_bank.get_signature_status_slot(signature) { + None => { + // Transaction is unknown to the working bank, it might have been + // dropped or landed in another fork. Re-send it + info!("Retrying transaction: {}", signature); + result.retried += 1; + inc_new_counter_info!("send_transaction_service-retry", 1); + let leaders = leader_info + .as_ref() + .map(|leader_info| leader_info.get_leader_tpus(1)); + let leader = if let Some(leaders) = leaders { + if leaders.is_empty() { &tpu_address - }; - Self::send_transaction( - &send_socket, - leader, - &transaction_info.wire_transaction, - ); - true - } - Some((_slot, status)) => { - if status.is_err() { - info!("Dropping failed transaction: {}", signature); - result.failed += 1; - inc_new_counter_info!("send_transaction_service-failed", 1); - false } else { - result.retained += 1; - true + leaders[0] } + } else { + &tpu_address + }; + Self::send_transaction( + &send_socket, + leader, + &transaction_info.wire_transaction, + ); + true + } + Some((_slot, status)) => { + if status.is_err() { + info!("Dropping failed transaction: {}", signature); + result.failed += 1; + inc_new_counter_info!("send_transaction_service-failed", 1); + false + } else { + result.retained += 1; + true } } } @@ -285,11 +317,14 @@ mod test { create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }; use solana_sdk::{ + account::Account, + fee_calculator::FeeCalculator, genesis_config::create_genesis_config, + nonce, poh_config::PohConfig, pubkey::Pubkey, signature::{Keypair, Signer}, - system_transaction, + system_program, system_transaction, timing::timestamp, }; use std::sync::mpsc::channel; @@ -344,10 +379,10 @@ mod test { let mut transactions = HashMap::new(); - info!("Expired transactions are dropped.."); + info!("Expired transactions are dropped..."); transactions.insert( Signature::default(), - TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1), + TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1, None), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -369,7 +404,7 @@ mod test { info!("Rooted transactions are dropped..."); transactions.insert( rooted_signature, - TransactionInfo::new(rooted_signature, vec![], working_bank.slot()), + TransactionInfo::new(rooted_signature, vec![], working_bank.slot(), None), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -391,7 +426,7 @@ mod test { info!("Failed transactions are dropped..."); transactions.insert( failed_signature, - TransactionInfo::new(failed_signature, vec![], working_bank.slot()), + TransactionInfo::new(failed_signature, vec![], working_bank.slot(), None), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -413,7 +448,7 @@ mod test { info!("Non-rooted transactions are kept..."); transactions.insert( non_rooted_signature, - TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()), + TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot(), None), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -436,7 +471,7 @@ mod test { info!("Unknown transactions are retried..."); transactions.insert( Signature::default(), - TransactionInfo::new(Signature::default(), vec![], working_bank.slot()), + TransactionInfo::new(Signature::default(), vec![], working_bank.slot(), None), ); let result = SendTransactionService::process_transactions( &working_bank, @@ -456,6 +491,270 @@ mod test { ); } + #[test] + fn test_retry_durable_nonce_transactions() { + solana_logger::setup(); + + let (genesis_config, mint_keypair) = create_genesis_config(4); + let bank = Bank::new(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let tpu_address = "127.0.0.1:0".parse().unwrap(); + + let root_bank = Arc::new(Bank::new_from_parent( + &bank_forks.read().unwrap().working_bank(), + &Pubkey::default(), + 1, + )); + let rooted_signature = root_bank + .transfer(1, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let nonce_address = Pubkey::new_unique(); + let durable_nonce = Hash::new_unique(); + let nonce_state = + nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data { + authority: Pubkey::default(), + blockhash: durable_nonce, + fee_calculator: FeeCalculator::new(42), + })); + let nonce_account = Account::new_data(43, &nonce_state, &system_program::id()).unwrap(); + root_bank.store_account(&nonce_address, &nonce_account); + + let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2)); + let non_rooted_signature = working_bank + .transfer(2, &mint_keypair, &mint_keypair.pubkey()) + .unwrap(); + + let last_valid_slot = working_bank.slot() + 300; + + let failed_signature = { + let blockhash = working_bank.last_blockhash(); + let transaction = + system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash); + let signature = transaction.signatures[0]; + working_bank.process_transaction(&transaction).unwrap_err(); + signature + }; + + let mut transactions = HashMap::new(); + + info!("Rooted durable-nonce transactions are dropped..."); + transactions.insert( + rooted_signature, + TransactionInfo::new( + rooted_signature, + vec![], + last_valid_slot, + Some((nonce_address, durable_nonce)), + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + rooted: 1, + ..ProcessTransactionsResult::default() + } + ); + // Nonce expired case + transactions.insert( + rooted_signature, + TransactionInfo::new( + rooted_signature, + vec![], + last_valid_slot, + Some((nonce_address, Hash::new_unique())), + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + rooted: 1, + ..ProcessTransactionsResult::default() + } + ); + + // Expired durable-nonce transactions are dropped; nonce has advanced... + info!("Expired durable-nonce transactions are dropped..."); + transactions.insert( + Signature::default(), + TransactionInfo::new( + Signature::default(), + vec![], + last_valid_slot, + Some((nonce_address, Hash::new_unique())), + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + expired: 1, + ..ProcessTransactionsResult::default() + } + ); + // ... or last_valid_slot timeout has passed + transactions.insert( + Signature::default(), + TransactionInfo::new( + Signature::default(), + vec![], + root_bank.slot() - 1, + Some((nonce_address, durable_nonce)), + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + expired: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Failed durable-nonce transactions are dropped..."); + transactions.insert( + failed_signature, + TransactionInfo::new( + failed_signature, + vec![], + last_valid_slot, + Some((nonce_address, Hash::new_unique())), // runtime should advance nonce on failed transactions + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert!(transactions.is_empty()); + assert_eq!( + result, + ProcessTransactionsResult { + failed: 1, + ..ProcessTransactionsResult::default() + } + ); + + info!("Non-rooted durable-nonce transactions are kept..."); + transactions.insert( + non_rooted_signature, + TransactionInfo::new( + non_rooted_signature, + vec![], + last_valid_slot, + Some((nonce_address, Hash::new_unique())), // runtime advances nonce when transaction lands + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retained: 1, + ..ProcessTransactionsResult::default() + } + ); + transactions.clear(); + + info!("Unknown durable-nonce transactions are retried until nonce advances..."); + transactions.insert( + Signature::default(), + TransactionInfo::new( + Signature::default(), + vec![], + last_valid_slot, + Some((nonce_address, durable_nonce)), + ), + ); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert_eq!(transactions.len(), 1); + assert_eq!( + result, + ProcessTransactionsResult { + retried: 1, + ..ProcessTransactionsResult::default() + } + ); + // Advance nonce + let new_durable_nonce = Hash::new_unique(); + let new_nonce_state = + nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data { + authority: Pubkey::default(), + blockhash: new_durable_nonce, + fee_calculator: FeeCalculator::new(42), + })); + let nonce_account = Account::new_data(43, &new_nonce_state, &system_program::id()).unwrap(); + working_bank.store_account(&nonce_address, &nonce_account); + let result = SendTransactionService::process_transactions( + &working_bank, + &root_bank, + &send_socket, + &tpu_address, + &mut transactions, + &None, + ); + assert_eq!(transactions.len(), 0); + assert_eq!( + result, + ProcessTransactionsResult { + expired: 1, + ..ProcessTransactionsResult::default() + } + ); + } + #[test] fn test_get_leader_tpus() { let ledger_path = get_tmp_ledger_path!();