tpu-client: Refactor to prep for async client (#25432)

This commit is contained in:
Jon Cinque 2022-05-21 20:22:30 +02:00 committed by GitHub
parent 3b3046ab3d
commit 09ea899a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 203 additions and 105 deletions

View File

@ -9,3 +9,26 @@ pub(crate) fn new_progress_bar() -> ProgressBar {
progress_bar.enable_steady_tick(100);
progress_bar
}
pub(crate) fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str,
) {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / total_transactions as f64,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
}

View File

@ -1,11 +1,11 @@
use {
crate::{
client_error::ClientError,
client_error::{ClientError, Result as ClientResult},
connection_cache::send_wire_transaction_async,
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
rpc_client::RpcClient,
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
rpc_response::SlotUpdate,
rpc_response::{RpcContactInfo, SlotUpdate},
spinner,
},
bincode::serialize,
@ -13,6 +13,7 @@ use {
solana_sdk::{
clock::Slot,
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
message::Message,
pubkey::Pubkey,
signature::SignerError,
@ -29,9 +30,9 @@ use {
Arc, RwLock,
},
thread::{sleep, JoinHandle},
time::{Duration, Instant},
},
thiserror::Error,
tokio::time::{Duration, Instant},
};
#[derive(Error, Debug)]
@ -56,6 +57,11 @@ pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
/// Maximum number of slots used to build TPU socket fanout set
pub const MAX_FANOUT_SLOTS: u64 = 100;
/// Send at ~100 TPS
pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/// Retry batch send after 4 seconds
pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
/// Config params for `TpuClient`
#[derive(Clone, Debug)]
pub struct TpuClientConfig {
@ -156,10 +162,6 @@ impl TpuClient {
signers: &T,
) -> Result<Vec<Option<TransactionError>>> {
let mut expired_blockhash_retries = 5;
/* Send at ~100 TPS */
const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/* Retry batch send after 4 seconds */
const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
let progress_bar = spinner::new_progress_bar();
progress_bar.set_message("Setting up...");
@ -169,29 +171,11 @@ impl TpuClient {
.enumerate()
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
.collect::<Vec<_>>();
let num_transactions = transactions.len() as f64;
let total_transactions = transactions.len();
let mut transaction_errors = vec![None; transactions.len()];
let set_message = |confirmed_transactions,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str| {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / num_transactions,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
};
let mut confirmed_transactions = 0;
let mut block_height = self.rpc_client.get_block_height()?;
while expired_blockhash_retries > 0 {
let (blockhash, last_valid_block_height) = self
.rpc_client
@ -213,8 +197,10 @@ impl TpuClient {
if !self.send_transaction(transaction) {
let _result = self.rpc_client.send_transaction(transaction).ok();
}
set_message(
spinner::set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
None, //block_height,
last_valid_block_height,
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
@ -226,8 +212,10 @@ impl TpuClient {
// Wait for the next block before checking for transaction statuses
let mut block_height_refreshes = 10;
set_message(
spinner::set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
Some(block_height),
last_valid_block_height,
&format!("Waiting for next block, {} pending...", num_transactions),
@ -269,8 +257,10 @@ impl TpuClient {
}
}
}
set_message(
spinner::set_message_for_confirmed_transactions(
&progress_bar,
confirmed_transactions,
total_transactions,
Some(block_height),
last_valid_block_height,
"Checking transaction status...",
@ -304,7 +294,20 @@ impl Drop for TpuClient {
}
}
struct LeaderTpuCache {
pub(crate) struct LeaderTpuCacheUpdateInfo {
pub(crate) maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
pub(crate) maybe_epoch_info: Option<ClientResult<EpochInfo>>,
pub(crate) maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
}
impl LeaderTpuCacheUpdateInfo {
pub(crate) fn has_some(&self) -> bool {
self.maybe_cluster_nodes.is_some()
|| self.maybe_epoch_info.is_some()
|| self.maybe_slot_leaders.is_some()
}
}
pub(crate) struct LeaderTpuCache {
first_slot: Slot,
leaders: Vec<Pubkey>,
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
@ -313,26 +316,41 @@ struct LeaderTpuCache {
}
impl LeaderTpuCache {
fn new(rpc_client: &RpcClient, first_slot: Slot) -> Result<Self> {
let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
let leaders = Self::fetch_slot_leaders(rpc_client, first_slot, slots_in_epoch)?;
let leader_tpu_map = Self::fetch_cluster_tpu_sockets(rpc_client)?;
Ok(Self {
pub(crate) fn new(
first_slot: Slot,
slots_in_epoch: Slot,
leaders: Vec<Pubkey>,
cluster_nodes: Vec<RpcContactInfo>,
) -> Self {
let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
Self {
first_slot,
leaders,
leader_tpu_map,
slots_in_epoch,
last_epoch_info_slot: first_slot,
})
}
}
// Last slot that has a cached leader pubkey
fn last_slot(&self) -> Slot {
pub(crate) fn last_slot(&self) -> Slot {
self.first_slot + self.leaders.len().saturating_sub(1) as u64
}
pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) {
(
self.last_slot(),
self.last_epoch_info_slot,
self.slots_in_epoch,
)
}
// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
pub(crate) fn get_leader_sockets(
&self,
current_slot: Slot,
fanout_slots: u64,
) -> Vec<SocketAddr> {
let mut leader_set = HashSet::new();
let mut leader_sockets = Vec::new();
for leader_slot in current_slot..current_slot + fanout_slots {
@ -358,7 +376,7 @@ impl LeaderTpuCache {
leader_sockets
}
fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
if slot >= self.first_slot {
let index = slot - self.first_slot;
self.leaders.get(index as usize)
@ -367,9 +385,10 @@ impl LeaderTpuCache {
}
}
fn fetch_cluster_tpu_sockets(rpc_client: &RpcClient) -> Result<HashMap<Pubkey, SocketAddr>> {
let cluster_contact_info = rpc_client.get_cluster_nodes()?;
Ok(cluster_contact_info
pub(crate) fn extract_cluster_tpu_sockets(
cluster_contact_info: Vec<RpcContactInfo>,
) -> HashMap<Pubkey, SocketAddr> {
cluster_contact_info
.into_iter()
.filter_map(|contact_info| {
Some((
@ -377,16 +396,97 @@ impl LeaderTpuCache {
contact_info.tpu?,
))
})
.collect())
.collect()
}
fn fetch_slot_leaders(
rpc_client: &RpcClient,
start_slot: Slot,
slots_in_epoch: Slot,
) -> Result<Vec<Pubkey>> {
let fanout = (2 * MAX_FANOUT_SLOTS).min(slots_in_epoch);
Ok(rpc_client.get_slot_leaders(start_slot, fanout)?)
pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot {
(2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
}
pub(crate) fn update_all(
&mut self,
estimated_current_slot: Slot,
cache_update_info: LeaderTpuCacheUpdateInfo,
) -> (bool, bool) {
let mut has_error = false;
let mut cluster_refreshed = false;
if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
match cluster_nodes {
Ok(cluster_nodes) => {
let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
self.leader_tpu_map = leader_tpu_map;
cluster_refreshed = true;
}
Err(err) => {
warn!("Failed to fetch cluster tpu sockets: {}", err);
has_error = true;
}
}
}
if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
self.slots_in_epoch = epoch_info.slots_in_epoch;
self.last_epoch_info_slot = estimated_current_slot;
}
if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
match slot_leaders {
Ok(slot_leaders) => {
self.first_slot = estimated_current_slot;
self.leaders = slot_leaders;
}
Err(err) => {
warn!(
"Failed to fetch slot leaders (current estimated slot: {}): {}",
estimated_current_slot, err
);
has_error = true;
}
}
}
(has_error, cluster_refreshed)
}
}
fn maybe_fetch_cache_info(
leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
last_cluster_refresh: Instant,
rpc_client: &RpcClient,
recent_slots: &RecentLeaderSlots,
) -> LeaderTpuCacheUpdateInfo {
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
// or new validators come online
let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
Some(rpc_client.get_cluster_nodes())
} else {
None
};
let estimated_current_slot = recent_slots.estimated_current_slot();
let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
let leader_tpu_cache = leader_tpu_cache.read().unwrap();
leader_tpu_cache.slot_info()
};
let maybe_epoch_info =
if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
Some(rpc_client.get_epoch_info())
} else {
None
};
let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
{
Some(rpc_client.get_slot_leaders(
estimated_current_slot,
LeaderTpuCache::fanout(slots_in_epoch),
))
} else {
None
};
LeaderTpuCacheUpdateInfo {
maybe_cluster_nodes,
maybe_epoch_info,
maybe_slot_leaders,
}
}
@ -394,15 +494,15 @@ impl LeaderTpuCache {
const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
#[derive(Clone, Debug)]
struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
impl RecentLeaderSlots {
fn new(current_slot: Slot) -> Self {
pub(crate) fn new(current_slot: Slot) -> Self {
let mut recent_slots = VecDeque::new();
recent_slots.push_back(current_slot);
Self(Arc::new(RwLock::new(recent_slots)))
}
fn record_slot(&self, current_slot: Slot) {
pub(crate) fn record_slot(&self, current_slot: Slot) {
let mut recent_slots = self.0.write().unwrap();
recent_slots.push_back(current_slot);
// 12 recent slots should be large enough to avoid a misbehaving
@ -413,7 +513,7 @@ impl RecentLeaderSlots {
}
// Estimate the current slot from recent slot notifications.
fn estimated_current_slot(&self) -> Slot {
pub(crate) fn estimated_current_slot(&self) -> Slot {
let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
assert!(!recent_slots.is_empty());
recent_slots.sort_unstable();
@ -458,7 +558,16 @@ impl LeaderTpuService {
let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
let recent_slots = RecentLeaderSlots::new(start_slot);
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(&rpc_client, start_slot)?));
let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
let leaders =
rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?;
let cluster_nodes = rpc_client.get_cluster_nodes()?;
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
start_slot,
slots_in_epoch,
leaders,
cluster_nodes,
)));
let subscription = if !websocket_url.is_empty() {
let recent_slots = recent_slots.clone();
@ -530,59 +639,25 @@ impl LeaderTpuService {
}
// Sleep a few slots before checking if leader cache needs to be refreshed again
std::thread::sleep(Duration::from_millis(sleep_ms));
sleep(Duration::from_millis(sleep_ms));
sleep_ms = 1000;
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
// or new validators come online
if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
match LeaderTpuCache::fetch_cluster_tpu_sockets(&rpc_client) {
Ok(leader_tpu_map) => {
leader_tpu_cache.write().unwrap().leader_tpu_map = leader_tpu_map;
last_cluster_refresh = Instant::now();
}
Err(err) => {
warn!("Failed to fetch cluster tpu sockets: {}", err);
sleep_ms = 100;
}
}
}
let cache_update_info = maybe_fetch_cache_info(
&leader_tpu_cache,
last_cluster_refresh,
&rpc_client,
&recent_slots,
);
let estimated_current_slot = recent_slots.estimated_current_slot();
let (last_slot, last_epoch_info_slot, mut slots_in_epoch) = {
let leader_tpu_cache = leader_tpu_cache.read().unwrap();
(
leader_tpu_cache.last_slot(),
leader_tpu_cache.last_epoch_info_slot,
leader_tpu_cache.slots_in_epoch,
)
};
if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
if let Ok(epoch_info) = rpc_client.get_epoch_info() {
slots_in_epoch = epoch_info.slots_in_epoch;
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.slots_in_epoch = slots_in_epoch;
leader_tpu_cache.last_epoch_info_slot = estimated_current_slot;
if cache_update_info.has_some() {
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
let (has_error, cluster_refreshed) = leader_tpu_cache
.update_all(recent_slots.estimated_current_slot(), cache_update_info);
if has_error {
sleep_ms = 100;
}
}
if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS) {
match LeaderTpuCache::fetch_slot_leaders(
&rpc_client,
estimated_current_slot,
slots_in_epoch,
) {
Ok(slot_leaders) => {
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
leader_tpu_cache.first_slot = estimated_current_slot;
leader_tpu_cache.leaders = slot_leaders;
}
Err(err) => {
warn!(
"Failed to fetch slot leaders (current estimated slot: {}): {}",
estimated_current_slot, err
);
sleep_ms = 100;
}
if cluster_refreshed {
last_cluster_refresh = Instant::now();
}
}
}