Retry durable-nonce transactions (#14308)

* Retry durable-nonce transactions

* Add metric to track durable-nonce txs in queue

* Populate send-tx-service initial addresses with tpu_address if empty (primarily for testing)

* Reinstate last_valid_slot check for durable-nonce txs; use arbitrary future slot
This commit is contained in:
Tyera Eulberg 2020-12-29 09:48:43 -07:00 committed by GitHub
parent 3a1e01cced
commit 3f10fb993b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 375 additions and 51 deletions

View File

@ -46,7 +46,7 @@ use solana_runtime::{
use solana_sdk::{
account::Account,
account_utils::StateMut,
clock::{Slot, UnixTimestamp},
clock::{Slot, UnixTimestamp, MAX_RECENT_BLOCKHASHES},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_schedule::EpochSchedule,
@ -1875,12 +1875,18 @@ fn _send_transaction(
transaction: Transaction,
wire_transaction: Vec<u8>,
last_valid_slot: Slot,
durable_nonce_info: Option<(Pubkey, Hash)>,
) -> Result<String> {
if transaction.signatures.is_empty() {
return Err(RpcCustomError::TransactionSignatureVerificationFailure.into());
}
let signature = transaction.signatures[0];
let transaction_info = TransactionInfo::new(signature, wire_transaction, last_valid_slot);
let transaction_info = TransactionInfo::new(
signature,
wire_transaction,
last_valid_slot,
durable_nonce_info,
);
meta.transaction_sender
.lock()
.unwrap()
@ -2309,7 +2315,7 @@ impl RpcSol for RpcSolImpl {
Error::internal_error()
})?;
_send_transaction(meta, transaction, wire_transaction, last_valid_slot)
_send_transaction(meta, transaction, wire_transaction, last_valid_slot, None)
}
fn send_transaction(
@ -2328,10 +2334,23 @@ impl RpcSol for RpcSolImpl {
.map(|commitment| CommitmentConfig { commitment });
let preflight_bank = &*meta.bank(preflight_commitment);
let last_valid_slot = preflight_bank
let mut last_valid_slot = preflight_bank
.get_blockhash_last_valid_slot(&transaction.message.recent_blockhash)
.unwrap_or(0);
let durable_nonce_info = solana_sdk::transaction::uses_durable_nonce(&transaction)
.and_then(|nonce_ix| {
solana_sdk::transaction::get_nonce_pubkey_from_instruction(&nonce_ix, &transaction)
})
.map(|&pubkey| (pubkey, transaction.message.recent_blockhash));
if durable_nonce_info.is_some() {
// While it uses a defined constant, this last_valid_slot value is chosen arbitrarily.
// It provides a fallback timeout for durable-nonce transaction retries in case of
// malicious packing of the retry queue. Durable-nonce transactions are otherwise
// retried until the nonce is advanced.
last_valid_slot = preflight_bank.slot() + MAX_RECENT_BLOCKHASHES as u64;
}
if !config.skip_preflight {
if let Err(e) = verify_transaction(&transaction) {
return Err(e);
@ -2352,7 +2371,13 @@ impl RpcSol for RpcSolImpl {
}
}
_send_transaction(meta, transaction, wire_transaction, last_valid_slot)
_send_transaction(
meta,
transaction,
wire_transaction,
last_valid_slot,
durable_nonce_info,
)
}
fn simulate_transaction(

View File

@ -6,6 +6,8 @@ use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
hash::Hash,
nonce_account,
pubkey::Pubkey,
signature::Signature,
};
@ -32,14 +34,21 @@ pub struct TransactionInfo {
pub signature: Signature,
pub wire_transaction: Vec<u8>,
pub last_valid_slot: Slot,
pub durable_nonce_info: Option<(Pubkey, Hash)>,
}
impl TransactionInfo {
pub fn new(signature: Signature, wire_transaction: Vec<u8>, last_valid_slot: Slot) -> Self {
pub fn new(
signature: Signature,
wire_transaction: Vec<u8>,
last_valid_slot: Slot,
durable_nonce_info: Option<(Pubkey, Hash)>,
) -> Self {
Self {
signature,
wire_transaction,
last_valid_slot,
durable_nonce_info,
}
}
}
@ -142,7 +151,15 @@ impl SendTransactionService {
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = addresses.unwrap_or_else(|| vec![&tpu_address]);
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
vec![&tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![&tpu_address]);
for address in addresses {
Self::send_transaction(
&send_socket,
@ -204,53 +221,68 @@ impl SendTransactionService {
let mut result = ProcessTransactionsResult::default();
transactions.retain(|signature, transaction_info| {
if transaction_info.durable_nonce_info.is_some() {
inc_new_counter_info!("send_transaction_service-nonced", 1);
}
if root_bank.has_signature(signature) {
info!("Transaction is rooted: {}", signature);
result.rooted += 1;
inc_new_counter_info!("send_transaction_service-rooted", 1);
false
} else if transaction_info.last_valid_slot < root_bank.slot() {
return false;
}
if let Some((nonce_pubkey, durable_nonce)) = transaction_info.durable_nonce_info {
let nonce_account = working_bank.get_account(&nonce_pubkey).unwrap_or_default();
if !nonce_account::verify_nonce_account(&nonce_account, &durable_nonce)
&& working_bank.get_signature_status_slot(signature).is_none()
{
info!("Dropping expired durable-nonce transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
return false;
}
}
if transaction_info.last_valid_slot < root_bank.slot() {
info!("Dropping expired transaction: {}", signature);
result.expired += 1;
inc_new_counter_info!("send_transaction_service-expired", 1);
false
} else {
match working_bank.get_signature_status_slot(signature) {
None => {
// Transaction is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
info!("Retrying transaction: {}", signature);
result.retried += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let leaders = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(1));
let leader = if let Some(leaders) = leaders {
if leaders.is_empty() {
&tpu_address
} else {
leaders[0]
}
} else {
return false;
}
match working_bank.get_signature_status_slot(signature) {
None => {
// Transaction is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
info!("Retrying transaction: {}", signature);
result.retried += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let leaders = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(1));
let leader = if let Some(leaders) = leaders {
if leaders.is_empty() {
&tpu_address
};
Self::send_transaction(
&send_socket,
leader,
&transaction_info.wire_transaction,
);
true
}
Some((_slot, status)) => {
if status.is_err() {
info!("Dropping failed transaction: {}", signature);
result.failed += 1;
inc_new_counter_info!("send_transaction_service-failed", 1);
false
} else {
result.retained += 1;
true
leaders[0]
}
} else {
&tpu_address
};
Self::send_transaction(
&send_socket,
leader,
&transaction_info.wire_transaction,
);
true
}
Some((_slot, status)) => {
if status.is_err() {
info!("Dropping failed transaction: {}", signature);
result.failed += 1;
inc_new_counter_info!("send_transaction_service-failed", 1);
false
} else {
result.retained += 1;
true
}
}
}
@ -285,11 +317,14 @@ mod test {
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
};
use solana_sdk::{
account::Account,
fee_calculator::FeeCalculator,
genesis_config::create_genesis_config,
nonce,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
system_program, system_transaction,
timing::timestamp,
};
use std::sync::mpsc::channel;
@ -344,10 +379,10 @@ mod test {
let mut transactions = HashMap::new();
info!("Expired transactions are dropped..");
info!("Expired transactions are dropped...");
transactions.insert(
Signature::default(),
TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1),
TransactionInfo::new(Signature::default(), vec![], root_bank.slot() - 1, None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
@ -369,7 +404,7 @@ mod test {
info!("Rooted transactions are dropped...");
transactions.insert(
rooted_signature,
TransactionInfo::new(rooted_signature, vec![], working_bank.slot()),
TransactionInfo::new(rooted_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
@ -391,7 +426,7 @@ mod test {
info!("Failed transactions are dropped...");
transactions.insert(
failed_signature,
TransactionInfo::new(failed_signature, vec![], working_bank.slot()),
TransactionInfo::new(failed_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
@ -413,7 +448,7 @@ mod test {
info!("Non-rooted transactions are kept...");
transactions.insert(
non_rooted_signature,
TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot()),
TransactionInfo::new(non_rooted_signature, vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
@ -436,7 +471,7 @@ mod test {
info!("Unknown transactions are retried...");
transactions.insert(
Signature::default(),
TransactionInfo::new(Signature::default(), vec![], working_bank.slot()),
TransactionInfo::new(Signature::default(), vec![], working_bank.slot(), None),
);
let result = SendTransactionService::process_transactions(
&working_bank,
@ -456,6 +491,270 @@ mod test {
);
}
#[test]
fn test_retry_durable_nonce_transactions() {
solana_logger::setup();
let (genesis_config, mint_keypair) = create_genesis_config(4);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let tpu_address = "127.0.0.1:0".parse().unwrap();
let root_bank = Arc::new(Bank::new_from_parent(
&bank_forks.read().unwrap().working_bank(),
&Pubkey::default(),
1,
));
let rooted_signature = root_bank
.transfer(1, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let nonce_address = Pubkey::new_unique();
let durable_nonce = Hash::new_unique();
let nonce_state =
nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data {
authority: Pubkey::default(),
blockhash: durable_nonce,
fee_calculator: FeeCalculator::new(42),
}));
let nonce_account = Account::new_data(43, &nonce_state, &system_program::id()).unwrap();
root_bank.store_account(&nonce_address, &nonce_account);
let working_bank = Arc::new(Bank::new_from_parent(&root_bank, &Pubkey::default(), 2));
let non_rooted_signature = working_bank
.transfer(2, &mint_keypair, &mint_keypair.pubkey())
.unwrap();
let last_valid_slot = working_bank.slot() + 300;
let failed_signature = {
let blockhash = working_bank.last_blockhash();
let transaction =
system_transaction::transfer(&mint_keypair, &Pubkey::default(), 1, blockhash);
let signature = transaction.signatures[0];
working_bank.process_transaction(&transaction).unwrap_err();
signature
};
let mut transactions = HashMap::new();
info!("Rooted durable-nonce transactions are dropped...");
transactions.insert(
rooted_signature,
TransactionInfo::new(
rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
rooted: 1,
..ProcessTransactionsResult::default()
}
);
// Nonce expired case
transactions.insert(
rooted_signature,
TransactionInfo::new(
rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
rooted: 1,
..ProcessTransactionsResult::default()
}
);
// Expired durable-nonce transactions are dropped; nonce has advanced...
info!("Expired durable-nonce transactions are dropped...");
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
// ... or last_valid_slot timeout has passed
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
root_bank.slot() - 1,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
info!("Failed durable-nonce transactions are dropped...");
transactions.insert(
failed_signature,
TransactionInfo::new(
failed_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())), // runtime should advance nonce on failed transactions
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert!(transactions.is_empty());
assert_eq!(
result,
ProcessTransactionsResult {
failed: 1,
..ProcessTransactionsResult::default()
}
);
info!("Non-rooted durable-nonce transactions are kept...");
transactions.insert(
non_rooted_signature,
TransactionInfo::new(
non_rooted_signature,
vec![],
last_valid_slot,
Some((nonce_address, Hash::new_unique())), // runtime advances nonce when transaction lands
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retained: 1,
..ProcessTransactionsResult::default()
}
);
transactions.clear();
info!("Unknown durable-nonce transactions are retried until nonce advances...");
transactions.insert(
Signature::default(),
TransactionInfo::new(
Signature::default(),
vec![],
last_valid_slot,
Some((nonce_address, durable_nonce)),
),
);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert_eq!(transactions.len(), 1);
assert_eq!(
result,
ProcessTransactionsResult {
retried: 1,
..ProcessTransactionsResult::default()
}
);
// Advance nonce
let new_durable_nonce = Hash::new_unique();
let new_nonce_state =
nonce::state::Versions::new_current(nonce::State::Initialized(nonce::state::Data {
authority: Pubkey::default(),
blockhash: new_durable_nonce,
fee_calculator: FeeCalculator::new(42),
}));
let nonce_account = Account::new_data(43, &new_nonce_state, &system_program::id()).unwrap();
working_bank.store_account(&nonce_address, &nonce_account);
let result = SendTransactionService::process_transactions(
&working_bank,
&root_bank,
&send_socket,
&tpu_address,
&mut transactions,
&None,
);
assert_eq!(transactions.len(), 0);
assert_eq!(
result,
ProcessTransactionsResult {
expired: 1,
..ProcessTransactionsResult::default()
}
);
}
#[test]
fn test_get_leader_tpus() {
let ledger_path = get_tmp_ledger_path!();