Added option to turn on UDP for TPU transaction and make UDP based TPU off by default (#27462)

--tpu-enable-udp is introduced. And when this is on, the transaction receive and transaction forward is enabled using udp.

Except for a few tests which was hard-coded sending transactions using udp, most tests are being done with udp based tpu disabled.
This commit is contained in:
Lijun Wang 2022-09-07 13:19:14 -07:00 committed by GitHub
parent d3ca364e9e
commit 7f223dc582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 131 additions and 36 deletions

View File

@ -30,6 +30,7 @@ use {
stake::{instruction::LockupArgs, state::Lockup},
transaction::{TransactionError, VersionedTransaction},
},
solana_tpu_client::connection_cache::DEFAULT_TPU_ENABLE_UDP,
solana_vote_program::vote_state::VoteAuthorize,
std::{collections::HashMap, error, io::stdout, str::FromStr, sync::Arc, time::Duration},
thiserror::Error,
@ -550,7 +551,7 @@ impl Default for CliConfig<'_> {
u64::from_str(DEFAULT_CONFIRM_TX_TIMEOUT_SECONDS).unwrap(),
),
address_labels: HashMap::new(),
use_quic: false,
use_quic: !DEFAULT_TPU_ENABLE_UDP,
}
}
}

View File

@ -16,6 +16,7 @@ use {
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
},
solana_tpu_client::connection_cache::DEFAULT_TPU_ENABLE_UDP,
std::{
net::UdpSocket,
sync::{
@ -57,6 +58,7 @@ impl FetchStage {
poh_recorder,
coalesce_ms,
None,
DEFAULT_TPU_ENABLE_UDP,
),
receiver,
vote_receiver,
@ -76,6 +78,7 @@ impl FetchStage {
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_enable_udp: bool,
) -> Self {
let tx_sockets = sockets.into_iter().map(Arc::new).collect();
let tpu_forwards_sockets = tpu_forwards_sockets.into_iter().map(Arc::new).collect();
@ -92,6 +95,7 @@ impl FetchStage {
poh_recorder,
coalesce_ms,
in_vote_only_mode,
tpu_enable_udp,
)
}
@ -150,42 +154,52 @@ impl FetchStage {
poh_recorder: &Arc<RwLock<PohRecorder>>,
coalesce_ms: u64,
in_vote_only_mode: Option<Arc<AtomicBool>>,
tpu_enable_udp: bool,
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
let tpu_stats = Arc::new(StreamerReceiveStats::new("tpu_receiver"));
let tpu_threads: Vec<_> = tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
let tpu_threads: Vec<_> = if tpu_enable_udp {
tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect()
} else {
Vec::default()
};
let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver"));
let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect();
let tpu_forwards_threads: Vec<_> = if tpu_enable_udp {
tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
true,
in_vote_only_mode.clone(),
)
})
.collect()
} else {
Vec::default()
};
let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
let tpu_vote_threads: Vec<_> = tpu_vote_sockets

View File

