Improved Transaction Forwarding (#13944)

* Forwarding

* Dedupe leaders

* Use consistent commitment for last_valid_slot in rpc send_transaction

* Plumb rpc send_transaction options into solana-validator

* Extend num slots banking-stage holds forwarded txs

Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
sakridge 2020-12-17 14:37:22 -08:00 committed by GitHub
parent 3322b83183
commit da7d1e2302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 251 additions and 43 deletions

View File

@ -522,6 +522,7 @@ impl CliConfig<'_> {
commitment: CommitmentConfig::recent(),
send_transaction_config: RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: Some(CommitmentConfig::recent().commitment),
..RpcSendTransactionConfig::default()
},
..Self::default()
@ -1406,6 +1407,7 @@ fn send_deploy_messages(
config.commitment,
RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: Some(config.commitment.commitment),
..RpcSendTransactionConfig::default()
},
)
@ -1815,6 +1817,7 @@ fn process_set_program_upgrade_authority(
config.commitment,
RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: Some(config.commitment.commitment),
..RpcSendTransactionConfig::default()
},
)

View File

@ -58,7 +58,7 @@ type PacketsAndOffsets = (Packets, Vec<usize>);
pub type UnprocessedPackets = Vec<PacketsAndOffsets>;
/// Transaction forwarding
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 1;
pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2;
// Fixed thread size seems to be fastest on GCP setup
pub const NUM_THREADS: u32 = 4;

View File

