removes dynamic cast and dynamic dispatch from connection-cache (#30128)

Dynamic dispatch forces heap allocation and adds extra overhead.
Dynamic casting as in the ones below, lacks compile-time type safety:
https://github.com/solana-labs/solana/blob/eeb622c4e/quic-client/src/lib.rs#L172-L175
https://github.com/solana-labs/solana/blob/eeb622c4e/udp-client/src/lib.rs#L52-L55

The commit removes all instances of Any, Box<dyn ...>, and Arc<dyn ...>,
and instead uses generic and associated types.

There are only two protocols QUIC and UDP; and the code which has to
work with both protocols can use a trivial thin enum wrapper.

With respect to connection-cache specifically:
* connection-cache/ConnectionCache is a single protocol cache which
  allows to use either QUIC or UDP without any build dependency on the
  other protocol.
* client/ConnectionCache is an enum wrapper around both protocols and
  can be used in the code which has to work with both QUIC and UDP.

Co-authored-by: Tyera Eulberg <tyera@solana.com>
This commit is contained in:
behzad nouri 2023-02-09 00:50:44 +00:00 committed by GitHub
parent 0481ce3069
commit 1ad69cfc38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 493 additions and 304 deletions

1
Cargo.lock generated
View File

@ -4904,6 +4904,7 @@ dependencies = [
"solana-clap-utils",
"solana-cli-config",
"solana-client",
"solana-connection-cache",
"solana-core",
"solana-faucet",
"solana-genesis",

View File

@ -19,6 +19,7 @@ serde_yaml = "0.9.13"
solana-clap-utils = { path = "../clap-utils", version = "=1.16.0" }
solana-cli-config = { path = "../cli-config", version = "=1.16.0" }
solana-client = { path = "../client", version = "=1.16.0" }
solana-connection-cache = { path = "../connection-cache", version = "=1.16.0" }
solana-core = { path = "../core", version = "=1.16.0" }
solana-faucet = { path = "../faucet", version = "=1.16.0" }
solana-genesis = { path = "../genesis", version = "=1.16.0" }

View File

@ -1,13 +1,21 @@
use {
crate::bench_tps_client::{BenchTpsClient, BenchTpsError, Result},
solana_client::tpu_client::TpuClient,
solana_connection_cache::connection_cache::{
ConnectionManager, ConnectionPool, NewConnectionConfig,
},
solana_sdk::{
account::Account, commitment_config::CommitmentConfig, epoch_info::EpochInfo, hash::Hash,
message::Message, pubkey::Pubkey, signature::Signature, transaction::Transaction,
},
};
impl BenchTpsClient for TpuClient {
impl<P, M, C> BenchTpsClient for TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn send_transaction(&self, transaction: Transaction) -> Result<Signature> {
let signature = transaction.signatures[0];
self.try_send_transaction(&transaction)?;

View File

@ -110,19 +110,32 @@ fn create_client(
true => ConnectionCache::new(tpu_connection_pool_size),
false => ConnectionCache::with_udp(tpu_connection_pool_size),
};
Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
Arc::new(connection_cache.into()),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}");
exit(1);
}),
)
match connection_cache {
ConnectionCache::Udp(cache) => Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
Arc::new(cache),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}");
exit(1);
}),
),
ConnectionCache::Quic(cache) => Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
TpuClientConfig::default(),
Arc::new(cache),
)
.unwrap_or_else(|err| {
eprintln!("Could not create TpuClient {err:?}");
exit(1);
}),
),
}
}
}
}

View File

@ -9,7 +9,6 @@ use {
spl_convert::FromOtherSolana,
},
solana_client::{
connection_cache::ConnectionCache,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
},
@ -131,17 +130,8 @@ fn test_bench_tps_test_validator(config: Config) {
CommitmentConfig::processed(),
));
let websocket_url = test_validator.rpc_pubsub_url();
let connection_cache = ConnectionCache::default();
let client = Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
&websocket_url,
TpuClientConfig::default(),
Arc::new(connection_cache.into()),
)
.unwrap(),
);
let client =
Arc::new(TpuClient::new(rpc_client, &websocket_url, TpuClientConfig::default()).unwrap());
let lamports_per_account = 1000;

View File

