Optimize TpuConnection and its implementations and refactor connection-cache to not use dyn in order to enable those changes (#23877)

This commit is contained in:
ryleung-solana 2022-03-24 11:40:26 -04:00 committed by GitHub
parent 5b916961b5
commit 82945ba973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 266 additions and 48 deletions

1
Cargo.lock generated
View File

@ -4638,6 +4638,7 @@ dependencies = [
"solana-measure",
"solana-net-utils",
"solana-sdk",
"solana-streamer",
"solana-transaction-status",
"solana-version",
"solana-vote-program",

View File

@ -41,6 +41,7 @@ solana-faucet = { path = "../faucet", version = "=1.11.0" }
solana-measure = { path = "../measure", version = "=1.11.0" }
solana-net-utils = { path = "../net-utils", version = "=1.11.0" }
solana-sdk = { path = "../sdk", version = "=1.11.0" }
solana-streamer = { path = "../streamer", version = "=1.11.0" }
solana-transaction-status = { path = "../transaction-status", version = "=1.11.0" }
solana-version = { path = "../version", version = "=1.11.0" }
solana-vote-program = { path = "../programs/vote", version = "=1.11.0" }

View File

@ -3,6 +3,7 @@ use {
quic_client::QuicTpuConnection, tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
},
lazy_static::lazy_static,
solana_sdk::{transaction::VersionedTransaction, transport::TransportError},
std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
net::{SocketAddr, UdpSocket},
@ -13,9 +14,15 @@ use {
// Should be non-zero
static MAX_CONNECTIONS: usize = 64;
#[derive(Clone)]
enum Connection {
Udp(Arc<UdpTpuConnection>),
Quic(Arc<QuicTpuConnection>),
}
struct ConnMap {
// Keeps track of the connection associated with an addr and the last time it was used
map: HashMap<SocketAddr, (Arc<dyn TpuConnection + 'static + Sync + Send>, u64)>,
map: HashMap<SocketAddr, (Connection, u64)>,
// Helps to find the least recently used connection. The search and inserts are O(log(n))
// but since we're bounding the size of the collections, this should be constant
// (and hopefully negligible) time. In theory, we can do this in constant time
@ -55,7 +62,7 @@ pub fn set_use_quic(use_quic: bool) {
#[allow(dead_code)]
// TODO: see https://github.com/solana-labs/solana/issues/23661
// remove lazy_static and optimize and refactor this
pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sync + Send> {
fn get_connection(addr: &SocketAddr) -> Connection {
let mut map = (*CONNECTION_MAP).lock().unwrap();
let ticks = map.ticks;
let use_quic = map.use_quic;
@ -71,10 +78,10 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
// TODO: see https://github.com/solana-labs/solana/issues/23659
// make it configurable (e.g. via the command line) whether to use UDP or Quic
let conn: Arc<dyn TpuConnection + 'static + Sync + Send> = if use_quic {
Arc::new(QuicTpuConnection::new(send_socket, *addr))
let conn = if use_quic {
Connection::Quic(Arc::new(QuicTpuConnection::new(send_socket, *addr)))
} else {
Arc::new(UdpTpuConnection::new(send_socket, *addr))
Connection::Udp(Arc::new(UdpTpuConnection::new(send_socket, *addr)))
};
entry.insert((conn.clone(), ticks));
@ -101,13 +108,69 @@ pub fn get_connection(addr: &SocketAddr) -> Arc<dyn TpuConnection + 'static + Sy
conn
}
// TODO: see https://github.com/solana-labs/solana/issues/23851
// use enum_dispatch and get rid of this tedious code.
// The main blocker to using enum_dispatch right now is that
// the it doesn't work with static methods like TpuConnection::new
// which is used by thin_client. This will be eliminated soon
// once thin_client is moved to using this connection cache.
// Once that is done, we will migrate to using enum_dispatch
// This will be done in a followup to
// https://github.com/solana-labs/solana/pull/23817
pub fn send_wire_transaction_batch(
packets: &[&[u8]],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction_batch(packets),
Connection::Quic(conn) => conn.send_wire_transaction_batch(packets),
}
}
pub fn send_wire_transaction(
wire_transaction: &[u8],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.send_wire_transaction(wire_transaction),
Connection::Quic(conn) => conn.send_wire_transaction(wire_transaction),
}
}
pub fn serialize_and_send_transaction(
transaction: &VersionedTransaction,
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.serialize_and_send_transaction(transaction),
Connection::Quic(conn) => conn.serialize_and_send_transaction(transaction),
}
}
pub fn par_serialize_and_send_transaction_batch(
transactions: &[VersionedTransaction],
addr: &SocketAddr,
) -> Result<(), TransportError> {
let conn = get_connection(addr);
match conn {
Connection::Udp(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
Connection::Quic(conn) => conn.par_serialize_and_send_transaction_batch(transactions),
}
}
#[cfg(test)]
mod tests {
use {
crate::connection_cache::{get_connection, CONNECTION_MAP, MAX_CONNECTIONS},
crate::{
connection_cache::{get_connection, Connection, CONNECTION_MAP, MAX_CONNECTIONS},
tpu_connection::TpuConnection,
},
rand::{Rng, SeedableRng},
rand_chacha::ChaChaRng,
std::net::SocketAddr,
std::net::{IpAddr, SocketAddr},
};
fn get_addr(rng: &mut ChaChaRng) -> SocketAddr {
@ -121,6 +184,13 @@ mod tests {
addr_str.parse().expect("Invalid address")
}
fn ip(conn: Connection) -> IpAddr {
match conn {
Connection::Udp(conn) => conn.tpu_addr().ip(),
Connection::Quic(conn) => conn.tpu_addr().ip(),
}
}
#[test]
fn test_connection_cache() {
// Allow the test to run deterministically
@ -136,7 +206,7 @@ mod tests {
// be lazy and not connect until first use or handle connection errors somehow
// (without crashing, as would be required in a real practical validator)
let first_addr = get_addr(&mut rng);
assert!(get_connection(&first_addr).tpu_addr().ip() == first_addr.ip());
assert!(ip(get_connection(&first_addr)) == first_addr.ip());
let addrs = (0..MAX_CONNECTIONS)
.into_iter()
.map(|_| {
@ -149,7 +219,7 @@ mod tests {
let map = (*CONNECTION_MAP).lock().unwrap();
addrs.iter().for_each(|a| {
let conn = map.map.get(a).expect("Address not found");
assert!(a.ip() == conn.0.tpu_addr().ip());
assert!(a.ip() == ip(conn.0.clone()));
});
assert!(map.map.get(&first_addr).is_none());

View File

@ -63,19 +63,22 @@ impl TpuConnection for QuicTpuConnection {
&self.client.addr
}
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let _guard = self.client.runtime.enter();
let send_buffer = self.client.send_buffer(wire_transaction);
self.client.runtime.block_on(send_buffer)?;
Ok(())
}
fn send_wire_transaction_batch(
&self,
wire_transaction_batch: &[Vec<u8>],
) -> TransportResult<()> {
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let _guard = self.client.runtime.enter();
let send_batch = self.client.send_batch(wire_transaction_batch);
let send_batch = self.client.send_batch(buffers);
self.client.runtime.block_on(send_batch)?;
Ok(())
}
@ -158,12 +161,18 @@ impl QuicClient {
}
}
pub async fn send_buffer(&self, data: &[u8]) -> Result<(), ClientErrorKind> {
self._send_buffer(data).await?;
pub async fn send_buffer<T>(&self, data: T) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
self._send_buffer(data.as_ref()).await?;
Ok(())
}
pub async fn send_batch(&self, buffers: &[Vec<u8>]) -> Result<(), ClientErrorKind> {
pub async fn send_batch<T>(&self, buffers: &[T]) -> Result<(), ClientErrorKind>
where
T: AsRef<[u8]>,
{
// Start off by "testing" the connection by sending the first transaction
// This will also connect to the server if not already connected
// and reconnect and retry if the first send attempt failed
@ -178,7 +187,7 @@ impl QuicClient {
if buffers.is_empty() {
return Ok(());
}
let connection = self._send_buffer(&buffers[0]).await?;
let connection = self._send_buffer(buffers[0].as_ref()).await?;
// Used to avoid dereferencing the Arc multiple times below
// by just getting a reference to the NewConnection once
@ -192,7 +201,7 @@ impl QuicClient {
join_all(
buffs
.into_iter()
.map(|buf| Self::_send_buffer_using_conn(buf, connection_ref)),
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
)
});

View File

@ -613,7 +613,7 @@ impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
let batch: Vec<VersionedTransaction> = transactions.into_iter().map(Into::into).collect();
self.tpu_connection()
.par_serialize_and_send_transaction_batch(&batch)?;
.par_serialize_and_send_transaction_batch(&batch[..])?;
Ok(())
}

View File

@ -1,13 +1,11 @@
use {
rayon::iter::{IntoParallelRefIterator, ParallelIterator},
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult},
std::net::{SocketAddr, UdpSocket},
};
pub trait TpuConnection {
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self
where
Self: Sized;
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
fn tpu_addr(&self) -> &SocketAddr;
@ -20,26 +18,23 @@ pub trait TpuConnection {
self.send_wire_transaction(&wire_transaction)
}
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>;
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]>;
fn par_serialize_and_send_transaction_batch(
&self,
transaction_batch: &[VersionedTransaction],
transactions: &[VersionedTransaction],
) -> TransportResult<()> {
let wire_transaction_batch: Vec<_> = transaction_batch
.par_iter()
let buffers = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect();
self.send_wire_transaction_batch(&wire_transaction_batch)
.collect::<Vec<_>>();
self.send_wire_transaction_batch(&buffers)
}
fn send_wire_transaction_batch(
&self,
wire_transaction_batch: &[Vec<u8>],
) -> TransportResult<()> {
for wire_transaction in wire_transaction_batch {
self.send_wire_transaction(wire_transaction)?;
}
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]>;
}

View File

@ -3,7 +3,9 @@
use {
crate::tpu_connection::TpuConnection,
core::iter::repeat,
solana_sdk::transport::Result as TransportResult,
solana_streamer::sendmmsg::batch_send,
std::net::{SocketAddr, UdpSocket},
};
@ -24,8 +26,20 @@ impl TpuConnection for UdpTpuConnection {
&self.addr
}
fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> {
self.socket.send_to(wire_transaction, self.addr)?;
fn send_wire_transaction<T>(&self, wire_transaction: T) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
self.socket.send_to(wire_transaction.as_ref(), self.addr)?;
Ok(())
}
fn send_wire_transaction_batch<T>(&self, buffers: &[T]) -> TransportResult<()>
where
T: AsRef<[u8]>,
{
let pkts: Vec<_> = buffers.iter().zip(repeat(self.tpu_addr())).collect();
batch_send(&self.socket, &pkts)?;
Ok(())
}
}

View File

@ -1,7 +1,7 @@
use {
crate::tower_storage::{SavedTowerVersions, TowerStorage},
crossbeam_channel::Receiver,
solana_client::connection_cache::get_connection,
solana_client::connection_cache::send_wire_transaction,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_poh::poh_recorder::PohRecorder,
@ -91,7 +91,7 @@ impl VotingService {
let mut measure = Measure::start("vote_tx_send-ms");
let target_address = target_address.unwrap_or_else(|| cluster_info.my_contact_info().tpu);
let wire_vote_tx = bincode::serialize(vote_op.tx()).expect("vote serialization failure");
let _ = get_connection(&target_address).send_wire_transaction(&wire_vote_tx);
let _ = send_wire_transaction(&wire_vote_tx, &target_address);
measure.stop();
inc_new_counter_info!("vote_tx_send-ms", measure.as_ms() as usize);

126
programs/bpf/Cargo.lock generated
View File

@ -174,6 +174,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64ct"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71acf5509fc522cce1b100ac0121c635129bfd4d91cdf036bcc9b9935f97ccf5"
[[package]]
name = "bincode"
version = "1.3.3"
@ -432,7 +438,7 @@ dependencies = [
"num-integer",
"num-traits",
"serde",
"time",
"time 0.1.43",
"winapi",
]
@ -543,6 +549,12 @@ dependencies = [
"web-sys",
]
[[package]]
name = "const-oid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -700,6 +712,15 @@ dependencies = [
"rayon",
]
[[package]]
name = "der"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
dependencies = [
"const-oid",
]
[[package]]
name = "derivation-path"
version = "0.2.0"
@ -1242,6 +1263,12 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "histogram"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669"
[[package]]
name = "hmac"
version = "0.8.1"
@ -1838,6 +1865,15 @@ dependencies = [
"syn 1.0.67",
]
[[package]]
name = "num_threads"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0"
dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
@ -1953,6 +1989,15 @@ dependencies = [
"digest 0.10.3",
]
[[package]]
name = "pem"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9a3b09a20e374558580a4914d3b7d89bd61b954a5a5e1dcbea98753addb1947"
dependencies = [
"base64 0.13.0",
]
[[package]]
name = "percent-encoding"
version = "2.1.0"
@ -1991,6 +2036,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
dependencies = [
"der",
"spki",
"zeroize",
]
[[package]]
name = "pkg-config"
version = "0.3.17"
@ -2284,6 +2340,18 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "rcgen"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7fa2d386df8533b02184941c76ae2e0d0c1d053f5d43339169d80f21275fc5e"
dependencies = [
"pem",
"ring",
"time 0.3.7",
"yasna",
]
[[package]]
name = "redox_syscall"
version = "0.1.56"
@ -3316,6 +3384,7 @@ dependencies = [
"solana-measure",
"solana-net-utils",
"solana-sdk",
"solana-streamer",
"solana-transaction-status",
"solana-version",
"solana-vote-program",
@ -3822,6 +3891,30 @@ dependencies = [
"thiserror",
]
[[package]]
name = "solana-streamer"
version = "1.11.0"
dependencies = [
"crossbeam-channel",
"futures-util",
"histogram",
"itertools 0.10.3",
"libc",
"log",
"nix",
"pem",
"pkcs8",
"quinn",
"rand 0.7.3",
"rcgen",
"rustls",
"solana-metrics",
"solana-perf",
"solana-sdk",
"thiserror",
"tokio",
]
[[package]]
name = "solana-transaction-status"
version = "1.11.0"
@ -3936,7 +4029,7 @@ dependencies = [
"rustc-demangle",
"scroll",
"thiserror",
"time",
"time 0.1.43",
]
[[package]]
@ -3945,6 +4038,16 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spki"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
dependencies = [
"base64ct",
"der",
]
[[package]]
name = "spl-associated-token-account"
version = "1.0.3"
@ -4181,6 +4284,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
dependencies = [
"libc",
"num_threads",
]
[[package]]
name = "tiny-bip39"
version = "0.8.2"
@ -4756,6 +4869,15 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "yasna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346d34a236c9d3e5f3b9b74563f238f955bbd05fa0b8b4efa53c130c43982f4c"
dependencies = [
"time 0.3.7",
]
[[package]]
name = "zeroize"
version = "1.3.0"

View File

@ -310,9 +310,8 @@ impl SendTransactionService {
fn send_transaction(tpu_address: &SocketAddr, wire_transaction: &[u8]) {
let mut measure = Measure::start("send_transaction_service-us");
let connection = connection_cache::get_connection(tpu_address);
if let Err(err) = connection.send_wire_transaction(wire_transaction) {
if let Err(err) = connection_cache::send_wire_transaction(wire_transaction, tpu_address) {
warn!("Failed to send transaction to {}: {:?}", tpu_address, err);
}
measure.stop();

View File

@ -8,6 +8,7 @@ use {
std::os::unix::io::AsRawFd,
};
use {
solana_sdk::transport::TransportError,
std::{
borrow::Borrow,
io,
@ -24,6 +25,12 @@ pub enum SendPktsError {
IoError(io::Error, usize),
}
impl From<SendPktsError> for TransportError {
fn from(err: SendPktsError) -> Self {
Self::Custom(format!("{:?}", err))
}
}
#[cfg(not(target_os = "linux"))]
pub fn batch_send<S, T>(sock: &UdpSocket, packets: &[(T, S)]) -> Result<(), SendPktsError>
where