@ -237,7 +237,7 @@ impl JsonRpcRequestProcessor {
let cluster_info = Arc::new(ClusterInfo::default());
let tpu_address = cluster_info.my_contact_info().tpu;
let (sender, receiver) = channel();
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
Self {
config: JsonRpcConfig::default(),
@ -2322,8 +2322,13 @@ impl RpcSol for RpcSolImpl {
let config = config.unwrap_or_default();
let encoding = config.encoding.unwrap_or(UiTransactionEncoding::Base58);
let (wire_transaction, transaction) = deserialize_transaction(data, encoding)?;
let bank = &*meta.bank(None);
let last_valid_slot = bank
let preflight_commitment = config
.preflight_commitment
.map(|commitment| CommitmentConfig { commitment });
let preflight_bank = &*meta.bank(preflight_commitment);
let last_valid_slot = preflight_bank
.get_blockhash_last_valid_slot(&transaction.message.recent_blockhash)
.unwrap_or(0);
@ -2335,11 +2340,6 @@ impl RpcSol for RpcSolImpl {
if meta.health.check() != RpcHealthStatus::Ok {
return Err(RpcCustomError::RpcNodeUnhealthy.into());
}
let preflight_commitment = config
.preflight_commitment
.map(|commitment| CommitmentConfig { commitment });
let preflight_bank = &*meta.bank(preflight_commitment);
if let (Err(err), logs) = preflight_bank.simulate_transaction(transaction.clone()) {
return Err(RpcCustomError::SendTransactionPreflightFailure {
message: format!("Transaction simulation failed: {}", err),
@ -2906,7 +2906,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
cluster_info.insert_info(ContactInfo::new_with_pubkey_socketaddr(
&leader_pubkey,
@ -4306,7 +4306,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
@ -4502,7 +4502,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), false);
assert_eq!(exit.load(Ordering::Relaxed), false);
}
@ -4534,7 +4534,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(request_processor.validator_exit(), true);
assert_eq!(exit.load(Ordering::Relaxed), true);
}
@ -4625,7 +4625,7 @@ pub mod tests {
None,
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {

View File

@ -251,6 +251,8 @@ impl JsonRpcService {
trusted_validators: Option<HashSet<Pubkey>>,
override_health_check: Arc<AtomicBool>,
optimistically_confirmed_bank: Arc<RwLock<OptimisticallyConfirmedBank>>,
send_transaction_retry_ms: u64,
send_transaction_leader_forward_count: u64,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
@ -323,6 +325,8 @@ impl JsonRpcService {
&bank_forks,
leader_info,
receiver,
send_transaction_retry_ms,
send_transaction_leader_forward_count,
));
#[cfg(test)]
@ -456,6 +460,8 @@ mod tests {
None,
Arc::new(AtomicBool::new(false)),
optimistically_confirmed_bank,
1000,
1,
);
let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");

View File

@ -4,7 +4,11 @@ use crate::poh_recorder::PohRecorder;
use log::*;
use solana_metrics::{datapoint_warn, inc_new_counter_info};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
use solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
signature::Signature,
};
use std::sync::Mutex;
use std::{
collections::HashMap,
@ -64,12 +68,21 @@ impl LeaderInfo {
.collect();
}
pub fn get_leader_tpu(&self) -> Option<&SocketAddr> {
self.poh_recorder
.lock()
.unwrap()
.leader_after_n_slots(0)
.and_then(|leader| self.recent_peers.get(&leader))
pub fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.lock().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let mut unique_leaders = vec![];
for leader in leaders.iter() {
if let Some(addr) = self.recent_peers.get(leader) {
if !unique_leaders.contains(&addr) {
unique_leaders.push(addr);
}
}
}
unique_leaders
}
}
@ -88,8 +101,17 @@ impl SendTransactionService {
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<LeaderInfo>,
receiver: Receiver<TransactionInfo>,
retry_rate_ms: u64,
leader_forward_count: u64,
) -> Self {
let thread = Self::retry_thread(tpu_address, receiver, bank_forks.clone(), leader_info);
let thread = Self::retry_thread(
tpu_address,
receiver,
bank_forks.clone(),
leader_info,
retry_rate_ms,
leader_forward_count,
);
Self { thread }
}
@ -98,8 +120,11 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<LeaderInfo>,
retry_rate_ms: u64,
leader_forward_count: u64,
) -> JoinHandle<()> {
let mut last_status_check = Instant::now();
let mut last_leader_refresh = Instant::now();
let mut transactions = HashMap::new();
let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
@ -110,19 +135,21 @@ impl SendTransactionService {
Builder::new()
.name("send-tx-sv2".to_string())
.spawn(move || loop {
match receiver.recv_timeout(Duration::from_secs(1)) {
match receiver.recv_timeout(Duration::from_millis(1000.min(retry_rate_ms))) {
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
let address = leader_info
let addresses = leader_info
.as_ref()
.and_then(|leader_info| leader_info.get_leader_tpu())
.unwrap_or(&tpu_address);
Self::send_transaction(
&send_socket,
address,
&transaction_info.wire_transaction,
);
.map(|leader_info| leader_info.get_leader_tpus(leader_forward_count));
let addresses = addresses.unwrap_or_else(|| vec![&tpu_address]);
for address in addresses {
Self::send_transaction(
&send_socket,
address,
&transaction_info.wire_transaction,
);
}
if transactions.len() < MAX_TRANSACTION_QUEUE_SIZE {
transactions.insert(transaction_info.signature, transaction_info);
} else {
@ -131,15 +158,19 @@ impl SendTransactionService {
}
}
if Instant::now().duration_since(last_status_check).as_secs() >= 5 {
if last_status_check.elapsed().as_millis() as u64 >= retry_rate_ms {
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
("len", transactions.len(), i64)
);
let bank_forks = bank_forks.read().unwrap();
let root_bank = bank_forks.root_bank();
let working_bank = bank_forks.working_bank();
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(
bank_forks.root_bank().clone(),
bank_forks.working_bank().clone(),
)
};
let _result = Self::process_transactions(
&working_bank,
@ -151,8 +182,11 @@ impl SendTransactionService {
);
}
last_status_check = Instant::now();
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
last_leader_refresh = Instant::now();
}
}
})
@ -188,12 +222,21 @@ impl SendTransactionService {
info!("Retrying transaction: {}", signature);
result.retried += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let leaders = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(1));
let leader = if let Some(leaders) = leaders {
if leaders.is_empty() {
&tpu_address
} else {
leaders[0]
}
} else {
&tpu_address
};
Self::send_transaction(
&send_socket,
leader_info
.as_ref()
.and_then(|leader_info| leader_info.get_leader_tpu())
.unwrap_or(&tpu_address),
leader,
&transaction_info.wire_transaction,
);
true
@ -234,9 +277,20 @@ impl SendTransactionService {
#[cfg(test)]
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use solana_ledger::{
blockstore::Blockstore, get_tmp_ledger_path, leader_schedule_cache::LeaderScheduleCache,
};
use solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs,
};
use solana_sdk::{
genesis_config::create_genesis_config, pubkey::Pubkey, signature::Signer,
genesis_config::create_genesis_config,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
system_transaction,
timing::timestamp,
};
use std::sync::mpsc::channel;
@ -248,7 +302,7 @@ mod test {
let (sender, receiver) = channel();
let send_tranaction_service =
SendTransactionService::new(tpu_address, &bank_forks, None, receiver);
SendTransactionService::new(tpu_address, &bank_forks, None, receiver, 1000, 1);
drop(sender);
send_tranaction_service.join().unwrap();
@ -401,4 +455,113 @@ mod test {
}
);
}
#[test]
fn test_get_leader_tpus() {
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path).unwrap();
let validator_vote_keypairs0 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs1 = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypairs2 = ValidatorVoteKeypairs::new_rand();
let validator_keypairs = vec![
&validator_vote_keypairs0,
&validator_vote_keypairs1,
&validator_vote_keypairs2,
];
let GenesisConfigInfo {
genesis_config,
mint_keypair: _,
voting_keypair: _,
} = create_genesis_config_with_vote_accounts(
1_000_000_000,
&validator_keypairs,
vec![10_000; 3],
);
let bank = Arc::new(Bank::new(&genesis_config));
let (poh_recorder, _entry_receiver) = PohRecorder::new(
0,
bank.last_blockhash(),
0,
Some((2, 2)),
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
);
let node_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()),
node_keypair,
));
let validator0_socket = SocketAddr::from(([127, 0, 0, 1], 1111));
let validator1_socket = SocketAddr::from(([127, 0, 0, 1], 2222));
let validator2_socket = SocketAddr::from(([127, 0, 0, 1], 3333));
let recent_peers: HashMap<_, _> = vec![
(
validator_vote_keypairs0.node_keypair.pubkey(),
validator0_socket,
),
(
validator_vote_keypairs1.node_keypair.pubkey(),
validator1_socket,
),
(
validator_vote_keypairs2.node_keypair.pubkey(),
validator2_socket,
),
]
.iter()
.cloned()
.collect();
let leader_info = LeaderInfo {
cluster_info,
poh_recorder: Arc::new(Mutex::new(poh_recorder)),
recent_peers: recent_peers.clone(),
};
let slot = bank.slot();
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1),
vec![recent_peers.get(&first_leader).unwrap()]
);
let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + NUM_CONSECUTIVE_LEADER_SLOTS,
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(),
recent_peers.get(&second_leader).unwrap(),
];
expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(2), expected_leader_sockets);
let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
slot + (2 * NUM_CONSECUTIVE_LEADER_SLOTS),
&bank,
)
.unwrap();
let mut expected_leader_sockets = vec![
recent_peers.get(&first_leader).unwrap(),
recent_peers.get(&second_leader).unwrap(),
recent_peers.get(&third_leader).unwrap(),
];
expected_leader_sockets.dedup();
assert_eq!(leader_info.get_leader_tpus(3), expected_leader_sockets);
for x in 4..8 {
assert!(leader_info.get_leader_tpus(x).len() <= recent_peers.len());
}
}
Blockstore::destroy(&ledger_path).unwrap();
}
}