@ -2163,21 +2163,32 @@ fn send_deploy_messages(
} else {
ConnectionCache::with_udp(1)
};
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
&config.websocket_url,
TpuClientConfig::default(),
Arc::new(connection_cache.into()),
)?;
let transaction_errors = tpu_client
let transaction_errors = match connection_cache {
ConnectionCache::Udp(cache) => TpuClient::new_with_connection_cache(
rpc_client.clone(),
&config.websocket_url,
TpuClientConfig::default(),
Arc::new(cache),
)?
.send_and_confirm_messages_with_spinner(
write_messages,
&[payer_signer, write_signer],
)
.map_err(|err| format!("Data writes to account failed: {err}"))?
.into_iter()
.flatten()
.collect::<Vec<_>>();
),
ConnectionCache::Quic(cache) => TpuClient::new_with_connection_cache(
rpc_client.clone(),
&config.websocket_url,
TpuClientConfig::default(),
Arc::new(cache),
)?
.send_and_confirm_messages_with_spinner(
write_messages,
&[payer_signer, write_signer],
),
}
.map_err(|err| format!("Data writes to account failed: {err}"))?
.into_iter()
.flatten()
.collect::<Vec<_>>();
if !transaction_errors.is_empty() {
for transaction_error in &transaction_errors {

View File

@ -1,16 +1,16 @@
use {
quinn::Endpoint,
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
client_connection::ClientConnection,
connection_cache::{
ConnectionCache as BackendConnectionCache, NewConnectionConfig, ProtocolType,
BaseClientConnection, ConnectionCache as BackendConnectionCache, ConnectionPool,
NewConnectionConfig,
},
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
solana_quic_client::{QuicConfig, QuicConnectionManager},
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_sdk::{pubkey::Pubkey, signature::Keypair, transport::Result as TransportResult},
solana_streamer::streamer::StakedNodes,
solana_udp_client::UdpConnectionManager,
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::{
error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr},
@ -18,15 +18,28 @@ use {
},
};
pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
pub const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
pub const MAX_CONNECTIONS: usize = 1024;
const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
/// A thin wrapper over connection-cache/ConnectionCache to ease
/// construction of the ConnectionCache for code dealing both with udp and quic.
/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly.
pub struct ConnectionCache {
cache: BackendConnectionCache,
pub enum ConnectionCache {
Quic(BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>),
}
type QuicBaseClientConnection = <QuicPool as ConnectionPool>::BaseClientConnection;
type UdpBaseClientConnection = <UdpPool as ConnectionPool>::BaseClientConnection;
pub enum BlockingClientConnection {
Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::BlockingClientConnection>),
}
pub enum NonblockingClientConnection {
Quic(Arc<<QuicBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
}
impl ConnectionCache {
@ -56,10 +69,9 @@ impl ConnectionCache {
if let Some(stake_info) = stake_info {
config.set_staked_nodes(stake_info.0, stake_info.1);
}
let connection_manager =
Box::new(QuicConnectionManager::new_with_connection_config(config));
let connection_manager = QuicConnectionManager::new_with_connection_config(config);
let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap();
Self { cache }
Self::Quic(cache)
}
#[deprecated(
@ -88,30 +100,31 @@ impl ConnectionCache {
pub fn with_udp(connection_pool_size: usize) -> Self {
// The minimum pool size is 1.
let connection_pool_size = 1.max(connection_pool_size);
let connection_manager = Box::<UdpConnectionManager>::default();
let connection_manager = UdpConnectionManager::default();
let cache = BackendConnectionCache::new(connection_manager, connection_pool_size).unwrap();
Self { cache }
Self::Udp(cache)
}
pub fn use_quic(&self) -> bool {
matches!(self.cache.get_protocol_type(), ProtocolType::QUIC)
matches!(self, Self::Quic(_))
}
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<dyn BlockingClientConnection> {
self.cache.get_connection(addr)
pub fn get_connection(&self, addr: &SocketAddr) -> BlockingClientConnection {
match self {
Self::Quic(cache) => BlockingClientConnection::Quic(cache.get_connection(addr)),
Self::Udp(cache) => BlockingClientConnection::Udp(cache.get_connection(addr)),
}
}
pub fn get_nonblocking_connection(
&self,
addr: &SocketAddr,
) -> Arc<dyn NonblockingClientConnection> {
self.cache.get_nonblocking_connection(addr)
}
}
impl From<ConnectionCache> for BackendConnectionCache {
fn from(cache: ConnectionCache) -> Self {
cache.cache
pub fn get_nonblocking_connection(&self, addr: &SocketAddr) -> NonblockingClientConnection {
match self {
Self::Quic(cache) => {
NonblockingClientConnection::Quic(cache.get_nonblocking_connection(addr))
}
Self::Udp(cache) => {
NonblockingClientConnection::Udp(cache.get_nonblocking_connection(addr))
}
}
}
}
@ -131,9 +144,51 @@ impl Default for ConnectionCache {
}
}
macro_rules! dispatch {
($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? {
match self {
Self::Quic(this) => this.$name($($arg, )?),
Self::Udp(this) => this.$name($($arg, )?),
}
}
};
}
impl ClientConnection for BlockingClientConnection {
dispatch!(fn server_addr(&self) -> &SocketAddr);
dispatch!(fn send_data(&self, buffer: &[u8]) -> TransportResult<()>);
dispatch!(fn send_data_async(&self, buffer: Vec<u8>) -> TransportResult<()>);
dispatch!(fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()>);
dispatch!(fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>);
}
#[async_trait::async_trait]
impl solana_connection_cache::nonblocking::client_connection::ClientConnection
for NonblockingClientConnection
{
dispatch!(fn server_addr(&self) -> &SocketAddr);
async fn send_data(&self, buffer: &[u8]) -> TransportResult<()> {
match self {
Self::Quic(cache) => Ok(cache.send_data(buffer).await?),
Self::Udp(cache) => Ok(cache.send_data(buffer).await?),
}
}
async fn send_data_batch(&self, buffers: &[Vec<u8>]) -> TransportResult<()> {
match self {
Self::Quic(cache) => Ok(cache.send_data_batch(buffers).await?),
Self::Udp(cache) => Ok(cache.send_data_batch(buffers).await?),
}
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair},

View File

@ -1,7 +1,11 @@
pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
use {
crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::nonblocking::rpc_client::RpcClient,
solana_sdk::{
message::Message,
@ -15,11 +19,20 @@ use {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient {
tpu_client: BackendTpuClient,
pub struct TpuClient<
P, // ConnectionPool
M, // ConnectionManager
C, // NewConnectionConfig
> {
tpu_client: BackendTpuClient<P, M, C>,
}
impl TpuClient {
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -62,23 +75,39 @@ impl TpuClient {
.try_send_wire_transaction_batch(wire_transactions)
.await
}
}
impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
/// Create a new client that disconnects when dropped
pub async fn new(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let connection_cache = Arc::new(ConnectionCache::default().into());
let connection_cache = match ConnectionCache::default() {
ConnectionCache::Quic(cache) => Arc::new(cache),
ConnectionCache::Udp(_) => {
return Err(TpuSenderError::Custom(String::from(
"Invalid default connection cache",
)))
}
};
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache).await
}
}
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub async fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<BackendConnectionCache>,
connection_cache: Arc<BackendConnectionCache<P, M, C>>,
) -> Result<Self> {
Ok(Self {
tpu_client: BackendTpuClient::new_with_connection_cache(

View File

@ -7,6 +7,7 @@ use {
crate::connection_cache::ConnectionCache,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::client_connection::ClientConnection,
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
solana_sdk::{

View File

@ -1,6 +1,10 @@
use {
crate::connection_cache::ConnectionCache,
solana_connection_cache::connection_cache::ConnectionCache as BackendConnectionCache,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
message::Message,
@ -19,11 +23,20 @@ pub use {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
pub struct TpuClient {
tpu_client: BackendTpuClient,
pub struct TpuClient<
P, // ConnectionPool
M, // ConnectionManager
C, // NewConnectionConfig
> {
tpu_client: BackendTpuClient<P, M, C>,
}
impl TpuClient {
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -54,28 +67,39 @@ impl TpuClient {
pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
self.tpu_client.try_send_wire_transaction(wire_transaction)
}
}
impl TpuClient<QuicPool, QuicConnectionManager, QuicConfig> {
/// Create a new client that disconnects when dropped
pub fn new(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
) -> Result<Self> {
let connection_cache = ConnectionCache::default();
Self::new_with_connection_cache(
rpc_client,
websocket_url,
config,
Arc::new(connection_cache.into()),
)
let connection_cache = match ConnectionCache::default() {
ConnectionCache::Quic(cache) => Arc::new(cache),
ConnectionCache::Udp(_) => {
return Err(TpuSenderError::Custom(String::from(
"Invalid default connection cache",
)))
}
};
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
}
}
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new client that disconnects when dropped
pub fn new_with_connection_cache(
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<BackendConnectionCache>,
connection_cache: Arc<BackendConnectionCache<P, M, C>>,
) -> Result<Self> {
Ok(Self {
tpu_client: BackendTpuClient::new_with_connection_cache(

View File

@ -9,7 +9,6 @@ use {
solana_measure::measure::Measure,
solana_sdk::timing::AtomicInterval,
std::{
any::Any,
net::SocketAddr,
sync::{atomic::Ordering, Arc, RwLock},
},
@ -17,38 +16,40 @@ use {
};
// Should be non-zero
pub static MAX_CONNECTIONS: usize = 1024;
const MAX_CONNECTIONS: usize = 1024;
/// Default connection pool size per remote address
pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
/// Defines the protocol types of an implementation supports.
pub enum ProtocolType {
UDP,
QUIC,
}
pub trait ConnectionManager {
type ConnectionPool: ConnectionPool;
type NewConnectionConfig: NewConnectionConfig;
pub trait ConnectionManager: Sync + Send {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool>;
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig>;
fn new_connection_pool(&self) -> Self::ConnectionPool;
fn new_connection_config(&self) -> Self::NewConnectionConfig;
fn get_port_offset(&self) -> u16;
fn get_protocol_type(&self) -> ProtocolType;
}
pub struct ConnectionCache {
pub map: RwLock<IndexMap<SocketAddr, Box<dyn ConnectionPool>>>,
pub connection_manager: Box<dyn ConnectionManager>,
pub stats: Arc<ConnectionCacheStats>,
pub last_stats: AtomicInterval,
pub connection_pool_size: usize,
pub connection_config: Box<dyn NewConnectionConfig>,
pub struct ConnectionCache<
R, // ConnectionPool
S, // ConnectionManager
T, // NewConnectionConfig
> {
map: RwLock<IndexMap<SocketAddr, /*ConnectionPool:*/ R>>,
connection_manager: S,
stats: Arc<ConnectionCacheStats>,
last_stats: AtomicInterval,
connection_pool_size: usize,
connection_config: T,
}
impl ConnectionCache {
pub fn new(
connection_manager: Box<dyn ConnectionManager>,
connection_pool_size: usize,
) -> Result<Self, ClientError> {
impl<P, M, C> ConnectionCache<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
pub fn new(connection_manager: M, connection_pool_size: usize) -> Result<Self, ClientError> {
let config = connection_manager.new_connection_config();
Ok(Self::new_with_config(
connection_pool_size,
@ -59,8 +60,8 @@ impl ConnectionCache {
pub fn new_with_config(
connection_pool_size: usize,
connection_config: Box<dyn NewConnectionConfig>,
connection_manager: Box<dyn ConnectionManager>,
connection_config: C,
connection_manager: M,
) -> Self {
Self {
map: RwLock::new(IndexMap::with_capacity(MAX_CONNECTIONS)),
@ -79,7 +80,7 @@ impl ConnectionCache {
&self,
lock_timing_ms: &mut u64,
addr: &SocketAddr,
) -> CreateConnectionResult {
) -> CreateConnectionResult<<P as ConnectionPool>::BaseClientConnection> {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let mut map = self.map.write().unwrap();
get_connection_map_lock_measure.stop();
@ -113,11 +114,11 @@ impl ConnectionCache {
map.entry(*addr)
.and_modify(|pool| {
pool.add_connection(&*self.connection_config, addr);
pool.add_connection(&self.connection_config, addr);
})
.or_insert_with(|| {
let mut pool = self.connection_manager.new_connection_pool();
pool.add_connection(&*self.connection_config, addr);
pool.add_connection(&self.connection_config, addr);
pool
});
(
@ -141,7 +142,10 @@ impl ConnectionCache {
}
}
fn get_or_add_connection(&self, addr: &SocketAddr) -> GetConnectionResult {
fn get_or_add_connection(
&self,
addr: &SocketAddr,
) -> GetConnectionResult<<P as ConnectionPool>::BaseClientConnection> {
let mut get_connection_map_lock_measure = Measure::start("get_connection_map_lock_measure");
let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop();
@ -207,7 +211,10 @@ impl ConnectionCache {
fn get_connection_and_log_stats(
&self,
addr: &SocketAddr,
) -> (Arc<dyn BaseClientConnection>, Arc<ConnectionCacheStats>) {
) -> (
Arc<<P as ConnectionPool>::BaseClientConnection>,
Arc<ConnectionCacheStats>,
) {
let mut get_connection_measure = Measure::start("get_connection_measure");
let GetConnectionResult {
connection,
@ -257,7 +264,7 @@ impl ConnectionCache {
(connection, connection_cache_stats)
}
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<dyn BlockingClientConnection> {
pub fn get_connection(&self, addr: &SocketAddr) -> Arc<<<P as ConnectionPool>::BaseClientConnection as BaseClientConnection>::BlockingClientConnection>{
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_blocking_connection(*addr, connection_cache_stats)
}
@ -265,14 +272,10 @@ impl ConnectionCache {
pub fn get_nonblocking_connection(
&self,
addr: &SocketAddr,
) -> Arc<dyn NonblockingClientConnection> {
) -> Arc<<<P as ConnectionPool>::BaseClientConnection as BaseClientConnection>::NonblockingClientConnection>{
let (connection, connection_cache_stats) = self.get_connection_and_log_stats(addr);
connection.new_nonblocking_connection(*addr, connection_cache_stats)
}
pub fn get_protocol_type(&self) -> ProtocolType {
self.connection_manager.get_protocol_type()
}
}
#[derive(Error, Debug)]
@ -290,28 +293,26 @@ pub enum ClientError {
IoError(#[from] std::io::Error),
}
pub trait NewConnectionConfig: Sync + Send {
fn new() -> Result<Self, ClientError>
where
Self: Sized;
fn as_any(&self) -> &dyn Any;
fn as_mut_any(&mut self) -> &mut dyn Any;
pub trait NewConnectionConfig: Sized {
fn new() -> Result<Self, ClientError>;
}
pub trait ConnectionPool: Sync + Send {
pub trait ConnectionPool {
type NewConnectionConfig: NewConnectionConfig;
type BaseClientConnection: BaseClientConnection;
/// Add a connection to the pool
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr);
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr);
/// Get the number of current connections in the pool
fn num_connections(&self) -> usize;
/// Get a connection based on its index in the pool, without checking if the
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError>;
fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError>;
/// Get a connection from the pool. It must have at least one connection in the pool.
/// This randomly picks a connection in the pool.
fn borrow_connection(&self) -> Arc<dyn BaseClientConnection> {
fn borrow_connection(&self) -> Arc<Self::BaseClientConnection> {
let mut rng = thread_rng();
let n = rng.gen_range(0, self.num_connections());
self.get(n).expect("index is within num_connections")
@ -324,27 +325,30 @@ pub trait ConnectionPool: Sync + Send {
fn create_pool_entry(
&self,
config: &dyn NewConnectionConfig,
config: &Self::NewConnectionConfig,
addr: &SocketAddr,
) -> Arc<dyn BaseClientConnection>;
) -> Arc<Self::BaseClientConnection>;
}
pub trait BaseClientConnection: Sync + Send {
pub trait BaseClientConnection {
type BlockingClientConnection: BlockingClientConnection;
type NonblockingClientConnection: NonblockingClientConnection;
fn new_blocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn BlockingClientConnection>;
) -> Arc<Self::BlockingClientConnection>;
fn new_nonblocking_connection(
&self,
addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn NonblockingClientConnection>;
) -> Arc<Self::NonblockingClientConnection>;
}
struct GetConnectionResult {
connection: Arc<dyn BaseClientConnection>,
struct GetConnectionResult<T> {
connection: Arc</*BaseClientConnection:*/ T>,
cache_hit: bool,
report_stats: bool,
map_timing_ms: u64,
@ -354,8 +358,8 @@ struct GetConnectionResult {
eviction_timing_ms: u64,
}
struct CreateConnectionResult {
connection: Arc<dyn BaseClientConnection>,
struct CreateConnectionResult<T> {
connection: Arc</*BaseClientConnection:*/ T>,
cache_hit: bool,
connection_cache_stats: Arc<ConnectionCacheStats>,
num_evictions: u64,
@ -382,11 +386,14 @@ mod tests {
const MOCK_PORT_OFFSET: u16 = 42;
pub struct MockUdpPool {
connections: Vec<Arc<dyn BaseClientConnection>>,
struct MockUdpPool {
connections: Vec<Arc<MockUdp>>,
}
impl ConnectionPool for MockUdpPool {
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
type NewConnectionConfig = MockUdpConfig;
type BaseClientConnection = MockUdp;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -395,7 +402,10 @@ mod tests {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
fn get(
&self,
index: usize,
) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -404,19 +414,14 @@ mod tests {
fn create_pool_entry(
&self,
config: &dyn NewConnectionConfig,
config: &Self::NewConnectionConfig,
_addr: &SocketAddr,
) -> Arc<dyn BaseClientConnection> {
let config: &MockUdpConfig = match config.as_any().downcast_ref::<MockUdpConfig>() {
Some(b) => b,
None => panic!("Expecting a MockUdpConfig!"),
};
) -> Arc<Self::BaseClientConnection> {
Arc::new(MockUdp(config.udp_socket.clone()))
}
}
pub struct MockUdpConfig {
struct MockUdpConfig {
udp_socket: Arc<UdpSocket>,
}
@ -440,23 +445,18 @@ mod tests {
),
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
pub struct MockUdp(Arc<UdpSocket>);
struct MockUdp(Arc<UdpSocket>);
impl BaseClientConnection for MockUdp {
type BlockingClientConnection = MockUdpConnection;
type NonblockingClientConnection = MockUdpConnection;
fn new_blocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn BlockingClientConnection> {
) -> Arc<Self::BlockingClientConnection> {
Arc::new(MockUdpConnection {
_socket: self.0.clone(),
addr,
@ -467,7 +467,7 @@ mod tests {
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn NonblockingClientConnection> {
) -> Arc<Self::NonblockingClientConnection> {
Arc::new(MockUdpConnection {
_socket: self.0.clone(),
addr,
@ -475,32 +475,31 @@ mod tests {
}
}
pub struct MockUdpConnection {
struct MockUdpConnection {
_socket: Arc<UdpSocket>,
addr: SocketAddr,
}
#[derive(Default)]
pub struct MockConnectionManager {}
struct MockConnectionManager {}
impl ConnectionManager for MockConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(MockUdpPool {
type ConnectionPool = MockUdpPool;
type NewConnectionConfig = MockUdpConfig;
fn new_connection_pool(&self) -> Self::ConnectionPool {
MockUdpPool {
connections: Vec::default(),
})
}
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(MockUdpConfig::new().unwrap())
fn new_connection_config(&self) -> Self::NewConnectionConfig {
MockUdpConfig::new().unwrap()
}
fn get_port_offset(&self) -> u16 {
MOCK_PORT_OFFSET
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::UDP
}
}
impl BlockingClientConnection for MockUdpConnection {
@ -560,7 +559,7 @@ mod tests {
// we can actually connect to those addresses - ClientConnection implementations should either
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let connection_manager = Box::<MockConnectionManager>::default();
let connection_manager = MockConnectionManager::default();
let connection_cache =
ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap();
let port_offset = MOCK_PORT_OFFSET;
@ -583,7 +582,14 @@ mod tests {
let conn = &map.get(addr).expect("Address not found").get(0).unwrap();
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert!(addr.ip() == conn.server_addr().ip());
assert_eq!(
BlockingClientConnection::server_addr(&*conn).ip(),
addr.ip(),
);
assert_eq!(
NonblockingClientConnection::server_addr(&*conn).ip(),
addr.ip(),
);
});
}
@ -608,14 +614,18 @@ mod tests {
let port = u16::MAX - MOCK_PORT_OFFSET + 1;
assert!(port.checked_add(MOCK_PORT_OFFSET).is_none());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let connection_manager = Box::<MockConnectionManager>::default();
let connection_manager = MockConnectionManager::default();
let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap();
let conn = connection_cache.get_connection(&addr);
// We (intentionally) don't have an interface that allows us to distinguish between
// UDP and Quic connections, so check instead that the port is valid (non-zero)
// and is the same as the input port (falling back on UDP)
assert!(conn.server_addr().port() != 0);
assert!(conn.server_addr().port() == port);
assert_ne!(port, 0u16);
assert_eq!(BlockingClientConnection::server_addr(&*conn).port(), port);
assert_eq!(
NonblockingClientConnection::server_addr(&*conn).port(),
port
);
}
}

View File

@ -7,7 +7,7 @@ use {
tracer_packet_stats::TracerPacketStats,
unprocessed_transaction_storage::UnprocessedTransactionStorage,
},
solana_client::connection_cache::ConnectionCache,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_perf::{data_budget::DataBudget, packet::Packet},

View File

@ -3,7 +3,7 @@
use {
rand::{thread_rng, Rng},
solana_client::connection_cache::ConnectionCache,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_poh::poh_recorder::PohRecorder,
std::{

View File

@ -45,7 +45,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::connection_cache::ConnectionCache,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_core::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{

View File

@ -16,13 +16,11 @@ use {
},
quinn::Endpoint,
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ProtocolType,
ConnectionPoolError, NewConnectionConfig,
},
connection_cache_stats::ConnectionCacheStats,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair},
solana_streamer::{
@ -31,7 +29,6 @@ use {
tls_certificates::new_self_signed_tls_certificate,
},
std::{
any::Any,
error::Error,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
@ -46,11 +43,14 @@ pub enum QuicClientError {
}
pub struct QuicPool {
connections: Vec<Arc<dyn BaseClientConnection>>,
connections: Vec<Arc<Quic>>,
endpoint: Arc<QuicLazyInitializedEndpoint>,
}
impl ConnectionPool for QuicPool {
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
type BaseClientConnection = Quic;
type NewConnectionConfig = QuicConfig;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -59,7 +59,7 @@ impl ConnectionPool for QuicPool {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -68,10 +68,9 @@ impl ConnectionPool for QuicPool {
fn create_pool_entry(
&self,
config: &dyn NewConnectionConfig,
config: &Self::NewConnectionConfig,
addr: &SocketAddr,
) -> Arc<dyn BaseClientConnection> {
let config = QuicConfig::downcast_ref(config);
) -> Arc<Self::BaseClientConnection> {
Arc::new(Quic(Arc::new(QuicClient::new(
self.endpoint.clone(),
*addr,
@ -105,14 +104,6 @@ impl NewConnectionConfig for QuicConfig {
client_endpoint: None,
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
impl QuicConfig {
@ -166,23 +157,18 @@ impl QuicConfig {
pub fn update_client_endpoint(&mut self, client_endpoint: Endpoint) {
self.client_endpoint = Some(client_endpoint);
}
/// Convenient function to downcast a generic NewConnectionConfig reference to QuicConfig
pub fn downcast_ref(config: &dyn NewConnectionConfig) -> &Self {
match config.as_any().downcast_ref::<QuicConfig>() {
Some(config) => config,
None => panic!("Expecting a QuicConfig!"),
}
}
}
pub struct Quic(Arc<QuicClient>);
impl BaseClientConnection for Quic {
type BlockingClientConnection = BlockingQuicClientConnection;
type NonblockingClientConnection = NonblockingQuicClientConnection;
fn new_blocking_connection(
&self,
_addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn BlockingClientConnection> {
) -> Arc<Self::BlockingClientConnection> {
Arc::new(BlockingQuicClientConnection::new_with_client(
self.0.clone(),
stats,
@ -193,7 +179,7 @@ impl BaseClientConnection for Quic {
&self,
_addr: SocketAddr,
stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn NonblockingClientConnection> {
) -> Arc<Self::NonblockingClientConnection> {
Arc::new(NonblockingQuicClientConnection::new_with_client(
self.0.clone(),
stats,
@ -203,40 +189,39 @@ impl BaseClientConnection for Quic {
#[derive(Default)]
pub struct QuicConnectionManager {
connection_config: Option<Box<dyn NewConnectionConfig>>,
connection_config: Option<QuicConfig>,
}
impl ConnectionManager for QuicConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(QuicPool {
type ConnectionPool = QuicPool;
type NewConnectionConfig = QuicConfig;
fn new_connection_pool(&self) -> Self::ConnectionPool {
QuicPool {
connections: Vec::default(),
endpoint: Arc::new(self.connection_config.as_ref().map_or(
QuicLazyInitializedEndpoint::default(),
|config| {
let config = QuicConfig::downcast_ref(config.as_ref());
config.create_endpoint()
},
)),
})
endpoint: Arc::new(
self.connection_config
.as_ref()
.map_or(QuicLazyInitializedEndpoint::default(), |config| {
config.create_endpoint()
}),
),
}
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(QuicConfig::new().unwrap())
fn new_connection_config(&self) -> QuicConfig {
QuicConfig::new().unwrap()
}
fn get_port_offset(&self) -> u16 {
QUIC_PORT_OFFSET
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::QUIC
}
}
impl QuicConnectionManager {
pub fn new_with_connection_config(config: QuicConfig) -> Self {
Self {
connection_config: Some(Box::new(config)),
connection_config: Some(config),
}
}
}

View File

@ -464,19 +464,28 @@ fn run_tpu_send_transaction(tpu_use_quic: bool) {
true => ConnectionCache::new(DEFAULT_TPU_CONNECTION_POOL_SIZE),
false => ConnectionCache::with_udp(DEFAULT_TPU_CONNECTION_POOL_SIZE),
};
let tpu_client = TpuClient::new_with_connection_cache(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
Arc::new(connection_cache.into()),
)
.unwrap();
let recent_blockhash = rpc_client.get_latest_blockhash().unwrap();
let tx =
system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash);
assert!(tpu_client.send_transaction(&tx));
let success = match connection_cache {
ConnectionCache::Quic(cache) => TpuClient::new_with_connection_cache(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
Arc::new(cache),
)
.unwrap()
.send_transaction(&tx),
ConnectionCache::Udp(cache) => TpuClient::new_with_connection_cache(
rpc_client.clone(),
&test_validator.rpc_pubsub_url(),
TpuClientConfig::default(),
Arc::new(cache),
)
.unwrap()
.send_transaction(&tx),
};
assert!(success);
let timeout = Duration::from_secs(5);
let now = Instant::now();
let signatures = vec![tx.signatures[0]];

View File

@ -2,7 +2,7 @@ use {
crate::tpu_info::TpuInfo,
crossbeam_channel::{Receiver, RecvTimeoutError},
log::*,
solana_client::connection_cache::ConnectionCache,
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_measure::measure::Measure,
solana_metrics::datapoint_warn,
solana_runtime::{bank::Bank, bank_forks::BankForks},

View File

@ -6,7 +6,12 @@
use {
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::ConnectionCache,
solana_connection_cache::{
client_connection::ClientConnection,
connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
},
},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{config::RpcProgramAccountsConfig, response::Response},
solana_sdk::{
@ -111,21 +116,30 @@ pub mod temporary_pub {
use temporary_pub::*;
/// An object for querying and sending transactions to the network.
pub struct ThinClient {
pub struct ThinClient<
P, // ConnectionPool
M, // ConnectionManager
C, // NewConnectionConfig
> {
rpc_clients: Vec<RpcClient>,
tpu_addrs: Vec<SocketAddr>,
optimizer: ClientOptimizer,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
}
impl ThinClient {
impl<P, M, C> ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
/// (currently hardcoded to UDP)
pub fn new(
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Self {
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_addr, connection_cache)
}
@ -134,7 +148,7 @@ impl ThinClient {
rpc_addr: SocketAddr,
tpu_addr: SocketAddr,
timeout: Duration,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Self {
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
Self::new_from_client(rpc_client, tpu_addr, connection_cache)
@ -143,7 +157,7 @@ impl ThinClient {
fn new_from_client(
rpc_client: RpcClient,
tpu_addr: SocketAddr,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Self {
Self {
rpc_clients: vec![rpc_client],
@ -156,7 +170,7 @@ impl ThinClient {
pub fn new_from_addrs(
rpc_addrs: Vec<SocketAddr>,
tpu_addrs: Vec<SocketAddr>,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Self {
assert!(!rpc_addrs.is_empty());
assert_eq!(rpc_addrs.len(), tpu_addrs.len());
@ -314,13 +328,23 @@ impl ThinClient {
}
}
impl Client for ThinClient {
impl<P, M, C> Client for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn tpu_addr(&self) -> String {
self.tpu_addr().to_string()
}
}
impl SyncClient for ThinClient {
impl<P, M, C> SyncClient for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn send_and_confirm_message<T: Signers>(
&self,
keypairs: &T,
@ -600,7 +624,12 @@ impl SyncClient for ThinClient {
}
}
impl AsyncClient for ThinClient {
impl<P, M, C> AsyncClient for ThinClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn async_send_versioned_transaction(
&self,
transaction: VersionedTransaction,

View File

@ -11,8 +11,12 @@ use {
bincode::serialize,
futures_util::{future::join_all, stream::StreamExt},
log::*,
solana_connection_cache::connection_cache::{
ConnectionCache, ConnectionManager, DEFAULT_CONNECTION_POOL_SIZE,
solana_connection_cache::{
connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
DEFAULT_CONNECTION_POOL_SIZE,
},
nonblocking::client_connection::ClientConnection,
},
solana_pubsub_client::nonblocking::pubsub_client::{PubsubClient, PubsubClientError},
solana_rpc_client::nonblocking::rpc_client::RpcClient,
@ -250,33 +254,52 @@ impl LeaderTpuCache {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient {
pub struct TpuClient<
P, // ConnectionPool
M, // ConnectionManager
C, // NewConnectionConfig
> {
fanout_slots: u64,
leader_tpu_service: LeaderTpuService,
exit: Arc<AtomicBool>,
rpc_client: Arc<RpcClient>,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
}
async fn send_wire_transaction_to_addr(
connection_cache: &ConnectionCache,
async fn send_wire_transaction_to_addr<P, M, C>(
connection_cache: &ConnectionCache<P, M, C>,
addr: &SocketAddr,
wire_transaction: Vec<u8>,
) -> TransportResult<()> {
) -> TransportResult<()>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data(&wire_transaction).await
}
async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
async fn send_wire_transaction_batch_to_addr<P, M, C>(
connection_cache: &ConnectionCache<P, M, C>,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
) -> TransportResult<()>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_data_batch(wire_transactions).await
}
impl TpuClient {
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -391,7 +414,7 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_manager: Box<dyn ConnectionManager>,
connection_manager: M,
) -> Result<Self> {
let connection_cache = Arc::new(
ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(),
@ -404,7 +427,7 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service =
@ -553,7 +576,7 @@ impl TpuClient {
}
}
impl Drop for TpuClient {
impl<P, M, C> Drop for TpuClient<P, M, C> {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
}

View File

@ -2,7 +2,9 @@ pub use crate::nonblocking::tpu_client::TpuSenderError;
use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::connection_cache::{ConnectionCache, ConnectionManager},
solana_connection_cache::connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig,
},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult},
std::{
@ -59,14 +61,23 @@ impl Default for TpuClientConfig {
/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
pub struct TpuClient {
pub struct TpuClient<
P, // ConnectionPool
M, // ConnectionManager
C, // NewConnectionConfig
> {
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
//todo: get rid of this field
rpc_client: Arc<RpcClient>,
tpu_client: Arc<NonblockingTpuClient>,
tpu_client: Arc<NonblockingTpuClient<P, M, C>>,
}
impl TpuClient {
impl<P, M, C> TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
@ -110,7 +121,7 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_manager: Box<dyn ConnectionManager>,
connection_manager: M,
) -> Result<Self> {
let create_tpu_client = NonblockingTpuClient::new(
rpc_client.get_inner_client().clone(),
@ -133,7 +144,7 @@ impl TpuClient {
rpc_client: Arc<RpcClient>,
websocket_url: &str,
config: TpuClientConfig,
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCache<P, M, C>>,
) -> Result<Self> {
let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
rpc_client.get_inner_client().clone(),

View File

@ -9,26 +9,26 @@ use {
udp_client::UdpClientConnection as BlockingUdpConnection,
},
solana_connection_cache::{
client_connection::ClientConnection as BlockingClientConnection,
connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ProtocolType,
ConnectionPoolError, NewConnectionConfig,
},
connection_cache_stats::ConnectionCacheStats,
nonblocking::client_connection::ClientConnection as NonblockingClientConnection,
},
std::{
any::Any,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::Arc,
},
};
pub struct UdpPool {
connections: Vec<Arc<dyn BaseClientConnection>>,
connections: Vec<Arc<Udp>>,
}
impl ConnectionPool for UdpPool {
fn add_connection(&mut self, config: &dyn NewConnectionConfig, addr: &SocketAddr) {
type BaseClientConnection = Udp;
type NewConnectionConfig = UdpConfig;
fn add_connection(&mut self, config: &Self::NewConnectionConfig, addr: &SocketAddr) {
let connection = self.create_pool_entry(config, addr);
self.connections.push(connection);
}
@ -37,7 +37,7 @@ impl ConnectionPool for UdpPool {
self.connections.len()
}
fn get(&self, index: usize) -> Result<Arc<dyn BaseClientConnection>, ConnectionPoolError> {
fn get(&self, index: usize) -> Result<Arc<Self::BaseClientConnection>, ConnectionPoolError> {
self.connections
.get(index)
.cloned()
@ -46,13 +46,9 @@ impl ConnectionPool for UdpPool {
fn create_pool_entry(
&self,
config: &dyn NewConnectionConfig,
config: &Self::NewConnectionConfig,
_addr: &SocketAddr,
) -> Arc<dyn BaseClientConnection> {
let config: &UdpConfig = match config.as_any().downcast_ref::<UdpConfig>() {
Some(b) => b,
None => panic!("Expecting a UdpConfig!"),
};
) -> Arc<Self::BaseClientConnection> {
Arc::new(Udp(config.udp_socket.clone()))
}
}
@ -69,23 +65,18 @@ impl NewConnectionConfig for UdpConfig {
udp_socket: Arc::new(socket),
})
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn Any {
self
}
}
pub struct Udp(Arc<UdpSocket>);
impl BaseClientConnection for Udp {
type BlockingClientConnection = BlockingUdpConnection;
type NonblockingClientConnection = NonblockingUdpConnection;
fn new_blocking_connection(
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn BlockingClientConnection> {
) -> Arc<Self::BlockingClientConnection> {
Arc::new(BlockingUdpConnection::new_from_addr(self.0.clone(), addr))
}
@ -93,7 +84,7 @@ impl BaseClientConnection for Udp {
&self,
addr: SocketAddr,
_stats: Arc<ConnectionCacheStats>,
) -> Arc<dyn NonblockingClientConnection> {
) -> Arc<Self::NonblockingClientConnection> {
Arc::new(NonblockingUdpConnection::new_from_addr(
self.0.try_clone().unwrap(),
addr,
@ -105,21 +96,19 @@ impl BaseClientConnection for Udp {
pub struct UdpConnectionManager {}
impl ConnectionManager for UdpConnectionManager {
fn new_connection_pool(&self) -> Box<dyn ConnectionPool> {
Box::new(UdpPool {
type ConnectionPool = UdpPool;
type NewConnectionConfig = UdpConfig;
fn new_connection_pool(&self) -> Self::ConnectionPool {
UdpPool {
connections: Vec::default(),
})
}
}
fn new_connection_config(&self) -> Box<dyn NewConnectionConfig> {
Box::new(UdpConfig::new().unwrap())
fn new_connection_config(&self) -> Self::NewConnectionConfig {
UdpConfig::new().unwrap()
}
fn get_port_offset(&self) -> u16 {
0
}
fn get_protocol_type(&self) -> ProtocolType {
ProtocolType::UDP
}
}