2022-08-09 05:33:14 -07:00
|
|
|
pub use crate::nonblocking::tpu_client::TpuSenderError;
|
2021-12-03 09:00:31 -08:00
|
|
|
use {
|
|
|
|
crate::{
|
2022-06-08 04:57:12 -07:00
|
|
|
connection_cache::ConnectionCache,
|
2022-08-09 05:33:14 -07:00
|
|
|
nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rpc_client::RpcClient,
|
2021-12-03 09:00:31 -08:00
|
|
|
},
|
|
|
|
solana_sdk::{
|
|
|
|
clock::Slot,
|
|
|
|
message::Message,
|
|
|
|
signers::Signers,
|
|
|
|
transaction::{Transaction, TransactionError},
|
2022-08-09 05:33:14 -07:00
|
|
|
transport::Result as TransportResult,
|
2021-12-03 09:00:31 -08:00
|
|
|
},
|
|
|
|
std::{
|
2022-08-09 05:33:14 -07:00
|
|
|
collections::VecDeque,
|
|
|
|
net::UdpSocket,
|
|
|
|
sync::{Arc, RwLock},
|
2021-04-22 18:35:12 -07:00
|
|
|
},
|
2022-08-09 05:33:14 -07:00
|
|
|
tokio::time::Duration,
|
2021-04-22 18:35:12 -07:00
|
|
|
};
|
|
|
|
|
|
|
|
type Result<T> = std::result::Result<T, TpuSenderError>;
|
|
|
|
|
|
|
|
/// Default number of slots used to build TPU socket fanout set
|
|
|
|
pub const DEFAULT_FANOUT_SLOTS: u64 = 12;
|
|
|
|
|
|
|
|
/// Maximum number of slots used to build TPU socket fanout set
|
|
|
|
pub const MAX_FANOUT_SLOTS: u64 = 100;
|
|
|
|
|
2022-05-21 11:22:30 -07:00
|
|
|
/// Send at ~100 TPS
|
|
|
|
pub(crate) const SEND_TRANSACTION_INTERVAL: Duration = Duration::from_millis(10);
|
|
|
|
/// Retry batch send after 4 seconds
|
|
|
|
pub(crate) const TRANSACTION_RESEND_INTERVAL: Duration = Duration::from_secs(4);
|
|
|
|
|
2021-04-22 18:35:12 -07:00
|
|
|
/// Config params for `TpuClient`
|
|
|
|
#[derive(Clone, Debug)]
|
|
|
|
pub struct TpuClientConfig {
|
|
|
|
/// The range of upcoming slots to include when determining which
|
2021-09-07 16:08:20 -07:00
|
|
|
/// leaders to send transactions to (min: 1, max: `MAX_FANOUT_SLOTS`)
|
2021-04-22 18:35:12 -07:00
|
|
|
pub fanout_slots: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for TpuClientConfig {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self {
|
|
|
|
fanout_slots: DEFAULT_FANOUT_SLOTS,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
|
|
|
/// The client uses RPC to determine the current leader and fetch node contact info
|
|
|
|
pub struct TpuClient {
|
2022-04-13 11:17:10 -07:00
|
|
|
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
|
2022-08-09 05:33:14 -07:00
|
|
|
//todo: get rid of this field
|
2021-10-26 13:54:26 -07:00
|
|
|
rpc_client: Arc<RpcClient>,
|
2022-08-09 05:33:14 -07:00
|
|
|
tpu_client: Arc<NonblockingTpuClient>,
|
2021-04-22 18:35:12 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
impl TpuClient {
|
2021-09-07 16:08:20 -07:00
|
|
|
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
|
|
|
/// size
|
2021-04-22 18:35:12 -07:00
|
|
|
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
|
2022-08-09 05:33:14 -07:00
|
|
|
self.invoke(self.tpu_client.send_transaction(transaction))
|
2021-04-22 18:35:12 -07:00
|
|
|
}
|
|
|
|
|
2021-09-07 16:08:20 -07:00
|
|
|
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
2022-04-13 11:17:10 -07:00
|
|
|
pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
2022-08-09 05:33:14 -07:00
|
|
|
self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
|
2022-04-12 08:43:29 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
|
|
|
/// size
|
|
|
|
/// Returns the last error if all sends fail
|
2022-04-13 11:17:10 -07:00
|
|
|
pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
2022-08-09 05:33:14 -07:00
|
|
|
self.invoke(self.tpu_client.try_send_transaction(transaction))
|
2022-04-12 08:43:29 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
|
|
|
/// Returns the last error if all sends fail
|
2022-08-09 05:33:14 -07:00
|
|
|
pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
|
|
|
self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
|
2021-04-22 18:35:12 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new client that disconnects when dropped
|
|
|
|
pub fn new(
|
|
|
|
rpc_client: Arc<RpcClient>,
|
|
|
|
websocket_url: &str,
|
|
|
|
config: TpuClientConfig,
|
2022-06-08 04:57:12 -07:00
|
|
|
) -> Result<Self> {
|
2022-08-09 05:33:14 -07:00
|
|
|
let create_tpu_client =
|
|
|
|
NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config);
|
|
|
|
let tpu_client =
|
|
|
|
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
|
|
|
rpc_client,
|
|
|
|
tpu_client: Arc::new(tpu_client),
|
|
|
|
})
|
2022-06-08 04:57:12 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Create a new client that disconnects when dropped
|
|
|
|
pub fn new_with_connection_cache(
|
|
|
|
rpc_client: Arc<RpcClient>,
|
|
|
|
websocket_url: &str,
|
|
|
|
config: TpuClientConfig,
|
|
|
|
connection_cache: Arc<ConnectionCache>,
|
2021-04-22 18:35:12 -07:00
|
|
|
) -> Result<Self> {
|
2022-08-09 05:33:14 -07:00
|
|
|
let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
|
|
|
|
rpc_client.get_inner_client().clone(),
|
|
|
|
websocket_url,
|
|
|
|
config,
|
|
|
|
connection_cache,
|
|
|
|
);
|
|
|
|
let tpu_client =
|
|
|
|
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
|
2021-04-22 18:35:12 -07:00
|
|
|
|
|
|
|
Ok(Self {
|
2022-04-13 11:17:10 -07:00
|
|
|
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
2021-10-26 13:54:26 -07:00
|
|
|
rpc_client,
|
2022-08-09 05:33:14 -07:00
|
|
|
tpu_client: Arc::new(tpu_client),
|
2021-04-22 18:35:12 -07:00
|
|
|
})
|
|
|
|
}
|
2021-10-26 13:54:26 -07:00
|
|
|
|
|
|
|
pub fn send_and_confirm_messages_with_spinner<T: Signers>(
|
|
|
|
&self,
|
|
|
|
messages: &[Message],
|
|
|
|
signers: &T,
|
|
|
|
) -> Result<Vec<Option<TransactionError>>> {
|
2022-08-09 05:33:14 -07:00
|
|
|
self.invoke(
|
|
|
|
self.tpu_client
|
|
|
|
.send_and_confirm_messages_with_spinner(messages, signers),
|
|
|
|
)
|
2021-10-26 13:54:26 -07:00
|
|
|
}
|
2022-04-12 08:43:29 -07:00
|
|
|
|
|
|
|
pub fn rpc_client(&self) -> &RpcClient {
|
|
|
|
&self.rpc_client
|
|
|
|
}
|
2021-04-22 18:35:12 -07:00
|
|
|
|
2022-08-09 05:33:14 -07:00
|
|
|
fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
|
|
|
|
// `block_on()` panics if called within an asynchronous execution context. Whereas
|
|
|
|
// `block_in_place()` only panics if called from a current_thread runtime, which is the
|
|
|
|
// lesser evil.
|
|
|
|
tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
|
2021-04-22 18:35:12 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
|
|
|
|
const MAX_SLOT_SKIP_DISTANCE: u64 = 48;
|
|
|
|
|
|
|
|
#[derive(Clone, Debug)]
|
2022-05-21 11:22:30 -07:00
|
|
|
pub(crate) struct RecentLeaderSlots(Arc<RwLock<VecDeque<Slot>>>);
|
2021-04-22 18:35:12 -07:00
|
|
|
impl RecentLeaderSlots {
|
2022-05-21 11:22:30 -07:00
|
|
|
pub(crate) fn new(current_slot: Slot) -> Self {
|
2021-04-22 18:35:12 -07:00
|
|
|
let mut recent_slots = VecDeque::new();
|
|
|
|
recent_slots.push_back(current_slot);
|
|
|
|
Self(Arc::new(RwLock::new(recent_slots)))
|
|
|
|
}
|
|
|
|
|
2022-05-21 11:22:30 -07:00
|
|
|
pub(crate) fn record_slot(&self, current_slot: Slot) {
|
2021-04-22 18:35:12 -07:00
|
|
|
let mut recent_slots = self.0.write().unwrap();
|
|
|
|
recent_slots.push_back(current_slot);
|
|
|
|
// 12 recent slots should be large enough to avoid a misbehaving
|
|
|
|
// validator from affecting the median recent slot
|
|
|
|
while recent_slots.len() > 12 {
|
|
|
|
recent_slots.pop_front();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Estimate the current slot from recent slot notifications.
|
2022-05-21 11:22:30 -07:00
|
|
|
pub(crate) fn estimated_current_slot(&self) -> Slot {
|
2021-04-22 18:35:12 -07:00
|
|
|
let mut recent_slots: Vec<Slot> = self.0.read().unwrap().iter().cloned().collect();
|
|
|
|
assert!(!recent_slots.is_empty());
|
|
|
|
recent_slots.sort_unstable();
|
|
|
|
|
|
|
|
// Validators can broadcast invalid blocks that are far in the future
|
|
|
|
// so check if the current slot is in line with the recent progression.
|
|
|
|
let max_index = recent_slots.len() - 1;
|
|
|
|
let median_index = max_index / 2;
|
|
|
|
let median_recent_slot = recent_slots[median_index];
|
|
|
|
let expected_current_slot = median_recent_slot + (max_index - median_index) as u64;
|
|
|
|
let max_reasonable_current_slot = expected_current_slot + MAX_SLOT_SKIP_DISTANCE;
|
|
|
|
|
|
|
|
// Return the highest slot that doesn't exceed what we believe is a
|
|
|
|
// reasonable slot.
|
|
|
|
recent_slots
|
|
|
|
.into_iter()
|
|
|
|
.rev()
|
|
|
|
.find(|slot| *slot <= max_reasonable_current_slot)
|
|
|
|
.unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
impl From<Vec<Slot>> for RecentLeaderSlots {
|
|
|
|
fn from(recent_slots: Vec<Slot>) -> Self {
|
|
|
|
assert!(!recent_slots.is_empty());
|
|
|
|
Self(Arc::new(RwLock::new(recent_slots.into_iter().collect())))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
fn assert_slot(recent_slots: RecentLeaderSlots, expected_slot: Slot) {
|
|
|
|
assert_eq!(recent_slots.estimated_current_slot(), expected_slot);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_recent_leader_slots() {
|
|
|
|
assert_slot(RecentLeaderSlots::new(0), 0);
|
|
|
|
|
|
|
|
let mut recent_slots: Vec<Slot> = (1..=12).collect();
|
|
|
|
assert_slot(RecentLeaderSlots::from(recent_slots.clone()), 12);
|
|
|
|
|
|
|
|
recent_slots.reverse();
|
|
|
|
assert_slot(RecentLeaderSlots::from(recent_slots), 12);
|
|
|
|
|
|
|
|
assert_slot(
|
|
|
|
RecentLeaderSlots::from(vec![0, 1 + MAX_SLOT_SKIP_DISTANCE]),
|
|
|
|
1 + MAX_SLOT_SKIP_DISTANCE,
|
|
|
|
);
|
|
|
|
assert_slot(
|
|
|
|
RecentLeaderSlots::from(vec![0, 2 + MAX_SLOT_SKIP_DISTANCE]),
|
|
|
|
0,
|
|
|
|
);
|
|
|
|
|
|
|
|
assert_slot(RecentLeaderSlots::from(vec![1]), 1);
|
|
|
|
assert_slot(RecentLeaderSlots::from(vec![1, 100]), 1);
|
|
|
|
assert_slot(RecentLeaderSlots::from(vec![1, 2, 100]), 2);
|
|
|
|
assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 100]), 3);
|
|
|
|
assert_slot(RecentLeaderSlots::from(vec![1, 2, 3, 99, 100]), 3);
|
|
|
|
}
|
|
|
|
}
|