client: Remove static connection cache, plumb it instead (#25667)

* client: Remove static connection cache, plumb it instead

* Add TpuClient::new_with_connection_cache to not break downstream

* Refactor get_connection and RwLock into ConnectionCache

* Fix merge conflicts from new async TpuClient

* Remove `ConnectionCache::set_use_quic`

* Move DEFAULT_TPU_USE_QUIC to client, use ConnectionCache::default()
This commit is contained in:
Jon Cinque 2022-06-08 13:57:12 +02:00 committed by GitHub
parent 3be11061ed
commit 79a8ecd0ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 592 additions and 307 deletions

2
Cargo.lock generated
View File

@ -4341,6 +4341,7 @@ dependencies = [
"log",
"rand 0.7.3",
"rayon",
"solana-client",
"solana-core",
"solana-gossip",
"solana-ledger",
@ -4388,6 +4389,7 @@ dependencies = [
"crossbeam-channel",
"futures 0.3.21",
"solana-banks-interface",
"solana-client",
"solana-runtime",
"solana-sdk 1.11.0",
"solana-send-transaction-service",

View File

@ -14,6 +14,7 @@ crossbeam-channel = "0.5"
log = "0.4.17"
rand = "0.7.0"
rayon = "1.5.3"
solana-client = { path = "../client", version = "=1.11.0" }
solana-core = { path = "../core", version = "=1.11.0" }
solana-gossip = { path = "../gossip", version = "=1.11.0" }
solana-ledger = { path = "../ledger", version = "=1.11.0" }

View File

@ -5,6 +5,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::banking_stage::BankingStage,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{
@ -212,6 +213,12 @@ fn main() {
.takes_value(true)
.help("Number of threads to use in the banking stage"),
)
.arg(
Arg::new("tpu_use_quic")
.long("tpu-use-quic")
.takes_value(false)
.help("Forward messages to TPU using QUIC"),
)
.get_matches();
let num_banking_threads = matches
@ -334,6 +341,7 @@ fn main() {
SocketAddrSpace::Unspecified,
);
let cluster_info = Arc::new(cluster_info);
let tpu_use_quic = matches.is_present("tpu_use_quic");
let banking_stage = BankingStage::new_num_threads(
&cluster_info,
&poh_recorder,
@ -344,6 +352,7 @@ fn main() {
None,
replay_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::new(tpu_use_quic)),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -14,6 +14,7 @@ bincode = "1.3.3"
crossbeam-channel = "0.5"
futures = "0.3"
solana-banks-interface = { path = "../banks-interface", version = "=1.11.0" }
solana-client = { path = "../client", version = "=1.11.0" }
solana-runtime = { path = "../runtime", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }
solana-send-transaction-service = { path = "../send-transaction-service", version = "=1.11.0" }

View File

@ -6,6 +6,7 @@ use {
Banks, BanksRequest, BanksResponse, BanksTransactionResultWithSimulation,
TransactionConfirmationStatus, TransactionSimulationDetails, TransactionStatus,
},
solana_client::connection_cache::ConnectionCache,
solana_runtime::{
bank::{Bank, TransactionSimulationResult},
bank_forks::BankForks,
@ -24,7 +25,7 @@ use {
transaction::{self, SanitizedTransaction, Transaction},
},
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC},
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
},
std::{
@ -393,6 +394,7 @@ pub async fn start_tcp_server(
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
) -> io::Result<()> {
// Note: These settings are copied straight from the tarpc example.
let server = tcp::listen(listen_addr, Bincode::default)
@ -417,9 +419,9 @@ pub async fn start_tcp_server(
&bank_forks,
None,
receiver,
&connection_cache,
5_000,
0,
DEFAULT_TPU_USE_QUIC,
);
let server = BanksServer::new(

View File

@ -3,6 +3,7 @@
use {
crate::banks_server::start_tcp_server,
futures::{future::FutureExt, pin_mut, prelude::stream::StreamExt, select},
solana_client::connection_cache::ConnectionCache,
solana_runtime::{bank_forks::BankForks, commitment::BlockCommitmentCache},
std::{
net::SocketAddr,
@ -29,6 +30,7 @@ async fn start_abortable_tcp_server(
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) {
let server = start_tcp_server(
@ -36,6 +38,7 @@ async fn start_abortable_tcp_server(
tpu_addr,
bank_forks.clone(),
block_commitment_cache.clone(),
connection_cache,
)
.fuse();
let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse();
@ -58,6 +61,7 @@ impl RpcBanksService {
tpu_addr: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) {
let server = start_abortable_tcp_server(
@ -65,6 +69,7 @@ impl RpcBanksService {
tpu_addr,
bank_forks,
block_commitment_cache,
connection_cache,
exit,
);
Runtime::new().unwrap().block_on(server);
@ -75,10 +80,12 @@ impl RpcBanksService {
tpu_addr: SocketAddr,
bank_forks: &Arc<RwLock<BankForks>>,
block_commitment_cache: &Arc<RwLock<BlockCommitmentCache>>,
connection_cache: &Arc<ConnectionCache>,
exit: &Arc<AtomicBool>,
) -> Self {
let bank_forks = bank_forks.clone();
let block_commitment_cache = block_commitment_cache.clone();
let connection_cache = connection_cache.clone();
let exit = exit.clone();
let thread_hdl = Builder::new()
.name("solana-rpc-banks".to_string())
@ -88,6 +95,7 @@ impl RpcBanksService {
tpu_addr,
bank_forks,
block_commitment_cache,
connection_cache,
exit,
)
})
@ -109,9 +117,17 @@ mod tests {
fn test_rpc_banks_server_exit() {
let bank_forks = Arc::new(RwLock::new(BankForks::new(Bank::default_for_tests())));
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let connection_cache = Arc::new(ConnectionCache::default());
let exit = Arc::new(AtomicBool::new(false));
let addr = "127.0.0.1:0".parse().unwrap();
let service = RpcBanksService::new(addr, addr, &bank_forks, &block_commitment_cache, &exit);
let service = RpcBanksService::new(
addr,
addr,
&bank_forks,
&block_commitment_cache,
&connection_cache,
&exit,
);
exit.store(true, Ordering::Relaxed);
service.join().unwrap();
}

View File

@ -2,6 +2,7 @@ use {
clap::{crate_description, crate_name, App, Arg, ArgMatches},
solana_clap_utils::input_validators::{is_url, is_url_or_moniker},
solana_cli_config::{ConfigInput, CONFIG_FILE},
solana_client::connection_cache::DEFAULT_TPU_USE_QUIC,
solana_sdk::{
fee_calculator::FeeRateGovernor,
pubkey::Pubkey,
@ -77,7 +78,7 @@ impl Default for Config {
target_slots_per_epoch: 0,
target_node: None,
external_client_type: ExternalClientType::default(),
use_quic: false,
use_quic: DEFAULT_TPU_USE_QUIC,
}
}
}

View File

@ -8,8 +8,9 @@ use {
keypairs::get_keypairs,
},
solana_client::{
connection_cache,
connection_cache::ConnectionCache,
rpc_client::RpcClient,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_genesis::Base64Account,
@ -101,9 +102,7 @@ fn main() {
do_bench_tps(client, cli_config, keypairs);
}
ExternalClientType::ThinClient => {
if *use_quic {
connection_cache::set_use_quic(true);
}
let connection_cache = Arc::new(ConnectionCache::new(*use_quic));
let client = if let Ok(rpc_addr) = value_t!(matches, "rpc_addr", String) {
let rpc = rpc_addr.parse().unwrap_or_else(|e| {
eprintln!("RPC address should parse as socketaddr {:?}", e);
@ -117,7 +116,7 @@ fn main() {
exit(1);
});
solana_client::thin_client::create_client(rpc, tpu)
ThinClient::new(rpc, tpu, connection_cache)
} else {
let nodes =
discover_cluster(entrypoint_addr, *num_nodes, SocketAddrSpace::Unspecified)
@ -127,7 +126,7 @@ fn main() {
});
if *multi_client {
let (client, num_clients) =
get_multi_client(&nodes, &SocketAddrSpace::Unspecified);
get_multi_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache);
if nodes.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
@ -141,8 +140,11 @@ fn main() {
let mut target_client = None;
for node in nodes {
if node.id == *target_node {
target_client =
Some(get_client(&[node], &SocketAddrSpace::Unspecified));
target_client = Some(get_client(
&[node],
&SocketAddrSpace::Unspecified,
connection_cache,
));
break;
}
}
@ -151,7 +153,7 @@ fn main() {
exit(1);
})
} else {
get_client(&nodes, &SocketAddrSpace::Unspecified)
get_client(&nodes, &SocketAddrSpace::Unspecified, connection_cache)
}
};
let client = Arc::new(client);
@ -170,15 +172,18 @@ fn main() {
json_rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
if *use_quic {
connection_cache::set_use_quic(true);
}
let connection_cache = Arc::new(ConnectionCache::new(*use_quic));
let client = Arc::new(
TpuClient::new(rpc_client, websocket_url, TpuClientConfig::default())
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
connection_cache,
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {:?}", err);
exit(1);
}),
);
let keypairs = get_keypairs(
client.clone(),

View File

@ -6,8 +6,9 @@ use {
cli::Config,
},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
thin_client::create_client,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_core::validator::ValidatorConfig,
@ -58,9 +59,10 @@ fn test_bench_tps_local_cluster(config: Config) {
cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);
let client = Arc::new(create_client(
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc,
cluster.entry_point_info.tpu,
cluster.connection_cache.clone(),
));
let lamports_per_account = 100;
@ -96,9 +98,17 @@ fn test_bench_tps_test_validator(config: Config) {
CommitmentConfig::processed(),
));
let websocket_url = test_validator.rpc_pubsub_url();
let connection_cache = Arc::new(ConnectionCache::default());
let client =
Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap());
let client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
&websocket_url,
TpuClientConfig::default(),
connection_cache,
)
.unwrap(),
);
let lamports_per_account = 100;

View File

@ -19,6 +19,7 @@ use {
},
solana_client::{
client_error::ClientErrorKind,
connection_cache::ConnectionCache,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig, RpcSendTransactionConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
@ -2223,10 +2224,12 @@ fn send_deploy_messages(
if let Some(write_messages) = write_messages {
if let Some(write_signer) = write_signer {
trace!("Writing program data");
let tpu_client = TpuClient::new(
let connection_cache = Arc::new(ConnectionCache::default());
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
&config.websocket_url,
TpuClientConfig::default(),
connection_cache,
)?;
let transaction_errors = tpu_client
.send_and_confirm_messages_with_spinner(

View File

@ -5,14 +5,13 @@ use {
udp_client::UdpTpuConnection,
},
indexmap::map::IndexMap,
lazy_static::lazy_static,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
std::{
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
},
},
@ -21,6 +20,10 @@ use {
// Should be non-zero
static MAX_CONNECTIONS: usize = 1024;
/// Used to decide whether the TPU and underlying connection cache should use
/// QUIC connections.
pub const DEFAULT_TPU_USE_QUIC: bool = false;
#[derive(Default)]
pub struct ConnectionCacheStats {
cache_hits: AtomicU64,
@ -210,35 +213,161 @@ impl ConnectionCacheStats {
}
}
struct ConnectionMap {
map: IndexMap<SocketAddr, Arc<Connection>>,
pub struct ConnectionCache {
map: RwLock<IndexMap<SocketAddr, Arc<Connection>>>,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
use_quic: bool,
use_quic: AtomicBool,
}
impl ConnectionMap {
pub fn new() -> Self {
impl ConnectionCache {
pub fn new(use_quic: bool) -> Self {
Self {
map: IndexMap::with_capacity(MAX_CONNECTIONS),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
use_quic: false,
use_quic: AtomicBool::new(use_quic),
..Self::default()
}
}
pub fn set_use_quic(&mut self, use_quic: bool) {
self.use_quic = use_quic;
pub fn get_use_quic(&self) -> bool {
self.use_quic.load(Ordering::Relaxed)
}
fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop();
let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
let report_stats = self
.last_stats
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) =
match map.get(addr) {
Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0),
None => {
// Upgrade to write access by dropping read lock and acquire write lock
drop(map);
let mut get_connection_map_lock_measure =
Measure::start("get_connection_map_lock_measure");
let mut map = self.map.write().unwrap();
get_connection_map_lock_measure.stop();
lock_timing_ms =
lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
match map.get(addr) {
Some(connection) => (connection.clone(), true, self.stats.clone(), 0, 0),
None => {
let connection: Connection = if self.use_quic.load(Ordering::Relaxed) {
QuicTpuConnection::new(*addr, self.stats.clone()).into()
} else {
UdpTpuConnection::new(*addr, self.stats.clone()).into()
};
let connection = Arc::new(connection);
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
map.insert(*addr, connection.clone());
(
connection,
false,
self.stats.clone(),
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
}
}
}
};
get_connection_map_measure.stop();
GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms: get_connection_map_measure.as_ms(),
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
}
}
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<Connection> {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms,
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
} = self.get_or_add_connection(addr);
if report_stats {
connection_cache_stats.report();
}
if cache_hit {
connection_cache_stats
.cache_hits
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_hit_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
} else {
connection_cache_stats
.cache_misses
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_miss_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
connection_cache_stats
.cache_evictions
.fetch_add(num_evictions, Ordering::Relaxed);
connection_cache_stats
.eviction_time_ms
.fetch_add(eviction_timing_ms, Ordering::Relaxed);
}
get_connection_measure.stop();
connection_cache_stats
.get_connection_lock_ms
.fetch_add(lock_timing_ms, Ordering::Relaxed);
connection_cache_stats
.get_connection_ms
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
connection
}
}
lazy_static! {
static ref CONNECTION_MAP: RwLock<ConnectionMap> = RwLock::new(ConnectionMap::new());
}
pub fn set_use_quic(use_quic: bool) {
let mut map = (*CONNECTION_MAP).write().unwrap();
map.set_use_quic(use_quic);
impl Default for ConnectionCache {
fn default() -> Self {
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
last_stats: AtomicInterval::default(),
use_quic: AtomicBool::new(DEFAULT_TPU_USE_QUIC),
}
}
}
struct GetConnectionResult {
@ -252,140 +381,11 @@ struct GetConnectionResult {
eviction_timing_ms: u64,
}
fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let map = (*CONNECTION_MAP).read().unwrap();
get_connection_map_lock_measure.stop();
let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
let report_stats = map
.last_stats
.should_update(CONNECTION_STAT_SUBMISSION_INTERVAL);
let mut get_connection_map_measure = Measure::start("get_connection_hit_measure");
let (connection, cache_hit, connection_cache_stats, num_evictions, eviction_timing_ms) =
match map.map.get(addr) {
Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
None => {
// Upgrade to write access by dropping read lock and acquire write lock
drop(map);
let mut get_connection_map_lock_measure =
Measure::start("get_connection_map_lock_measure");
let mut map = (*CONNECTION_MAP).write().unwrap();
get_connection_map_lock_measure.stop();
lock_timing_ms =
lock_timing_ms.saturating_add(get_connection_map_lock_measure.as_ms());
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
match map.map.get(addr) {
Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
None => {
let connection: Connection = if map.use_quic {
QuicTpuConnection::new(*addr, map.stats.clone()).into()
} else {
UdpTpuConnection::new(*addr, map.stats.clone()).into()
};
let connection = Arc::new(connection);
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
while map.map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0, MAX_CONNECTIONS);
map.map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
map.map.insert(*addr, connection.clone());
(
connection,
false,
map.stats.clone(),
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
}
}
}
};
get_connection_map_measure.stop();
GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms: get_connection_map_measure.as_ms(),
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
}
}
// TODO: see https://github.com/solana-labs/solana/issues/23661
// remove lazy_static and optimize and refactor this
pub fn get_connection(addr: &SocketAddr) -> Arc<Connection> {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
cache_hit,
report_stats,
map_timing_ms,
lock_timing_ms,
connection_cache_stats,
num_evictions,
eviction_timing_ms,
} = get_or_add_connection(addr);
if report_stats {
connection_cache_stats.report();
}
if cache_hit {
connection_cache_stats
.cache_hits
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_hit_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
} else {
connection_cache_stats
.cache_misses
.fetch_add(1, Ordering::Relaxed);
connection_cache_stats
.get_connection_miss_ms
.fetch_add(map_timing_ms, Ordering::Relaxed);
connection_cache_stats
.cache_evictions
.fetch_add(num_evictions, Ordering::Relaxed);
connection_cache_stats
.eviction_time_ms
.fetch_add(eviction_timing_ms, Ordering::Relaxed);
}
get_connection_measure.stop();
connection_cache_stats
.get_connection_lock_ms
.fetch_add(lock_timing_ms, Ordering::Relaxed);
connection_cache_stats
.get_connection_ms
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
connection
}
#[cfg(test)]
mod tests {
use {
crate::{
connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS},
connection_cache::{ConnectionCache, MAX_CONNECTIONS},
tpu_connection::TpuConnection,
},
rand::{Rng, SeedableRng},
@ -419,28 +419,29 @@ mod tests {
// we can actually connect to those addresses - TPUConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_cache = ConnectionCache::default();
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
let addr = get_addr(&mut rng);
get_connection(&addr);
connection_cache.get_connection(&addr);
addr
})
.collect::<Vec<_>>();
{
let map = (*CONNECTION_MAP).read().unwrap();
assert!(map.map.len() == MAX_CONNECTIONS);
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| {
let conn = map.map.get(a).expect("Address not found");
let conn = map.get(a).expect("Address not found");
assert!(a.ip() == conn.tpu_addr().ip());
});
}
let addr = get_addr(&mut rng);
get_connection(&addr);
connection_cache.get_connection(&addr);
let map = (*CONNECTION_MAP).read().unwrap();
assert!(map.map.len() == MAX_CONNECTIONS);
let _conn = map.map.get(&addr).expect("Address not found");
let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS);
let _conn = map.get(&addr).expect("Address not found");
}
}

View File

@ -1,7 +1,7 @@
use {
crate::{
client_error::ClientError,
connection_cache::get_connection,
connection_cache::ConnectionCache,
nonblocking::{
pubsub_client::{PubsubClient, PubsubClientError},
rpc_client::RpcClient,
@ -65,6 +65,7 @@ pub struct TpuClient {
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
}
impl TpuClient {
@ -99,7 +100,7 @@ impl TpuClient {
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
let conn = get_connection(&tpu_address);
let conn = self.connection_cache.get_connection(&tpu_address);
// Fake async
let result = conn.send_wire_transaction_async(wire_transaction.clone());
if let Err(err) = result {
@ -124,6 +125,17 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let connection_cache = Arc::new(ConnectionCache::default());
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
}
/// Create a new client that disconnects when dropped
pub async fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service =
@ -134,6 +146,7 @@ impl TpuClient {
leader_tpu_service,
exit,
rpc_client,
connection_cache,
})
}

View File

@ -5,7 +5,7 @@
use {
crate::{
connection_cache::get_connection, rpc_client::RpcClient,
connection_cache::ConnectionCache, rpc_client::RpcClient,
rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
tpu_connection::TpuConnection,
},
@ -33,7 +33,7 @@ use {
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
RwLock,
Arc, RwLock,
},
time::{Duration, Instant},
},
@ -123,34 +123,49 @@ pub struct ThinClient {
rpc_clients: Vec<RpcClient>,
tpu_addrs: Vec<SocketAddr>,
optimizer: ClientOptimizer,
connection_cache: Arc<ConnectionCache>,
}
impl ThinClient {
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
/// (currently hardcoded to UDP)
pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr) -> Self {
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr)
pub fn new(
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
connection_cache: Arc<ConnectionCache>,
) -> Self {
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
}
pub fn new_socket_with_timeout(
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
timeout: Duration,
connection_cache: Arc<ConnectionCache>,
) -> Self {
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
Self::new_from_client(rpc_client, tpu_addr)
Self::new_from_client(rpc_client, tpu_addr, connection_cache)
}
fn new_from_client(rpc_client: RpcClient, tpu_addr: SocketAddr) -> Self {
fn new_from_client(
rpc_client: RpcClient,
tpu_addr: SocketAddr,
connection_cache: Arc<ConnectionCache>,
) -> Self {
Self {
rpc_clients: vec![rpc_client],
tpu_addrs: vec![tpu_addr],
optimizer: ClientOptimizer::new(0),
connection_cache,
}
}
pub fn new_from_addrs(rpc_addrs: Vec<SocketAddr>, tpu_addrs: Vec<SocketAddr>) -> Self {
pub fn new_from_addrs(
rpc_addrs: Vec<SocketAddr>,
tpu_addrs: Vec<SocketAddr>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
assert!(!rpc_addrs.is_empty());
assert_eq!(rpc_addrs.len(), tpu_addrs.len());
@ -160,6 +175,7 @@ impl ThinClient {
rpc_clients,
tpu_addrs,
optimizer,
connection_cache,
}
}
@ -208,7 +224,7 @@ impl ThinClient {
bincode::serialize(&transaction).expect("transaction serialization failed");
while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 {
let conn = get_connection(self.tpu_addr());
let conn = self.connection_cache.get_connection(self.tpu_addr());
// Send the transaction if there has been no confirmation (e.g. the first time)
conn.send_wire_transaction(&wire_transaction)?;
}
@ -596,7 +612,7 @@ impl AsyncClient for ThinClient {
&self,
transaction: VersionedTransaction,
) -> TransportResult<Signature> {
let conn = get_connection(self.tpu_addr());
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.serialize_and_send_transaction(&transaction)?;
Ok(transaction.signatures[0])
}
@ -605,24 +621,12 @@ impl AsyncClient for ThinClient {
&self,
batch: Vec<VersionedTransaction>,
) -> TransportResult<()> {
let conn = get_connection(self.tpu_addr());
let conn = self.connection_cache.get_connection(self.tpu_addr());
conn.par_serialize_and_send_transaction_batch(&batch[..])?;
Ok(())
}
}
pub fn create_client(rpc: SocketAddr, tpu: SocketAddr) -> ThinClient {
ThinClient::new(rpc, tpu)
}
pub fn create_client_with_timeout(
rpc: SocketAddr,
tpu: SocketAddr,
timeout: Duration,
) -> ThinClient {
ThinClient::new_socket_with_timeout(rpc, tpu, timeout)
}
#[cfg(test)]
mod tests {
use {super::*, rayon::prelude::*};

View File

@ -1,7 +1,7 @@
use {
crate::{
client_error::{ClientError, Result as ClientResult},
connection_cache::get_connection,
connection_cache::ConnectionCache,
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
rpc_client::RpcClient,
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
@ -87,6 +87,7 @@ pub struct TpuClient {
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
}
impl TpuClient {
@ -120,7 +121,7 @@ impl TpuClient {
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots)
{
let conn = get_connection(&tpu_address);
let conn = self.connection_cache.get_connection(&tpu_address);
let result = conn.send_wire_transaction_async(wire_transaction.clone());
if let Err(err) = result {
last_error = Some(err);
@ -144,6 +145,17 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let connection_cache = Arc::new(ConnectionCache::default());
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
}
/// Create a new client that disconnects when dropped
pub fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service =
@ -155,6 +167,7 @@ impl TpuClient {
leader_tpu_service,
exit,
rpc_client,
connection_cache,
})
}

View File

@ -8,6 +8,7 @@ use {
log::*,
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::{BankingStage, BankingStageStats},
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
@ -230,6 +231,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None,
s,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -18,7 +18,7 @@ use {
itertools::Itertools,
min_max_heap::MinMaxHeap,
solana_client::{
connection_cache::get_connection, tpu_connection::TpuConnection,
connection_cache::ConnectionCache, tpu_connection::TpuConnection,
udp_client::UdpTpuConnection,
},
solana_entry::entry::hash_transactions,
@ -410,6 +410,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -421,6 +422,7 @@ impl BankingStage {
transaction_status_sender,
gossip_vote_sender,
cost_model,
connection_cache,
)
}
@ -435,6 +437,7 @@ impl BankingStage {
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks.
@ -466,6 +469,7 @@ impl BankingStage {
let gossip_vote_sender = gossip_vote_sender.clone();
let data_budget = data_budget.clone();
let cost_model = cost_model.clone();
let connection_cache = connection_cache.clone();
Builder::new()
.name(format!("solana-banking-stage-tx-{}", i))
.spawn(move || {
@ -481,6 +485,7 @@ impl BankingStage {
gossip_vote_sender,
&data_budget,
cost_model,
connection_cache,
);
})
.unwrap()
@ -506,6 +511,7 @@ impl BankingStage {
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets(
connection_cache: &ConnectionCache,
forward_option: &ForwardOption,
cluster_info: &ClusterInfo,
poh_recorder: &Arc<Mutex<PohRecorder>>,
@ -570,7 +576,7 @@ impl BankingStage {
banking_stage_stats
.forwarded_transaction_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
get_connection(&addr)
connection_cache.get_connection(&addr)
};
let res = conn.send_wire_transaction_batch_async(packet_vec);
@ -872,6 +878,7 @@ impl BankingStage {
data_budget: &DataBudget,
qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
connection_cache: &ConnectionCache,
) {
let (decision, make_decision_time) = measure!(
{
@ -940,6 +947,7 @@ impl BankingStage {
data_budget,
slot_metrics_tracker,
banking_stage_stats,
connection_cache,
),
"forward",
);
@ -956,6 +964,7 @@ impl BankingStage {
data_budget,
slot_metrics_tracker,
banking_stage_stats,
connection_cache,
),
"forward_and_hold",
);
@ -974,6 +983,7 @@ impl BankingStage {
data_budget: &DataBudget,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
banking_stage_stats: &BankingStageStats,
connection_cache: &ConnectionCache,
) {
if let ForwardOption::NotForward = forward_option {
if !hold {
@ -986,6 +996,7 @@ impl BankingStage {
Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter());
let forwardable_packets_len = forwardable_packets.len();
let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets(
connection_cache,
forward_option,
cluster_info,
poh_recorder,
@ -1032,6 +1043,7 @@ impl BankingStage {
gossip_vote_sender: ReplayVoteSender,
data_budget: &DataBudget,
cost_model: Arc<RwLock<CostModel>>,
connection_cache: Arc<ConnectionCache>,
) {
let recorder = poh_recorder.lock().unwrap().recorder();
let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit);
@ -1058,6 +1070,7 @@ impl BankingStage {
data_budget,
&qos_service,
&mut slot_metrics_tracker,
&connection_cache,
),
"process_buffered_packets",
);
@ -2257,6 +2270,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
drop(verified_sender);
drop(gossip_verified_vote_sender);
@ -2306,6 +2320,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
trace!("sending bank");
drop(verified_sender);
@ -2380,6 +2395,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
// fund another account so we can send 2 good transactions in a single batch.
@ -2531,6 +2547,7 @@ mod tests {
None,
gossip_vote_sender,
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
// wait for banking_stage to eat the packets
@ -4058,6 +4075,7 @@ mod tests {
("budget-available", DataBudget::default(), 1),
];
let connection_cache = ConnectionCache::default();
for (name, data_budget, expected_num_forwarded) in test_cases {
let mut unprocessed_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(
@ -4074,6 +4092,7 @@ mod tests {
&data_budget,
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&connection_cache,
);
recv_socket
@ -4146,6 +4165,7 @@ mod tests {
let local_node = Node::new_localhost_with_pubkey(validator_pubkey);
let cluster_info = new_test_cluster_info(local_node.info);
let recv_socket = &local_node.sockets.tpu_forwards[0];
let connection_cache = ConnectionCache::default();
let test_cases = vec![
("not-forward", ForwardOption::NotForward, true, vec![], 2),
@ -4183,6 +4203,7 @@ mod tests {
&DataBudget::default(),
&mut LeaderSlotMetricsTracker::new(0),
&stats,
&connection_cache,
);
recv_socket

View File

@ -16,6 +16,7 @@ use {
staked_nodes_updater_service::StakedNodesUpdaterService,
},
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError},
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender},
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
@ -96,6 +97,7 @@ impl Tpu {
tpu_coalesce_ms: u64,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
cost_model: &Arc<RwLock<CostModel>>,
connection_cache: &Arc<ConnectionCache>,
keypair: &Keypair,
) -> Self {
let TpuSockets {
@ -226,6 +228,7 @@ impl Tpu {
transaction_status_sender,
replay_vote_sender,
cost_model.clone(),
connection_cache.clone(),
);
let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -26,6 +26,7 @@ use {
warm_quic_cache_service::WarmQuicCacheService,
},
crossbeam_channel::{bounded, unbounded, Receiver, RecvTimeoutError},
solana_client::connection_cache::ConnectionCache,
solana_geyser_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
@ -132,7 +133,7 @@ impl Tvu {
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
wait_to_vote_slot: Option<Slot>,
accounts_background_request_sender: AbsRequestSender,
use_quic: bool,
connection_cache: &Arc<ConnectionCache>,
) -> Self {
let TvuSockets {
repair: repair_socket,
@ -229,8 +230,9 @@ impl Tvu {
bank_forks.clone(),
);
let warm_quic_cache_service = if use_quic {
let warm_quic_cache_service = if connection_cache.get_use_quic() {
Some(WarmQuicCacheService::new(
connection_cache.clone(),
cluster_info.clone(),
poh_recorder.clone(),
exit.clone(),
@ -451,7 +453,7 @@ pub mod tests {
None,
None,
AbsRequestSender::default(),
false, // use_quic
&Arc::new(ConnectionCache::default()),
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();

View File

@ -25,6 +25,7 @@ use {
},
crossbeam_channel::{bounded, unbounded, Receiver},
rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache,
solana_entry::poh::compute_hash_time_ns,
solana_geyser_plugin_manager::geyser_plugin_service::GeyserPluginService,
solana_gossip::{
@ -747,6 +748,8 @@ impl Validator {
};
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let connection_cache = Arc::new(ConnectionCache::new(use_quic));
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let (
json_rpc_service,
@ -791,6 +794,7 @@ impl Validator {
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
connection_cache.clone(),
max_complete_transaction_status_slot,
)
.unwrap_or_else(|s| {
@ -972,7 +976,7 @@ impl Validator {
block_metadata_notifier,
config.wait_to_vote_slot,
accounts_background_request_sender,
use_quic,
&connection_cache,
);
let tpu = Tpu::new(
@ -1004,6 +1008,7 @@ impl Validator {
config.tpu_coalesce_ms,
cluster_confirmed_slot_sender,
&cost_model,
&connection_cache,
&identity_keypair,
);
@ -2049,6 +2054,7 @@ mod tests {
use {
super::*,
crossbeam_channel::{bounded, RecvTimeoutError},
solana_client::connection_cache::DEFAULT_TPU_USE_QUIC,
solana_ledger::{create_new_tmp_ledger, genesis_utils::create_genesis_config_with_leader},
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
std::{fs::remove_dir_all, thread, time::Duration},
@ -2084,7 +2090,7 @@ mod tests {
true, // should_check_duplicate_instance
start_progress.clone(),
SocketAddrSpace::Unspecified,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
);
assert_eq!(
*start_progress.read().unwrap(),
@ -2179,7 +2185,7 @@ mod tests {
true, // should_check_duplicate_instance
Arc::new(RwLock::new(ValidatorStartProgress::default())),
SocketAddrSpace::Unspecified,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
)
})
.collect();

View File

@ -3,7 +3,7 @@
use {
rand::{thread_rng, Rng},
solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_poh::poh_recorder::PohRecorder,
std::{
@ -26,6 +26,7 @@ const CACHE_JITTER_SLOT: i64 = 20;
impl WarmQuicCacheService {
pub fn new(
connection_cache: Arc<ConnectionCache>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<Mutex<PohRecorder>>,
exit: Arc<AtomicBool>,
@ -48,7 +49,7 @@ impl WarmQuicCacheService {
if let Some(addr) = cluster_info
.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
{
let conn = get_connection(&addr);
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_wire_transaction(&[0u8]) {
warn!(
"Failed to warmup QUIC connection to the leader {:?}, Error {:?}",

View File

@ -50,6 +50,13 @@ pub struct DosClientParameters {
#[clap(flatten)]
pub transaction_params: TransactionParams,
#[clap(
long,
conflicts_with("skip-gossip"),
help = "Submit transactions via QUIC"
)]
pub tpu_use_quic: bool,
}
#[derive(Args, Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
@ -210,7 +217,8 @@ mod tests {
data_input: Some(pubkey),
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default()
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
}
@ -228,6 +236,7 @@ mod tests {
"--valid-signatures",
"--num-signatures",
"8",
"--tpu-use-quic",
])
.unwrap();
assert_eq!(
@ -248,6 +257,7 @@ mod tests {
transaction_type: None,
num_instructions: None,
},
tpu_use_quic: true,
},
);
}
@ -287,6 +297,7 @@ mod tests {
transaction_type: Some(TransactionType::Transfer),
num_instructions: Some(1),
},
tpu_use_quic: false,
},
);
@ -341,6 +352,7 @@ mod tests {
transaction_type: Some(TransactionType::Transfer),
num_instructions: Some(8),
},
tpu_use_quic: false,
},
);
}
@ -378,6 +390,7 @@ mod tests {
transaction_type: Some(TransactionType::AccountCreation),
num_instructions: None,
},
tpu_use_quic: false,
},
);
}

View File

@ -44,7 +44,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::rpc_client::RpcClient,
solana_client::{connection_cache::ConnectionCache, rpc_client::RpcClient},
solana_core::serve_repair::RepairProtocol,
solana_dos::cli::*,
solana_gossip::{
@ -598,7 +598,9 @@ fn main() {
exit(1);
});
let (client, num_clients) = get_multi_client(&validators, &SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::new(cmd_params.tpu_use_quic));
let (client, num_clients) =
get_multi_client(&validators, &SocketAddrSpace::Unspecified, connection_cache);
if validators.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
@ -620,7 +622,7 @@ fn main() {
pub mod test {
use {
super::*,
solana_client::thin_client::{create_client, ThinClient},
solana_client::thin_client::ThinClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_local_cluster::{
@ -658,6 +660,7 @@ pub mod test {
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
@ -673,6 +676,7 @@ pub mod test {
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
@ -688,6 +692,7 @@ pub mod test {
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
@ -703,6 +708,7 @@ pub mod test {
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
}
@ -733,6 +739,7 @@ pub mod test {
skip_gossip: false,
allow_private_addr: false,
transaction_params: TransactionParams::default(),
tpu_use_quic: false,
},
);
}
@ -749,9 +756,10 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];
let client = Arc::new(create_client(
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc,
cluster.entry_point_info.tpu,
cluster.connection_cache.clone(),
));
// creates one transaction with 8 valid signatures and sends it 10 times
@ -775,6 +783,7 @@ pub mod test {
transaction_type: None,
num_instructions: None,
},
tpu_use_quic: false,
},
);
@ -799,6 +808,7 @@ pub mod test {
transaction_type: None,
num_instructions: None,
},
tpu_use_quic: false,
},
);
@ -823,12 +833,12 @@ pub mod test {
transaction_type: None,
num_instructions: None,
},
tpu_use_quic: false,
},
);
}
#[test]
fn test_dos_with_blockhash_and_payer() {
fn run_dos_with_blockhash_and_payer(tpu_use_quic: bool) {
solana_logger::setup();
// 1. Create faucet thread
@ -872,9 +882,10 @@ pub mod test {
let node = cluster.get_contact_info(&nodes[0]).unwrap().clone();
let nodes_slice = [node];
let client = Arc::new(create_client(
let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc,
cluster.entry_point_info.tpu,
cluster.connection_cache.clone(),
));
// creates one transaction and sends it 10 times
@ -899,6 +910,7 @@ pub mod test {
transaction_type: Some(TransactionType::Transfer),
num_instructions: Some(1),
},
tpu_use_quic,
},
);
@ -925,6 +937,7 @@ pub mod test {
transaction_type: Some(TransactionType::Transfer),
num_instructions: Some(1),
},
tpu_use_quic,
},
);
// creates and sends unique transactions of type Transfer
@ -950,6 +963,7 @@ pub mod test {
transaction_type: Some(TransactionType::Transfer),
num_instructions: Some(8),
},
tpu_use_quic,
},
);
// creates and sends unique transactions of type CreateAccount
@ -975,7 +989,18 @@ pub mod test {
transaction_type: Some(TransactionType::AccountCreation),
num_instructions: None,
},
tpu_use_quic,
},
);
}
#[test]
fn test_dos_with_blockhash_and_payer() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ false)
}
#[test]
fn test_dos_with_blockhash_and_payer_and_quic() {
run_dos_with_blockhash_and_payer(/*tpu_use_quic*/ true)
}
}

View File

@ -4,7 +4,7 @@ use {
crate::{cluster_info::ClusterInfo, contact_info::ContactInfo},
crossbeam_channel::{unbounded, Sender},
rand::{thread_rng, Rng},
solana_client::thin_client::{create_client, ThinClient},
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
@ -194,29 +194,25 @@ pub fn discover(
))
}
/// Creates a ThinClient per valid node
pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> Vec<ThinClient> {
nodes
.iter()
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.map(|(rpc, tpu)| create_client(rpc, tpu))
.collect()
}
/// Creates a ThinClient by selecting a valid node at random
pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> ThinClient {
pub fn get_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> ThinClient {
let nodes: Vec<_> = nodes
.iter()
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.collect();
let select = thread_rng().gen_range(0, nodes.len());
let (rpc, tpu) = nodes[select];
create_client(rpc, tpu)
ThinClient::new(rpc, tpu, connection_cache)
}
pub fn get_multi_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> (ThinClient, usize) {
let addrs: Vec<_> = nodes
.iter()
@ -226,7 +222,10 @@ pub fn get_multi_client(
let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect();
let num_nodes = tpu_addrs.len();
(ThinClient::new_from_addrs(rpc_addrs, tpu_addrs), num_nodes)
(
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache),
num_nodes,
)
}
fn spy(

View File

@ -6,7 +6,7 @@ use log::*;
use {
rand::{thread_rng, Rng},
rayon::prelude::*,
solana_client::thin_client::create_client,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_core::consensus::VOTE_THRESHOLD_DEPTH,
solana_entry::entry::{Entry, EntrySlice},
solana_gossip::{
@ -50,6 +50,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
nodes: usize,
ignore_nodes: HashSet<Pubkey, S>,
socket_addr_space: SocketAddrSpace,
connection_cache: &Arc<ConnectionCache>,
) {
let cluster_nodes =
discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap();
@ -61,7 +62,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
}
let random_keypair = Keypair::new();
let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let bal = client
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
@ -83,7 +84,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
continue;
}
let (rpc, tpu) = validator.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(&sig, confs).unwrap();
}
});
@ -92,9 +93,10 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
pub fn verify_balances<S: ::std::hash::BuildHasher>(
expected_balances: HashMap<Pubkey, u64, S>,
node: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) {
let (rpc, tpu) = node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache);
for (pk, b) in expected_balances {
let bal = client
.poll_get_balance_with_commitment(&pk, CommitmentConfig::processed())
@ -106,11 +108,12 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
pub fn send_many_transactions(
node: &ContactInfo,
funding_keypair: &Keypair,
connection_cache: &Arc<ConnectionCache>,
max_tokens_per_transfer: u64,
num_txs: u64,
) -> HashMap<Pubkey, u64> {
let (rpc, tpu) = node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let mut expected_balances = HashMap::new();
for _ in 0..num_txs {
let random_keypair = Keypair::new();
@ -193,6 +196,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
entry_point_info: &ContactInfo,
entry_point_validator_exit: &Arc<RwLock<Exit>>,
funding_keypair: &Keypair,
connection_cache: &Arc<ConnectionCache>,
nodes: usize,
slot_millis: u64,
socket_addr_space: SocketAddrSpace,
@ -202,7 +206,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap();
assert!(cluster_nodes.len() >= nodes);
let (rpc, tpu) = entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
// sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);
@ -232,7 +236,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
}
let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let balance = client
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
@ -278,7 +282,13 @@ pub fn kill_entry_and_spend_and_verify_rest(
}
};
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(entry_point_info, &cluster_nodes, &sig, confs) {
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&sig,
confs,
) {
Err(e) => {
info!("poll_all_nodes_for_signature() failed {:?}", e);
result = Err(e);
@ -292,7 +302,12 @@ pub fn kill_entry_and_spend_and_verify_rest(
}
}
pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], test_name: &str) {
pub fn check_for_new_roots(
num_new_roots: usize,
contact_infos: &[ContactInfo],
connection_cache: &Arc<ConnectionCache>,
test_name: &str,
) {
let mut roots = vec![HashSet::new(); contact_infos.len()];
let mut done = false;
let mut last_print = Instant::now();
@ -304,7 +319,7 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo],
for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let root_slot = client
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
@ -327,6 +342,7 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo],
pub fn check_no_new_roots(
num_slots_to_wait: usize,
contact_infos: &[ContactInfo],
connection_cache: &Arc<ConnectionCache>,
test_name: &str,
) {
assert!(!contact_infos.is_empty());
@ -336,7 +352,7 @@ pub fn check_no_new_roots(
.enumerate()
.map(|(i, ingress_node)| {
let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let initial_root = client
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.id));
@ -355,7 +371,7 @@ pub fn check_no_new_roots(
loop {
for contact_info in contact_infos {
let (rpc, tpu) = contact_info.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
current_slot = client
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].id));
@ -378,7 +394,7 @@ pub fn check_no_new_roots(
for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
assert_eq!(
client
.get_slot()
@ -391,6 +407,7 @@ pub fn check_no_new_roots(
fn poll_all_nodes_for_signature(
entry_point_info: &ContactInfo,
cluster_nodes: &[ContactInfo],
connection_cache: &Arc<ConnectionCache>,
sig: &Signature,
confs: usize,
) -> Result<(), TransportError> {
@ -399,7 +416,7 @@ fn poll_all_nodes_for_signature(
continue;
}
let (rpc, tpu) = validator.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(sig, confs)?;
}

View File

@ -6,7 +6,10 @@ use {
},
itertools::izip,
log::*,
solana_client::thin_client::{create_client, ThinClient},
solana_client::{
connection_cache::{ConnectionCache, DEFAULT_TPU_USE_QUIC},
thin_client::ThinClient,
},
solana_core::{
tower_storage::FileTowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
@ -76,6 +79,7 @@ pub struct ClusterConfig {
pub cluster_type: ClusterType,
pub poh_config: PohConfig,
pub additional_accounts: Vec<(Pubkey, AccountSharedData)>,
pub tpu_use_quic: bool,
}
impl Default for ClusterConfig {
@ -95,6 +99,7 @@ impl Default for ClusterConfig {
poh_config: PohConfig::default(),
skip_warmup_slots: false,
additional_accounts: vec![],
tpu_use_quic: DEFAULT_TPU_USE_QUIC,
}
}
}
@ -106,6 +111,7 @@ pub struct LocalCluster {
pub entry_point_info: ContactInfo,
pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
pub genesis_config: GenesisConfig,
pub connection_cache: Arc<ConnectionCache>,
}
impl LocalCluster {
@ -248,7 +254,7 @@ impl LocalCluster {
true, // should_check_duplicate_instance
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
);
let mut validators = HashMap::new();
@ -271,6 +277,7 @@ impl LocalCluster {
entry_point_info: leader_contact_info,
validators,
genesis_config,
connection_cache: Arc::new(ConnectionCache::new(config.tpu_use_quic)),
};
let node_pubkey_to_vote_key: HashMap<Pubkey, Arc<Keypair>> = keys_in_genesis
@ -390,7 +397,7 @@ impl LocalCluster {
socket_addr_space: SocketAddrSpace,
) -> Pubkey {
let (rpc, tpu) = self.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, self.connection_cache.clone());
// Must have enough tokens to fund vote account and set delegate
let should_create_vote_pubkey = voting_keypair.is_none();
@ -442,7 +449,7 @@ impl LocalCluster {
true, // should_check_duplicate_instance
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
);
let validator_pubkey = validator_keypair.pubkey();
@ -476,7 +483,7 @@ impl LocalCluster {
pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let (rpc, tpu) = self.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, self.connection_cache.clone());
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
}
@ -501,7 +508,12 @@ impl LocalCluster {
.unwrap();
info!("{} discovered {} nodes", test_name, cluster_nodes.len());
info!("{} looking for new roots on all nodes", test_name);
cluster_tests::check_for_new_roots(num_new_roots, &alive_node_contact_infos, test_name);
cluster_tests::check_for_new_roots(
num_new_roots,
&alive_node_contact_infos,
&self.connection_cache,
test_name,
);
info!("{} done waiting for roots", test_name);
}
@ -526,7 +538,12 @@ impl LocalCluster {
.unwrap();
info!("{} discovered {} nodes", test_name, cluster_nodes.len());
info!("{} making sure no new roots on any nodes", test_name);
cluster_tests::check_no_new_roots(num_slots_to_wait, &alive_node_contact_infos, test_name);
cluster_tests::check_no_new_roots(
num_slots_to_wait,
&alive_node_contact_infos,
&self.connection_cache,
test_name,
);
info!("{} done waiting for roots", test_name);
}
@ -700,7 +717,7 @@ impl Cluster for LocalCluster {
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
self.validators.get(pubkey).map(|f| {
let (rpc, tpu) = f.info.contact_info.client_facing_addr();
create_client(rpc, tpu)
ThinClient::new(rpc, tpu, self.connection_cache.clone())
})
}
@ -779,7 +796,7 @@ impl Cluster for LocalCluster {
true, // should_check_duplicate_instance
Arc::new(RwLock::new(ValidatorStartProgress::default())),
socket_addr_space,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
);
cluster_validator_info.validator = Some(restarted_node);
cluster_validator_info

View File

@ -336,6 +336,7 @@ pub fn run_cluster_partition<C>(
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
let cluster_nodes = discover_cluster(

View File

@ -11,7 +11,7 @@ use {
rpc_client::RpcClient,
rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
rpc_response::RpcSignatureResult,
thin_client::{create_client, ThinClient},
thin_client::ThinClient,
},
solana_core::{
broadcast_stage::BroadcastStageType,
@ -117,6 +117,7 @@ fn test_spend_and_verify_all_nodes_1() {
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&local.connection_cache,
);
}
@ -138,6 +139,7 @@ fn test_spend_and_verify_all_nodes_2() {
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&local.connection_cache,
);
}
@ -159,6 +161,7 @@ fn test_spend_and_verify_all_nodes_3() {
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&local.connection_cache,
);
}
@ -183,7 +186,7 @@ fn test_local_cluster_signature_subscribe() {
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();
let (rpc, tpu) = non_bootstrap_info.client_facing_addr();
let tx_client = create_client(rpc, tpu);
let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
let (blockhash, _) = tx_client
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
@ -260,6 +263,7 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() {
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&local.connection_cache,
);
}
@ -332,7 +336,13 @@ fn test_forwarding() {
.unwrap();
// Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(validator_info, &cluster.funding_keypair, 10, 20);
cluster_tests::send_many_transactions(
validator_info,
&cluster.funding_keypair,
&cluster.connection_cache,
10,
20,
);
}
#[test]
@ -372,6 +382,7 @@ fn test_restart_node() {
cluster_tests::send_many_transactions(
&cluster.entry_point_info,
&cluster.funding_keypair,
&cluster.connection_cache,
10,
1,
);
@ -402,7 +413,7 @@ fn test_mainnet_beta_cluster_type() {
assert_eq!(cluster_nodes.len(), 1);
let (rpc, tpu) = cluster.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
// Programs that are available at epoch 0
for program_id in [
@ -1279,6 +1290,7 @@ fn test_snapshot_restart_tower() {
1,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
}
@ -1434,6 +1446,7 @@ fn test_snapshots_restart_validity() {
let new_balances = cluster_tests::send_many_transactions(
&cluster.entry_point_info,
&cluster.funding_keypair,
&cluster.connection_cache,
10,
10,
);
@ -1460,7 +1473,11 @@ fn test_snapshots_restart_validity() {
// Verify account balances on validator
trace!("Verifying balances");
cluster_tests::verify_balances(expected_balances.clone(), &cluster.entry_point_info);
cluster_tests::verify_balances(
expected_balances.clone(),
&cluster.entry_point_info,
cluster.connection_cache.clone(),
);
// Check that we can still push transactions
trace!("Spending and verifying");
@ -1470,6 +1487,7 @@ fn test_snapshots_restart_validity() {
1,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
}
}
@ -1696,6 +1714,7 @@ fn test_optimistic_confirmation_violation_detection() {
cluster_tests::check_for_new_roots(
16,
&[cluster.get_contact_info(&entry_point_id).unwrap().clone()],
&cluster.connection_cache,
"test_optimistic_confirmation_violation",
);
}

View File

@ -192,6 +192,7 @@ fn test_leader_failure_4() {
.config
.validator_exit,
&local.funding_keypair,
&local.connection_cache,
num_nodes,
config.ticks_per_slot * config.poh_config.target_tick_duration.as_millis() as u64,
SocketAddrSpace::Unspecified,
@ -225,6 +226,7 @@ fn test_ledger_cleanup_service() {
num_nodes,
HashSet::new(),
SocketAddrSpace::Unspecified,
&cluster.connection_cache,
);
cluster.close_preserve_ledgers();
//check everyone's ledgers and make sure only ~100 slots are stored
@ -435,6 +437,7 @@ fn test_slot_hash_expiry() {
cluster_tests::check_for_new_roots(
16,
&[cluster.get_contact_info(&a_pubkey).unwrap().clone()],
&cluster.connection_cache,
"test_slot_hashes_expiry",
);
}

View File

@ -3820,6 +3820,7 @@ dependencies = [
"crossbeam-channel",
"futures 0.3.21",
"solana-banks-interface",
"solana-client",
"solana-runtime",
"solana-sdk 1.11.0",
"solana-send-transaction-service",

View File

@ -8,6 +8,7 @@ use {
solana_account_decoder::UiAccount,
solana_client::{
client_error::{ClientErrorKind, Result as ClientResult},
connection_cache::ConnectionCache,
nonblocking::pubsub_client::PubsubClient,
rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcSignatureSubscribeConfig},
@ -410,8 +411,7 @@ fn test_rpc_subscriptions() {
}
}
#[test]
fn test_tpu_send_transaction() {
fn run_tpu_send_transaction(tpu_use_quic: bool) {
let mint_keypair = Keypair::new();
let mint_pubkey = mint_keypair.pubkey();
let test_validator =
@ -420,11 +420,12 @@ fn test_tpu_send_transaction() {
test_validator.rpc_url(),
CommitmentConfig::processed(),
));
let tpu_client = TpuClient::new(
let connection_cache = Arc::new(ConnectionCache::new(tpu_use_quic));
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
connection_cache,
)
.unwrap();
@ -445,6 +446,16 @@ fn test_tpu_send_transaction() {
}
}
#[test]
fn test_tpu_send_transaction() {
run_tpu_send_transaction(/*tpu_use_quic*/ false)
}
#[test]
fn test_tpu_send_transaction_with_quic() {
run_tpu_send_transaction(/*tpu_use_quic*/ true)
}
#[test]
fn deserialize_rpc_error() -> ClientResult<()> {
solana_logger::setup();

View File

@ -15,6 +15,7 @@ use {
UiAccount, UiAccountEncoding, UiDataSliceConfig, MAX_BASE58_BYTES,
},
solana_client::{
connection_cache::ConnectionCache,
rpc_cache::LargestAccountsCache,
rpc_config::*,
rpc_custom_error::RpcCustomError,
@ -76,7 +77,7 @@ use {
},
},
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo, DEFAULT_TPU_USE_QUIC},
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
},
solana_storage_bigtable::Error as StorageError,
@ -337,7 +338,11 @@ impl JsonRpcRequestProcessor {
}
// Useful for unit testing
pub fn new_from_bank(bank: &Arc<Bank>, socket_addr_space: SocketAddrSpace) -> Self {
pub fn new_from_bank(
bank: &Arc<Bank>,
socket_addr_space: SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> Self {
let genesis_hash = bank.hash();
let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks(
&[bank.clone()],
@ -357,9 +362,9 @@ impl JsonRpcRequestProcessor {
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
DEFAULT_TPU_USE_QUIC,
);
Self {
@ -4927,8 +4932,12 @@ pub mod tests {
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
bank.transfer(20, &genesis.mint_keypair, &bob_pubkey)
.unwrap();
let request_processor =
JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::default());
let request_processor = JsonRpcRequestProcessor::new_from_bank(
&bank,
SocketAddrSpace::Unspecified,
connection_cache,
);
assert_eq!(
request_processor
.get_transaction_count(RpcContextConfig::default())
@ -4942,7 +4951,12 @@ pub mod tests {
let genesis = create_genesis_config(20);
let mint_pubkey = genesis.mint_keypair.pubkey();
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::default());
let meta = JsonRpcRequestProcessor::new_from_bank(
&bank,
SocketAddrSpace::Unspecified,
connection_cache,
);
let mut io = MetaIoHandler::default();
io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
@ -4970,7 +4984,12 @@ pub mod tests {
let genesis = create_genesis_config(20);
let mint_pubkey = genesis.mint_keypair.pubkey();
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::default());
let meta = JsonRpcRequestProcessor::new_from_bank(
&bank,
SocketAddrSpace::Unspecified,
connection_cache,
);
let mut io = MetaIoHandler::default();
io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
@ -5077,7 +5096,12 @@ pub mod tests {
bank.transfer(4, &genesis.mint_keypair, &bob_pubkey)
.unwrap();
let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::default());
let meta = JsonRpcRequestProcessor::new_from_bank(
&bank,
SocketAddrSpace::Unspecified,
connection_cache,
);
let mut io = MetaIoHandler::default();
io.extend_with(rpc_minimal::MinimalImpl.to_delegate());
@ -6200,7 +6224,12 @@ pub mod tests {
fn test_rpc_send_bad_tx() {
let genesis = create_genesis_config(100);
let bank = Arc::new(Bank::new_for_tests(&genesis.genesis_config));
let meta = JsonRpcRequestProcessor::new_from_bank(&bank, SocketAddrSpace::Unspecified);
let connection_cache = Arc::new(ConnectionCache::default());
let meta = JsonRpcRequestProcessor::new_from_bank(
&bank,
SocketAddrSpace::Unspecified,
connection_cache,
);
let mut io = MetaIoHandler::default();
io.extend_with(rpc_full::FullImpl.to_delegate());
@ -6250,14 +6279,15 @@ pub mod tests {
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
DEFAULT_TPU_USE_QUIC,
);
let mut bad_transaction = system_transaction::transfer(
@ -6516,14 +6546,15 @@ pub mod tests {
Arc::new(LeaderScheduleCache::default()),
Arc::new(AtomicU64::default()),
);
let connection_cache = Arc::new(ConnectionCache::default());
SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
DEFAULT_TPU_USE_QUIC,
);
assert_eq!(
request_processor.get_block_commitment(0),

View File

@ -18,7 +18,7 @@ use {
RequestMiddlewareAction, ServerBuilder,
},
regex::Regex,
solana_client::rpc_cache::LargestAccountsCache,
solana_client::{connection_cache::ConnectionCache, rpc_cache::LargestAccountsCache},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig,
@ -352,6 +352,7 @@ impl JsonRpcService {
send_transaction_service_config: send_transaction_service::Config,
max_slots: Arc<MaxSlots>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
connection_cache: Arc<ConnectionCache>,
current_transaction_status_slot: Arc<AtomicU64>,
) -> Result<Self, String> {
info!("rpc bound to {:?}", rpc_addr);
@ -464,6 +465,7 @@ impl JsonRpcService {
&bank_forks,
leader_info,
receiver,
&connection_cache,
send_transaction_service_config,
));
@ -610,6 +612,7 @@ mod tests {
let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default()));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let connection_cache = Arc::new(ConnectionCache::default());
let mut rpc_service = JsonRpcService::new(
rpc_addr,
JsonRpcConfig::default(),
@ -632,6 +635,7 @@ mod tests {
},
Arc::new(MaxSlots::default()),
Arc::new(LeaderScheduleCache::default()),
connection_cache,
Arc::new(AtomicU64::default()),
)
.unwrap();

View File

@ -2,7 +2,7 @@ use {
crate::tpu_info::TpuInfo,
crossbeam_channel::{Receiver, RecvTimeoutError},
log::*,
solana_client::{connection_cache, tpu_connection::TpuConnection},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_measure::measure::Measure,
solana_metrics::datapoint_warn,
solana_runtime::{bank::Bank, bank_forks::BankForks},
@ -101,16 +101,12 @@ struct ProcessTransactionsResult {
retained: u64,
}
pub const DEFAULT_TPU_USE_QUIC: bool = false;
#[derive(Clone, Debug)]
pub struct Config {
pub retry_rate_ms: u64,
pub leader_forward_count: u64,
pub default_max_retries: Option<usize>,
pub service_max_retries: usize,
/// Whether to use Quic protocol to send transactions
pub use_quic: bool,
/// The batch size for sending transactions in batches
pub batch_size: usize,
/// How frequently batches are sent
@ -124,7 +120,6 @@ impl Default for Config {
leader_forward_count: DEFAULT_LEADER_FORWARD_COUNT,
default_max_retries: None,
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
use_quic: DEFAULT_TPU_USE_QUIC,
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
}
@ -334,17 +329,23 @@ impl SendTransactionService {
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
receiver: Receiver<TransactionInfo>,
connection_cache: &Arc<ConnectionCache>,
retry_rate_ms: u64,
leader_forward_count: u64,
use_quic: bool,
) -> Self {
let config = Config {
retry_rate_ms,
leader_forward_count,
use_quic,
..Config::default()
};
Self::new_with_config(tpu_address, bank_forks, leader_info, receiver, config)
Self::new_with_config(
tpu_address,
bank_forks,
leader_info,
receiver,
connection_cache,
config,
)
}
pub fn new_with_config<T: TpuInfo + std::marker::Send + 'static>(
@ -352,6 +353,7 @@ impl SendTransactionService {
bank_forks: &Arc<RwLock<BankForks>>,
leader_info: Option<T>,
receiver: Receiver<TransactionInfo>,
connection_cache: &Arc<ConnectionCache>,
config: Config,
) -> Self {
let stats_report = Arc::new(SendTransactionServiceStatsReport::default());
@ -365,6 +367,7 @@ impl SendTransactionService {
tpu_address,
receiver,
leader_info_provider.clone(),
connection_cache.clone(),
config.clone(),
retry_transactions.clone(),
stats_report.clone(),
@ -375,6 +378,7 @@ impl SendTransactionService {
tpu_address,
bank_forks.clone(),
leader_info_provider,
connection_cache.clone(),
config,
retry_transactions,
stats_report,
@ -392,6 +396,7 @@ impl SendTransactionService {
tpu_address: SocketAddr,
receiver: Receiver<TransactionInfo>,
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
connection_cache: Arc<ConnectionCache>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
@ -404,7 +409,6 @@ impl SendTransactionService {
"Starting send-transaction-service::receive_txn_thread with config {:?}",
config
);
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-receive".to_string())
.spawn(move || loop {
@ -450,6 +454,7 @@ impl SendTransactionService {
&tpu_address,
&mut transactions,
leader_info_provider.lock().unwrap().get_leader_info(),
&connection_cache,
&config,
stats,
);
@ -494,6 +499,7 @@ impl SendTransactionService {
tpu_address: SocketAddr,
bank_forks: Arc<RwLock<BankForks>>,
leader_info_provider: Arc<Mutex<CurrentLeaderInfo<T>>>,
connection_cache: Arc<ConnectionCache>,
config: Config,
retry_transactions: Arc<Mutex<HashMap<Signature, TransactionInfo>>>,
stats_report: Arc<SendTransactionServiceStatsReport>,
@ -503,7 +509,6 @@ impl SendTransactionService {
"Starting send-transaction-service::retry_thread with config {:?}",
config
);
connection_cache::set_use_quic(config.use_quic);
Builder::new()
.name("send-tx-retry".to_string())
.spawn(move || loop {
@ -534,6 +539,7 @@ impl SendTransactionService {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
stats,
);
@ -548,6 +554,7 @@ impl SendTransactionService {
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info: Option<&T>,
connection_cache: &Arc<ConnectionCache>,
config: &Config,
stats: &SendTransactionServiceStats,
) {
@ -560,7 +567,7 @@ impl SendTransactionService {
.collect::<Vec<&[u8]>>();
for address in &addresses {
Self::send_transactions(address, &wire_transactions, stats);
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
}
}
@ -571,6 +578,7 @@ impl SendTransactionService {
tpu_address: &SocketAddr,
transactions: &mut HashMap<Signature, TransactionInfo>,
leader_info_provider: &Arc<Mutex<CurrentLeaderInfo<T>>>,
connection_cache: &Arc<ConnectionCache>,
config: &Config,
stats: &SendTransactionServiceStats,
) -> ProcessTransactionsResult {
@ -682,7 +690,7 @@ impl SendTransactionService {
let addresses = Self::get_tpu_addresses(tpu_address, leader_info, config);
for address in &addresses {
Self::send_transactions(address, chunk, stats);
Self::send_transactions(address, chunk, connection_cache, stats);
}
}
}
@ -692,30 +700,33 @@ impl SendTransactionService {
fn send_transaction(
tpu_address: &SocketAddr,
wire_transaction: &[u8],
connection_cache: &Arc<ConnectionCache>,
) -> Result<(), TransportError> {
let conn = connection_cache::get_connection(tpu_address);
let conn = connection_cache.get_connection(tpu_address);
conn.send_wire_transaction_async(wire_transaction.to_vec())
}
fn send_transactions_with_metrics(
tpu_address: &SocketAddr,
wire_transactions: &[&[u8]],
connection_cache: &Arc<ConnectionCache>,
) -> Result<(), TransportError> {
let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect();
let conn = connection_cache::get_connection(tpu_address);
let conn = connection_cache.get_connection(tpu_address);
conn.send_wire_transaction_batch_async(wire_transactions)
}
fn send_transactions(
tpu_address: &SocketAddr,
wire_transactions: &[&[u8]],
connection_cache: &Arc<ConnectionCache>,
stats: &SendTransactionServiceStats,
) {
let mut measure = Measure::start("send-us");
let result = if wire_transactions.len() == 1 {
Self::send_transaction(tpu_address, wire_transactions[0])
Self::send_transaction(tpu_address, wire_transactions[0], connection_cache)
} else {
Self::send_transactions_with_metrics(tpu_address, wire_transactions)
Self::send_transactions_with_metrics(tpu_address, wire_transactions, connection_cache)
};
if let Err(err) = result {
@ -781,14 +792,15 @@ mod test {
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = unbounded();
let connection_cache = Arc::new(ConnectionCache::default());
let send_tranaction_service = SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
DEFAULT_TPU_USE_QUIC,
);
drop(sender);
@ -848,12 +860,14 @@ mod test {
Some(Instant::now()),
),
);
let connection_cache = Arc::new(ConnectionCache::default());
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -884,6 +898,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -914,6 +929,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -944,6 +960,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -976,6 +993,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1018,6 +1036,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1036,6 +1055,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1113,12 +1133,14 @@ mod test {
);
let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None)));
let stats = SendTransactionServiceStats::default();
let connection_cache = Arc::new(ConnectionCache::default());
let result = SendTransactionService::process_transactions::<NullTpuInfo>(
&working_bank,
&root_bank,
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1148,6 +1170,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1179,6 +1202,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1208,6 +1232,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1238,6 +1263,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1268,6 +1294,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1300,6 +1327,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);
@ -1330,6 +1358,7 @@ mod test {
&tpu_address,
&mut transactions,
&leader_info_provider,
&connection_cache,
&config,
&stats,
);

View File

@ -2,7 +2,7 @@
use {
log::*,
solana_cli_output::CliAccount,
solana_client::{nonblocking, rpc_client::RpcClient},
solana_client::{connection_cache::DEFAULT_TPU_USE_QUIC, nonblocking, rpc_client::RpcClient},
solana_core::{
tower_storage::TowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
@ -748,7 +748,7 @@ impl TestValidator {
true, // should_check_duplicate_instance
config.start_progress.clone(),
socket_addr_space,
false, // use_quic
DEFAULT_TPU_USE_QUIC,
));
// Needed to avoid panics in `solana-responder-gossip` in tests that create a number of

View File

@ -2527,7 +2527,6 @@ pub fn main() {
"rpc_send_transaction_service_max_retries",
usize
),
use_quic: tpu_use_quic,
batch_send_rate_ms: rpc_send_batch_send_rate_ms,
batch_size: rpc_send_batch_size,
},