tpu-client: Add "fake" async version with sync sending (#25499)
* tpu-client: Add "fake" async for sending * Add Drop implementation * Rebase
This commit is contained in:
parent
dd0bf4ff4b
commit
6349a6dc97
|
@ -1,2 +1,3 @@
|
|||
pub mod pubsub_client;
|
||||
pub mod rpc_client;
|
||||
pub mod tpu_client;
|
||||
|
|
|
@ -0,0 +1,479 @@
|
|||
use {
|
||||
crate::{
|
||||
client_error::ClientError,
|
||||
connection_cache::get_connection,
|
||||
nonblocking::{
|
||||
pubsub_client::{PubsubClient, PubsubClientError},
|
||||
rpc_client::RpcClient,
|
||||
},
|
||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||
rpc_response::SlotUpdate,
|
||||
spinner,
|
||||
tpu_client::{
|
||||
LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig,
|
||||
MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL,
|
||||
},
|
||||
tpu_connection::TpuConnection,
|
||||
},
|
||||
bincode::serialize,
|
||||
futures_util::stream::StreamExt,
|
||||
log::*,
|
||||
solana_sdk::{
|
||||
clock::Slot,
|
||||
commitment_config::CommitmentConfig,
|
||||
message::Message,
|
||||
signature::SignerError,
|
||||
signers::Signers,
|
||||
transaction::{Transaction, TransactionError},
|
||||
transport::{Result as TransportResult, TransportError},
|
||||
},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::SocketAddr,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
},
|
||||
thiserror::Error,
|
||||
tokio::{
|
||||
task::JoinHandle,
|
||||
time::{sleep, timeout, Duration, Instant},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum TpuSenderError {
|
||||
#[error("Pubsub error: {0:?}")]
|
||||
PubsubError(#[from] PubsubClientError),
|
||||
#[error("RPC error: {0:?}")]
|
||||
RpcError(#[from] ClientError),
|
||||
#[error("IO error: {0:?}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("Signer error: {0:?}")]
|
||||
SignerError(#[from] SignerError),
|
||||
#[error("Custom error: {0}")]
|
||||
Custom(String),
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, TpuSenderError>;
|
||||
|
||||
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
||||
/// The client uses RPC to determine the current leader and fetch node contact info
|
||||
pub struct TpuClient {
|
||||
fanout_slots: u64,
|
||||
leader_tpu_service: LeaderTpuService,
|
||||
exit: Arc<AtomicBool>,
|
||||
rpc_client: Arc<RpcClient>,
|
||||
}
|
||||
|
||||
impl TpuClient {
|
||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||
/// size
|
||||
pub async fn send_transaction(&self, transaction: &Transaction) -> bool {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.send_wire_transaction(wire_transaction).await
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
pub async fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
||||
self.try_send_wire_transaction(wire_transaction)
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||
/// size
|
||||
/// Returns the last error if all sends fail
|
||||
pub async fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
||||
self.try_send_wire_transaction(wire_transaction).await
|
||||
}
|
||||
|
||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||
/// Returns the last error if all sends fail
|
||||
async fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||
let mut last_error: Option<TransportError> = None;
|
||||
let mut some_success = false;
|
||||
for tpu_address in self
|
||||
.leader_tpu_service
|
||||
.leader_tpu_sockets(self.fanout_slots)
|
||||
{
|
||||
let conn = get_connection(&tpu_address);
|
||||
// Fake async
|
||||
let result = conn.send_wire_transaction_async(wire_transaction.clone());
|
||||
if let Err(err) = result {
|
||||
last_error = Some(err);
|
||||
} else {
|
||||
some_success = true;
|
||||
}
|
||||
}
|
||||
if !some_success {
|
||||
Err(if let Some(err) = last_error {
|
||||
err
|
||||
} else {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
|
||||
})
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new client that disconnects when dropped
|
||||
pub async fn new(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
websocket_url: &str,
|
||||
config: TpuClientConfig,
|
||||
) -> Result<Self> {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let leader_tpu_service =
|
||||
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone()).await?;
|
||||
|
||||
Ok(Self {
|
||||
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
|
||||
leader_tpu_service,
|
||||
exit,
|
||||
rpc_client,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send_and_confirm_messages_with_spinner<T: Signers>(
|
||||
&self,
|
||||
messages: &[Message],
|
||||
signers: &T,
|
||||
) -> Result<Vec<Option<TransactionError>>> {
|
||||
let mut expired_blockhash_retries = 5;
|
||||
let progress_bar = spinner::new_progress_bar();
|
||||
progress_bar.set_message("Setting up...");
|
||||
|
||||
let mut transactions = messages
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
|
||||
.collect::<Vec<_>>();
|
||||
let total_transactions = transactions.len();
|
||||
let mut transaction_errors = vec![None; transactions.len()];
|
||||
let mut confirmed_transactions = 0;
|
||||
let mut block_height = self.rpc_client.get_block_height().await?;
|
||||
while expired_blockhash_retries > 0 {
|
||||
let (blockhash, last_valid_block_height) = self
|
||||
.rpc_client
|
||||
.get_latest_blockhash_with_commitment(self.rpc_client.commitment())
|
||||
.await?;
|
||||
|
||||
let mut pending_transactions = HashMap::new();
|
||||
for (i, mut transaction) in transactions {
|
||||
transaction.try_sign(signers, blockhash)?;
|
||||
pending_transactions.insert(transaction.signatures[0], (i, transaction));
|
||||
}
|
||||
|
||||
let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
|
||||
while block_height <= last_valid_block_height {
|
||||
let num_transactions = pending_transactions.len();
|
||||
|
||||
// Periodically re-send all pending transactions
|
||||
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
|
||||
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
|
||||
if !self.send_transaction(transaction).await {
|
||||
let _result = self.rpc_client.send_transaction(transaction).await.ok();
|
||||
}
|
||||
spinner::set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
None, //block_height,
|
||||
last_valid_block_height,
|
||||
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
|
||||
);
|
||||
sleep(SEND_TRANSACTION_INTERVAL).await;
|
||||
}
|
||||
last_resend = Instant::now();
|
||||
}
|
||||
|
||||
// Wait for the next block before checking for transaction statuses
|
||||
let mut block_height_refreshes = 10;
|
||||
spinner::set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
Some(block_height),
|
||||
last_valid_block_height,
|
||||
&format!(
|
||||
"Waiting for next block, {} transactions pending...",
|
||||
num_transactions
|
||||
),
|
||||
);
|
||||
let mut new_block_height = block_height;
|
||||
while block_height == new_block_height && block_height_refreshes > 0 {
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
new_block_height = self.rpc_client.get_block_height().await?;
|
||||
block_height_refreshes -= 1;
|
||||
}
|
||||
block_height = new_block_height;
|
||||
|
||||
// Collect statuses for the transactions, drop those that are confirmed
|
||||
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
||||
for pending_signatures_chunk in
|
||||
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
||||
{
|
||||
if let Ok(result) = self
|
||||
.rpc_client
|
||||
.get_signature_statuses(pending_signatures_chunk)
|
||||
.await
|
||||
{
|
||||
let statuses = result.value;
|
||||
for (signature, status) in
|
||||
pending_signatures_chunk.iter().zip(statuses.into_iter())
|
||||
{
|
||||
if let Some(status) = status {
|
||||
if status.satisfies_commitment(self.rpc_client.commitment()) {
|
||||
if let Some((i, _)) = pending_transactions.remove(signature) {
|
||||
confirmed_transactions += 1;
|
||||
if status.err.is_some() {
|
||||
progress_bar.println(format!(
|
||||
"Failed transaction: {:?}",
|
||||
status
|
||||
));
|
||||
}
|
||||
transaction_errors[i] = status.err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
spinner::set_message_for_confirmed_transactions(
|
||||
&progress_bar,
|
||||
confirmed_transactions,
|
||||
total_transactions,
|
||||
Some(block_height),
|
||||
last_valid_block_height,
|
||||
"Checking transaction status...",
|
||||
);
|
||||
}
|
||||
|
||||
if pending_transactions.is_empty() {
|
||||
return Ok(transaction_errors);
|
||||
}
|
||||
}
|
||||
|
||||
transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
|
||||
progress_bar.println(format!(
|
||||
"Blockhash expired. {} retries remaining",
|
||||
expired_blockhash_retries
|
||||
));
|
||||
expired_blockhash_retries -= 1;
|
||||
}
|
||||
Err(TpuSenderError::Custom("Max retries exceeded".into()))
|
||||
}
|
||||
|
||||
pub fn rpc_client(&self) -> &RpcClient {
|
||||
&self.rpc_client
|
||||
}
|
||||
|
||||
pub async fn shutdown(&mut self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
self.leader_tpu_service.join().await;
|
||||
}
|
||||
}
|
||||
impl Drop for TpuClient {
|
||||
fn drop(&mut self) {
|
||||
self.exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Service that tracks upcoming leaders and maintains an up-to-date mapping
|
||||
/// of leader id to TPU socket address.
|
||||
pub struct LeaderTpuService {
|
||||
recent_slots: RecentLeaderSlots,
|
||||
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
|
||||
t_leader_tpu_service: Option<JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
impl LeaderTpuService {
|
||||
pub async fn new(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
websocket_url: &str,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<Self> {
|
||||
let start_slot = rpc_client
|
||||
.get_slot_with_commitment(CommitmentConfig::processed())
|
||||
.await?;
|
||||
|
||||
let recent_slots = RecentLeaderSlots::new(start_slot);
|
||||
let slots_in_epoch = rpc_client.get_epoch_info().await?.slots_in_epoch;
|
||||
let leaders = rpc_client
|
||||
.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))
|
||||
.await?;
|
||||
let cluster_nodes = rpc_client.get_cluster_nodes().await?;
|
||||
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
|
||||
start_slot,
|
||||
slots_in_epoch,
|
||||
leaders,
|
||||
cluster_nodes,
|
||||
)));
|
||||
|
||||
let pubsub_client = if !websocket_url.is_empty() {
|
||||
Some(PubsubClient::new(websocket_url).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let t_leader_tpu_service = Some({
|
||||
let recent_slots = recent_slots.clone();
|
||||
let leader_tpu_cache = leader_tpu_cache.clone();
|
||||
tokio::spawn(async move {
|
||||
Self::run(
|
||||
rpc_client,
|
||||
recent_slots,
|
||||
leader_tpu_cache,
|
||||
pubsub_client,
|
||||
exit,
|
||||
)
|
||||
.await
|
||||
})
|
||||
});
|
||||
|
||||
Ok(LeaderTpuService {
|
||||
recent_slots,
|
||||
leader_tpu_cache,
|
||||
t_leader_tpu_service,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn join(&mut self) {
|
||||
if let Some(t_handle) = self.t_leader_tpu_service.take() {
|
||||
t_handle.await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn estimated_current_slot(&self) -> Slot {
|
||||
self.recent_slots.estimated_current_slot()
|
||||
}
|
||||
|
||||
fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
|
||||
let current_slot = self.recent_slots.estimated_current_slot();
|
||||
self.leader_tpu_cache
|
||||
.read()
|
||||
.unwrap()
|
||||
.get_leader_sockets(current_slot, fanout_slots)
|
||||
}
|
||||
|
||||
async fn run(
|
||||
rpc_client: Arc<RpcClient>,
|
||||
recent_slots: RecentLeaderSlots,
|
||||
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
|
||||
pubsub_client: Option<PubsubClient>,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> Result<()> {
|
||||
let (mut notifications, unsubscribe) = if let Some(pubsub_client) = &pubsub_client {
|
||||
let (notifications, unsubscribe) = pubsub_client.slot_updates_subscribe().await?;
|
||||
(Some(notifications), Some(unsubscribe))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
let mut last_cluster_refresh = Instant::now();
|
||||
let mut sleep_ms = 1000;
|
||||
loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
if let Some(unsubscribe) = unsubscribe {
|
||||
(unsubscribe)().await;
|
||||
}
|
||||
// `notifications` requires a valid reference to `pubsub_client`
|
||||
// so `notifications` must be dropped before moving `pubsub_client`
|
||||
drop(notifications);
|
||||
if let Some(pubsub_client) = pubsub_client {
|
||||
pubsub_client.shutdown().await.unwrap();
|
||||
};
|
||||
break;
|
||||
}
|
||||
|
||||
// Sleep a slot before checking if leader cache needs to be refreshed again
|
||||
sleep(Duration::from_millis(sleep_ms)).await;
|
||||
sleep_ms = 1000;
|
||||
|
||||
if let Some(notifications) = &mut notifications {
|
||||
while let Ok(Some(update)) =
|
||||
timeout(Duration::from_millis(10), notifications.next()).await
|
||||
{
|
||||
let current_slot = match update {
|
||||
// This update indicates that a full slot was received by the connected
|
||||
// node so we can stop sending transactions to the leader for that slot
|
||||
SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
|
||||
// This update indicates that we have just received the first shred from
|
||||
// the leader for this slot and they are probably still accepting transactions.
|
||||
SlotUpdate::FirstShredReceived { slot, .. } => slot,
|
||||
_ => continue,
|
||||
};
|
||||
recent_slots.record_slot(current_slot);
|
||||
}
|
||||
}
|
||||
|
||||
let cache_update_info = maybe_fetch_cache_info(
|
||||
&leader_tpu_cache,
|
||||
last_cluster_refresh,
|
||||
&rpc_client,
|
||||
&recent_slots,
|
||||
)
|
||||
.await;
|
||||
|
||||
if cache_update_info.has_some() {
|
||||
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
|
||||
let (has_error, cluster_refreshed) = leader_tpu_cache
|
||||
.update_all(recent_slots.estimated_current_slot(), cache_update_info);
|
||||
if has_error {
|
||||
sleep_ms = 100;
|
||||
}
|
||||
if cluster_refreshed {
|
||||
last_cluster_refresh = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn maybe_fetch_cache_info(
|
||||
leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
|
||||
last_cluster_refresh: Instant,
|
||||
rpc_client: &RpcClient,
|
||||
recent_slots: &RecentLeaderSlots,
|
||||
) -> LeaderTpuCacheUpdateInfo {
|
||||
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
|
||||
// or new validators come online
|
||||
let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
|
||||
Some(rpc_client.get_cluster_nodes().await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let estimated_current_slot = recent_slots.estimated_current_slot();
|
||||
let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
|
||||
let leader_tpu_cache = leader_tpu_cache.read().unwrap();
|
||||
leader_tpu_cache.slot_info()
|
||||
};
|
||||
let maybe_epoch_info =
|
||||
if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
|
||||
Some(rpc_client.get_epoch_info().await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
|
||||
{
|
||||
Some(
|
||||
rpc_client
|
||||
.get_slot_leaders(
|
||||
estimated_current_slot,
|
||||
LeaderTpuCache::fanout(slots_in_epoch),
|
||||
)
|
||||
.await,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
LeaderTpuCacheUpdateInfo {
|
||||
maybe_cluster_nodes,
|
||||
maybe_epoch_info,
|
||||
maybe_slot_leaders,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
use {
|
||||
solana_client::{
|
||||
nonblocking::tpu_client::{LeaderTpuService, TpuClient},
|
||||
tpu_client::TpuClientConfig,
|
||||
},
|
||||
solana_sdk::{clock::DEFAULT_MS_PER_SLOT, pubkey::Pubkey, system_transaction},
|
||||
solana_test_validator::TestValidatorGenesis,
|
||||
std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
tokio::time::{sleep, Duration, Instant},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tpu_send_transaction() {
|
||||
let (test_validator, mint_keypair) = TestValidatorGenesis::default().start_async().await;
|
||||
let rpc_client = Arc::new(test_validator.get_async_rpc_client());
|
||||
let mut tpu_client = TpuClient::new(
|
||||
rpc_client.clone(),
|
||||
&test_validator.rpc_pubsub_url(),
|
||||
TpuClientConfig::default(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let recent_blockhash = rpc_client.get_latest_blockhash().await.unwrap();
|
||||
let tx =
|
||||
system_transaction::transfer(&mint_keypair, &Pubkey::new_unique(), 42, recent_blockhash);
|
||||
assert!(tpu_client.send_transaction(&tx).await);
|
||||
|
||||
let timeout = Duration::from_secs(5);
|
||||
let now = Instant::now();
|
||||
let signatures = vec![tx.signatures[0]];
|
||||
loop {
|
||||
assert!(now.elapsed() < timeout);
|
||||
let statuses = rpc_client
|
||||
.get_signature_statuses(&signatures)
|
||||
.await
|
||||
.unwrap();
|
||||
if statuses.value.get(0).is_some() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
tpu_client.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tpu_cache_slot_updates() {
|
||||
let (test_validator, _) = TestValidatorGenesis::default().start_async().await;
|
||||
let rpc_client = Arc::new(test_validator.get_async_rpc_client());
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let mut leader_tpu_service =
|
||||
LeaderTpuService::new(rpc_client, &test_validator.rpc_pubsub_url(), exit.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let start_slot = leader_tpu_service.estimated_current_slot();
|
||||
let timeout = Duration::from_secs(5);
|
||||
let sleep_time = Duration::from_millis(DEFAULT_MS_PER_SLOT);
|
||||
let now = Instant::now();
|
||||
loop {
|
||||
assert!(now.elapsed() < timeout);
|
||||
let current_slot = leader_tpu_service.estimated_current_slot();
|
||||
if current_slot != start_slot {
|
||||
break;
|
||||
}
|
||||
sleep(sleep_time).await;
|
||||
}
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
leader_tpu_service.join().await;
|
||||
}
|
Loading…
Reference in New Issue