@ -99,6 +99,7 @@ impl Tpu {
log_messages_bytes_limit: Option<usize>,
staked_nodes: &Arc<RwLock<StakedNodes>>,
shared_staked_nodes_overrides: Arc<RwLock<HashMap<Pubkey, u64>>>,
tpu_enable_udp: bool,
) -> Self {
let TpuSockets {
transactions: transactions_sockets,
@ -124,6 +125,7 @@ impl Tpu {
poh_recorder,
tpu_coalesce_ms,
Some(bank_forks.read().unwrap().get_vote_only_mode_signal()),
tpu_enable_udp,
);
let staked_nodes_updater_service = StakedNodesUpdaterService::new(

View File

@ -381,6 +381,7 @@ impl Validator {
socket_addr_space: SocketAddrSpace,
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
) -> Result<Self, String> {
let id = identity_keypair.pubkey();
assert_eq!(id, node.info.id);
@ -1020,6 +1021,7 @@ impl Validator {
config.runtime_config.log_messages_bytes_limit,
&staked_nodes,
config.staked_nodes_overrides.clone(),
tpu_enable_udp,
);
datapoint_info!(
@ -2171,7 +2173,7 @@ mod tests {
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
solana_tpu_client::connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{fs::remove_dir_all, thread, time::Duration},
};
@ -2208,6 +2210,7 @@ mod tests {
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
)
.expect("assume successful validator start");
assert_eq!(
@ -2292,6 +2295,7 @@ mod tests {
SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
)
.expect("assume successful validator start")
})

View File

@ -43,7 +43,8 @@ use {
solana_streamer::socket::SocketAddrSpace,
solana_thin_client::thin_client::ThinClient,
solana_tpu_client::connection_cache::{
ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC,
ConnectionCache, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP,
DEFAULT_TPU_USE_QUIC,
},
solana_vote_program::{
vote_instruction,
@ -278,6 +279,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
)
.expect("assume successful validator start");
@ -477,6 +479,7 @@ impl LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
)
.expect("assume successful validator start");
@ -837,6 +840,7 @@ impl Cluster for LocalCluster {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
)
.expect("assume successful validator start");
cluster_validator_info.validator = Some(restarted_node);

View File

@ -64,6 +64,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --tpu-disable-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-enable-udp ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
args+=("$1" "$2")
shift 2

View File

@ -150,6 +150,9 @@ while [[ -n $1 ]]; do
elif [[ $1 = --tpu-disable-quic ]]; then
args+=("$1")
shift
elif [[ $1 = --tpu-enable-udp ]]; then
args+=("$1")
shift
elif [[ $1 = --rpc-send-batch-ms ]]; then
args+=("$1" "$2")
shift 2

View File

@ -111,6 +111,9 @@ Operate a configured testnet
--tpu-disable-quic
- Disable quic for tpu packet forwarding
--tpu-enable-udp
- Enable UDP for tpu transactions
sanity/start-specific options:
-F - Discard validator nodes that didn't bootup successfully
-o noInstallCheck - Skip solana-install sanity
@ -325,6 +328,7 @@ startBootstrapLeader() {
\"$extraPrimordialStakes\" \
\"$TMPFS_ACCOUNTS\" \
\"$disableQuic\" \
\"$enableUdp\" \
"
) >> "$logFile" 2>&1 || {
@ -398,6 +402,7 @@ startNode() {
\"$extraPrimordialStakes\" \
\"$TMPFS_ACCOUNTS\" \
\"$disableQuic\" \
\"$enableUdp\" \
"
) >> "$logFile" 2>&1 &
declare pid=$!
@ -807,6 +812,7 @@ maybeFullRpc=false
waitForNodeInit=true
extraPrimordialStakes=0
disableQuic=false
enableUdp=false
command=$1
[[ -n $command ]] || usage
@ -922,6 +928,9 @@ while [[ -n $1 ]]; do
elif [[ $1 == --tpu-disable-quic ]]; then
disableQuic=true
shift 1
elif [[ $1 == --tpu-enable-udp ]]; then
enableUdp=true
shift 1
elif [[ $1 == --async-node-init ]]; then
waitForNodeInit=false
shift 1

View File

@ -29,6 +29,7 @@ waitForNodeInit="${20}"
extraPrimordialStakes="${21:=0}"
tmpfsAccounts="${22:false}"
disableQuic="${23}"
enableUdp="${24}"
set +x
@ -290,6 +291,10 @@ EOF
args+=(--tpu-disable-quic)
fi
if $enableUdp; then
args+=(--tpu-enable-udp)
fi
if [[ $airdropsEnabled = true ]]; then
cat >> ~/solana/on-reboot <<EOF
./multinode-demo/faucet.sh > faucet.log 2>&1 &
@ -422,6 +427,10 @@ EOF
args+=(--tpu-disable-quic)
fi
if $enableUdp; then
args+=(--tpu-enable-udp)
fi
cat >> ~/solana/on-reboot <<EOF
$maybeSkipAccountsCreation
nohup multinode-demo/validator.sh ${args[@]} > validator.log.\$now 2>&1 &

View File

@ -245,7 +245,7 @@ fn test_rpc_subscriptions() {
let alice = Keypair::new();
let test_validator =
TestValidator::with_no_fees(alice.pubkey(), None, SocketAddrSpace::Unspecified);
TestValidator::with_no_fees_udp(alice.pubkey(), None, SocketAddrSpace::Unspecified);
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
transactions_socket.connect(test_validator.tpu()).unwrap();

View File

@ -44,7 +44,9 @@ use {
signature::{read_keypair_file, write_keypair_file, Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::connection_cache::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
solana_tpu_client::connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
},
std::{
collections::{HashMap, HashSet},
ffi::OsStr,
@ -123,6 +125,7 @@ pub struct TestValidatorGenesis {
compute_unit_limit: Option<u64>,
pub log_messages_bytes_limit: Option<usize>,
pub transaction_account_lock_limit: Option<usize>,
pub tpu_enable_udp: bool,
}
impl Default for TestValidatorGenesis {
@ -154,6 +157,7 @@ impl Default for TestValidatorGenesis {
compute_unit_limit: Option::<u64>::default(),
log_messages_bytes_limit: Option::<usize>::default(),
transaction_account_lock_limit: Option::<usize>::default(),
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
}
}
}
@ -181,6 +185,11 @@ impl TestValidatorGenesis {
ledger_path.join("vote-account-keypair.json").exists()
}
pub fn tpu_enable_udp(&mut self, tpu_enable_udp: bool) -> &mut Self {
self.tpu_enable_udp = tpu_enable_udp;
self
}
pub fn fee_rate_governor(&mut self, fee_rate_governor: FeeRateGovernor) -> &mut Self {
self.fee_rate_governor = fee_rate_governor;
self
@ -549,6 +558,25 @@ impl TestValidator {
.expect("validator start failed")
}
/// Create a test validator using udp for TPU.
pub fn with_no_fees_udp(
mint_address: Pubkey,
faucet_addr: Option<SocketAddr>,
socket_addr_space: SocketAddrSpace,
) -> Self {
TestValidatorGenesis::default()
.tpu_enable_udp(true)
.fee_rate_governor(FeeRateGovernor::new(0, 0))
.rent(Rent {
lamports_per_byte_year: 1,
exemption_threshold: 1.0,
..Rent::default()
})
.faucet_addr(faucet_addr)
.start_with_mint_address(mint_address, socket_addr_space)
.expect("validator start failed")
}
/// Create and start a `TestValidator` with custom transaction fees and minimal rent.
/// Faucet optional.
///
@ -815,6 +843,7 @@ impl TestValidator {
socket_addr_space,
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
config.tpu_enable_udp,
)?);
// Needed to avoid panics in `solana-responder-gossip` in tests that create a number of

View File

@ -37,6 +37,8 @@ pub const DEFAULT_TPU_USE_QUIC: bool = true;
/// Default TPU connection pool size per remote address
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
#[derive(Default)]
pub struct ConnectionCacheStats {
cache_hits: AtomicU64,

View File

@ -70,7 +70,9 @@ use {
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::connection_cache::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_tpu_client::connection_cache::{
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP,
},
solana_validator::{
admin_rpc_service,
admin_rpc_service::{load_staked_nodes_overrides, StakedNodesOverrides},
@ -1245,6 +1247,12 @@ pub fn main() {
.takes_value(false)
.help("Do not use QUIC to send transactions."),
)
.arg(
Arg::with_name("tpu_enable_udp")
.long("tpu-enable-udp")
.takes_value(false)
.help("Enable UDP for receiving/sending transactions."),
)
.arg(
Arg::with_name("disable_quic_servers")
.long("disable-quic-servers")
@ -2431,6 +2439,12 @@ pub fn main() {
let accounts_shrink_optimize_total_space =
value_t_or_exit!(matches, "accounts_shrink_optimize_total_space", bool);
let tpu_use_quic = !matches.is_present("tpu_disable_quic");
let tpu_enable_udp = if matches.is_present("tpu_enable_udp") {
true
} else {
DEFAULT_TPU_ENABLE_UDP
};
let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize);
let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64);
@ -3221,6 +3235,7 @@ pub fn main() {
socket_addr_space,
tpu_use_quic,
tpu_connection_pool_size,
tpu_enable_udp,
)
.unwrap_or_else(|e| {
error!("Failed to start validator: {:?}", e);