Batch tpu calls in send-transaction-service (#24083)

Introduced flag --tpu-do-batch2.
Introduced flag to control the batch size-- by default 100
The default batch timeout is 200ms -- configurable. If either it time out or the batch size is filled, a new batch is sent
The batch honor the retry rate on the transaction already sent before.
Introduced two threads in STS: one for receiving new transactions and doing batch send and one for retrying old transactions and doing batch.6.
Fixes #
This commit is contained in:
Lijun Wang 2022-04-21 12:43:08 -07:00 committed by GitHub
parent 25e9199397
commit 7c61e438fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 412 additions and 85 deletions

View File

@ -179,6 +179,7 @@ impl Banks for BanksServer {
last_valid_block_height,
None,
None,
None,
);
self.transaction_sender.send(info).unwrap();
}
@ -310,6 +311,7 @@ impl Banks for BanksServer {
last_valid_block_height,
None,
None,
None,
);
self.transaction_sender.send(info).unwrap();
self.poll_signature_status(&signature, blockhash, last_valid_block_height, commitment)

View File

@ -61,6 +61,15 @@ while [[ -n $1 ]]; do
elif [[ $1 = --enable-rpc-bigtable-ledger-storage ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-use-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
args+=("$1" "$2")
shift 2
elif [[ $1 = --rpc-send-batch-size ]]; then
args+=("$1" "$2")
shift 2
elif [[ $1 = --skip-poh-verify ]]; then
args+=("$1")
shift

View File

@ -147,6 +147,15 @@ while [[ -n $1 ]]; do
elif [[ $1 = --skip-poh-verify ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-use-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
args+=("$1" "$2")
shift 2
elif [[ $1 = --rpc-send-batch-size ]]; then
args+=("$1" "$2")
shift 2
elif [[ $1 = --log ]]; then
args+=("$1" "$2")
shift 2

View File

@ -10,6 +10,7 @@ use {
},
};
#[derive(Clone)]
pub struct ClusterTpuInfo {
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,

View File

@ -2377,6 +2377,7 @@ fn _send_transaction(
last_valid_block_height,
durable_nonce_info,
max_retries,
None,
);
meta.transaction_sender
.lock()

View File

@ -8,25 +8,52 @@ use {
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{hash::Hash, nonce_account, pubkey::Pubkey, signature::Signature},
std::{
collections::hash_map::{Entry, HashMap},
collections::{
hash_map::{Entry, HashMap},
HashSet,
},
net::SocketAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
};
/// Maximum size of the transaction queue
const MAX_TRANSACTION_QUEUE_SIZE: usize = 10_000; // This seems like a lot but maybe it needs to be bigger one day
/// Default retry interval
const DEFAULT_RETRY_RATE_MS: u64 = 2_000;
/// Default number of leaders to forward transactions to
const DEFAULT_LEADER_FORWARD_COUNT: u64 = 2;
/// Default max number of time the service will retry broadcast
const DEFAULT_SERVICE_MAX_RETRIES: usize = usize::MAX;
/// Default batch size for sending transaction in batch
/// When this size is reached, send out the transactions.
const DEFAULT_TRANSACTION_BATCH_SIZE: usize = 1;
// The maximum transaction batch size
pub const MAX_TRANSACTION_BATCH_SIZE: usize = 10_000;
/// Maximum transaction sends per second
pub const MAX_TRANSACTION_SENDS_PER_SECOND: u64 = 1_000;
/// Default maximum batch waiting time in ms. If this time is reached,
/// whatever transactions are cached will be sent.
const DEFAULT_BATCH_SEND_RATE_MS: u64 = 1;
// The maximum transaction batch send rate in MS
pub const MAX_BATCH_SEND_RATE_MS: usize = 100_000;
pub struct SendTransactionService {
thread: JoinHandle<()>,
receive_txn_thread: JoinHandle<()>,
retry_thread: JoinHandle<()>,
exit: Arc<AtomicBool>,
}
pub struct TransactionInfo {
@ -36,6 +63,8 @@ pub struct TransactionInfo {
pub durable_nonce_info: Option<(Pubkey, Hash)>,
pub max_retries: Option<usize>,
retries: usize,
/// Last time the transaction was sent
last_sent_time: Option<Instant>,
}
impl TransactionInfo {
@ -45,6 +74,7 @@ impl TransactionInfo {
last_valid_block_height: u64,
durable_nonce_info: Option<(Pubkey, Hash)>,
max_retries: Option<usize>,
last_sent_time: Option<Instant>,
) -> Self {
Self {
signature,
@ -53,6 +83,7 @@ impl TransactionInfo {
durable_nonce_info,
max_retries,
retries: 0,
last_sent_time,
}
}
}
@ -75,7 +106,12 @@ pub struct Config {
pub leader_forward_count: u64,
pub default_max_retries: Option<usize>,
pub service_max_retries: usize,
/// Whether to use Quic protocol to send transactions
pub use_quic: bool,
/// The batch size for sending transactions in batches
pub batch_size: usize,
/// How frequently batches are sent
pub batch_send_rate_ms: u64,
}
impl Default for Config {
@ -86,12 +122,14 @@ impl Default for Config {
default_max_retries: None,
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
use_quic: DEFAULT_TPU_USE_QUIC,
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
}
}
}
impl SendTransactionService {
pub fn new<T: TpuInfo + std::marker::Send + 'static>(
pub fn new<T: TpuInfo + std::marker::Send + Clone + 'static>(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
@ -109,100 +147,125 @@ impl SendTransactionService {
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
}
pub fn new_with_config<T: TpuInfo + std::marker::Send + 'static>(
pub fn new_with_config<T: TpuInfo + std::marker::Send + Clone + 'static>(
tpu_address: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
receiver: Receiver<TransactionInfo>,
config: Config,
) -> Self {
let thread = Self::retry_thread(
let retry_transactions = Arc::new(Mutex::new(HashMap::new()));
let exit = Arc::new(AtomicBool::new(false));
let receive_txn_thread = Self::receive_txn_thread(
tpu_address,
receiver,
leader_info.clone(),
config.clone(),
retry_transactions.clone(),
exit.clone(),
);
let retry_thread = Self::retry_thread(
tpu_address,
bank_forks.clone(),
leader_info,
config,
retry_transactions,
exit.clone(),
);
Self { thread }
Self {
receive_txn_thread,
retry_thread,
exit,
}
}
fn retry_thread<T: TpuInfo + std::marker::Send + 'static>(
/// Thread responsible for receiving transactions from RPC clients.
fn receive_txn_thread<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
receiver: Receiver<TransactionInfo>,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<T>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_status_check = Instant::now();
let mut last_batch_sent = Instant::now();
let mut last_leader_refresh = Instant::now();
let mut transactions = HashMap::new();
info!(
"Starting send-transaction-service::receive_txn_thread with config {:?}",
config
);
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-sv2".to_string())
.name("send-tx-receive".to_string())
.spawn(move || loop {
match receiver.recv_timeout(Duration::from_millis(1000.min(config.retry_rate_ms))) {
Err(RecvTimeoutError::Disconnected) => break,
let recv_timeout_ms = config.batch_send_rate_ms;
match receiver.recv_timeout(Duration::from_millis(1000.min(recv_timeout_ms))) {
Err(RecvTimeoutError::Disconnected) => {
info!("Terminating send-transaction-service.");
exit.store(true, Ordering::Relaxed);
break;
}
Err(RecvTimeoutError::Timeout) => {}
Ok(transaction_info) => {
inc_new_counter_info!("send_transaction_service-recv-tx", 1);
let transactions_len = transactions.len();
let entry = transactions.entry(transaction_info.signature);
let mut new_transaction = false;
if let Entry::Vacant(_) = entry {
let addresses = leader_info.as_ref().map(|leader_info| {
leader_info.get_leader_tpus(config.leader_forward_count)
});
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
vec![&tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![&tpu_address]);
for address in addresses {
Self::send_transaction(address, &transaction_info.wire_transaction);
}
if transactions_len < MAX_TRANSACTION_QUEUE_SIZE {
inc_new_counter_info!("send_transaction_service-insert-tx", 1);
if !retry_transactions
.lock()
.unwrap()
.contains_key(&transaction_info.signature)
{
entry.or_insert(transaction_info);
} else {
datapoint_warn!("send_transaction_service-queue-overflow");
new_transaction = true;
}
} else {
}
if !new_transaction {
inc_new_counter_info!("send_transaction_service-recv-duplicate", 1);
}
}
}
if last_status_check.elapsed().as_millis() as u64 >= config.retry_rate_ms {
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
("len", transactions.len(), i64)
);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(
bank_forks.root_bank().clone(),
bank_forks.working_bank().clone(),
)
};
let _result = Self::process_transactions(
&working_bank,
&root_bank,
&tpu_address,
&mut transactions,
&leader_info,
&config,
);
if (!transactions.is_empty()
&& last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms)
|| transactions.len() >= config.batch_size
{
inc_new_counter_info!(
"send_transaction_service-batch-size",
transactions.len()
);
let _result = Self::send_transactions_in_batch(
&tpu_address,
&mut transactions,
&leader_info,
&config,
);
let last_sent_time = Instant::now();
{
// take a lock of retry_transactions and move the batch to the retry set.
let mut retry_transactions = retry_transactions.lock().unwrap();
for (signature, mut transaction_info) in transactions.drain() {
let retry_len = retry_transactions.len();
let entry = retry_transactions.entry(signature);
if let Entry::Vacant(_) = entry {
if retry_len >= MAX_TRANSACTION_QUEUE_SIZE {
datapoint_warn!("send_transaction_service-queue-overflow");
break;
} else {
transaction_info.last_sent_time = Some(last_sent_time);
entry.or_insert(transaction_info);
}
}
}
}
last_status_check = Instant::now();
last_batch_sent = Instant::now();
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
@ -214,6 +277,96 @@ impl SendTransactionService {
.unwrap()
}
/// Thread responsible for retrying transactions
fn retry_thread<T: TpuInfo + std::marker::Send + 'static>(
tpu_address: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
mut leader_info: Option<T>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
let mut last_leader_refresh = Instant::now();
info!(
"Starting send-transaction-service::retry_thread with config {:?}",
config
);
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-retry".to_string())
.spawn(move || loop {
let retry_interval_ms = config.retry_rate_ms;
sleep(Duration::from_millis(1000.min(retry_interval_ms)));
if exit.load(Ordering::Relaxed) {
break;
}
let mut transactions = retry_transactions.lock().unwrap();
if !transactions.is_empty() {
datapoint_info!(
"send_transaction_service-queue-size",
("len", transactions.len(), i64)
);
let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(
bank_forks.root_bank().clone(),
bank_forks.working_bank().clone(),
)
};
let _result = Self::process_transactions(
&working_bank,
&root_bank,
&tpu_address,
&mut transactions,
&leader_info,
&config,
);
}
if last_leader_refresh.elapsed().as_millis() > 1000 {
if let Some(leader_info) = leader_info.as_mut() {
leader_info.refresh_recent_peers();
}
last_leader_refresh = Instant::now();
}
})
.unwrap()
}
/// Process transactions in batch.
fn send_transactions_in_batch<T: TpuInfo>(
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: &Option<T>,
config: &Config,
) {
let mut measure = Measure::start("send_transactions_in_batch-us");
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
let wire_transactions = transactions
.iter()
.map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref())
.collect::<Vec<&[u8]>>();
for address in &addresses {
Self::send_transactions(address, &wire_transactions);
}
measure.stop();
inc_new_counter_info!(
"send_transactions_in_batch-us",
measure.as_us() as usize,
1000,
1000
);
}
/// Retry transactions sent before.
fn process_transactions<T: TpuInfo>(
working_bank: &Arc<Bank>,
root_bank: &Arc<Bank>,
@ -224,6 +377,9 @@ impl SendTransactionService {
) -> ProcessTransactionsResult {
let mut result = ProcessTransactionsResult::default();
let mut batched_transactions = HashSet::new();
let retry_rate = Duration::from_millis(config.retry_rate_ms);
transactions.retain(|signature, mut transaction_info| {
if transaction_info.durable_nonce_info.is_some() {
inc_new_counter_info!("send_transaction_service-nonced", 1);
@ -234,10 +390,17 @@ impl SendTransactionService {
inc_new_counter_info!("send_transaction_service-rooted", 1);
return false;
}
let signature_status = working_bank.get_signature_status_slot(signature);
if let Some((nonce_pubkey, durable_nonce)) = transaction_info.durable_nonce_info {
let nonce_account = working_bank.get_account(&nonce_pubkey).unwrap_or_default();
let now = Instant::now();
let expired = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.unwrap_or(false);
if !nonce_account::verify_nonce_account(&nonce_account, &durable_nonce)
&& working_bank.get_signature_status_slot(signature).is_none()
&& signature_status.is_none()
&& expired
{
info!("Dropping expired durable-nonce transaction: {}", signature);
result.expired += 1;
@ -266,28 +429,27 @@ impl SendTransactionService {
}
}
match working_bank.get_signature_status_slot(signature) {
match signature_status {
None => {
// Transaction is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
info!("Retrying transaction: {}", signature);
result.retried += 1;
transaction_info.retries += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
let addresses = leader_info.as_ref().map(|leader_info| {
leader_info.get_leader_tpus(config.leader_forward_count)
});
let addresses = addresses
.map(|address_list| {
if address_list.is_empty() {
vec![tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![tpu_address]);
for address in addresses {
Self::send_transaction(address, &transaction_info.wire_transaction);
let now = Instant::now();
let need_send = transaction_info
.last_sent_time
.map(|last| now.duration_since(last) >= retry_rate)
.unwrap_or(true);
if need_send {
if transaction_info.last_sent_time.is_some() {
// Transaction sent before is unknown to the working bank, it might have been
// dropped or landed in another fork. Re-send it
info!("Retrying transaction: {}", signature);
result.retried += 1;
transaction_info.retries += 1;
inc_new_counter_info!("send_transaction_service-retry", 1);
}
batched_transactions.insert(*signature);
transaction_info.last_sent_time = Some(now);
}
true
}
@ -305,12 +467,28 @@ impl SendTransactionService {
}
});
if !batched_transactions.is_empty() {
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
let wire_transactions = transactions
.iter()
.filter(|(signature, _)| batched_transactions.contains(signature))
.map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref())
.collect::<Vec<&[u8]>>();
for address in &addresses {
let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
Self::send_transactions(address, chunk);
}
}
}
result
}
fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) {
let mut measure = Measure::start("send_transaction_service-us");
if let Err(err) =
connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address)
{
@ -325,8 +503,56 @@ impl SendTransactionService {
);
}
fn send_transactions_with_metrics(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) {
let mut measure = Measure::start("send_transaction_service-batch-us");
let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect();
let send_result =
connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address);
if let Err(err) = send_result {
warn!(
"Failed to send transaction batch to {}: {:?}",
tpu_address, err
);
}
measure.stop();
inc_new_counter_info!(
"send_transaction_service-batch-us",
measure.as_us() as usize
);
}
fn send_transactions(tpu_address: &SocketAddr, wire_transactions: &[&[u8]]) {
if wire_transactions.len() == 1 {
Self::send_transaction(tpu_address, wire_transactions[0])
} else {
Self::send_transactions_with_metrics(tpu_address, wire_transactions)
}
}
fn get_tpu_addresses<'a, T: TpuInfo>(
tpu_address: &'a SocketAddr,
leader_info: &'a Option<T>,
config: &'a Config,
) -> Vec<&'a SocketAddr> {
let addresses = leader_info
.as_ref()
.map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count));
addresses
.map(|address_list| {
if address_list.is_empty() {
vec![tpu_address]
} else {
address_list
}
})
.unwrap_or_else(|| vec![tpu_address])
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
self.receive_txn_thread.join()?;
self.exit.store(true, Ordering::Relaxed);
self.retry_thread.join()
}
}
@ -340,6 +566,7 @@ mod test {
account::AccountSharedData, genesis_config::create_genesis_config, nonce,
pubkey::Pubkey, signature::Signer, system_program, system_transaction,
},
std::ops::Sub,
};
#[test]
@ -411,6 +638,7 @@ mod test {
root_bank.block_height() - 1,
None,
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -439,6 +667,7 @@ mod test {
working_bank.block_height(),
None,
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -467,6 +696,7 @@ mod test {
working_bank.block_height(),
None,
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -495,6 +725,7 @@ mod test {
working_bank.block_height(),
None,
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -524,8 +755,10 @@ mod test {
working_bank.block_height(),
None,
None,
Some(Instant::now().sub(Duration::from_millis(4000))),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
@ -553,6 +786,7 @@ mod test {
working_bank.block_height(),
None,
Some(0),
Some(Instant::now()),
),
);
transactions.insert(
@ -563,6 +797,7 @@ mod test {
working_bank.block_height(),
None,
Some(1),
Some(Instant::now().sub(Duration::from_millis(4000))),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -658,6 +893,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, durable_nonce)),
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -685,6 +921,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, Hash::new_unique())),
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -714,6 +951,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, Hash::new_unique())),
None,
Some(Instant::now().sub(Duration::from_millis(4000))),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -741,6 +979,7 @@ mod test {
root_bank.block_height() - 1,
Some((nonce_address, durable_nonce)),
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -769,6 +1008,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, Hash::new_unique())), // runtime should advance nonce on failed transactions
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -797,6 +1037,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, Hash::new_unique())), // runtime advances nonce when transaction lands
None,
Some(Instant::now()),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -818,6 +1059,7 @@ mod test {
transactions.clear();
info!("Unknown durable-nonce transactions are retried until nonce advances...");
// simulate there was a nonce transaction sent 4 seconds ago (> the retry rate which is 2 seconds)
transactions.insert(
Signature::default(),
TransactionInfo::new(
@ -826,6 +1068,7 @@ mod test {
last_valid_block_height,
Some((nonce_address, durable_nonce)),
None,
Some(Instant::now().sub(Duration::from_millis(4000))),
),
);
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
@ -844,7 +1087,11 @@ mod test {
..ProcessTransactionsResult::default()
}
);
// Advance nonce
// Advance nonce, simulate the transaction was again last sent 4 seconds ago.
// This time the transaction should have been dropped.
for mut transaction in transactions.values_mut() {
transaction.last_sent_time = Some(Instant::now().sub(Duration::from_millis(4000)));
}
let new_durable_nonce = Hash::new_unique();
let new_nonce_state = nonce::state::Versions::new_current(nonce::State::Initialized(
nonce::state::Data::new(Pubkey::default(), new_durable_nonce, 42),

View File

@ -5,6 +5,7 @@ pub trait TpuInfo {
fn get_leader_tpus(&self, max_count: u64) -> Vec<&SocketAddr>;
}
#[derive(Clone)]
pub struct NullTpuInfo;
impl TpuInfo for NullTpuInfo {

View File

@ -14,6 +14,7 @@ use {
input_validators::{
is_keypair, is_keypair_or_ask_keyword, is_niceness_adjustment_valid, is_parsable,
is_pow2, is_pubkey, is_pubkey_or_keypair, is_slot, is_valid_percentage,
is_within_range,
},
keypair::SKIP_SEED_PHRASE_VALIDATION_ARG,
},
@ -66,7 +67,9 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_send_transaction_service::send_transaction_service,
solana_send_transaction_service::send_transaction_service::{
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
},
solana_streamer::socket::SocketAddrSpace,
solana_validator::{
admin_rpc_service, bootstrap, dashboard::Dashboard, ledger_lockfile, lock_ledger,
@ -101,6 +104,7 @@ const INCLUDE_KEY: &str = "account-index-include-key";
const DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED: u64 = 10485760;
// The maximum times of snapshot download abort and retry
const MAX_SNAPSHOT_DOWNLOAD_ABORT: u32 = 5;
const MILLIS_PER_SECOND: u64 = 1000;
fn monitor_validator(ledger_path: &Path) {
let dashboard = Dashboard::new(ledger_path, None, None).unwrap_or_else(|err| {
@ -434,12 +438,18 @@ pub fn main() {
let default_rpc_send_transaction_retry_ms = default_send_transaction_service_config
.retry_rate_ms
.to_string();
let default_rpc_send_transaction_batch_ms = default_send_transaction_service_config
.batch_send_rate_ms
.to_string();
let default_rpc_send_transaction_leader_forward_count = default_send_transaction_service_config
.leader_forward_count
.to_string();
let default_rpc_send_transaction_service_max_retries = default_send_transaction_service_config
.service_max_retries
.to_string();
let default_rpc_send_transaction_batch_size = default_send_transaction_service_config
.batch_size
.to_string();
let default_rpc_threads = num_cpus::get().to_string();
let default_accountsdb_repl_threads = num_cpus::get().to_string();
let default_maximum_full_snapshot_archives_to_retain =
@ -1344,6 +1354,16 @@ pub fn main() {
.default_value(&default_rpc_send_transaction_retry_ms)
.help("The rate at which transactions sent via rpc service are retried."),
)
.arg(
Arg::with_name("rpc_send_transaction_batch_ms")
.long("rpc-send-batch-ms")
.value_name("MILLISECS")
.hidden(true)
.takes_value(true)
.validator(|s| is_within_range(s, 1, MAX_BATCH_SEND_RATE_MS))
.default_value(&default_rpc_send_transaction_batch_ms)
.help("The rate at which transactions sent via rpc service are sent in batch."),
)
.arg(
Arg::with_name("rpc_send_transaction_leader_forward_count")
.long("rpc-send-leader-count")
@ -1370,6 +1390,16 @@ pub fn main() {
.default_value(&default_rpc_send_transaction_service_max_retries)
.help("The maximum number of transaction broadcast retries, regardless of requested value."),
)
.arg(
Arg::with_name("rpc_send_transaction_batch_size")
.long("rpc-send-batch-size")
.value_name("NUMBER")
.hidden(true)
.takes_value(true)
.validator(|s| is_within_range(s, 1, MAX_TRANSACTION_BATCH_SIZE))
.default_value(&default_rpc_send_transaction_batch_size)
.help("The size of transactions to be sent in batch."),
)
.arg(
Arg::with_name("rpc_scan_and_fix_roots")
.long("rpc-scan-and-fix-roots")
@ -2335,6 +2365,31 @@ pub fn main() {
if matches.is_present("no_accounts_db_index_hashing") {
info!("The accounts hash is only calculated without using the index. --no-accounts-db-index-hashing is deprecated and can be removed from the command line");
}
let rpc_send_retry_rate_ms = value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64);
let rpc_send_batch_size = value_t_or_exit!(matches, "rpc_send_transaction_batch_size", usize);
let rpc_send_batch_send_rate_ms =
value_t_or_exit!(matches, "rpc_send_transaction_batch_ms", u64);
if rpc_send_batch_send_rate_ms > rpc_send_retry_rate_ms {
eprintln!(
"The specified rpc-send-batch-ms ({}) is invalid, it must be <= rpc-send-retry-ms ({})",
rpc_send_batch_send_rate_ms, rpc_send_retry_rate_ms
);
exit(1);
}
let tps = rpc_send_batch_size as u64 * MILLIS_PER_SECOND / rpc_send_batch_send_rate_ms;
if tps > send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND {
eprintln!(
"Either the specified rpc-send-batch-size ({}) or rpc-send-batch-ms ({}) is invalid, \
'rpc-send-batch-size * 1000 / rpc-send-batch-ms' must be smaller than ({}) .",
rpc_send_batch_size,
rpc_send_batch_send_rate_ms,
send_transaction_service::MAX_TRANSACTION_SENDS_PER_SECOND
);
exit(1);
}
let mut validator_config = ValidatorConfig {
require_tower: matches.is_present("require_tower"),
tower_storage,
@ -2416,7 +2471,7 @@ pub fn main() {
debug_keys,
contact_debug_interval,
send_transaction_service_config: send_transaction_service::Config {
retry_rate_ms: value_t_or_exit!(matches, "rpc_send_transaction_retry_ms", u64),
retry_rate_ms: rpc_send_retry_rate_ms,
leader_forward_count: value_t_or_exit!(
matches,
"rpc_send_transaction_leader_forward_count",
@ -2434,6 +2489,8 @@ pub fn main() {
usize
),
use_quic: tpu_use_quic,
batch_send_rate_ms: rpc_send_batch_send_rate_ms,
batch_size: rpc_send_batch_size,
},
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),