Async connection creation in connection cache (#33302)

If there is a connection in the cache available, use it and create the additional connection asynchronously.
This commit is contained in:
Lijun Wang 2023-09-25 18:17:47 -07:00 committed by GitHub
parent d25d53e979
commit 344e466e12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 236 additions and 71 deletions

1
Cargo.lock generated
View File

@ -5700,6 +5700,7 @@ version = "1.17.0"
dependencies = [
"async-trait",
"bincode",
"crossbeam-channel",
"futures-util",
"indexmap 2.0.0",
"indicatif",

View File

@ -1,7 +1,9 @@
use {
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
solana_client::tpu_client::TpuClient,
solana_connection_cache::connection_cache::{ConnectionManager, ConnectionPool},
solana_connection_cache::connection_cache::{
ConnectionManager, ConnectionPool, NewConnectionConfig,
},
solana_sdk::{
account::Account, commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash,
message::Message, pubkey::Pubkey, signature::Signature, transaction::Transaction,
@ -12,6 +14,7 @@ impl<P, M, C> BenchTpsClient for TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
let signature = transaction.signatures[0];

View File

@ -5,6 +5,7 @@ use {
client_connection::ClientConnection,
connection_cache::{
BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
NewConnectionConfig,
},
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},

View File

@ -3,6 +3,7 @@ use {
crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::nonblocking::rpc_client::RpcClient,
@ -30,6 +31,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
@ -99,6 +101,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub async fn new_with_connection_cache(

View File

@ -2,6 +2,7 @@ use {
crate::connection_cache::ConnectionCache,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
@ -34,6 +35,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
@ -90,6 +92,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub fn new_with_connection_cache(

View File

@ -12,6 +12,7 @@ edition = { workspace = true }
[dependencies]
async-trait = { workspace = true }
bincode = { workspace = true }
crossbeam-channel = { workspace = true }
futures-util = { workspace = true }
indexmap = { workspace = true }
indicatif = { workspace = true, optional = true }

View File

@ -4,13 +4,16 @@ use {
connection_cache_stats::{ConnectionCacheStats, CONNECTION_STAT_SUBMISSION_INTERVAL},
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
crossbeam_channel::{Receiver, RecvError, Sender},
indexmap::map::IndexMap,
log::*,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
thread::{Builder, JoinHandle},
},
thiserror::Error,
};
@ -27,9 +30,9 @@ pub enum Protocol {
QUIC,
}
pub trait ConnectionManager {
pub trait ConnectionManager: Send + Sync + 'static {
type ConnectionPool: ConnectionPool;
type NewConnectionConfig;
type NewConnectionConfig: NewConnectionConfig;
const PROTOCOL: Protocol;
@ -43,18 +46,20 @@ pub struct ConnectionCache<
T, // NewConnectionConfig
> {
name: &'static str,
map: RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ R>>,
connection_manager: S,
map: Arc<RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ R>>>,
connection_manager: Arc<S>,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
connection_pool_size: usize,
connection_config: T,
connection_config: Arc<T>,
sender: Sender<(usize, SocketAddr)>,
}
impl<P, M, C> ConnectionCache<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
pub fn new(
name: &'static str,
@ -76,17 +81,61 @@ where
connection_config: C,
connection_manager: M,
) -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
let map = Arc::new(RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)));
let config = Arc::new(connection_config);
let connection_manager = Arc::new(connection_manager);
let connection_pool_size = 1.max(connection_pool_size); // The minimum pool size is 1.
let stats = Arc::new(ConnectionCacheStats::default());
let _async_connection_thread =
Self::create_connection_async_thread(map.clone(), receiver, stats.clone());
Self {
name,
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
stats: Arc::new(ConnectionCacheStats::default()),
map,
stats,
connection_manager,
last_stats: AtomicInterval::default(),
connection_pool_size: 1.max(connection_pool_size), // The minimum pool size is 1.
connection_config,
connection_pool_size,
connection_config: config,
sender,
}
}
/// This actually triggers the connection creation by sending empty data
fn create_connection_async_thread(
map: Arc<RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ P>>>,
receiver: Receiver<(usize, SocketAddr)>,
stats: Arc<ConnectionCacheStats>,
) -> JoinHandle<()> {
Builder::new()
.name("solQAsynCon".to_string())
.spawn(move || loop {
let recv_result = receiver.recv();
match recv_result {
Err(RecvError) => {
break;
}
Ok((idx, addr)) => {
let map = map.read().unwrap();
let pool = map.get(&addr);
if let Some(pool) = pool {
let conn = pool.get(idx);
if let Ok(conn) = conn {
drop(map);
let conn = conn.new_blocking_connection(addr, stats.clone());
let result = conn.send_data(&[]);
debug!("Create async connection result {result:?} for {addr}");
}
}
}
}
})
.unwrap()
}
/// Create a lazy connection object under the exclusive lock of the cache map if there is not
/// enough used connections in the connection pool for the specified address.
/// Returns CreateConnectionResult.
@ -102,47 +151,37 @@ where
// Read again, as it is possible that between read lock dropped and the write lock acquired
// another thread could have setup the connection.
let should_create_connection = map
let pool_status = map
.get(addr)
.map(|pool| pool.need_new_connection(self.connection_pool_size))
.unwrap_or(true);
.map(|pool| pool.check_pool_status(self.connection_pool_size))
.unwrap_or(PoolStatus::Empty);
let (cache_hit, num_evictions, eviction_timing_ms) = if should_create_connection {
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
let existing_index = map.get_index_of(addr);
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0..MAX_CONNECTIONS);
if let Some(index) = existing_index {
if n == index {
continue;
}
}
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
let (cache_hit, num_evictions, eviction_timing_ms) =
if matches!(pool_status, PoolStatus::Empty) {
Self::create_connection_internal(
&self.connection_config,
&self.connection_manager,
&mut map,
addr,
self.connection_pool_size,
None,
)
} else {
(true, 0, 0)
};
map.entry(*addr)
.and_modify(|pool| {
pool.add_connection(&self.connection_config, addr);
})
.or_insert_with(|| {
let mut pool = self.connection_manager.new_connection_pool();
pool.add_connection(&self.connection_config, addr);
pool
});
(
false,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
} else {
(true, 0, 0)
};
if matches!(pool_status, PoolStatus::PartiallyFull) {
// trigger an async connection create
debug!("Triggering async connection for {addr:?}");
Self::create_connection_internal(
&self.connection_config,
&self.connection_manager,
&mut map,
addr,
self.connection_pool_size,
Some(&self.sender),
);
}
let pool = map.get(addr).unwrap();
let connection = pool.borrow_connection();
@ -156,6 +195,63 @@ where
}
}
fn create_connection_internal(
config: &Arc<C>,
connection_manager: &Arc<M>,
map: &mut std::sync::RwLockWriteGuard<'_, IndexMap<SocketAddr, P>>,
addr: &SocketAddr,
connection_pool_size: usize,
async_connection_sender: Option<&Sender<(usize, SocketAddr)>>,
) -> (bool, u64, u64) {
// evict a connection if the cache is reaching upper bounds
let mut num_evictions = 0;
let mut get_connection_cache_eviction_measure =
Measure::start("get_connection_cache_eviction_measure");
let existing_index = map.get_index_of(addr);
while map.len() >= MAX_CONNECTIONS {
let mut rng = thread_rng();
let n = rng.gen_range(0..MAX_CONNECTIONS);
if let Some(index) = existing_index {
if n == index {
continue;
}
}
map.swap_remove_index(n);
num_evictions += 1;
}
get_connection_cache_eviction_measure.stop();
let mut hit_cache = false;
map.entry(*addr)
.and_modify(|pool| {
if matches!(
pool.check_pool_status(connection_pool_size),
PoolStatus::PartiallyFull
) {
let idx = pool.add_connection(config, addr);
if let Some(sender) = async_connection_sender {
debug!(
"Sending async connection creation {} for {addr}",
pool.num_connections() - 1
);
sender.send((idx, *addr)).unwrap();
};
} else {
hit_cache = true;
}
})
.or_insert_with(|| {
let mut pool = connection_manager.new_connection_pool();
pool.add_connection(config, addr);
pool
});
(
hit_cache,
num_evictions,
get_connection_cache_eviction_measure.as_ms(),
)
}
fn get_or_add_connection(
&self,
addr: &SocketAddr,
@ -179,12 +275,26 @@ where
eviction_timing_ms,
} = match map.get(addr) {
Some(pool) => {
if pool.need_new_connection(self.connection_pool_size) {
let pool_status = pool.check_pool_status(self.connection_pool_size);
if matches!(pool_status, PoolStatus::Empty) {
// create more connection and put it in the pool
drop(map);
self.create_connection(&mut lock_timing_ms, addr)
} else {
let connection = pool.borrow_connection();
if matches!(pool_status, PoolStatus::PartiallyFull) {
debug!("Creating connection async for {addr}");
drop(map);
let mut map = self.map.write().unwrap();
Self::create_connection_internal(
&self.connection_config,
&self.connection_manager,
&mut map,
addr,
self.connection_pool_size,
Some(&self.sender),
);
}
CreateConnectionResult {
connection,
cache_hit: true,
@ -299,12 +409,22 @@ pub enum ClientError {
IoError(#[from] std::io::Error),
}
pub trait ConnectionPool {
type NewConnectionConfig;
pub trait NewConnectionConfig: Sized + Send + Sync + 'static {
fn new() -> Result<Self, ClientError>;
}
pub enum PoolStatus {
Empty,
PartiallyFull,
Full,
}
pub trait ConnectionPool: Send + Sync + 'static {
type NewConnectionConfig: NewConnectionConfig;
type BaseClientConnection: BaseClientConnection;
/// Add a connection to the pool
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr);
/// Add a connection to the pool and return its index
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize;
/// Get the number of current connections in the pool
fn num_connections(&self) -> usize;
@ -319,10 +439,17 @@ pub trait ConnectionPool {
let n = rng.gen_range(0..self.num_connections());
self.get(n).expect("index is within num_connections")
}
/// Check if we need to create a new connection. If the count of the connections
/// is smaller than the pool size.
fn need_new_connection(&self, required_pool_size: usize) -> bool {
self.num_connections() < required_pool_size
/// is smaller than the pool size and if there is no connection at all.
fn check_pool_status(&self, required_pool_size: usize) -> PoolStatus {
if self.num_connections() == 0 {
PoolStatus::Empty
} else if self.num_connections() < required_pool_size {
PoolStatus::PartiallyFull
} else {
PoolStatus::Full
}
}
fn create_pool_entry(
@ -393,9 +520,16 @@ mod tests {
type NewConnectionConfig = MockUdpConfig;
type BaseClientConnection = MockUdp;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
/// Add a connection into the pool and return its index in the pool.
fn add_connection(
&mut self,
config: &Self::NewConnectionConfig,
addr: &SocketAddr,
) -> usize {
let connection = self.create_pool_entry(config, addr);
let idx = self.connections.len();
self.connections.push(connection);
idx
}
fn num_connections(&self) -> usize {
@ -436,7 +570,7 @@ mod tests {
}
}
impl MockUdpConfig {
impl NewConnectionConfig for MockUdpConfig {
fn new() -> Result<Self, ClientError> {
Ok(Self {
udp_socket: Arc::new(

View File

@ -4765,6 +4765,7 @@ version = "1.17.0"
dependencies = [
"async-trait",
"bincode",
"crossbeam-channel",
"futures-util",
"indexmap 2.0.0",
"log",

View File

@ -19,7 +19,7 @@ use {
solana_connection_cache::{
connection_cache::{
BaseClientConnection, ClientError, ConnectionCache, ConnectionManager, ConnectionPool,
ConnectionPoolError, Protocol,
ConnectionPoolError, NewConnectionConfig, Protocol,
},
connection_cache_stats::ConnectionCacheStats,
},
@ -53,9 +53,11 @@ impl ConnectionPool for QuicPool {
type BaseClientConnection = Quic;
type NewConnectionConfig = QuicConfig;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
let connection = self.create_pool_entry(config, addr);
let idx = self.connections.len();
self.connections.push(connection);
idx
}
fn num_connections(&self) -> usize {
@ -93,8 +95,8 @@ pub struct QuicConfig {
client_endpoint: Option<Endpoint>,
}
impl QuicConfig {
pub fn new() -> Result<Self, ClientError> {
impl NewConnectionConfig for QuicConfig {
fn new() -> Result<Self, ClientError> {
let (cert, priv_key) =
new_self_signed_tls_certificate(&Keypair::new(), IpAddr::V4(Ipv4Addr::UNSPECIFIED))?;
Ok(Self {

View File

@ -340,16 +340,18 @@ impl QuicClient {
Ok(conn) => {
*conn_guard = Some(conn.clone());
info!(
"Made connection to {} id {} try_count {}",
"Made connection to {} id {} try_count {}, from connection cache warming?: {}",
self.addr,
conn.connection.stable_id(),
connection_try_count
connection_try_count,
data.is_empty(),
);
connection_try_count += 1;
conn.connection.clone()
}
Err(err) => {
info!("Cannot make connection to {}, error {:}", self.addr, err);
info!("Cannot make connection to {}, error {:}, from connection cache warming?: {}",
self.addr, err, data.is_empty());
return Err(err);
}
}

View File

@ -8,7 +8,9 @@ use {
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::{
client_connection::ClientConnection,
connection_cache::{ConnectionCache, ConnectionManager, ConnectionPool},
connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
},
},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
@ -124,6 +126,7 @@ impl<P, M, C> ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
@ -324,6 +327,7 @@ impl<P, M, C> Client for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn tpu_addr(&self) -> String {
self.tpu_addr().to_string()
@ -334,6 +338,7 @@ impl<P, M, C> SyncClient for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn send_and_confirm_message<T: Signers + ?Sized>(
&self,
@ -618,6 +623,7 @@ impl<P, M, C> AsyncClient for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn async_send_versioned_transaction(
&self,

View File

@ -9,7 +9,7 @@ use {
log::*,
solana_connection_cache::{
connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, Protocol,
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, Protocol,
DEFAULT_CONNECTION_POOL_SIZE,
},
nonblocking::client_connection::ClientConnection,
@ -268,6 +268,7 @@ fn send_wire_transaction_futures<'a, P, M, C>(
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
let sleep_duration = SEND_TRANSACTION_INTERVAL.saturating_mul(index as u32);
@ -339,6 +340,7 @@ async fn sleep_and_send_wire_transaction_to_addr<P, M, C>(
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
sleep(sleep_duration).await;
send_wire_transaction_to_addr(connection_cache, &addr, wire_transaction).await
@ -352,6 +354,7 @@ async fn send_wire_transaction_to_addr<P, M, C>(
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data(&wire_transaction).await
@ -365,6 +368,7 @@ async fn send_wire_transaction_batch_to_addr<P, M, C>(
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data_batch(wire_transactions).await
@ -374,6 +378,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size

View File

@ -3,7 +3,7 @@ use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool,
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult},
@ -71,6 +71,7 @@ impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size

View File

@ -11,7 +11,7 @@ use {
solana_connection_cache::{
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, Protocol,
ConnectionPoolError, NewConnectionConfig, Protocol,
},
connection_cache_stats::ConnectionCacheStats,
},
@ -28,9 +28,11 @@ impl ConnectionPool for UdpPool {
type BaseClientConnection = Udp;
type NewConnectionConfig = UdpConfig;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) -> usize {
let connection = self.create_pool_entry(config, addr);
let idx = self.connections.len();
self.connections.push(connection);
idx
}
fn num_connections(&self) -> usize {
@ -57,7 +59,7 @@ pub struct UdpConfig {
udp_socket: Arc<UdpSocket>,
}
impl UdpConfig {
impl NewConnectionConfig for UdpConfig {
fn new() -> Result<Self, ClientError> {
let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
.map_err(Into::<ClientError>::into)?;