From 1ca5c3a7bd88ec836f58c3194635cdd100f8da80 Mon Sep 17 00:00:00 2001 From: ryleung-solana <91908731+ryleung-solana@users.noreply.github.com> Date: Thu, 26 May 2022 11:21:16 -0400 Subject: [PATCH] Switch to using enum-dispatch to switch between UDP and Quic (#24713) --- Cargo.lock | 13 ++ client/Cargo.toml | 1 + client/src/connection_cache.rs | 151 +++--------------- client/src/quic_client.rs | 63 +++----- client/src/thin_client.rs | 19 ++- client/src/tpu_client.rs | 6 +- client/src/tpu_connection.rs | 48 ++---- client/src/udp_client.rs | 41 +---- client/tests/quic_client.rs | 11 +- core/src/banking_stage.rs | 5 +- core/src/warm_quic_cache_service.rs | 5 +- programs/bpf/Cargo.lock | 13 ++ .../src/send_transaction_service.rs | 8 +- 13 files changed, 127 insertions(+), 257 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a217326a9..e8748704e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1264,6 +1264,18 @@ dependencies = [ "syn 1.0.93", ] +[[package]] +name = "enum_dispatch" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eb359f1476bf611266ac1f5355bc14aeca37b299d0ebccc038ee7058891c9cb" +dependencies = [ + "once_cell", + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -4638,6 +4650,7 @@ dependencies = [ "bytes", "clap 2.33.3", "crossbeam-channel", + "enum_dispatch", "futures 0.3.21", "futures-util", "indexmap", diff --git a/client/Cargo.toml b/client/Cargo.toml index 189d1d790..1a57a2493 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -18,6 +18,7 @@ bs58 = "0.4.0" bytes = "1.1.0" clap = "2.33.0" crossbeam-channel = "0.5" +enum_dispatch = "0.3.8" futures = "0.3" futures-util = "0.3.21" indexmap = "1.8.1" diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index d26fae696..4480b254e 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -1,16 +1,13 @@ use { crate::{ - quic_client::QuicTpuConnection, - tpu_connection::{ClientStats, TpuConnection}, - udp_client::UdpTpuConnection, + quic_client::QuicTpuConnection, tpu_connection::ClientStats, udp_client::UdpTpuConnection, }, + enum_dispatch::enum_dispatch, indexmap::map::IndexMap, lazy_static::lazy_static, rand::{thread_rng, Rng}, solana_measure::measure::Measure, - solana_sdk::{ - timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError, - }, + solana_sdk::timing::AtomicInterval, std::{ net::SocketAddr, sync::{ @@ -23,10 +20,10 @@ use { // Should be non-zero static MAX_CONNECTIONS: usize = 1024; -#[derive(Clone)] +#[enum_dispatch(TpuConnection)] pub enum Connection { - Udp(Arc), - Quic(Arc), + UdpTpuConnection, + QuicTpuConnection, } #[derive(Default)] @@ -52,7 +49,12 @@ pub struct ConnectionCacheStats { const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000; impl ConnectionCacheStats { - fn add_client_stats(&self, client_stats: &ClientStats, num_packets: usize, is_success: bool) { + pub fn add_client_stats( + &self, + client_stats: &ClientStats, + num_packets: usize, + is_success: bool, + ) { self.total_client_stats.total_connections.fetch_add( client_stats.total_connections.load(Ordering::Relaxed), Ordering::Relaxed, @@ -214,7 +216,7 @@ impl ConnectionCacheStats { } struct ConnectionMap { - map: IndexMap, + map: IndexMap>, stats: Arc, last_stats: AtomicInterval, use_quic: bool, @@ -245,7 +247,7 @@ pub fn set_use_quic(use_quic: bool) { } struct GetConnectionResult { - connection: Connection, + connection: Arc, cache_hit: bool, report_stats: bool, map_timing_ms: u64, @@ -286,18 +288,14 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { match map.map.get(addr) { Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0), None => { - let connection = if map.use_quic { - Connection::Quic(Arc::new(QuicTpuConnection::new( - *addr, - map.stats.clone(), - ))) + let connection: Connection = if map.use_quic { + QuicTpuConnection::new(*addr, map.stats.clone()).into() } else { - Connection::Udp(Arc::new(UdpTpuConnection::new( - *addr, - map.stats.clone(), - ))) + UdpTpuConnection::new(*addr, map.stats.clone()).into() }; + let connection = Arc::new(connection); + // evict a connection if the cache is reaching upper bounds let mut num_evictions = 0; let mut get_connection_cache_eviction_measure = @@ -338,7 +336,7 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult { // TODO: see https://github.com/solana-labs/solana/issues/23661 // remove lazy_static and optimize and refactor this -fn get_connection(addr: &SocketAddr) -> (Connection, Arc) { +pub fn get_connection(addr: &SocketAddr) -> Arc { let mut get_connection_measure = Measure::start("get_connection_measure"); let GetConnectionResult { connection, @@ -384,114 +382,20 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc) connection_cache_stats .get_connection_ms .fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed); - (connection, connection_cache_stats) -} -// TODO: see https://github.com/solana-labs/solana/issues/23851 -// use enum_dispatch and get rid of this tedious code. -// The main blocker to using enum_dispatch right now is that -// the it doesn't work with static methods like TpuConnection::new -// which is used by thin_client. This will be eliminated soon -// once thin_client is moved to using this connection cache. -// Once that is done, we will migrate to using enum_dispatch -// This will be done in a followup to -// https://github.com/solana-labs/solana/pull/23817 -pub fn send_wire_transaction_batch( - packets: &[&[u8]], - addr: &SocketAddr, -) -> Result<(), TransportError> { - let (conn, stats) = get_connection(addr); - let client_stats = ClientStats::default(); - let r = match conn { - Connection::Udp(conn) => conn.send_wire_transaction_batch(packets, &client_stats), - Connection::Quic(conn) => conn.send_wire_transaction_batch(packets, &client_stats), - }; - stats.add_client_stats(&client_stats, packets.len(), r.is_ok()); - r -} - -pub fn send_wire_transaction_async( - packets: Vec, - addr: &SocketAddr, -) -> Result<(), TransportError> { - let (conn, stats) = get_connection(addr); - let client_stats = Arc::new(ClientStats::default()); - let r = match conn { - Connection::Udp(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()), - Connection::Quic(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()), - }; - stats.add_client_stats(&client_stats, 1, r.is_ok()); - r -} - -pub fn send_wire_transaction_batch_async( - packets: Vec>, - addr: &SocketAddr, -) -> Result<(), TransportError> { - let (conn, stats) = get_connection(addr); - let client_stats = Arc::new(ClientStats::default()); - let len = packets.len(); - let r = match conn { - Connection::Udp(conn) => { - conn.send_wire_transaction_batch_async(packets, client_stats.clone()) - } - Connection::Quic(conn) => { - conn.send_wire_transaction_batch_async(packets, client_stats.clone()) - } - }; - stats.add_client_stats(&client_stats, len, r.is_ok()); - r -} - -pub fn send_wire_transaction( - wire_transaction: &[u8], - addr: &SocketAddr, -) -> Result<(), TransportError> { - send_wire_transaction_batch(&[wire_transaction], addr) -} - -pub fn serialize_and_send_transaction( - transaction: &VersionedTransaction, - addr: &SocketAddr, -) -> Result<(), TransportError> { - let (conn, stats) = get_connection(addr); - let client_stats = ClientStats::default(); - let r = match conn { - Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction, &client_stats), - Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction, &client_stats), - }; - stats.add_client_stats(&client_stats, 1, r.is_ok()); - r -} - -pub fn par_serialize_and_send_transaction_batch( - transactions: &[VersionedTransaction], - addr: &SocketAddr, -) -> Result<(), TransportError> { - let (conn, stats) = get_connection(addr); - let client_stats = ClientStats::default(); - let r = match conn { - Connection::Udp(conn) => { - conn.par_serialize_and_send_transaction_batch(transactions, &client_stats) - } - Connection::Quic(conn) => { - conn.par_serialize_and_send_transaction_batch(transactions, &client_stats) - } - }; - stats.add_client_stats(&client_stats, transactions.len(), r.is_ok()); - r + connection } #[cfg(test)] mod tests { use { crate::{ - connection_cache::{get_connection, Connection, CONNECTION_MAP, MAX_CONNECTIONS}, + connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS}, tpu_connection::TpuConnection, }, rand::{Rng, SeedableRng}, rand_chacha::ChaChaRng, - std::net::{IpAddr, SocketAddr}, + std::net::SocketAddr, }; fn get_addr(rng: &mut ChaChaRng) -> SocketAddr { @@ -505,13 +409,6 @@ mod tests { addr_str.parse().expect("Invalid address") } - fn ip(conn: Connection) -> IpAddr { - match conn { - Connection::Udp(conn) => conn.tpu_addr().ip(), - Connection::Quic(conn) => conn.tpu_addr().ip(), - } - } - #[test] fn test_connection_cache() { solana_logger::setup(); @@ -540,7 +437,7 @@ mod tests { assert!(map.map.len() == MAX_CONNECTIONS); addrs.iter().for_each(|a| { let conn = map.map.get(a).expect("Address not found"); - assert!(a.ip() == ip(conn.clone())); + assert!(a.ip() == conn.tpu_addr().ip()); }); } diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index 63a932e0b..7117e800d 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -161,10 +161,8 @@ impl QuicTpuConnection { pub fn base_stats(&self) -> Arc { self.client.stats.clone() } -} -impl TpuConnection for QuicTpuConnection { - fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self { + pub fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self { let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET); let client = Arc::new(QuicClient::new(tpu_addr)); @@ -173,76 +171,65 @@ impl TpuConnection for QuicTpuConnection { connection_stats, } } +} +impl TpuConnection for QuicTpuConnection { fn tpu_addr(&self) -> &SocketAddr { &self.client.addr } - fn send_wire_transaction( - &self, - wire_transaction: T, - stats: &ClientStats, - ) -> TransportResult<()> - where - T: AsRef<[u8]>, - { - let _guard = RUNTIME.enter(); - let send_buffer = - self.client - .send_buffer(wire_transaction, stats, self.connection_stats.clone()); - RUNTIME.block_on(send_buffer)?; - Ok(()) - } - - fn send_wire_transaction_batch( - &self, - buffers: &[T], - stats: &ClientStats, - ) -> TransportResult<()> + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]>, { + let stats = ClientStats::default(); + let len = buffers.len(); let _guard = RUNTIME.enter(); let send_batch = self .client - .send_batch(buffers, stats, self.connection_stats.clone()); - RUNTIME.block_on(send_batch)?; + .send_batch(buffers, &stats, self.connection_stats.clone()); + let res = RUNTIME.block_on(send_batch); + self.connection_stats + .add_client_stats(&stats, len, res.is_ok()); + res?; Ok(()) } - fn send_wire_transaction_async( - &self, - wire_transaction: Vec, - stats: Arc, - ) -> TransportResult<()> { + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { + let stats = Arc::new(ClientStats::default()); let _guard = RUNTIME.enter(); let client = self.client.clone(); let connection_stats = self.connection_stats.clone(); //drop and detach the task let _ = RUNTIME.spawn(async move { - let send_buffer = client.send_buffer(wire_transaction, &stats, connection_stats); + let send_buffer = + client.send_buffer(wire_transaction, &stats, connection_stats.clone()); if let Err(e) = send_buffer.await { warn!("Failed to send transaction async to {:?}", e); datapoint_warn!("send-wire-async", ("failure", 1, i64),); + connection_stats.add_client_stats(&stats, 1, false); + } else { + connection_stats.add_client_stats(&stats, 1, true); } }); Ok(()) } - fn send_wire_transaction_batch_async( - &self, - buffers: Vec>, - stats: Arc, - ) -> TransportResult<()> { + fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { + let stats = Arc::new(ClientStats::default()); let _guard = RUNTIME.enter(); let client = self.client.clone(); let connection_stats = self.connection_stats.clone(); + let len = buffers.len(); //drop and detach the task let _ = RUNTIME.spawn(async move { - let send_batch = client.send_batch(&buffers, &stats, connection_stats); + let send_batch = client.send_batch(&buffers, &stats, connection_stats.clone()); if let Err(e) = send_batch.await { warn!("Failed to send transaction batch async to {:?}", e); datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),); + connection_stats.add_client_stats(&stats, len, false); + } else { + connection_stats.add_client_stats(&stats, len, true); } }); Ok(()) diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index bb87945b1..089821ae9 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -5,13 +5,9 @@ use { crate::{ - connection_cache::{ - par_serialize_and_send_transaction_batch, send_wire_transaction, - serialize_and_send_transaction, - }, - rpc_client::RpcClient, - rpc_config::RpcProgramAccountsConfig, - rpc_response::Response, + connection_cache::get_connection, rpc_client::RpcClient, + rpc_config::RpcProgramAccountsConfig, rpc_response::Response, + tpu_connection::TpuConnection, }, log::*, solana_sdk::{ @@ -212,8 +208,9 @@ impl ThinClient { bincode::serialize(&transaction).expect("transaction serialization failed"); while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { + let conn = get_connection(self.tpu_addr()); // Send the transaction if there has been no confirmation (e.g. the first time) - send_wire_transaction(&wire_transaction, self.tpu_addr())?; + conn.send_wire_transaction(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -599,7 +596,8 @@ impl AsyncClient for ThinClient { &self, transaction: VersionedTransaction, ) -> TransportResult { - serialize_and_send_transaction(&transaction, self.tpu_addr())?; + let conn = get_connection(self.tpu_addr()); + conn.serialize_and_send_transaction(&transaction)?; Ok(transaction.signatures[0]) } @@ -607,7 +605,8 @@ impl AsyncClient for ThinClient { &self, batch: Vec, ) -> TransportResult<()> { - par_serialize_and_send_transaction_batch(&batch[..], self.tpu_addr())?; + let conn = get_connection(self.tpu_addr()); + conn.par_serialize_and_send_transaction_batch(&batch[..])?; Ok(()) } } diff --git a/client/src/tpu_client.rs b/client/src/tpu_client.rs index 64f03f2b3..85e03cadf 100644 --- a/client/src/tpu_client.rs +++ b/client/src/tpu_client.rs @@ -1,12 +1,13 @@ use { crate::{ client_error::{ClientError, Result as ClientResult}, - connection_cache::send_wire_transaction_async, + connection_cache::get_connection, pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription}, rpc_client::RpcClient, rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS, rpc_response::{RpcContactInfo, SlotUpdate}, spinner, + tpu_connection::TpuConnection, }, bincode::serialize, log::*, @@ -119,7 +120,8 @@ impl TpuClient { .leader_tpu_service .leader_tpu_sockets(self.fanout_slots) { - let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address); + let conn = get_connection(&tpu_address); + let result = conn.send_wire_transaction_async(wire_transaction.clone()); if let Err(err) = result { last_error = Some(err); } else { diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index 8c8faf726..4942cd93f 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,12 +1,12 @@ use { - crate::connection_cache::ConnectionCacheStats, + crate::{ + connection_cache::Connection, quic_client::QuicTpuConnection, udp_client::UdpTpuConnection, + }, + enum_dispatch::enum_dispatch, rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_metrics::MovingStat, solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, - std::{ - net::SocketAddr, - sync::{atomic::AtomicU64, Arc}, - }, + std::{net::SocketAddr, sync::atomic::AtomicU64}, }; #[derive(Default)] @@ -25,59 +25,43 @@ pub struct ClientStats { pub make_connection_ms: AtomicU64, } +#[enum_dispatch] pub trait TpuConnection { - fn new(tpu_addr: SocketAddr, connection_stats: Arc) -> Self; - fn tpu_addr(&self) -> &SocketAddr; fn serialize_and_send_transaction( &self, transaction: &VersionedTransaction, - stats: &ClientStats, ) -> TransportResult<()> { let wire_transaction = bincode::serialize(transaction).expect("serialize Transaction in send_batch"); - self.send_wire_transaction(&wire_transaction, stats) + self.send_wire_transaction(&wire_transaction) } - fn send_wire_transaction( - &self, - wire_transaction: T, - stats: &ClientStats, - ) -> TransportResult<()> + fn send_wire_transaction(&self, wire_transaction: T) -> TransportResult<()> where - T: AsRef<[u8]>; + T: AsRef<[u8]>, + { + self.send_wire_transaction_batch(&[wire_transaction]) + } - fn send_wire_transaction_async( - &self, - wire_transaction: Vec, - stats: Arc, - ) -> TransportResult<()>; + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()>; fn par_serialize_and_send_transaction_batch( &self, transactions: &[VersionedTransaction], - stats: &ClientStats, ) -> TransportResult<()> { let buffers = transactions .into_par_iter() .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) .collect::>(); - self.send_wire_transaction_batch(&buffers, stats) + self.send_wire_transaction_batch(&buffers) } - fn send_wire_transaction_batch( - &self, - buffers: &[T], - stats: &ClientStats, - ) -> TransportResult<()> + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]>; - fn send_wire_transaction_batch_async( - &self, - buffers: Vec>, - stats: Arc, - ) -> TransportResult<()>; + fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()>; } diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index c54ccdcf5..10873d7e2 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -2,10 +2,7 @@ //! an interface for sending transactions use { - crate::{ - connection_cache::ConnectionCacheStats, - tpu_connection::{ClientStats, TpuConnection}, - }, + crate::{connection_cache::ConnectionCacheStats, tpu_connection::TpuConnection}, core::iter::repeat, solana_net_utils::VALIDATOR_PORT_RANGE, solana_sdk::transport::Result as TransportResult, @@ -21,8 +18,8 @@ pub struct UdpTpuConnection { addr: SocketAddr, } -impl TpuConnection for UdpTpuConnection { - fn new(tpu_addr: SocketAddr, _connection_stats: Arc) -> Self { +impl UdpTpuConnection { + pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc) -> Self { let (_, client_socket) = solana_net_utils::bind_in_range( IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), VALIDATOR_PORT_RANGE, @@ -34,37 +31,19 @@ impl TpuConnection for UdpTpuConnection { addr: tpu_addr, } } +} +impl TpuConnection for UdpTpuConnection { fn tpu_addr(&self) -> &SocketAddr { &self.addr } - fn send_wire_transaction( - &self, - wire_transaction: T, - _stats: &ClientStats, - ) -> TransportResult<()> - where - T: AsRef<[u8]>, - { + fn send_wire_transaction_async(&self, wire_transaction: Vec) -> TransportResult<()> { self.socket.send_to(wire_transaction.as_ref(), self.addr)?; Ok(()) } - fn send_wire_transaction_async( - &self, - wire_transaction: Vec, - _stats: Arc, - ) -> TransportResult<()> { - self.socket.send_to(wire_transaction.as_ref(), self.addr)?; - Ok(()) - } - - fn send_wire_transaction_batch( - &self, - buffers: &[T], - _stats: &ClientStats, - ) -> TransportResult<()> + fn send_wire_transaction_batch(&self, buffers: &[T]) -> TransportResult<()> where T: AsRef<[u8]>, { @@ -72,11 +51,7 @@ impl TpuConnection for UdpTpuConnection { batch_send(&self.socket, &pkts)?; Ok(()) } - fn send_wire_transaction_batch_async( - &self, - buffers: Vec>, - _stats: Arc, - ) -> TransportResult<()> { + fn send_wire_transaction_batch_async(&self, buffers: Vec>) -> TransportResult<()> { let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect(); batch_send(&self.socket, &pkts)?; Ok(()) diff --git a/client/tests/quic_client.rs b/client/tests/quic_client.rs index 9c661e17b..abf4ddbb4 100644 --- a/client/tests/quic_client.rs +++ b/client/tests/quic_client.rs @@ -3,9 +3,8 @@ mod tests { use { crossbeam_channel::unbounded, solana_client::{ - connection_cache::ConnectionCacheStats, - quic_client::QuicTpuConnection, - tpu_connection::{ClientStats, TpuConnection}, + connection_cache::ConnectionCacheStats, quic_client::QuicTpuConnection, + tpu_connection::TpuConnection, }, solana_sdk::{packet::PACKET_DATA_SIZE, quic::QUIC_PORT_OFFSET, signature::Keypair}, solana_streamer::quic::spawn_server, @@ -53,11 +52,7 @@ mod tests { let num_expected_packets: usize = 4000; let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets]; - let stats = Arc::new(ClientStats::default()); - - assert!(client - .send_wire_transaction_batch_async(packets, stats) - .is_ok()); + assert!(client.send_wire_transaction_batch_async(packets).is_ok()); let mut all_packets = vec![]; let now = Instant::now(); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0f0096312..ef00ebf29 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -17,7 +17,7 @@ use { histogram::Histogram, itertools::Itertools, min_max_heap::MinMaxHeap, - solana_client::connection_cache::send_wire_transaction_batch_async, + solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection}, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::blockstore_processor::TransactionStatusSender, @@ -525,7 +525,8 @@ impl BankingStage { let mut measure = Measure::start("banking_stage-forward-us"); - let res = send_wire_transaction_batch_async(packet_vec, tpu_forwards); + let conn = get_connection(tpu_forwards); + let res = conn.send_wire_transaction_batch_async(packet_vec); measure.stop(); inc_new_counter_info!( diff --git a/core/src/warm_quic_cache_service.rs b/core/src/warm_quic_cache_service.rs index 04014eddf..00be5be7e 100644 --- a/core/src/warm_quic_cache_service.rs +++ b/core/src/warm_quic_cache_service.rs @@ -3,7 +3,7 @@ use { rand::{thread_rng, Rng}, - solana_client::connection_cache::send_wire_transaction, + solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, solana_poh::poh_recorder::PohRecorder, std::{ @@ -48,7 +48,8 @@ impl WarmQuicCacheService { if let Some(addr) = cluster_info .lookup_contact_info(&leader_pubkey, |leader| leader.tpu) { - if let Err(err) = send_wire_transaction(&[0u8], &addr) { + let conn = get_connection(&addr); + if let Err(err) = conn.send_wire_transaction(&[0u8]) { warn!( "Failed to warmup QUIC connection to the leader {:?}, Error {:?}", leader_pubkey, err diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 05dfc061b..6635da80d 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -1083,6 +1083,18 @@ dependencies = [ "syn 1.0.93", ] +[[package]] +name = "enum_dispatch" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eb359f1476bf611266ac1f5355bc14aeca37b299d0ebccc038ee7058891c9cb" +dependencies = [ + "once_cell", + "proc-macro2 1.0.38", + "quote 1.0.18", + "syn 1.0.93", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -4317,6 +4329,7 @@ dependencies = [ "bytes", "clap 2.33.3", "crossbeam-channel", + "enum_dispatch", "futures 0.3.21", "futures-util", "indexmap", diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 270ca9dc4..8d3257fad 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -2,7 +2,7 @@ use { crate::tpu_info::TpuInfo, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, - solana_client::connection_cache, + solana_client::{connection_cache, tpu_connection::TpuConnection}, solana_measure::measure::Measure, solana_metrics::datapoint_warn, solana_runtime::{bank::Bank, bank_forks::BankForks}, @@ -693,7 +693,8 @@ impl SendTransactionService { tpu_address: &SocketAddr, wire_transaction: &[u8], ) -> Result<(), TransportError> { - connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address) + let conn = connection_cache::get_connection(tpu_address); + conn.send_wire_transaction_async(wire_transaction.to_vec()) } fn send_transactions_with_metrics( @@ -701,7 +702,8 @@ impl SendTransactionService { wire_transactions: &[&[u8]], ) -> Result<(), TransportError> { let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address) + let conn = connection_cache::get_connection(tpu_address); + conn.send_wire_transaction_batch_async(wire_transactions) } fn send_transactions(