Cleanup tempoary pub modules (#30268)

Clean up temporary_pub_modules in tpu_client and thin_client
This commit is contained in:
Lijun Wang 2023-02-10 20:35:09 -08:00 committed by GitHub
parent 8770b15bb2
commit d49b48100d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 113 deletions

View File

@ -13,7 +13,7 @@ use {
transaction::{Transaction, TransactionError},
transport::Result as TransportResult,
},
solana_tpu_client::nonblocking::tpu_client::{temporary_pub::*, TpuClient as BackendTpuClient},
solana_tpu_client::nonblocking::tpu_client::{Result, TpuClient as BackendTpuClient},
std::sync::Arc,
};

View File

@ -12,7 +12,7 @@ use {
transaction::{Transaction, TransactionError},
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{temporary_pub::Result, TpuClient as BackendTpuClient},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
std::sync::Arc,
};
pub use {

View File

@ -43,77 +43,72 @@ use {
},
};
pub mod temporary_pub {
use super::*;
struct ClientOptimizer {
cur_index: AtomicUsize,
experiment_index: AtomicUsize,
experiment_done: AtomicBool,
times: RwLock<Vec<u64>>,
num_clients: usize,
}
pub struct ClientOptimizer {
cur_index: AtomicUsize,
experiment_index: AtomicUsize,
experiment_done: AtomicBool,
times: RwLock<Vec<u64>>,
num_clients: usize,
impl ClientOptimizer {
fn new(num_clients: usize) -> Self {
Self {
cur_index: AtomicUsize::new(0),
experiment_index: AtomicUsize::new(0),
experiment_done: AtomicBool::new(false),
times: RwLock::new(vec![std::u64::MAX; num_clients]),
num_clients,
}
}
impl ClientOptimizer {
pub fn new(num_clients: usize) -> Self {
Self {
cur_index: AtomicUsize::new(0),
experiment_index: AtomicUsize::new(0),
experiment_done: AtomicBool::new(false),
times: RwLock::new(vec![std::u64::MAX; num_clients]),
num_clients,
}
}
pub fn experiment(&self) -> usize {
if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
if old < self.num_clients {
old
} else {
self.best()
}
fn experiment(&self) -> usize {
if self.experiment_index.load(Ordering::Relaxed) < self.num_clients {
let old = self.experiment_index.fetch_add(1, Ordering::Relaxed);
if old < self.num_clients {
old
} else {
self.best()
}
}
pub fn report(&self, index: usize, time_ms: u64) {
if self.num_clients > 1
&& (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
{
trace!(
"report {} with {} exp: {}",
index,
time_ms,
self.experiment_index.load(Ordering::Relaxed)
);
self.times.write().unwrap()[index] = time_ms;
if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
let times = self.times.read().unwrap();
let (min_time, min_index) = min_index(&times);
trace!(
"done experimenting min: {} time: {} times: {:?}",
min_index,
min_time,
times
);
// Only 1 thread should grab the num_clients-1 index, so this should be ok.
self.cur_index.store(min_index, Ordering::Relaxed);
self.experiment_done.store(true, Ordering::Relaxed);
}
}
}
pub fn best(&self) -> usize {
self.cur_index.load(Ordering::Relaxed)
} else {
self.best()
}
}
fn report(&self, index: usize, time_ms: u64) {
if self.num_clients > 1
&& (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX)
{
trace!(
"report {} with {} exp: {}",
index,
time_ms,
self.experiment_index.load(Ordering::Relaxed)
);
self.times.write().unwrap()[index] = time_ms;
if index == (self.num_clients - 1) || time_ms == std::u64::MAX {
let times = self.times.read().unwrap();
let (min_time, min_index) = min_index(&times);
trace!(
"done experimenting min: {} time: {} times: {:?}",
min_index,
min_time,
times
);
// Only 1 thread should grab the num_clients-1 index, so this should be ok.
self.cur_index.store(min_index, Ordering::Relaxed);
self.experiment_done.store(true, Ordering::Relaxed);
}
}
}
fn best(&self) -> usize {
self.cur_index.load(Ordering::Relaxed)
}
}
use temporary_pub::*;
/// An object for querying and sending transactions to the network.
pub struct ThinClient<

View File

@ -1,11 +1,4 @@
#[cfg(feature = "spinner")]
use {
crate::tpu_client::temporary_pub::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner,
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
};
pub use crate::tpu_client::Result;
use {
crate::tpu_client::{RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS},
bincode::serialize,
@ -48,37 +41,38 @@ use {
time::{sleep, timeout, Duration, Instant},
},
};
#[cfg(feature = "spinner")]
use {
crate::tpu_client::{SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL},
indicatif::ProgressBar,
solana_rpc_client::spinner,
solana_rpc_client_api::request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
solana_sdk::{message::Message, signers::Signers, transaction::TransactionError},
};
pub mod temporary_pub {
use super::*;
pub type Result<T> = std::result::Result<T, TpuSenderError>;
#[cfg(feature = "spinner")]
pub fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str,
) {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / total_transactions as f64,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
}
#[cfg(feature = "spinner")]
fn set_message_for_confirmed_transactions(
progress_bar: &ProgressBar,
confirmed_transactions: u32,
total_transactions: usize,
block_height: Option<u64>,
last_valid_block_height: u64,
status: &str,
) {
progress_bar.set_message(format!(
"{:>5.1}% | {:<40}{}",
confirmed_transactions as f64 * 100. / total_transactions as f64,
status,
match block_height {
Some(block_height) => format!(
" [block height {}; re-sign in {} blocks]",
block_height,
last_valid_block_height.saturating_sub(block_height),
),
None => String::new(),
},
));
}
use temporary_pub::*;
#[derive(Error, Debug)]
pub enum TpuSenderError {

View File

@ -23,19 +23,14 @@ pub const DEFAULT_TPU_ENABLE_UDP: bool = false;
pub const DEFAULT_TPU_USE_QUIC: bool = true;
pub const DEFAULT_TPU_CONNECTION_POOL_SIZE: usize = 4;
pub mod temporary_pub {
use super::*;
pub type Result<T> = std::result::Result<T, TpuSenderError>;
pub type Result<T> = std::result::Result<T, TpuSenderError>;
/// Send at ~100 TPS
#[cfg(feature = "spinner")]
pub const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/// Retry batch send after 4 seconds
#[cfg(feature = "spinner")]
pub const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
}
use temporary_pub::*;
/// Send at ~100 TPS
#[cfg(feature = "spinner")]
pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
/// Retry batch send after 4 seconds
#[cfg(feature = "spinner")]
pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
/// Default number of slots used to build TPU socket fanout set
pub const DEFAULT_FANOUT_SLOTS: u64 = 12;