removes hard-coded QUIC_PORT_OFFSET from connection-cache (#31541)

New ContactInfo has api identifying QUIC vs UDP ports; no need to hard-code
port-offset deep in connection-cache.
This commit is contained in:
behzad nouri 2023-05-09 13:46:17 +00:00 committed by GitHub
parent 6e342ded42
commit 6a4a0418a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 195 additions and 132 deletions

View File

@ -9,6 +9,7 @@ use {
spl_convert::FromOtherSolana, spl_convert::FromOtherSolana,
}, },
solana_client::{ solana_client::{
connection_cache::ConnectionCache,
thin_client::ThinClient, thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig}, tpu_client::{TpuClient, TpuClientConfig},
}, },
@ -81,7 +82,10 @@ fn test_bench_tps_local_cluster(config: Config) {
let client = Arc::new(ThinClient::new( let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(), cluster.entry_point_info.rpc().unwrap(),
cluster.entry_point_info.tpu().unwrap(), match *cluster.connection_cache {
ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(),
ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(),
},
cluster.connection_cache.clone(), cluster.connection_cache.clone(),
)); ));

View File

@ -1,3 +1,4 @@
pub use solana_connection_cache::connection_cache::Protocol;
use { use {
quinn::Endpoint, quinn::Endpoint,
solana_connection_cache::{ solana_connection_cache::{
@ -74,6 +75,14 @@ impl ConnectionCache {
Self::Quic(Arc::new(cache)) Self::Quic(Arc::new(cache))
} }
#[inline]
pub fn protocol(&self) -> Protocol {
match self {
Self::Quic(_) => Protocol::QUIC,
Self::Udp(_) => Protocol::UDP,
}
}
#[deprecated( #[deprecated(
since = "1.15.0", since = "1.15.0",
note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate." note = "This method does not do anything. Please use `new_with_client_options` instead to set the client certificate."
@ -204,7 +213,7 @@ mod tests {
super::*, super::*,
crate::connection_cache::ConnectionCache, crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded, crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{ solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats, nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::StreamStats,
streamer::StakedNodes, streamer::StakedNodes,
@ -236,9 +245,6 @@ mod tests {
#[test] #[test]
fn test_connection_with_specified_client_endpoint() { fn test_connection_with_specified_client_endpoint() {
let port = u16::MAX - QUIC_PORT_OFFSET + 1;
assert!(port.checked_add(QUIC_PORT_OFFSET).is_none());
// Start a response receiver: // Start a response receiver:
let ( let (
response_recv_socket, response_recv_socket,
@ -274,13 +280,13 @@ mod tests {
let port1 = 9001; let port1 = 9001;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port1);
let conn = connection_cache.get_connection(&addr); let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.server_addr().port(), port1 + QUIC_PORT_OFFSET); assert_eq!(conn.server_addr().port(), port1);
// server port 2: // server port 2:
let port2 = 9002; let port2 = 9002;
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port2);
let conn = connection_cache.get_connection(&addr); let conn = connection_cache.get_connection(&addr);
assert_eq!(conn.server_addr().port(), port2 + QUIC_PORT_OFFSET); assert_eq!(conn.server_addr().port(), port2);
response_recv_exit.store(true, Ordering::Relaxed); response_recv_exit.store(true, Ordering::Relaxed);
response_recv_thread.join().unwrap(); response_recv_thread.join().unwrap();

View File

@ -21,13 +21,20 @@ const MAX_CONNECTIONS: usize = 1024;
/// Default connection pool size per remote address /// Default connection pool size per remote address
pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4; pub const DEFAULT_CONNECTION_POOL_SIZE: usize = 4;
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub enum Protocol {
UDP,
QUIC,
}
pub trait ConnectionManager { pub trait ConnectionManager {
type ConnectionPool: ConnectionPool; type ConnectionPool: ConnectionPool;
type NewConnectionConfig: NewConnectionConfig; type NewConnectionConfig: NewConnectionConfig;
const PROTOCOL: Protocol;
fn new_connection_pool(&self) -> Self::ConnectionPool; fn new_connection_pool(&self) -> Self::ConnectionPool;
fn new_connection_config(&self) -> Self::NewConnectionConfig; fn new_connection_config(&self) -> Self::NewConnectionConfig;
fn get_port_offset(&self) -> u16;
} }
pub struct ConnectionCache< pub struct ConnectionCache<
@ -150,14 +157,6 @@ where
let map = self.map.read().unwrap(); let map = self.map.read().unwrap();
get_connection_map_lock_measure.stop(); get_connection_map_lock_measure.stop();
let port_offset = self.connection_manager.get_port_offset();
let port = addr
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr = SocketAddr::new(addr.ip(), port);
let mut lock_timing_ms = get_connection_map_lock_measure.as_ms(); let mut lock_timing_ms = get_connection_map_lock_measure.as_ms();
let report_stats = self let report_stats = self
@ -171,12 +170,12 @@ where
connection_cache_stats, connection_cache_stats,
num_evictions, num_evictions,
eviction_timing_ms, eviction_timing_ms,
} = match map.get(&addr) { } = match map.get(addr) {
Some(pool) => { Some(pool) => {
if pool.need_new_connection(self.connection_pool_size) { if pool.need_new_connection(self.connection_pool_size) {
// create more connection and put it in the pool // create more connection and put it in the pool
drop(map); drop(map);
self.create_connection(&mut lock_timing_ms, &addr) self.create_connection(&mut lock_timing_ms, addr)
} else { } else {
let connection = pool.borrow_connection(); let connection = pool.borrow_connection();
CreateConnectionResult { CreateConnectionResult {
@ -191,7 +190,7 @@ where
None => { None => {
// Upgrade to write access by dropping read lock and acquire write lock // Upgrade to write access by dropping read lock and acquire write lock
drop(map); drop(map);
self.create_connection(&mut lock_timing_ms, &addr) self.create_connection(&mut lock_timing_ms, addr)
} }
}; };
get_connection_map_measure.stop(); get_connection_map_measure.stop();
@ -384,8 +383,6 @@ mod tests {
}, },
}; };
const MOCK_PORT_OFFSET: u16 = 42;
struct MockUdpPool { struct MockUdpPool {
connections: Vec<Arc<MockUdp>>, connections: Vec<Arc<MockUdp>>,
} }
@ -487,6 +484,8 @@ mod tests {
type ConnectionPool = MockUdpPool; type ConnectionPool = MockUdpPool;
type NewConnectionConfig = MockUdpConfig; type NewConnectionConfig = MockUdpConfig;
const PROTOCOL: Protocol = Protocol::QUIC;
fn new_connection_pool(&self) -> Self::ConnectionPool { fn new_connection_pool(&self) -> Self::ConnectionPool {
MockUdpPool { MockUdpPool {
connections: Vec::default(), connections: Vec::default(),
@ -496,10 +495,6 @@ mod tests {
fn new_connection_config(&self) -> Self::NewConnectionConfig { fn new_connection_config(&self) -> Self::NewConnectionConfig {
MockUdpConfig::new().unwrap() MockUdpConfig::new().unwrap()
} }
fn get_port_offset(&self) -> u16 {
MOCK_PORT_OFFSET
}
} }
impl BlockingClientConnection for MockUdpConnection { impl BlockingClientConnection for MockUdpConnection {
@ -562,7 +557,6 @@ mod tests {
let connection_manager = MockConnectionManager::default(); let connection_manager = MockConnectionManager::default();
let connection_cache = let connection_cache =
ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap(); ConnectionCache::new(connection_manager, DEFAULT_CONNECTION_POOL_SIZE).unwrap();
let port_offset = MOCK_PORT_OFFSET;
let addrs = (0..MAX_CONNECTIONS) let addrs = (0..MAX_CONNECTIONS)
.map(|_| { .map(|_| {
let addr = get_addr(&mut rng); let addr = get_addr(&mut rng);
@ -573,13 +567,7 @@ mod tests {
{ {
let map = connection_cache.map.read().unwrap(); let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS); assert!(map.len() == MAX_CONNECTIONS);
addrs.iter().for_each(|a| { addrs.iter().for_each(|addr| {
let port = a
.port()
.checked_add(port_offset)
.unwrap_or_else(|| a.port());
let addr = &SocketAddr::new(a.ip(), port);
let conn = &map.get(addr).expect("Address not found").get(0).unwrap(); let conn = &map.get(addr).expect("Address not found").get(0).unwrap();
let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone()); let conn = conn.new_blocking_connection(*addr, connection_cache.stats.clone());
assert_eq!( assert_eq!(
@ -596,10 +584,7 @@ mod tests {
let addr = &get_addr(&mut rng); let addr = &get_addr(&mut rng);
connection_cache.get_connection(addr); connection_cache.get_connection(addr);
let port = addr let port = addr.port();
.port()
.checked_add(port_offset)
.unwrap_or_else(|| addr.port());
let addr_with_quic_port = SocketAddr::new(addr.ip(), port); let addr_with_quic_port = SocketAddr::new(addr.ip(), port);
let map = connection_cache.map.read().unwrap(); let map = connection_cache.map.read().unwrap();
assert!(map.len() == MAX_CONNECTIONS); assert!(map.len() == MAX_CONNECTIONS);
@ -611,8 +596,7 @@ mod tests {
// an invalid port. // an invalid port.
#[test] #[test]
fn test_overflow_address() { fn test_overflow_address() {
let port = u16::MAX - MOCK_PORT_OFFSET + 1; let port = u16::MAX;
assert!(port.checked_add(MOCK_PORT_OFFSET).is_none());
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let connection_manager = MockConnectionManager::default(); let connection_manager = MockConnectionManager::default();
let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap(); let connection_cache = ConnectionCache::new(connection_manager, 1).unwrap();

View File

@ -3,12 +3,12 @@ use {
crate::{ crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
next_leader::{next_leader_tpu_forwards, next_leader_tpu_vote}, next_leader::{next_leader, next_leader_tpu_vote},
tracer_packet_stats::TracerPacketStats, tracer_packet_stats::TracerPacketStats,
unprocessed_transaction_storage::UnprocessedTransactionStorage, unprocessed_transaction_storage::UnprocessedTransactionStorage,
}, },
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo, solana_gossip::{cluster_info::ClusterInfo, contact_info::LegacyContactInfo as ContactInfo},
solana_measure::measure_us, solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet}, solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder, solana_poh::poh_recorder::PohRecorder,
@ -214,9 +214,14 @@ impl Forwarder {
fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> { fn get_leader_and_addr(&self, forward_option: &ForwardOption) -> Option<(Pubkey, SocketAddr)> {
match forward_option { match forward_option {
ForwardOption::NotForward => None, ForwardOption::NotForward => None,
ForwardOption::ForwardTransaction => { ForwardOption::ForwardTransaction => next_leader(
next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder) &self.cluster_info,
} &self.poh_recorder,
match *self.connection_cache {
ConnectionCache::Quic(_) => ContactInfo::tpu_forwards_quic,
ConnectionCache::Udp(_) => ContactInfo::tpu_forwards,
},
),
ForwardOption::ForwardTpuVote => { ForwardOption::ForwardTpuVote => {
next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder)
@ -252,8 +257,6 @@ impl Forwarder {
batch_send(&self.socket, &pkts).map_err(|err| err.into()) batch_send(&self.socket, &pkts).map_err(|err| err.into())
} }
ForwardOption::ForwardTransaction => { ForwardOption::ForwardTransaction => {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
let conn = self.connection_cache.get_connection(addr); let conn = self.connection_cache.get_connection(addr);
conn.send_data_batch_async(packet_vec) conn.send_data_batch_async(packet_vec)
} }

View File

@ -7,21 +7,14 @@ use {
std::{net::SocketAddr, sync::RwLock}, std::{net::SocketAddr, sync::RwLock},
}; };
pub(crate) fn next_leader_tpu_forwards(
cluster_info: &ClusterInfo,
poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_forwards)
}
pub(crate) fn next_leader_tpu_vote( pub(crate) fn next_leader_tpu_vote(
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
poh_recorder: &RwLock<PohRecorder>, poh_recorder: &RwLock<PohRecorder>,
) -> Option<(Pubkey, SocketAddr)> { ) -> Option<(Pubkey, SocketAddr)> {
next_leader_x(cluster_info, poh_recorder, ContactInfo::tpu_vote) next_leader(cluster_info, poh_recorder, ContactInfo::tpu_vote)
} }
fn next_leader_x<F, E>( pub(crate) fn next_leader<F, E>(
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
poh_recorder: &RwLock<PohRecorder>, poh_recorder: &RwLock<PohRecorder>,
port_selector: F, port_selector: F,

View File

@ -31,6 +31,7 @@ impl WarmQuicCacheService {
poh_recorder: Arc<RwLock<PohRecorder>>, poh_recorder: Arc<RwLock<PohRecorder>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Self { ) -> Self {
assert!(matches!(*connection_cache, ConnectionCache::Quic(_)));
let thread_hdl = Builder::new() let thread_hdl = Builder::new()
.name("solWarmQuicSvc".to_string()) .name("solWarmQuicSvc".to_string())
.spawn(move || { .spawn(move || {
@ -47,7 +48,7 @@ impl WarmQuicCacheService {
{ {
maybe_last_leader = Some(leader_pubkey); maybe_last_leader = Some(leader_pubkey);
if let Some(Ok(addr)) = cluster_info if let Some(Ok(addr)) = cluster_info
.lookup_contact_info(&leader_pubkey, ContactInfo::tpu) .lookup_contact_info(&leader_pubkey, ContactInfo::tpu_quic)
{ {
let conn = connection_cache.get_connection(&addr); let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_data(&[0u8]) { if let Err(err) = conn.send_data(&[0u8]) {

View File

@ -414,6 +414,7 @@ fn get_target(
nodes: &[ContactInfo], nodes: &[ContactInfo],
mode: Mode, mode: Mode,
entrypoint_addr: SocketAddr, entrypoint_addr: SocketAddr,
tpu_use_quic: bool,
) -> Option<(Pubkey, SocketAddr)> { ) -> Option<(Pubkey, SocketAddr)> {
let mut target = None; let mut target = None;
if nodes.is_empty() { if nodes.is_empty() {
@ -433,8 +434,24 @@ fn get_target(
Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())), Mode::Gossip => Some((*node.pubkey(), node.gossip().unwrap())),
Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())), Mode::Tvu => Some((*node.pubkey(), node.tvu().unwrap())),
Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())), Mode::TvuForwards => Some((*node.pubkey(), node.tvu_forwards().unwrap())),
Mode::Tpu => Some((*node.pubkey(), node.tpu().unwrap())), Mode::Tpu => Some((
Mode::TpuForwards => Some((*node.pubkey(), node.tpu_forwards().unwrap())), *node.pubkey(),
if tpu_use_quic {
node.tpu_quic()
} else {
node.tpu()
}
.unwrap(),
)),
Mode::TpuForwards => Some((
*node.pubkey(),
if tpu_use_quic {
node.tpu_forwards_quic()
} else {
node.tpu_forwards()
}
.unwrap(),
)),
Mode::Repair => Some((*node.pubkey(), node.repair().unwrap())), Mode::Repair => Some((*node.pubkey(), node.repair().unwrap())),
Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())), Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())),
Mode::Rpc => None, Mode::Rpc => None,
@ -606,8 +623,12 @@ fn run_dos<T: 'static + BenchTpsClient + Send + Sync>(
client: Option<Arc<T>>, client: Option<Arc<T>>,
params: DosClientParameters, params: DosClientParameters,
) { ) {
let target = get_target(nodes, params.mode, params.entrypoint_addr); let target = get_target(
nodes,
params.mode,
params.entrypoint_addr,
params.tpu_use_quic,
);
if params.mode == Mode::Rpc { if params.mode == Mode::Rpc {
// creating rpc_client because get_account, get_program_accounts are not implemented for BenchTpsClient // creating rpc_client because get_account, get_program_accounts are not implemented for BenchTpsClient
let rpc_client = let rpc_client =
@ -1079,7 +1100,10 @@ pub mod test {
let client = Arc::new(ThinClient::new( let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(), cluster.entry_point_info.rpc().unwrap(),
cluster.entry_point_info.tpu().unwrap(), match *cluster.connection_cache {
ConnectionCache::Quic(_) => cluster.entry_point_info.tpu_quic().unwrap(),
ConnectionCache::Udp(_) => cluster.entry_point_info.tpu().unwrap(),
},
cluster.connection_cache.clone(), cluster.connection_cache.clone(),
)); ));

View File

@ -200,9 +200,10 @@ pub fn get_client(
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
) -> ThinClient { ) -> ThinClient {
let protocol = connection_cache.protocol();
let nodes: Vec<_> = nodes let nodes: Vec<_> = nodes
.iter() .iter()
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.collect(); .collect();
let select = thread_rng().gen_range(0, nodes.len()); let select = thread_rng().gen_range(0, nodes.len());
let (rpc, tpu) = nodes[select]; let (rpc, tpu) = nodes[select];
@ -214,13 +215,11 @@ pub fn get_multi_client(
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
) -> (ThinClient, usize) { ) -> (ThinClient, usize) {
let addrs: Vec<_> = nodes let protocol = connection_cache.protocol();
let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes
.iter() .iter()
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) .filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.collect(); .unzip();
let rpc_addrs: Vec<_> = addrs.iter().map(|addr| addr.0).collect();
let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect();
let num_nodes = tpu_addrs.len(); let num_nodes = tpu_addrs.len();
( (
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache), ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache),

View File

@ -6,6 +6,7 @@ use {
}, },
crds_value::MAX_WALLCLOCK, crds_value::MAX_WALLCLOCK,
}, },
solana_client::connection_cache::Protocol,
solana_sdk::{ solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
sanitize::{Sanitize, SanitizeError}, sanitize::{Sanitize, SanitizeError},
@ -202,6 +203,10 @@ impl LegacyContactInfo {
self.tpu().and_then(|addr| get_quic_socket(&addr)) self.tpu().and_then(|addr| get_quic_socket(&addr))
} }
pub fn tpu_forwards_quic(&self) -> Result<SocketAddr, Error> {
self.tpu_forwards().and_then(|addr| get_quic_socket(&addr))
}
fn is_valid_ip(addr: IpAddr) -> bool { fn is_valid_ip(addr: IpAddr) -> bool {
!(addr.is_unspecified() || addr.is_multicast()) !(addr.is_unspecified() || addr.is_multicast())
// || (addr.is_loopback() && !cfg_test)) // || (addr.is_loopback() && !cfg_test))
@ -216,21 +221,22 @@ impl LegacyContactInfo {
addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr) addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr)
} }
pub fn client_facing_addr(&self) -> (SocketAddr, SocketAddr) {
(self.rpc, self.tpu)
}
pub(crate) fn valid_client_facing_addr( pub(crate) fn valid_client_facing_addr(
&self, &self,
protocol: Protocol,
socket_addr_space: &SocketAddrSpace, socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> { ) -> Option<(SocketAddr, SocketAddr)> {
if LegacyContactInfo::is_valid_address(&self.rpc, socket_addr_space) let rpc = self
&& LegacyContactInfo::is_valid_address(&self.tpu, socket_addr_space) .rpc()
{ .ok()
Some((self.rpc, self.tpu)) .filter(|addr| socket_addr_space.check(addr))?;
} else { let tpu = match protocol {
None Protocol::QUIC => self.tpu_quic(),
Protocol::UDP => self.tpu(),
} }
.ok()
.filter(|addr| socket_addr_space.check(addr))?;
Some((rpc, tpu))
} }
} }
@ -323,17 +329,17 @@ mod tests {
fn test_valid_client_facing() { fn test_valid_client_facing() {
let mut ci = LegacyContactInfo::default(); let mut ci = LegacyContactInfo::default();
assert_eq!( assert_eq!(
ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified), ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None None
); );
ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123); ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123);
assert_eq!( assert_eq!(
ci.valid_client_facing_addr(&SocketAddrSpace::Unspecified), ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None None
); );
ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234); ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234);
assert!(ci assert!(ci
.valid_client_facing_addr(&SocketAddrSpace::Unspecified) .valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified)
.is_some()); .is_some());
} }

View File

@ -6,7 +6,10 @@ use log::*;
use { use {
rand::{thread_rng, Rng}, rand::{thread_rng, Rng},
rayon::prelude::*, rayon::prelude::*,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, solana_client::{
connection_cache::{ConnectionCache, Protocol},
thin_client::ThinClient,
},
solana_core::consensus::VOTE_THRESHOLD_DEPTH, solana_core::consensus::VOTE_THRESHOLD_DEPTH,
solana_entry::entry::{Entry, EntrySlice}, solana_entry::entry::{Entry, EntrySlice},
solana_gossip::{ solana_gossip::{
@ -51,9 +54,15 @@ use {
}; };
pub fn get_client_facing_addr<T: Borrow<LegacyContactInfo>>( pub fn get_client_facing_addr<T: Borrow<LegacyContactInfo>>(
protocol: Protocol,
contact_info: T, contact_info: T,
) -> (SocketAddr, SocketAddr) { ) -> (SocketAddr, SocketAddr) {
let (rpc, mut tpu) = contact_info.borrow().client_facing_addr(); let contact_info = contact_info.borrow();
let rpc = contact_info.rpc().unwrap();
let mut tpu = match protocol {
Protocol::QUIC => contact_info.tpu_quic().unwrap(),
Protocol::UDP => contact_info.tpu().unwrap(),
};
// QUIC certificate authentication requires the IP Address to match. ContactInfo might have // QUIC certificate authentication requires the IP Address to match. ContactInfo might have
// 0.0.0.0 as the IP instead of 127.0.0.1. // 0.0.0.0 as the IP instead of 127.0.0.1.
tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)); tpu.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST));
@ -82,7 +91,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
return; return;
} }
let random_keypair = Keypair::new(); let random_keypair = Keypair::new();
let (rpc, tpu) = get_client_facing_addr(ingress_node); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let bal = client let bal = client
.poll_get_balance_with_commitment( .poll_get_balance_with_commitment(
@ -104,7 +113,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
if ignore_nodes.contains(validator.pubkey()) { if ignore_nodes.contains(validator.pubkey()) {
continue; continue;
} }
let (rpc, tpu) = get_client_facing_addr(validator); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(&sig, confs).unwrap(); client.poll_for_signature_confirmation(&sig, confs).unwrap();
} }
@ -117,7 +126,7 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
) { ) {
let (rpc, tpu) = LegacyContactInfo::try_from(node) let (rpc, tpu) = LegacyContactInfo::try_from(node)
.map(get_client_facing_addr) .map(|node| get_client_facing_addr(connection_cache.protocol(), node))
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, connection_cache); let client = ThinClient::new(rpc, tpu, connection_cache);
for (pk, b) in expected_balances { for (pk, b) in expected_balances {
@ -135,7 +144,7 @@ pub fn send_many_transactions(
max_tokens_per_transfer: u64, max_tokens_per_transfer: u64,
num_txs: u64, num_txs: u64,
) -> HashMap<Pubkey, u64> { ) -> HashMap<Pubkey, u64> {
let (rpc, tpu) = get_client_facing_addr(node); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let mut expected_balances = HashMap::new(); let mut expected_balances = HashMap::new();
for _ in 0..num_txs { for _ in 0..num_txs {
@ -233,7 +242,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
.unwrap(); .unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
let (rpc, tpu) = LegacyContactInfo::try_from(entry_point_info) let (rpc, tpu) = LegacyContactInfo::try_from(entry_point_info)
.map(get_client_facing_addr) .map(|node| get_client_facing_addr(connection_cache.protocol(), node))
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
@ -262,7 +271,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
continue; continue;
} }
let (rpc, tpu) = get_client_facing_addr(ingress_node); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let balance = client let balance = client
.poll_get_balance_with_commitment( .poll_get_balance_with_commitment(
@ -346,7 +355,7 @@ pub fn check_for_new_roots(
for (i, ingress_node) in contact_infos.iter().enumerate() { for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = LegacyContactInfo::try_from(ingress_node) let (rpc, tpu) = LegacyContactInfo::try_from(ingress_node)
.map(get_client_facing_addr) .map(|node| get_client_facing_addr(connection_cache.protocol(), node))
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let root_slot = client let root_slot = client
@ -380,7 +389,7 @@ pub fn check_no_new_roots(
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, ingress_node)| { .map(|(i, ingress_node)| {
let (rpc, tpu) = get_client_facing_addr(ingress_node); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let initial_root = client let initial_root = client
.get_slot() .get_slot()
@ -399,7 +408,7 @@ pub fn check_no_new_roots(
let mut reached_end_slot = false; let mut reached_end_slot = false;
loop { loop {
for contact_info in contact_infos { for contact_info in contact_infos {
let (rpc, tpu) = get_client_facing_addr(contact_info); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
current_slot = client current_slot = client
.get_slot_with_commitment(CommitmentConfig::processed()) .get_slot_with_commitment(CommitmentConfig::processed())
@ -425,7 +434,7 @@ pub fn check_no_new_roots(
} }
for (i, ingress_node) in contact_infos.iter().enumerate() { for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(ingress_node); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
assert_eq!( assert_eq!(
client client
@ -447,7 +456,7 @@ fn poll_all_nodes_for_signature(
if validator.pubkey() == entry_point_info.pubkey() { if validator.pubkey() == entry_point_info.pubkey() {
continue; continue;
} }
let (rpc, tpu) = get_client_facing_addr(validator); let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone()); let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(sig, confs)?; client.poll_for_signature_confirmation(sig, confs)?;
} }

View File

@ -436,7 +436,9 @@ impl LocalCluster {
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
) -> Pubkey { ) -> Pubkey {
let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone());
@ -531,7 +533,9 @@ impl LocalCluster {
pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info) let (rpc, tpu) = LegacyContactInfo::try_from(&self.entry_point_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); let client = ThinClient::new(rpc, tpu, self.connection_cache.clone());
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
@ -786,7 +790,9 @@ impl Cluster for LocalCluster {
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> { fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
self.validators.get(pubkey).map(|f| { self.validators.get(pubkey).map(|f| {
let (rpc, tpu) = LegacyContactInfo::try_from(&f.info.contact_info) let (rpc, tpu) = LegacyContactInfo::try_from(&f.info.contact_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(self.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
ThinClient::new(rpc, tpu, self.connection_cache.clone()) ThinClient::new(rpc, tpu, self.connection_cache.clone())
}) })

View File

@ -201,7 +201,9 @@ fn test_local_cluster_signature_subscribe() {
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();
let (rpc, tpu) = LegacyContactInfo::try_from(non_bootstrap_info) let (rpc, tpu) = LegacyContactInfo::try_from(non_bootstrap_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
@ -433,7 +435,9 @@ fn test_mainnet_beta_cluster_type() {
assert_eq!(cluster_nodes.len(), 1); assert_eq!(cluster_nodes.len(), 1);
let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
@ -2641,7 +2645,9 @@ fn test_oc_bad_signatures() {
// 3) Start up a spy to listen for and push votes to leader TPU // 3) Start up a spy to listen for and push votes to leader TPU
let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info) let (rpc, tpu) = LegacyContactInfo::try_from(&cluster.entry_point_info)
.map(cluster_tests::get_client_facing_addr) .map(|node| {
cluster_tests::get_client_facing_addr(cluster.connection_cache.protocol(), node)
})
.unwrap(); .unwrap();
let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone());
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();

View File

@ -19,11 +19,11 @@ use {
solana_connection_cache::{ solana_connection_cache::{
connection_cache::{ connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ConnectionPoolError, NewConnectionConfig, Protocol,
}, },
connection_cache_stats::ConnectionCacheStats, connection_cache_stats::ConnectionCacheStats,
}, },
solana_sdk::{pubkey::Pubkey, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_streamer::{ solana_streamer::{
nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType}, nonblocking::quic::{compute_max_allowed_uni_streams, ConnectionPeerType},
streamer::StakedNodes, streamer::StakedNodes,
@ -195,6 +195,8 @@ impl ConnectionManager for QuicConnectionManager {
type ConnectionPool = QuicPool; type ConnectionPool = QuicPool;
type NewConnectionConfig = QuicConfig; type NewConnectionConfig = QuicConfig;
const PROTOCOL: Protocol = Protocol::QUIC;
fn new_connection_pool(&self) -> Self::ConnectionPool { fn new_connection_pool(&self) -> Self::ConnectionPool {
QuicPool { QuicPool {
connections: Vec::default(), connections: Vec::default(),
@ -211,10 +213,6 @@ impl ConnectionManager for QuicConnectionManager {
fn new_connection_config(&self) -> QuicConfig { fn new_connection_config(&self) -> QuicConfig {
QuicConfig::new().unwrap() QuicConfig::new().unwrap()
} }
fn get_port_offset(&self) -> u16 {
QUIC_PORT_OFFSET
}
} }
impl QuicConnectionManager { impl QuicConnectionManager {

View File

@ -1,5 +1,6 @@
use { use {
solana_client::{ solana_client::{
connection_cache::Protocol,
nonblocking::tpu_client::{LeaderTpuService, TpuClient}, nonblocking::tpu_client::{LeaderTpuService, TpuClient},
tpu_client::TpuClientConfig, tpu_client::TpuClientConfig,
}, },
@ -50,10 +51,14 @@ async fn test_tpu_cache_slot_updates() {
let (test_validator, _) = TestValidatorGenesis::default().start_async().await; let (test_validator, _) = TestValidatorGenesis::default().start_async().await;
let rpc_client = Arc::new(test_validator.get_async_rpc_client()); let rpc_client = Arc::new(test_validator.get_async_rpc_client());
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let mut leader_tpu_service = let mut leader_tpu_service = LeaderTpuService::new(
LeaderTpuService::new(rpc_client, &test_validator.rpc_pubsub_url(), exit.clone()) rpc_client,
.await &test_validator.rpc_pubsub_url(),
.unwrap(); Protocol::QUIC,
exit.clone(),
)
.await
.unwrap();
let start_slot = leader_tpu_service.estimated_current_slot(); let start_slot = leader_tpu_service.estimated_current_slot();
let timeout = Duration::from_secs(5); let timeout = Duration::from_secs(5);
let sleep_time = Duration::from_millis(DEFAULT_MS_PER_SLOT); let sleep_time = Duration::from_millis(DEFAULT_MS_PER_SLOT);

View File

@ -355,7 +355,11 @@ impl JsonRpcRequestProcessor {
); );
ClusterInfo::new(contact_info, keypair, socket_addr_space) ClusterInfo::new(contact_info, keypair, socket_addr_space)
}); });
let tpu_address = cluster_info.my_contact_info().tpu().unwrap(); let tpu_address = match *connection_cache {
ConnectionCache::Quic(_) => ContactInfo::tpu_quic,
ConnectionCache::Udp(_) => ContactInfo::tpu,
}(&cluster_info.my_contact_info())
.unwrap();
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
SendTransactionService::new::<NullTpuInfo>( SendTransactionService::new::<NullTpuInfo>(
tpu_address, tpu_address,

View File

@ -20,7 +20,7 @@ use {
}, },
regex::Regex, regex::Regex,
solana_client::connection_cache::ConnectionCache, solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{ solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, bigtable_upload::ConfirmedBlockUploadConfig,
bigtable_upload_service::BigTableUploadService, blockstore::Blockstore, bigtable_upload_service::BigTableUploadService, blockstore::Blockstore,
@ -378,10 +378,11 @@ impl JsonRpcService {
LARGEST_ACCOUNTS_CACHE_DURATION, LARGEST_ACCOUNTS_CACHE_DURATION,
))); )));
let tpu_address = cluster_info let tpu_address = match *connection_cache {
.my_contact_info() ConnectionCache::Quic(_) => ContactInfo::tpu_quic,
.tpu() ConnectionCache::Udp(_) => ContactInfo::tpu,
.map_err(|err| format!("{err}"))?; }(&cluster_info.my_contact_info())
.map_err(|err| format!("{err}"))?;
// sadly, some parts of our current rpc implemention block the jsonrpc's // sadly, some parts of our current rpc implemention block the jsonrpc's
// _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU,

View File

@ -6,7 +6,7 @@ use {
log::*, log::*,
solana_connection_cache::{ solana_connection_cache::{
connection_cache::{ connection_cache::{
ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, Protocol,
DEFAULT_CONNECTION_POOL_SIZE, DEFAULT_CONNECTION_POOL_SIZE,
}, },
nonblocking::client_connection::ClientConnection, nonblocking::client_connection::ClientConnection,
@ -22,6 +22,7 @@ use {
commitment_config::CommitmentConfig, commitment_config::CommitmentConfig,
epoch_info::EpochInfo, epoch_info::EpochInfo,
pubkey::Pubkey, pubkey::Pubkey,
quic::QUIC_PORT_OFFSET,
signature::SignerError, signature::SignerError,
transaction::Transaction, transaction::Transaction,
transport::{Result as TransportResult, TransportError}, transport::{Result as TransportResult, TransportError},
@ -102,6 +103,7 @@ impl LeaderTpuCacheUpdateInfo {
} }
struct LeaderTpuCache { struct LeaderTpuCache {
protocol: Protocol,
first_slot: Slot, first_slot: Slot,
leaders: Vec<Pubkey>, leaders: Vec<Pubkey>,
leader_tpu_map: HashMap<Pubkey, SocketAddr>, leader_tpu_map: HashMap<Pubkey, SocketAddr>,
@ -115,9 +117,11 @@ impl LeaderTpuCache {
slots_in_epoch: Slot, slots_in_epoch: Slot,
leaders: Vec<Pubkey>, leaders: Vec<Pubkey>,
cluster_nodes: Vec<RpcContactInfo>, cluster_nodes: Vec<RpcContactInfo>,
protocol: Protocol,
) -> Self { ) -> Self {
let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes); let leader_tpu_map = Self::extract_cluster_tpu_sockets(protocol, cluster_nodes);
Self { Self {
protocol,
first_slot, first_slot,
leaders, leaders,
leader_tpu_map, leader_tpu_map,
@ -183,16 +187,24 @@ impl LeaderTpuCache {
} }
} }
pub fn extract_cluster_tpu_sockets( fn extract_cluster_tpu_sockets(
protocol: Protocol,
cluster_contact_info: Vec<RpcContactInfo>, cluster_contact_info: Vec<RpcContactInfo>,
) -> HashMap<Pubkey, SocketAddr> { ) -> HashMap<Pubkey, SocketAddr> {
cluster_contact_info cluster_contact_info
.into_iter() .into_iter()
.filter_map(|contact_info| { .filter_map(|contact_info| {
Some(( let pubkey = Pubkey::from_str(&contact_info.pubkey).ok()?;
Pubkey::from_str(&contact_info.pubkey).ok()?, let socket = match protocol {
contact_info.tpu?, Protocol::QUIC => contact_info.tpu_quic.or_else(|| {
)) let mut socket = contact_info.tpu?;
let port = socket.port().checked_add(QUIC_PORT_OFFSET)?;
socket.set_port(port);
Some(socket)
}),
Protocol::UDP => contact_info.tpu,
}?;
Some((pubkey, socket))
}) })
.collect() .collect()
} }
@ -211,8 +223,8 @@ impl LeaderTpuCache {
if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes { if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
match cluster_nodes { match cluster_nodes {
Ok(cluster_nodes) => { Ok(cluster_nodes) => {
let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes); self.leader_tpu_map =
self.leader_tpu_map = leader_tpu_map; Self::extract_cluster_tpu_sockets(self.protocol, cluster_nodes);
cluster_refreshed = true; cluster_refreshed = true;
} }
Err(err) => { Err(err) => {
@ -425,7 +437,8 @@ where
) -> Result<Self> { ) -> Result<Self> {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let leader_tpu_service = let leader_tpu_service =
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?; LeaderTpuService::new(rpc_client.clone(), websocket_url, M::PROTOCOL, exit.clone())
.await?;
Ok(Self { Ok(Self {
fanout_slots: config.fanout_slots.clamp(1, MAX_FANOUT_SLOTS), fanout_slots: config.fanout_slots.clamp(1, MAX_FANOUT_SLOTS),
@ -586,6 +599,7 @@ impl LeaderTpuService {
pub async fn new( pub async fn new(
rpc_client: Arc<RpcClient>, rpc_client: Arc<RpcClient>,
websocket_url: &str, websocket_url: &str,
protocol: Protocol,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
) -> Result<Self> { ) -> Result<Self> {
let start_slot = rpc_client let start_slot = rpc_client
@ -603,6 +617,7 @@ impl LeaderTpuService {
slots_in_epoch, slots_in_epoch,
leaders, leaders,
cluster_nodes, cluster_nodes,
protocol,
))); )));
let pubsub_client = if !websocket_url.is_empty() { let pubsub_client = if !websocket_url.is_empty() {
@ -640,7 +655,7 @@ impl LeaderTpuService {
self.recent_slots.estimated_current_slot() self.recent_slots.estimated_current_slot()
} }
pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> { fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
let current_slot = self.recent_slots.estimated_current_slot(); let current_slot = self.recent_slots.estimated_current_slot();
self.leader_tpu_cache self.leader_tpu_cache
.read() .read()

View File

@ -11,7 +11,7 @@ use {
solana_connection_cache::{ solana_connection_cache::{
connection_cache::{ connection_cache::{
BaseClientConnection, ClientError, ConnectionManager, ConnectionPool, BaseClientConnection, ClientError, ConnectionManager, ConnectionPool,
ConnectionPoolError, NewConnectionConfig, ConnectionPoolError, NewConnectionConfig, Protocol,
}, },
connection_cache_stats::ConnectionCacheStats, connection_cache_stats::ConnectionCacheStats,
}, },
@ -98,6 +98,9 @@ pub struct UdpConnectionManager {}
impl ConnectionManager for UdpConnectionManager { impl ConnectionManager for UdpConnectionManager {
type ConnectionPool = UdpPool; type ConnectionPool = UdpPool;
type NewConnectionConfig = UdpConfig; type NewConnectionConfig = UdpConfig;
const PROTOCOL: Protocol = Protocol::UDP;
fn new_connection_pool(&self) -> Self::ConnectionPool { fn new_connection_pool(&self) -> Self::ConnectionPool {
UdpPool { UdpPool {
connections: Vec::default(), connections: Vec::default(),
@ -107,8 +110,4 @@ impl ConnectionManager for UdpConnectionManager {
fn new_connection_config(&self) -> Self::NewConnectionConfig { fn new_connection_config(&self) -> Self::NewConnectionConfig {
UdpConfig::new().unwrap() UdpConfig::new().unwrap()
} }
fn get_port_offset(&self) -> u16 {
0
}
} }