View File

@ -106,6 +106,8 @@ pub struct ValidatorConfig {
pub debug_keys: Option<Arc<HashSet<Pubkey>>>,
pub contact_debug_interval: u64,
pub bpf_jit: bool,
pub send_transaction_retry_ms: u64,
pub send_transaction_leader_forward_count: u64,
}
impl Default for ValidatorConfig {
@ -144,6 +146,8 @@ impl Default for ValidatorConfig {
debug_keys: None,
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL,
bpf_jit: false,
send_transaction_retry_ms: 2000,
send_transaction_leader_forward_count: 2,
}
}
}
@ -433,6 +437,8 @@ impl Validator {
config.trusted_validators.clone(),
rpc_override_health_check.clone(),
optimistically_confirmed_bank.clone(),
config.send_transaction_retry_ms,
config.send_transaction_leader_forward_count,
),
pubsub_service: PubSubService::new(
config.pubsub_config.clone(),

View File

@ -798,6 +798,12 @@ pub fn main() {
PubSubConfig::default().max_in_buffer_capacity.to_string();
let default_rpc_pubsub_max_out_buffer_capacity =
PubSubConfig::default().max_out_buffer_capacity.to_string();
let default_rpc_send_transaction_retry_ms = ValidatorConfig::default()
.send_transaction_retry_ms
.to_string();
let default_rpc_send_transaction_leader_forward_count = ValidatorConfig::default()
.send_transaction_leader_forward_count
.to_string();
let matches = App::new(crate_name!()).about(crate_description!())
.version(solana_version::version!())
@ -1278,6 +1284,24 @@ pub fn main() {
.default_value(&default_rpc_pubsub_max_out_buffer_capacity)
.help("The maximum size in bytes to which the outgoing websocket buffer can grow."),
)
.arg(
Arg::with_name("rpc_send_transaction_retry_ms")
.long("rpc-send-retry-ms")
.value_name("MILLISECS")
.takes_value(true)
.validator(is_parsable::<u64>)
.default_value(&default_rpc_send_transaction_retry_ms)
.help("The rate at which transactions sent via rpc service are retried."),
)
.arg(
Arg::with_name("rpc_send_transaction_leader_forward_count")
.long("rpc-send-leader-count")
.value_name("NUMBER")
.takes_value(true)
.validator(is_parsable::<u64>)
.default_value(&default_rpc_send_transaction_leader_forward_count)
.help("The number of upcoming leaders to which to forward transactions sent via rpc service."),
)
.arg(
Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch")
.long("halt-on-trusted-validators-accounts-hash-mismatch")
@ -1483,6 +1507,12 @@ pub fn main() {
debug_keys,
contact_debug_interval,
bpf_jit: matches.is_present("bpf_jit"),
send_transaction_retry_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64),
send_transaction_leader_forward_count: value_t_or_exit!(
matches,
"rpc_send_transaction_leader_forward_count",
u64
),
..ValidatorConfig::default()
};