Switch to using enum-dispatch to switch between UDP and Quic (#24713)
This commit is contained in:
parent
ae9513f00a
commit
1ca5c3a7bd
|
@ -1264,6 +1264,18 @@ dependencies = [
|
|||
"syn 1.0.93",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum_dispatch"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0eb359f1476bf611266ac1f5355bc14aeca37b299d0ebccc038ee7058891c9cb"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"proc-macro2 1.0.38",
|
||||
"quote 1.0.18",
|
||||
"syn 1.0.93",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.9.0"
|
||||
|
@ -4638,6 +4650,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"clap 2.33.3",
|
||||
"crossbeam-channel",
|
||||
"enum_dispatch",
|
||||
"futures 0.3.21",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
|
|
|
@ -18,6 +18,7 @@ bs58 = "0.4.0"
|
|||
bytes = "1.1.0"
|
||||
clap = "2.33.0"
|
||||
crossbeam-channel = "0.5"
|
||||
enum_dispatch = "0.3.8"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3.21"
|
||||
indexmap = "1.8.1"
|
||||
|
|
|
@ -1,16 +1,13 @@
|
|||
use {
|
||||
crate::{
|
||||
quic_client::QuicTpuConnection,
|
||||
tpu_connection::{ClientStats, TpuConnection},
|
||||
udp_client::UdpTpuConnection,
|
||||
quic_client::QuicTpuConnection, tpu_connection::ClientStats, udp_client::UdpTpuConnection,
|
||||
},
|
||||
enum_dispatch::enum_dispatch,
|
||||
indexmap::map::IndexMap,
|
||||
lazy_static::lazy_static,
|
||||
rand::{thread_rng, Rng},
|
||||
solana_measure::measure::Measure,
|
||||
solana_sdk::{
|
||||
timing::AtomicInterval, transaction::VersionedTransaction, transport::TransportError,
|
||||
},
|
||||
solana_sdk::timing::AtomicInterval,
|
||||
std::{
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
|
@ -23,10 +20,10 @@ use {
|
|||
// Should be non-zero
|
||||
static MAX_CONNECTIONS: usize = 1024;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[enum_dispatch(TpuConnection)]
|
||||
pub enum Connection {
|
||||
Udp(Arc<UdpTpuConnection>),
|
||||
Quic(Arc<QuicTpuConnection>),
|
||||
UdpTpuConnection,
|
||||
QuicTpuConnection,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -52,7 +49,12 @@ pub struct ConnectionCacheStats {
|
|||
const CONNECTION_STAT_SUBMISSION_INTERVAL: u64 = 2000;
|
||||
|
||||
impl ConnectionCacheStats {
|
||||
fn add_client_stats(&self, client_stats: &ClientStats, num_packets: usize, is_success: bool) {
|
||||
pub fn add_client_stats(
|
||||
&self,
|
||||
client_stats: &ClientStats,
|
||||
num_packets: usize,
|
||||
is_success: bool,
|
||||
) {
|
||||
self.total_client_stats.total_connections.fetch_add(
|
||||
client_stats.total_connections.load(Ordering::Relaxed),
|
||||
Ordering::Relaxed,
|
||||
|
@ -214,7 +216,7 @@ impl ConnectionCacheStats {
|
|||
}
|
||||
|
||||
struct ConnectionMap {
|
||||
map: IndexMap<SocketAddr, Connection>,
|
||||
map: IndexMap<SocketAddr, Arc<Connection>>,
|
||||
stats: Arc<ConnectionCacheStats>,
|
||||
last_stats: AtomicInterval,
|
||||
use_quic: bool,
|
||||
|
@ -245,7 +247,7 @@ pub fn set_use_quic(use_quic: bool) {
|
|||
}
|
||||
|
||||
struct GetConnectionResult {
|
||||
connection: Connection,
|
||||
connection: Arc<Connection>,
|
||||
cache_hit: bool,
|
||||
report_stats: bool,
|
||||
map_timing_ms: u64,
|
||||
|
@ -286,18 +288,14 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
|
|||
match map.map.get(addr) {
|
||||
Some(connection) => (connection.clone(), true, map.stats.clone(), 0, 0),
|
||||
None => {
|
||||
let connection = if map.use_quic {
|
||||
Connection::Quic(Arc::new(QuicTpuConnection::new(
|
||||
*addr,
|
||||
map.stats.clone(),
|
||||
)))
|
||||
let connection: Connection = if map.use_quic {
|
||||
QuicTpuConnection::new(*addr, map.stats.clone()).into()
|
||||
} else {
|
||||
Connection::Udp(Arc::new(UdpTpuConnection::new(
|
||||
*addr,
|
||||
map.stats.clone(),
|
||||
)))
|
||||
UdpTpuConnection::new(*addr, map.stats.clone()).into()
|
||||
};
|
||||
|
||||
let connection = Arc::new(connection);
|
||||
|
||||
// evict a connection if the cache is reaching upper bounds
|
||||
let mut num_evictions = 0;
|
||||
let mut get_connection_cache_eviction_measure =
|
||||
|
@ -338,7 +336,7 @@ fn get_or_add_connection(addr: &SocketAddr) -> GetConnectionResult {
|
|||
|
||||
// TODO: see https://github.com/solana-labs/solana/issues/23661
|
||||
// remove lazy_static and optimize and refactor this
|
||||
fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>) {
|
||||
pub fn get_connection(addr: &SocketAddr) -> Arc<Connection> {
|
||||
let mut get_connection_measure = Measure::start("get_connection_measure");
|
||||
let GetConnectionResult {
|
||||
connection,
|
||||
|
@ -384,114 +382,20 @@ fn get_connection(addr: &SocketAddr) -> (Connection, Arc<ConnectionCacheStats>)
|
|||
connection_cache_stats
|
||||
.get_connection_ms
|
||||
.fetch_add(get_connection_measure.as_ms(), Ordering::Relaxed);
|
||||
(connection, connection_cache_stats)
|
||||
}
|
||||
|
||||
// TODO: see https://github.com/solana-labs/solana/issues/23851
|
||||
// use enum_dispatch and get rid of this tedious code.
|
||||
// The main blocker to using enum_dispatch right now is that
|
||||
// the it doesn't work with static methods like TpuConnection::new
|
||||
// which is used by thin_client. This will be eliminated soon
|
||||
// once thin_client is moved to using this connection cache.
|
||||
// Once that is done, we will migrate to using enum_dispatch
|
||||
// This will be done in a followup to
|
||||
// https://github.com/solana-labs/solana/pull/23817
|
||||
pub fn send_wire_transaction_batch(
|
||||
packets: &[&[u8]],
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
let (conn, stats) = get_connection(addr);
|
||||
let client_stats = ClientStats::default();
|
||||
let r = match conn {
|
||||
Connection::Udp(conn) => conn.send_wire_transaction_batch(packets, &client_stats),
|
||||
Connection::Quic(conn) => conn.send_wire_transaction_batch(packets, &client_stats),
|
||||
};
|
||||
stats.add_client_stats(&client_stats, packets.len(), r.is_ok());
|
||||
r
|
||||
}
|
||||
|
||||
pub fn send_wire_transaction_async(
|
||||
packets: Vec<u8>,
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
let (conn, stats) = get_connection(addr);
|
||||
let client_stats = Arc::new(ClientStats::default());
|
||||
let r = match conn {
|
||||
Connection::Udp(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()),
|
||||
Connection::Quic(conn) => conn.send_wire_transaction_async(packets, client_stats.clone()),
|
||||
};
|
||||
stats.add_client_stats(&client_stats, 1, r.is_ok());
|
||||
r
|
||||
}
|
||||
|
||||
pub fn send_wire_transaction_batch_async(
|
||||
packets: Vec<Vec<u8>>,
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
let (conn, stats) = get_connection(addr);
|
||||
let client_stats = Arc::new(ClientStats::default());
|
||||
let len = packets.len();
|
||||
let r = match conn {
|
||||
Connection::Udp(conn) => {
|
||||
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
|
||||
}
|
||||
Connection::Quic(conn) => {
|
||||
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
|
||||
}
|
||||
};
|
||||
stats.add_client_stats(&client_stats, len, r.is_ok());
|
||||
r
|
||||
}
|
||||
|
||||
pub fn send_wire_transaction(
|
||||
wire_transaction: &[u8],
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
send_wire_transaction_batch(&[wire_transaction], addr)
|
||||
}
|
||||
|
||||
pub fn serialize_and_send_transaction(
|
||||
transaction: &VersionedTransaction,
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
let (conn, stats) = get_connection(addr);
|
||||
let client_stats = ClientStats::default();
|
||||
let r = match conn {
|
||||
Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction, &client_stats),
|
||||
Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction, &client_stats),
|
||||
};
|
||||
stats.add_client_stats(&client_stats, 1, r.is_ok());
|
||||
r
|
||||
}
|
||||
|
||||
pub fn par_serialize_and_send_transaction_batch(
|
||||
transactions: &[VersionedTransaction],
|
||||
addr: &SocketAddr,
|
||||
) -> Result<(), TransportError> {
|
||||
let (conn, stats) = get_connection(addr);
|
||||
let client_stats = ClientStats::default();
|
||||
let r = match conn {
|
||||
Connection::Udp(conn) => {
|
||||
conn.par_serialize_and_send_transaction_batch(transactions, &client_stats)
|
||||
}
|
||||
Connection::Quic(conn) => {
|
||||
conn.par_serialize_and_send_transaction_batch(transactions, &client_stats)
|
||||
}
|
||||
};
|
||||
stats.add_client_stats(&client_stats, transactions.len(), r.is_ok());
|
||||
r
|
||||
connection
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::{get_connection, Connection, CONNECTION_MAP, MAX_CONNECTIONS},
|
||||
connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS},
|
||||
tpu_connection::TpuConnection,
|
||||
},
|
||||
rand::{Rng, SeedableRng},
|
||||
rand_chacha::ChaChaRng,
|
||||
std::net::{IpAddr, SocketAddr},
|
||||
std::net::SocketAddr,
|
||||
};
|
||||
|
||||
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
|
||||
|
@ -505,13 +409,6 @@ mod tests {
|
|||
addr_str.parse().expect("Invalid address")
|
||||
}
|
||||
|
||||
fn ip(conn: Connection) -> IpAddr {
|
||||
match conn {
|
||||
Connection::Udp(conn) => conn.tpu_addr().ip(),
|
||||
Connection::Quic(conn) => conn.tpu_addr().ip(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_connection_cache() {
|
||||
solana_logger::setup();
|
||||
|
@ -540,7 +437,7 @@ mod tests {
|
|||
assert!(map.map.len() == MAX_CONNECTIONS);
|
||||
addrs.iter().for_each(|a| {
|
||||
let conn = map.map.get(a).expect("Address not found");
|
||||
assert!(a.ip() == ip(conn.clone()));
|
||||
assert!(a.ip() == conn.tpu_addr().ip());
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -161,10 +161,8 @@ impl QuicTpuConnection {
|
|||
pub fn base_stats(&self) -> Arc<ClientStats> {
|
||||
self.client.stats.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl TpuConnection for QuicTpuConnection {
|
||||
fn new(tpu_addr: SocketAddr, connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
pub fn new(tpu_addr: SocketAddr, connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET);
|
||||
let client = Arc::new(QuicClient::new(tpu_addr));
|
||||
|
||||
|
@ -173,76 +171,65 @@ impl TpuConnection for QuicTpuConnection {
|
|||
connection_stats,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TpuConnection for QuicTpuConnection {
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.client.addr
|
||||
}
|
||||
|
||||
fn send_wire_transaction<T>(
|
||||
&self,
|
||||
wire_transaction: T,
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
let _guard = RUNTIME.enter();
|
||||
let send_buffer =
|
||||
self.client
|
||||
.send_buffer(wire_transaction, stats, self.connection_stats.clone());
|
||||
RUNTIME.block_on(send_buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_wire_transaction_batch<T>(
|
||||
&self,
|
||||
buffers: &[T],
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
let stats = ClientStats::default();
|
||||
let len = buffers.len();
|
||||
let _guard = RUNTIME.enter();
|
||||
let send_batch = self
|
||||
.client
|
||||
.send_batch(buffers, stats, self.connection_stats.clone());
|
||||
RUNTIME.block_on(send_batch)?;
|
||||
.send_batch(buffers, &stats, self.connection_stats.clone());
|
||||
let res = RUNTIME.block_on(send_batch);
|
||||
self.connection_stats
|
||||
.add_client_stats(&stats, len, res.is_ok());
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_wire_transaction_async(
|
||||
&self,
|
||||
wire_transaction: Vec<u8>,
|
||||
stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()> {
|
||||
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||
let stats = Arc::new(ClientStats::default());
|
||||
let _guard = RUNTIME.enter();
|
||||
let client = self.client.clone();
|
||||
let connection_stats = self.connection_stats.clone();
|
||||
//drop and detach the task
|
||||
let _ = RUNTIME.spawn(async move {
|
||||
let send_buffer = client.send_buffer(wire_transaction, &stats, connection_stats);
|
||||
let send_buffer =
|
||||
client.send_buffer(wire_transaction, &stats, connection_stats.clone());
|
||||
if let Err(e) = send_buffer.await {
|
||||
warn!("Failed to send transaction async to {:?}", e);
|
||||
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
|
||||
connection_stats.add_client_stats(&stats, 1, false);
|
||||
} else {
|
||||
connection_stats.add_client_stats(&stats, 1, true);
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_wire_transaction_batch_async(
|
||||
&self,
|
||||
buffers: Vec<Vec<u8>>,
|
||||
stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()> {
|
||||
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
|
||||
let stats = Arc::new(ClientStats::default());
|
||||
let _guard = RUNTIME.enter();
|
||||
let client = self.client.clone();
|
||||
let connection_stats = self.connection_stats.clone();
|
||||
let len = buffers.len();
|
||||
//drop and detach the task
|
||||
let _ = RUNTIME.spawn(async move {
|
||||
let send_batch = client.send_batch(&buffers, &stats, connection_stats);
|
||||
let send_batch = client.send_batch(&buffers, &stats, connection_stats.clone());
|
||||
if let Err(e) = send_batch.await {
|
||||
warn!("Failed to send transaction batch async to {:?}", e);
|
||||
datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),);
|
||||
connection_stats.add_client_stats(&stats, len, false);
|
||||
} else {
|
||||
connection_stats.add_client_stats(&stats, len, true);
|
||||
}
|
||||
});
|
||||
Ok(())
|
||||
|
|
|
@ -5,13 +5,9 @@
|
|||
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::{
|
||||
par_serialize_and_send_transaction_batch, send_wire_transaction,
|
||||
serialize_and_send_transaction,
|
||||
},
|
||||
rpc_client::RpcClient,
|
||||
rpc_config::RpcProgramAccountsConfig,
|
||||
rpc_response::Response,
|
||||
connection_cache::get_connection, rpc_client::RpcClient,
|
||||
rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
|
||||
tpu_connection::TpuConnection,
|
||||
},
|
||||
log::*,
|
||||
solana_sdk::{
|
||||
|
@ -212,8 +208,9 @@ impl ThinClient {
|
|||
bincode::serialize(&transaction).expect("transaction serialization failed");
|
||||
while now.elapsed().as_secs() < wait_time as u64 {
|
||||
if num_confirmed == 0 {
|
||||
let conn = get_connection(self.tpu_addr());
|
||||
// Send the transaction if there has been no confirmation (e.g. the first time)
|
||||
send_wire_transaction(&wire_transaction, self.tpu_addr())?;
|
||||
conn.send_wire_transaction(&wire_transaction)?;
|
||||
}
|
||||
|
||||
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
|
||||
|
@ -599,7 +596,8 @@ impl AsyncClient for ThinClient {
|
|||
&self,
|
||||
transaction: VersionedTransaction,
|
||||
) -> TransportResult<Signature> {
|
||||
serialize_and_send_transaction(&transaction, self.tpu_addr())?;
|
||||
let conn = get_connection(self.tpu_addr());
|
||||
conn.serialize_and_send_transaction(&transaction)?;
|
||||
Ok(transaction.signatures[0])
|
||||
}
|
||||
|
||||
|
@ -607,7 +605,8 @@ impl AsyncClient for ThinClient {
|
|||
&self,
|
||||
batch: Vec<VersionedTransaction>,
|
||||
) -> TransportResult<()> {
|
||||
par_serialize_and_send_transaction_batch(&batch[..], self.tpu_addr())?;
|
||||
let conn = get_connection(self.tpu_addr());
|
||||
conn.par_serialize_and_send_transaction_batch(&batch[..])?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
use {
|
||||
crate::{
|
||||
client_error::{ClientError, Result as ClientResult},
|
||||
connection_cache::send_wire_transaction_async,
|
||||
connection_cache::get_connection,
|
||||
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
|
||||
rpc_client::RpcClient,
|
||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||
rpc_response::{RpcContactInfo, SlotUpdate},
|
||||
spinner,
|
||||
tpu_connection::TpuConnection,
|
||||
},
|
||||
bincode::serialize,
|
||||
log::*,
|
||||
|
@ -119,7 +120,8 @@ impl TpuClient {
|
|||
.leader_tpu_service
|
||||
.leader_tpu_sockets(self.fanout_slots)
|
||||
{
|
||||
let result = send_wire_transaction_async(wire_transaction.clone(), &tpu_address);
|
||||
let conn = get_connection(&tpu_address);
|
||||
let result = conn.send_wire_transaction_async(wire_transaction.clone());
|
||||
if let Err(err) = result {
|
||||
last_error = Some(err);
|
||||
} else {
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
use {
|
||||
crate::connection_cache::ConnectionCacheStats,
|
||||
crate::{
|
||||
connection_cache::Connection, quic_client::QuicTpuConnection, udp_client::UdpTpuConnection,
|
||||
},
|
||||
enum_dispatch::enum_dispatch,
|
||||
rayon::iter::{IntoParallelIterator, ParallelIterator},
|
||||
solana_metrics::MovingStat,
|
||||
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
|
||||
std::{
|
||||
net::SocketAddr,
|
||||
sync::{atomic::AtomicU64, Arc},
|
||||
},
|
||||
std::{net::SocketAddr, sync::atomic::AtomicU64},
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -25,59 +25,43 @@ pub struct ClientStats {
|
|||
pub make_connection_ms: AtomicU64,
|
||||
}
|
||||
|
||||
#[enum_dispatch]
|
||||
pub trait TpuConnection {
|
||||
fn new(tpu_addr: SocketAddr, connection_stats: Arc<ConnectionCacheStats>) -> Self;
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr;
|
||||
|
||||
fn serialize_and_send_transaction(
|
||||
&self,
|
||||
transaction: &VersionedTransaction,
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()> {
|
||||
let wire_transaction =
|
||||
bincode::serialize(transaction).expect("serialize Transaction in send_batch");
|
||||
self.send_wire_transaction(&wire_transaction, stats)
|
||||
self.send_wire_transaction(&wire_transaction)
|
||||
}
|
||||
|
||||
fn send_wire_transaction<T>(
|
||||
&self,
|
||||
wire_transaction: T,
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>;
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
self.send_wire_transaction_batch(&[wire_transaction])
|
||||
}
|
||||
|
||||
fn send_wire_transaction_async(
|
||||
&self,
|
||||
wire_transaction: Vec<u8>,
|
||||
stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()>;
|
||||
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()>;
|
||||
|
||||
fn par_serialize_and_send_transaction_batch(
|
||||
&self,
|
||||
transactions: &[VersionedTransaction],
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()> {
|
||||
let buffers = transactions
|
||||
.into_par_iter()
|
||||
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.send_wire_transaction_batch(&buffers, stats)
|
||||
self.send_wire_transaction_batch(&buffers)
|
||||
}
|
||||
|
||||
fn send_wire_transaction_batch<T>(
|
||||
&self,
|
||||
buffers: &[T],
|
||||
stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>;
|
||||
|
||||
fn send_wire_transaction_batch_async(
|
||||
&self,
|
||||
buffers: Vec<Vec<u8>>,
|
||||
stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()>;
|
||||
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()>;
|
||||
}
|
||||
|
|
|
@ -2,10 +2,7 @@
|
|||
//! an interface for sending transactions
|
||||
|
||||
use {
|
||||
crate::{
|
||||
connection_cache::ConnectionCacheStats,
|
||||
tpu_connection::{ClientStats, TpuConnection},
|
||||
},
|
||||
crate::{connection_cache::ConnectionCacheStats, tpu_connection::TpuConnection},
|
||||
core::iter::repeat,
|
||||
solana_net_utils::VALIDATOR_PORT_RANGE,
|
||||
solana_sdk::transport::Result as TransportResult,
|
||||
|
@ -21,8 +18,8 @@ pub struct UdpTpuConnection {
|
|||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl TpuConnection for UdpTpuConnection {
|
||||
fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
impl UdpTpuConnection {
|
||||
pub fn new(tpu_addr: SocketAddr, _connection_stats: Arc<ConnectionCacheStats>) -> Self {
|
||||
let (_, client_socket) = solana_net_utils::bind_in_range(
|
||||
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
||||
VALIDATOR_PORT_RANGE,
|
||||
|
@ -34,37 +31,19 @@ impl TpuConnection for UdpTpuConnection {
|
|||
addr: tpu_addr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TpuConnection for UdpTpuConnection {
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
|
||||
fn send_wire_transaction<T>(
|
||||
&self,
|
||||
wire_transaction: T,
|
||||
_stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
fn send_wire_transaction_async(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_wire_transaction_async(
|
||||
&self,
|
||||
wire_transaction: Vec<u8>,
|
||||
_stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()> {
|
||||
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_wire_transaction_batch<T>(
|
||||
&self,
|
||||
buffers: &[T],
|
||||
_stats: &ClientStats,
|
||||
) -> TransportResult<()>
|
||||
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
|
||||
where
|
||||
T: AsRef<[u8]>,
|
||||
{
|
||||
|
@ -72,11 +51,7 @@ impl TpuConnection for UdpTpuConnection {
|
|||
batch_send(&self.socket, &pkts)?;
|
||||
Ok(())
|
||||
}
|
||||
fn send_wire_transaction_batch_async(
|
||||
&self,
|
||||
buffers: Vec<Vec<u8>>,
|
||||
_stats: Arc<ClientStats>,
|
||||
) -> TransportResult<()> {
|
||||
fn send_wire_transaction_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
|
||||
let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect();
|
||||
batch_send(&self.socket, &pkts)?;
|
||||
Ok(())
|
||||
|
|
|
@ -3,9 +3,8 @@ mod tests {
|
|||
use {
|
||||
crossbeam_channel::unbounded,
|
||||
solana_client::{
|
||||
connection_cache::ConnectionCacheStats,
|
||||
quic_client::QuicTpuConnection,
|
||||
tpu_connection::{ClientStats, TpuConnection},
|
||||
connection_cache::ConnectionCacheStats, quic_client::QuicTpuConnection,
|
||||
tpu_connection::TpuConnection,
|
||||
},
|
||||
solana_sdk::{packet::PACKET_DATA_SIZE, quic::QUIC_PORT_OFFSET, signature::Keypair},
|
||||
solana_streamer::quic::spawn_server,
|
||||
|
@ -53,11 +52,7 @@ mod tests {
|
|||
let num_expected_packets: usize = 4000;
|
||||
let packets = vec![vec![0u8; PACKET_DATA_SIZE]; num_expected_packets];
|
||||
|
||||
let stats = Arc::new(ClientStats::default());
|
||||
|
||||
assert!(client
|
||||
.send_wire_transaction_batch_async(packets, stats)
|
||||
.is_ok());
|
||||
assert!(client.send_wire_transaction_batch_async(packets).is_ok());
|
||||
|
||||
let mut all_packets = vec![];
|
||||
let now = Instant::now();
|
||||
|
|
|
@ -17,7 +17,7 @@ use {
|
|||
histogram::Histogram,
|
||||
itertools::Itertools,
|
||||
min_max_heap::MinMaxHeap,
|
||||
solana_client::connection_cache::send_wire_transaction_batch_async,
|
||||
solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection},
|
||||
solana_entry::entry::hash_transactions,
|
||||
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
|
||||
solana_ledger::blockstore_processor::TransactionStatusSender,
|
||||
|
@ -525,7 +525,8 @@ impl BankingStage {
|
|||
|
||||
let mut measure = Measure::start("banking_stage-forward-us");
|
||||
|
||||
let res = send_wire_transaction_batch_async(packet_vec, tpu_forwards);
|
||||
let conn = get_connection(tpu_forwards);
|
||||
let res = conn.send_wire_transaction_batch_async(packet_vec);
|
||||
|
||||
measure.stop();
|
||||
inc_new_counter_info!(
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
|
||||
use {
|
||||
rand::{thread_rng, Rng},
|
||||
solana_client::connection_cache::send_wire_transaction,
|
||||
solana_client::{connection_cache::get_connection, tpu_connection::TpuConnection},
|
||||
solana_gossip::cluster_info::ClusterInfo,
|
||||
solana_poh::poh_recorder::PohRecorder,
|
||||
std::{
|
||||
|
@ -48,7 +48,8 @@ impl WarmQuicCacheService {
|
|||
if let Some(addr) = cluster_info
|
||||
.lookup_contact_info(&leader_pubkey, |leader| leader.tpu)
|
||||
{
|
||||
if let Err(err) = send_wire_transaction(&[0u8], &addr) {
|
||||
let conn = get_connection(&addr);
|
||||
if let Err(err) = conn.send_wire_transaction(&[0u8]) {
|
||||
warn!(
|
||||
"Failed to warmup QUIC connection to the leader {:?}, Error {:?}",
|
||||
leader_pubkey, err
|
||||
|
|
|
@ -1083,6 +1083,18 @@ dependencies = [
|
|||
"syn 1.0.93",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum_dispatch"
|
||||
version = "0.3.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0eb359f1476bf611266ac1f5355bc14aeca37b299d0ebccc038ee7058891c9cb"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"proc-macro2 1.0.38",
|
||||
"quote 1.0.18",
|
||||
"syn 1.0.93",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.9.0"
|
||||
|
@ -4317,6 +4329,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"clap 2.33.3",
|
||||
"crossbeam-channel",
|
||||
"enum_dispatch",
|
||||
"futures 0.3.21",
|
||||
"futures-util",
|
||||
"indexmap",
|
||||
|
|
|
@ -2,7 +2,7 @@ use {
|
|||
crate::tpu_info::TpuInfo,
|
||||
crossbeam_channel::{Receiver, RecvTimeoutError},
|
||||
log::*,
|
||||
solana_client::connection_cache,
|
||||
solana_client::{connection_cache, tpu_connection::TpuConnection},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::datapoint_warn,
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
|
@ -693,7 +693,8 @@ impl SendTransactionService {
|
|||
tpu_address: &SocketAddr,
|
||||
wire_transaction: &[u8],
|
||||
) -> Result<(), TransportError> {
|
||||
connection_cache::send_wire_transaction_async(wire_transaction.to_vec(), tpu_address)
|
||||
let conn = connection_cache::get_connection(tpu_address);
|
||||
conn.send_wire_transaction_async(wire_transaction.to_vec())
|
||||
}
|
||||
|
||||
fn send_transactions_with_metrics(
|
||||
|
@ -701,7 +702,8 @@ impl SendTransactionService {
|
|||
wire_transactions: &[&[u8]],
|
||||
) -> Result<(), TransportError> {
|
||||
let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect();
|
||||
connection_cache::send_wire_transaction_batch_async(wire_transactions, tpu_address)
|
||||
let conn = connection_cache::get_connection(tpu_address);
|
||||
conn.send_wire_transaction_batch_async(wire_transactions)
|
||||
}
|
||||
|
||||
fn send_transactions(
|
||||
|
|
Loading…
Reference in New Issue