Add quic-client module (#23166)
* Add quic-client module to send transactions via quic, abstracted behind the TpuConnection trait (along with a legacy UDP implementation of TpuConnection) and change thin-client to use TpuConnection
This commit is contained in:
parent
1fe0d6eeeb
commit
17b00ad3a4
|
@ -145,6 +145,15 @@ version = "1.5.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
|
||||
|
||||
[[package]]
|
||||
name = "async-mutex"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.2"
|
||||
|
@ -1312,6 +1321,12 @@ dependencies = [
|
|||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
|
||||
|
||||
[[package]]
|
||||
name = "fake-simd"
|
||||
version = "0.1.2"
|
||||
|
@ -4543,19 +4558,25 @@ name = "solana-client"
|
|||
version = "1.10.1"
|
||||
dependencies = [
|
||||
"assert_matches",
|
||||
"async-mutex",
|
||||
"async-trait",
|
||||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"bs58 0.4.0",
|
||||
"bytes",
|
||||
"clap 2.33.3",
|
||||
"crossbeam-channel",
|
||||
"futures 0.3.21",
|
||||
"futures-util",
|
||||
"indicatif",
|
||||
"itertools 0.10.3",
|
||||
"jsonrpc-core",
|
||||
"jsonrpc-http-server",
|
||||
"log",
|
||||
"quinn",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustls 0.20.4",
|
||||
"semver 1.0.6",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
|
|
|
@ -475,6 +475,7 @@ fn do_tx_transfers<T: Client>(
|
|||
let tx_len = txs0.len();
|
||||
let transfer_start = Instant::now();
|
||||
let mut old_transactions = false;
|
||||
let mut transactions = Vec::<_>::new();
|
||||
for tx in txs0 {
|
||||
let now = timestamp();
|
||||
// Transactions that are too old will be rejected by the cluster Don't bother
|
||||
|
@ -483,10 +484,13 @@ fn do_tx_transfers<T: Client>(
|
|||
old_transactions = true;
|
||||
continue;
|
||||
}
|
||||
client
|
||||
.async_send_transaction(tx.0)
|
||||
.expect("async_send_transaction in do_tx_transfers");
|
||||
transactions.push(tx.0);
|
||||
}
|
||||
|
||||
if let Err(error) = client.async_send_batch(transactions) {
|
||||
warn!("send_batch_sync in do_tx_transfers failed: {}", error);
|
||||
}
|
||||
|
||||
if old_transactions {
|
||||
let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers");
|
||||
shared_txs_wl.clear();
|
||||
|
|
|
@ -10,18 +10,24 @@ license = "Apache-2.0"
|
|||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
async-mutex = "1.4.0"
|
||||
async-trait = "0.1.52"
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3.3"
|
||||
bs58 = "0.4.0"
|
||||
bytes = "1.1.0"
|
||||
clap = "2.33.0"
|
||||
crossbeam-channel = "0.5"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3.21"
|
||||
indicatif = "0.16.2"
|
||||
itertools = "0.10.2"
|
||||
jsonrpc-core = "18.0.0"
|
||||
log = "0.4.14"
|
||||
quinn = "0.8.0"
|
||||
rayon = "1.5.1"
|
||||
reqwest = { version = "0.11.9", default-features = false, features = ["blocking", "rustls-tls", "json"] }
|
||||
rustls = { version = "0.20.2", features = ["dangerous_configuration"] }
|
||||
semver = "1.0.6"
|
||||
serde = "1.0.136"
|
||||
serde_derive = "1.0.103"
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
pub use reqwest;
|
||||
use {
|
||||
crate::{rpc_request, rpc_response},
|
||||
quinn::{ConnectError, WriteError},
|
||||
solana_faucet::faucet::FaucetError,
|
||||
solana_sdk::{
|
||||
signature::SignerError, transaction::TransactionError, transport::TransportError,
|
||||
|
@ -72,6 +73,18 @@ impl From<ClientErrorKind> for TransportError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<WriteError> for ClientErrorKind {
|
||||
fn from(write_error: WriteError) -> Self {
|
||||
Self::Custom(format!("{:?}", write_error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ConnectError> for ClientErrorKind {
|
||||
fn from(connect_error: ConnectError) -> Self {
|
||||
Self::Custom(format!("{:?}", connect_error))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[error("{kind}")]
|
||||
pub struct ClientError {
|
||||
|
|
|
@ -10,6 +10,7 @@ pub mod nonblocking;
|
|||
pub mod nonce_utils;
|
||||
pub mod perf_utils;
|
||||
pub mod pubsub_client;
|
||||
pub mod quic_client;
|
||||
pub mod rpc_cache;
|
||||
pub mod rpc_client;
|
||||
pub mod rpc_config;
|
||||
|
@ -22,7 +23,9 @@ pub mod rpc_sender;
|
|||
pub mod spinner;
|
||||
pub mod thin_client;
|
||||
pub mod tpu_client;
|
||||
pub mod tpu_connection;
|
||||
pub mod transaction_executor;
|
||||
pub mod udp_client;
|
||||
|
||||
pub mod mock_sender_for_cli {
|
||||
/// Magic `SIGNATURE` value used by `solana-cli` unit tests.
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
//! Simple client that connects to a given UDP port with the QUIC protocol and provides
|
||||
//! an interface for sending transactions which is restricted by the server's flow control.
|
||||
|
||||
use {
|
||||
crate::{client_error::ClientErrorKind, tpu_connection::TpuConnection},
|
||||
async_mutex::Mutex,
|
||||
futures::future::join_all,
|
||||
itertools::Itertools,
|
||||
quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError},
|
||||
rayon::iter::{IntoParallelIterator, ParallelIterator},
|
||||
solana_sdk::{
|
||||
quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET},
|
||||
transaction::Transaction,
|
||||
transport::Result as TransportResult,
|
||||
},
|
||||
std::{
|
||||
net::{SocketAddr, UdpSocket},
|
||||
sync::Arc,
|
||||
},
|
||||
tokio::runtime::Runtime,
|
||||
};
|
||||
|
||||
struct SkipServerVerification;
|
||||
|
||||
impl SkipServerVerification {
|
||||
pub fn new() -> Arc<Self> {
|
||||
Arc::new(Self)
|
||||
}
|
||||
}
|
||||
|
||||
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls::Certificate,
|
||||
_intermediates: &[rustls::Certificate],
|
||||
_server_name: &rustls::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: std::time::SystemTime,
|
||||
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||
Ok(rustls::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
struct QuicClient {
|
||||
runtime: Runtime,
|
||||
endpoint: Endpoint,
|
||||
connection: Arc<Mutex<Option<Arc<NewConnection>>>>,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
pub struct QuicTpuConnection {
|
||||
client: Arc<QuicClient>,
|
||||
}
|
||||
|
||||
impl TpuConnection for QuicTpuConnection {
|
||||
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self {
|
||||
let tpu_addr = SocketAddr::new(tpu_addr.ip(), tpu_addr.port() + QUIC_PORT_OFFSET);
|
||||
let client = Arc::new(QuicClient::new(client_socket, tpu_addr));
|
||||
|
||||
Self { client }
|
||||
}
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.client.addr
|
||||
}
|
||||
|
||||
fn send_wire_transaction(&self, data: Vec<u8>) -> TransportResult<()> {
|
||||
let _guard = self.client.runtime.enter();
|
||||
let send_buffer = self.client.send_buffer(&data[..]);
|
||||
self.client.runtime.block_on(send_buffer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
||||
let buffers = transactions
|
||||
.into_par_iter()
|
||||
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let _guard = self.client.runtime.enter();
|
||||
let send_batch = self.client.send_batch(&buffers[..]);
|
||||
self.client.runtime.block_on(send_batch)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl QuicClient {
|
||||
pub fn new(client_socket: UdpSocket, addr: SocketAddr) -> Self {
|
||||
let runtime = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let _guard = runtime.enter();
|
||||
|
||||
let crypto = rustls::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_custom_certificate_verifier(SkipServerVerification::new())
|
||||
.with_no_client_auth();
|
||||
|
||||
let create_endpoint = QuicClient::create_endpoint(EndpointConfig::default(), client_socket);
|
||||
|
||||
let mut endpoint = runtime.block_on(create_endpoint);
|
||||
|
||||
endpoint.set_default_client_config(ClientConfig::new(Arc::new(crypto)));
|
||||
|
||||
Self {
|
||||
runtime,
|
||||
endpoint,
|
||||
connection: Arc::new(Mutex::new(None)),
|
||||
addr,
|
||||
}
|
||||
}
|
||||
|
||||
// If this function becomes public, it should be changed to
|
||||
// not expose details of the specific Quic implementation we're using
|
||||
async fn create_endpoint(config: EndpointConfig, client_socket: UdpSocket) -> Endpoint {
|
||||
quinn::Endpoint::new(config, None, client_socket).unwrap().0
|
||||
}
|
||||
|
||||
async fn _send_buffer_using_conn(
|
||||
data: &[u8],
|
||||
connection: &NewConnection,
|
||||
) -> Result<(), WriteError> {
|
||||
let mut send_stream = connection.connection.open_uni().await?;
|
||||
send_stream.write_all(data).await?;
|
||||
send_stream.finish().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Attempts to send data, connecting/reconnecting as necessary
|
||||
// On success, returns the connection used to successfully send the data
|
||||
async fn _send_buffer(&self, data: &[u8]) -> Result<Arc<NewConnection>, WriteError> {
|
||||
let connection = {
|
||||
let mut conn_guard = self.connection.lock().await;
|
||||
|
||||
let maybe_conn = (*conn_guard).clone();
|
||||
match maybe_conn {
|
||||
Some(conn) => conn.clone(),
|
||||
None => {
|
||||
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
|
||||
let connection = Arc::new(connecting.await?);
|
||||
*conn_guard = Some(connection.clone());
|
||||
connection
|
||||
}
|
||||
}
|
||||
};
|
||||
match Self::_send_buffer_using_conn(data, &connection).await {
|
||||
Ok(()) => Ok(connection),
|
||||
_ => {
|
||||
let connection = {
|
||||
let connecting = self.endpoint.connect(self.addr, "connect").unwrap();
|
||||
let connection = Arc::new(connecting.await?);
|
||||
let mut conn_guard = self.connection.lock().await;
|
||||
*conn_guard = Some(connection.clone());
|
||||
connection
|
||||
};
|
||||
Self::_send_buffer_using_conn(data, &connection).await?;
|
||||
Ok(connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_buffer(&self, data: &[u8]) -> Result<(), ClientErrorKind> {
|
||||
self._send_buffer(data).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_batch(&self, buffers: &[Vec<u8>]) -> Result<(), ClientErrorKind> {
|
||||
// Start off by "testing" the connection by sending the first transaction
|
||||
// This will also connect to the server if not already connected
|
||||
// and reconnect and retry if the first send attempt failed
|
||||
// (for example due to a timed out connection), returning an error
|
||||
// or the connection that was used to successfully send the transaction.
|
||||
// We will use the returned connection to send the rest of the transactions in the batch
|
||||
// to avoid touching the mutex in self, and not bother reconnecting if we fail along the way
|
||||
// since testing even in the ideal GCE environment has found no cases
|
||||
// where reconnecting and retrying in the middle of a batch send
|
||||
// (i.e. we encounter a connection error in the middle of a batch send, which presumably cannot
|
||||
// be due to a timed out connection) has succeeded
|
||||
if buffers.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let connection = self._send_buffer(&buffers[0][..]).await?;
|
||||
|
||||
// Used to avoid dereferencing the Arc multiple times below
|
||||
// by just getting a reference to the NewConnection once
|
||||
let connection_ref: &NewConnection = &connection;
|
||||
|
||||
let chunks = buffers[1..buffers.len()]
|
||||
.iter()
|
||||
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
|
||||
|
||||
let futures = chunks.into_iter().map(|buffs| {
|
||||
join_all(
|
||||
buffs
|
||||
.into_iter()
|
||||
.map(|buf| Self::_send_buffer_using_conn(&buf[..], connection_ref)),
|
||||
)
|
||||
});
|
||||
|
||||
for f in futures {
|
||||
f.await.into_iter().try_for_each(|res| res)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -4,8 +4,10 @@
|
|||
//! unstable and may change in future releases.
|
||||
|
||||
use {
|
||||
crate::{rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response},
|
||||
bincode::{serialize_into, serialized_size},
|
||||
crate::{
|
||||
rpc_client::RpcClient, rpc_config::RpcProgramAccountsConfig, rpc_response::Response,
|
||||
tpu_connection::TpuConnection, udp_client::UdpTpuConnection,
|
||||
},
|
||||
log::*,
|
||||
solana_sdk::{
|
||||
account::Account,
|
||||
|
@ -17,7 +19,6 @@ use {
|
|||
hash::Hash,
|
||||
instruction::Instruction,
|
||||
message::Message,
|
||||
packet::PACKET_DATA_SIZE,
|
||||
pubkey::Pubkey,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
signers::Signers,
|
||||
|
@ -117,22 +118,20 @@ impl ClientOptimizer {
|
|||
}
|
||||
|
||||
/// An object for querying and sending transactions to the network.
|
||||
pub struct ThinClient {
|
||||
transactions_socket: UdpSocket,
|
||||
tpu_addrs: Vec<SocketAddr>,
|
||||
pub struct ThinClient<C: 'static + TpuConnection> {
|
||||
rpc_clients: Vec<RpcClient>,
|
||||
tpu_connections: Vec<C>,
|
||||
optimizer: ClientOptimizer,
|
||||
}
|
||||
|
||||
impl ThinClient {
|
||||
impl<C: 'static + TpuConnection> ThinClient<C> {
|
||||
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
|
||||
/// and the Tpu at `tpu_addr` over `transactions_socket` using UDP.
|
||||
/// and the Tpu at `tpu_addr` over `transactions_socket` using Quic or UDP
|
||||
/// (currently hardcoded to UDP)
|
||||
pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self {
|
||||
Self::new_from_client(
|
||||
tpu_addr,
|
||||
transactions_socket,
|
||||
RpcClient::new_socket(rpc_addr),
|
||||
)
|
||||
let tpu_connection = C::new(transactions_socket, tpu_addr);
|
||||
|
||||
Self::new_from_client(RpcClient::new_socket(rpc_addr), tpu_connection)
|
||||
}
|
||||
|
||||
pub fn new_socket_with_timeout(
|
||||
|
@ -142,18 +141,14 @@ impl ThinClient {
|
|||
timeout: Duration,
|
||||
) -> Self {
|
||||
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
|
||||
Self::new_from_client(tpu_addr, transactions_socket, rpc_client)
|
||||
let tpu_connection = C::new(transactions_socket, tpu_addr);
|
||||
Self::new_from_client(rpc_client, tpu_connection)
|
||||
}
|
||||
|
||||
fn new_from_client(
|
||||
tpu_addr: SocketAddr,
|
||||
transactions_socket: UdpSocket,
|
||||
rpc_client: RpcClient,
|
||||
) -> Self {
|
||||
fn new_from_client(rpc_client: RpcClient, tpu_connection: C) -> Self {
|
||||
Self {
|
||||
transactions_socket,
|
||||
tpu_addrs: vec![tpu_addr],
|
||||
rpc_clients: vec![rpc_client],
|
||||
tpu_connections: vec![tpu_connection],
|
||||
optimizer: ClientOptimizer::new(0),
|
||||
}
|
||||
}
|
||||
|
@ -168,16 +163,19 @@ impl ThinClient {
|
|||
|
||||
let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
|
||||
let optimizer = ClientOptimizer::new(rpc_clients.len());
|
||||
let tpu_connections: Vec<_> = tpu_addrs
|
||||
.into_iter()
|
||||
.map(|tpu_addr| C::new(transactions_socket.try_clone().unwrap(), tpu_addr))
|
||||
.collect();
|
||||
Self {
|
||||
transactions_socket,
|
||||
tpu_addrs,
|
||||
rpc_clients,
|
||||
tpu_connections,
|
||||
optimizer,
|
||||
}
|
||||
}
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.tpu_addrs[self.optimizer.best()]
|
||||
fn tpu_connection(&self) -> &C {
|
||||
&self.tpu_connections[self.optimizer.best()]
|
||||
}
|
||||
|
||||
fn rpc_client(&self) -> &RpcClient {
|
||||
|
@ -205,7 +203,6 @@ impl ThinClient {
|
|||
self.send_and_confirm_transaction(&[keypair], transaction, tries, 0)
|
||||
}
|
||||
|
||||
/// Retry sending a signed Transaction to the server for processing
|
||||
pub fn send_and_confirm_transaction<T: Signers>(
|
||||
&self,
|
||||
keypairs: &T,
|
||||
|
@ -215,18 +212,13 @@ impl ThinClient {
|
|||
) -> TransportResult<Signature> {
|
||||
for x in 0..tries {
|
||||
let now = Instant::now();
|
||||
let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
|
||||
let mut wr = std::io::Cursor::new(&mut buf[..]);
|
||||
let mut num_confirmed = 0;
|
||||
let mut wait_time = MAX_PROCESSING_AGE;
|
||||
serialize_into(&mut wr, &transaction)
|
||||
.expect("serialize Transaction in pub fn transfer_signed");
|
||||
// resend the same transaction until the transaction has no chance of succeeding
|
||||
while now.elapsed().as_secs() < wait_time as u64 {
|
||||
if num_confirmed == 0 {
|
||||
// Send the transaction if there has been no confirmation (e.g. the first time)
|
||||
self.transactions_socket
|
||||
.send_to(&buf[..], &self.tpu_addr())?;
|
||||
self.tpu_connection().send_transaction(transaction)?;
|
||||
}
|
||||
|
||||
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
|
||||
|
@ -321,13 +313,13 @@ impl ThinClient {
|
|||
}
|
||||
}
|
||||
|
||||
impl Client for ThinClient {
|
||||
impl<C: 'static + TpuConnection> Client for ThinClient<C> {
|
||||
fn tpu_addr(&self) -> String {
|
||||
self.tpu_addr().to_string()
|
||||
self.tpu_connection().tpu_addr().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl SyncClient for ThinClient {
|
||||
impl<C: 'static + TpuConnection> SyncClient for ThinClient<C> {
|
||||
fn send_and_confirm_message<T: Signers>(
|
||||
&self,
|
||||
keypairs: &T,
|
||||
|
@ -607,17 +599,16 @@ impl SyncClient for ThinClient {
|
|||
}
|
||||
}
|
||||
|
||||
impl AsyncClient for ThinClient {
|
||||
impl<C: 'static + TpuConnection> AsyncClient for ThinClient<C> {
|
||||
fn async_send_transaction(&self, transaction: Transaction) -> TransportResult<Signature> {
|
||||
let mut buf = vec![0; serialized_size(&transaction).unwrap() as usize];
|
||||
let mut wr = std::io::Cursor::new(&mut buf[..]);
|
||||
serialize_into(&mut wr, &transaction)
|
||||
.expect("serialize Transaction in pub fn transfer_signed");
|
||||
assert!(buf.len() < PACKET_DATA_SIZE);
|
||||
self.transactions_socket
|
||||
.send_to(&buf[..], &self.tpu_addr())?;
|
||||
self.tpu_connection().send_transaction(&transaction)?;
|
||||
Ok(transaction.signatures[0])
|
||||
}
|
||||
|
||||
fn async_send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
||||
self.tpu_connection().send_batch(transactions)
|
||||
}
|
||||
|
||||
fn async_send_message<T: Signers>(
|
||||
&self,
|
||||
keypairs: &T,
|
||||
|
@ -649,20 +640,23 @@ impl AsyncClient for ThinClient {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr), range: (u16, u16)) -> ThinClient {
|
||||
pub fn create_client(
|
||||
(rpc, tpu): (SocketAddr, SocketAddr),
|
||||
range: (u16, u16),
|
||||
) -> ThinClient<UdpTpuConnection> {
|
||||
let (_, transactions_socket) =
|
||||
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
|
||||
ThinClient::new(rpc, tpu, transactions_socket)
|
||||
ThinClient::<UdpTpuConnection>::new(rpc, tpu, transactions_socket)
|
||||
}
|
||||
|
||||
pub fn create_client_with_timeout(
|
||||
(rpc, tpu): (SocketAddr, SocketAddr),
|
||||
range: (u16, u16),
|
||||
timeout: Duration,
|
||||
) -> ThinClient {
|
||||
) -> ThinClient<UdpTpuConnection> {
|
||||
let (_, transactions_socket) =
|
||||
solana_net_utils::bind_in_range(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), range).unwrap();
|
||||
ThinClient::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout)
|
||||
ThinClient::<UdpTpuConnection>::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
use {
|
||||
solana_sdk::{transaction::Transaction, transport::Result as TransportResult},
|
||||
std::net::{SocketAddr, UdpSocket},
|
||||
};
|
||||
|
||||
pub trait TpuConnection {
|
||||
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self;
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr;
|
||||
|
||||
fn send_transaction(&self, tx: &Transaction) -> TransportResult<()> {
|
||||
let data = bincode::serialize(tx).expect("serialize Transaction in send_transaction");
|
||||
self.send_wire_transaction(data)
|
||||
}
|
||||
|
||||
fn send_wire_transaction(&self, data: Vec<u8>) -> TransportResult<()>;
|
||||
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()>;
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
//! Simple TPU client that communicates with the given UDP port with UDP and provides
|
||||
//! an interface for sending transactions
|
||||
|
||||
use {
|
||||
crate::tpu_connection::TpuConnection,
|
||||
solana_sdk::{transaction::Transaction, transport::Result as TransportResult},
|
||||
std::net::{SocketAddr, UdpSocket},
|
||||
};
|
||||
|
||||
pub struct UdpTpuConnection {
|
||||
socket: UdpSocket,
|
||||
addr: SocketAddr,
|
||||
}
|
||||
|
||||
impl TpuConnection for UdpTpuConnection {
|
||||
fn new(client_socket: UdpSocket, tpu_addr: SocketAddr) -> Self {
|
||||
Self {
|
||||
socket: client_socket,
|
||||
addr: tpu_addr,
|
||||
}
|
||||
}
|
||||
|
||||
fn tpu_addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
|
||||
fn send_wire_transaction(&self, data: Vec<u8>) -> TransportResult<()> {
|
||||
self.socket.send_to(&data[..], self.addr)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_batch(&self, transactions: Vec<Transaction>) -> TransportResult<()> {
|
||||
transactions
|
||||
.into_iter()
|
||||
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
|
||||
.try_for_each(|buff| -> TransportResult<()> {
|
||||
self.socket.send_to(&buff[..], self.addr)?;
|
||||
Ok(())
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -7,7 +7,10 @@ use {
|
|||
},
|
||||
crossbeam_channel::{unbounded, Sender},
|
||||
rand::{thread_rng, Rng},
|
||||
solana_client::thin_client::{create_client, ThinClient},
|
||||
solana_client::{
|
||||
thin_client::{create_client, ThinClient},
|
||||
udp_client::UdpTpuConnection,
|
||||
},
|
||||
solana_perf::recycler::Recycler,
|
||||
solana_runtime::bank_forks::BankForks,
|
||||
solana_sdk::{
|
||||
|
@ -194,7 +197,10 @@ pub fn discover(
|
|||
}
|
||||
|
||||
/// Creates a ThinClient per valid node
|
||||
pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> Vec<ThinClient> {
|
||||
pub fn get_clients(
|
||||
nodes: &[ContactInfo],
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> Vec<ThinClient<UdpTpuConnection>> {
|
||||
nodes
|
||||
.iter()
|
||||
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
|
||||
|
@ -203,7 +209,10 @@ pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -
|
|||
}
|
||||
|
||||
/// Creates a ThinClient by selecting a valid node at random
|
||||
pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -> ThinClient {
|
||||
pub fn get_client(
|
||||
nodes: &[ContactInfo],
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> ThinClient<UdpTpuConnection> {
|
||||
let nodes: Vec<_> = nodes
|
||||
.iter()
|
||||
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
|
||||
|
@ -215,7 +224,7 @@ pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) ->
|
|||
pub fn get_multi_client(
|
||||
nodes: &[ContactInfo],
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> (ThinClient, usize) {
|
||||
) -> (ThinClient<UdpTpuConnection>, usize) {
|
||||
let addrs: Vec<_> = nodes
|
||||
.iter()
|
||||
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
|
||||
|
@ -229,7 +238,8 @@ pub fn get_multi_client(
|
|||
.unwrap();
|
||||
let num_nodes = tpu_addrs.len();
|
||||
(
|
||||
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket),
|
||||
//TODO: make it configurable whether to use quic
|
||||
ThinClient::<UdpTpuConnection>::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket),
|
||||
num_nodes,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use {
|
||||
solana_client::thin_client::ThinClient,
|
||||
solana_client::{thin_client::ThinClient, udp_client::UdpTpuConnection},
|
||||
solana_core::validator::{Validator, ValidatorConfig},
|
||||
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
|
||||
solana_sdk::{pubkey::Pubkey, signature::Keypair},
|
||||
|
@ -36,7 +36,7 @@ impl ClusterValidatorInfo {
|
|||
|
||||
pub trait Cluster {
|
||||
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
|
||||
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
|
||||
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient<UdpTpuConnection>>;
|
||||
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
|
||||
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo;
|
||||
fn restart_node(
|
||||
|
|
|
@ -6,7 +6,10 @@ use {
|
|||
},
|
||||
itertools::izip,
|
||||
log::*,
|
||||
solana_client::thin_client::{create_client, ThinClient},
|
||||
solana_client::{
|
||||
thin_client::{create_client, ThinClient},
|
||||
udp_client::UdpTpuConnection,
|
||||
},
|
||||
solana_core::{
|
||||
tower_storage::FileTowerStorage,
|
||||
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
|
||||
|
@ -535,7 +538,7 @@ impl LocalCluster {
|
|||
}
|
||||
|
||||
fn transfer_with_client(
|
||||
client: &ThinClient,
|
||||
client: &ThinClient<UdpTpuConnection>,
|
||||
source_keypair: &Keypair,
|
||||
dest_pubkey: &Pubkey,
|
||||
lamports: u64,
|
||||
|
@ -564,7 +567,7 @@ impl LocalCluster {
|
|||
}
|
||||
|
||||
fn setup_vote_and_stake_accounts(
|
||||
client: &ThinClient,
|
||||
client: &ThinClient<UdpTpuConnection>,
|
||||
vote_account: &Keypair,
|
||||
from_account: &Arc<Keypair>,
|
||||
amount: u64,
|
||||
|
@ -701,7 +704,7 @@ impl Cluster for LocalCluster {
|
|||
self.validators.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
|
||||
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient<UdpTpuConnection>> {
|
||||
self.validators.get(pubkey).map(|f| {
|
||||
create_client(
|
||||
f.info.contact_info.client_facing_addr(),
|
||||
|
|
|
@ -17,6 +17,7 @@ use {
|
|||
rpc_config::{RpcProgramAccountsConfig, RpcSignatureSubscribeConfig},
|
||||
rpc_response::RpcSignatureResult,
|
||||
thin_client::{create_client, ThinClient},
|
||||
udp_client::UdpTpuConnection,
|
||||
},
|
||||
solana_core::{
|
||||
broadcast_stage::BroadcastStageType,
|
||||
|
@ -2646,8 +2647,8 @@ fn setup_transfer_scan_threads(
|
|||
num_starting_accounts: usize,
|
||||
exit: Arc<AtomicBool>,
|
||||
scan_commitment: CommitmentConfig,
|
||||
update_client_receiver: Receiver<ThinClient>,
|
||||
scan_client_receiver: Receiver<ThinClient>,
|
||||
update_client_receiver: Receiver<ThinClient<UdpTpuConnection>>,
|
||||
scan_client_receiver: Receiver<ThinClient<UdpTpuConnection>>,
|
||||
) -> (
|
||||
JoinHandle<()>,
|
||||
JoinHandle<()>,
|
||||
|
|
|
@ -125,6 +125,15 @@ version = "1.5.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
|
||||
|
||||
[[package]]
|
||||
name = "async-mutex"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.52"
|
||||
|
@ -515,6 +524,22 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "core-foundation-sys"
|
||||
version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.1"
|
||||
|
@ -918,6 +943,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.6.0"
|
||||
|
@ -1062,6 +1093,15 @@ dependencies = [
|
|||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fxhash"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
|
||||
dependencies = [
|
||||
"byteorder 1.4.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.12.3"
|
||||
|
@ -1596,6 +1636,19 @@ dependencies = [
|
|||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.7.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"miow",
|
||||
"ntapi",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mio"
|
||||
version = "0.8.0"
|
||||
|
@ -1763,6 +1816,12 @@ version = "0.3.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
|
||||
|
||||
[[package]]
|
||||
name = "openssl-probe"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry"
|
||||
version = "0.16.0"
|
||||
|
@ -1973,6 +2032,60 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "584865613896a1f644d757e52c45c573441c8b04cac38ac13990b0235203db66"
|
||||
dependencies = [
|
||||
"bytes 1.1.0",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"fxhash",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustls",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-proto"
|
||||
version = "0.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2b1562bf4998b0c6d1841a4742b7103bb82cdde61374833de826bab9e8ad498"
|
||||
dependencies = [
|
||||
"bytes 1.1.0",
|
||||
"fxhash",
|
||||
"rand 0.8.2",
|
||||
"ring",
|
||||
"rustls",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile",
|
||||
"slab",
|
||||
"thiserror",
|
||||
"tinyvec",
|
||||
"tracing",
|
||||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df185e5e5f7611fa6e628ed8f9633df10114b03bbaecab186ec55822c44ac727"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"libc",
|
||||
"mio 0.7.14",
|
||||
"quinn-proto",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "0.6.13"
|
||||
|
@ -2263,6 +2376,18 @@ dependencies = [
|
|||
"webpki",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "0.2.1"
|
||||
|
@ -2293,6 +2418,16 @@ dependencies = [
|
|||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "schannel"
|
||||
version = "0.1.19"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
|
@ -2329,6 +2464,29 @@ dependencies = [
|
|||
"untrusted",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework"
|
||||
version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2dc14f172faf8a0194a3aded622712b0de276821addc574fa54fc0a1167e10dc"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"core-foundation",
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
"security-framework-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "security-framework-sys"
|
||||
version = "2.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556"
|
||||
dependencies = [
|
||||
"core-foundation-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "0.9.0"
|
||||
|
@ -3056,18 +3214,24 @@ dependencies = [
|
|||
name = "solana-client"
|
||||
version = "1.10.1"
|
||||
dependencies = [
|
||||
"async-mutex",
|
||||
"async-trait",
|
||||
"base64 0.13.0",
|
||||
"bincode",
|
||||
"bs58 0.4.0",
|
||||
"bytes 1.1.0",
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"indicatif",
|
||||
"itertools 0.10.3",
|
||||
"jsonrpc-core",
|
||||
"log",
|
||||
"quinn",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustls",
|
||||
"semver 1.0.6",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
|
@ -3969,7 +4133,7 @@ dependencies = [
|
|||
"bytes 1.1.0",
|
||||
"libc",
|
||||
"memchr",
|
||||
"mio",
|
||||
"mio 0.8.0",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"parking_lot",
|
||||
|
|
|
@ -45,6 +45,13 @@ impl AsyncClient for BankClient {
|
|||
Ok(signature)
|
||||
}
|
||||
|
||||
fn async_send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
|
||||
for t in transactions {
|
||||
self.async_send_transaction(t)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn async_send_message<T: Signers>(
|
||||
&self,
|
||||
keypairs: &T,
|
||||
|
|
|
@ -175,6 +175,8 @@ pub trait AsyncClient {
|
|||
/// Send a signed transaction, but don't wait to see if the server accepted it.
|
||||
fn async_send_transaction(&self, transaction: transaction::Transaction) -> Result<Signature>;
|
||||
|
||||
fn async_send_batch(&self, transactions: Vec<transaction::Transaction>) -> Result<()>;
|
||||
|
||||
/// Create a transaction from the given message, and send it to the
|
||||
/// server, but don't wait for to see if the server accepted it.
|
||||
fn async_send_message<T: Signers>(
|
||||
|
|
|
@ -1 +1,5 @@
|
|||
pub const QUIC_PORT_OFFSET: u16 = 6;
|
||||
// Empirically found max number of concurrent streams
|
||||
// that seems to maximize TPS on GCE (higher values don't seem to
|
||||
// give significant improvement or seem to impact stability)
|
||||
pub const QUIC_MAX_CONCURRENT_STREAMS: usize = 2048;
|
||||
|
|
|
@ -8,6 +8,7 @@ use {
|
|||
solana_perf::packet::PacketBatch,
|
||||
solana_sdk::{
|
||||
packet::{Packet, PACKET_DATA_SIZE},
|
||||
quic::QUIC_MAX_CONCURRENT_STREAMS,
|
||||
signature::Keypair,
|
||||
timing,
|
||||
},
|
||||
|
@ -49,7 +50,8 @@ fn configure_server(
|
|||
.map_err(|_e| QuicServerError::ConfigureFailed)?;
|
||||
let config = Arc::get_mut(&mut server_config.transport).unwrap();
|
||||
|
||||
const MAX_CONCURRENT_UNI_STREAMS: u32 = 1;
|
||||
// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
|
||||
const MAX_CONCURRENT_UNI_STREAMS: u32 = (QUIC_MAX_CONCURRENT_STREAMS * 2) as u32;
|
||||
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
|
||||
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
|
||||
config.receive_window((PACKET_DATA_SIZE as u32 * MAX_CONCURRENT_UNI_STREAMS).into());
|
||||
|
|
Loading…
Reference in New Issue