client: Use async TPU client in sync TPU client by sharing tokio runtime (#26996)
* Make the sync tpu client use the async tpu client * Try to fix CI errors * Fix formatting * Make rpc_client::get_nonblocking_client public only in the crate * Save work * Temporary hack to test sharing runtime between tpu_client and rpc_client * [WIP] Copy rpc client * Fix build * Small refactoring * Remove copies * Refactor access to RPC client fields * Change `clone_inner_client` to `get_inner_client` Co-authored-by: Ryan Leung <ryan.leung@solana.com>
This commit is contained in:
parent
632752d2f9
commit
04cac610cc
|
@ -64,8 +64,14 @@ pub enum PubsubClientError {
|
||||||
#[error("subscribe failed: {reason}")]
|
#[error("subscribe failed: {reason}")]
|
||||||
SubscribeFailed { reason: String, message: String },
|
SubscribeFailed { reason: String, message: String },
|
||||||
|
|
||||||
|
#[error("unexpected message format: {0}")]
|
||||||
|
UnexpectedMessageError(String),
|
||||||
|
|
||||||
#[error("request failed: {reason}")]
|
#[error("request failed: {reason}")]
|
||||||
RequestFailed { reason: String, message: String },
|
RequestFailed { reason: String, message: String },
|
||||||
|
|
||||||
|
#[error("request error: {0}")]
|
||||||
|
RequestError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
|
type UnsubscribeFn = Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send>;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
client_error::ClientError,
|
client_error::{ClientError, Result as ClientResult},
|
||||||
connection_cache::ConnectionCache,
|
connection_cache::ConnectionCache,
|
||||||
nonblocking::{
|
nonblocking::{
|
||||||
pubsub_client::{PubsubClient, PubsubClientError},
|
pubsub_client::{PubsubClient, PubsubClientError},
|
||||||
|
@ -8,11 +8,11 @@ use {
|
||||||
tpu_connection::TpuConnection,
|
tpu_connection::TpuConnection,
|
||||||
},
|
},
|
||||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
||||||
rpc_response::SlotUpdate,
|
rpc_response::{RpcContactInfo, SlotUpdate},
|
||||||
spinner,
|
spinner,
|
||||||
tpu_client::{
|
tpu_client::{
|
||||||
LeaderTpuCache, LeaderTpuCacheUpdateInfo, RecentLeaderSlots, TpuClientConfig,
|
RecentLeaderSlots, TpuClientConfig, MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL,
|
||||||
MAX_FANOUT_SLOTS, SEND_TRANSACTION_INTERVAL, TRANSACTION_RESEND_INTERVAL,
|
TRANSACTION_RESEND_INTERVAL,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
bincode::serialize,
|
bincode::serialize,
|
||||||
|
@ -21,15 +21,18 @@ use {
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
commitment_config::CommitmentConfig,
|
commitment_config::CommitmentConfig,
|
||||||
|
epoch_info::EpochInfo,
|
||||||
message::Message,
|
message::Message,
|
||||||
|
pubkey::Pubkey,
|
||||||
signature::SignerError,
|
signature::SignerError,
|
||||||
signers::Signers,
|
signers::Signers,
|
||||||
transaction::{Transaction, TransactionError},
|
transaction::{Transaction, TransactionError},
|
||||||
transport::{Result as TransportResult, TransportError},
|
transport::{Result as TransportResult, TransportError},
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
collections::HashMap,
|
collections::{HashMap, HashSet},
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
|
str::FromStr,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicBool, Ordering},
|
||||||
Arc, RwLock,
|
Arc, RwLock,
|
||||||
|
@ -56,6 +59,156 @@ pub enum TpuSenderError {
|
||||||
Custom(String),
|
Custom(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct LeaderTpuCacheUpdateInfo {
|
||||||
|
maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
|
||||||
|
maybe_epoch_info: Option<ClientResult<EpochInfo>>,
|
||||||
|
maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
|
||||||
|
}
|
||||||
|
impl LeaderTpuCacheUpdateInfo {
|
||||||
|
pub fn has_some(&self) -> bool {
|
||||||
|
self.maybe_cluster_nodes.is_some()
|
||||||
|
|| self.maybe_epoch_info.is_some()
|
||||||
|
|| self.maybe_slot_leaders.is_some()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LeaderTpuCache {
|
||||||
|
first_slot: Slot,
|
||||||
|
leaders: Vec<Pubkey>,
|
||||||
|
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
|
||||||
|
slots_in_epoch: Slot,
|
||||||
|
last_epoch_info_slot: Slot,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LeaderTpuCache {
|
||||||
|
pub fn new(
|
||||||
|
first_slot: Slot,
|
||||||
|
slots_in_epoch: Slot,
|
||||||
|
leaders: Vec<Pubkey>,
|
||||||
|
cluster_nodes: Vec<RpcContactInfo>,
|
||||||
|
) -> Self {
|
||||||
|
let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
|
||||||
|
Self {
|
||||||
|
first_slot,
|
||||||
|
leaders,
|
||||||
|
leader_tpu_map,
|
||||||
|
slots_in_epoch,
|
||||||
|
last_epoch_info_slot: first_slot,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last slot that has a cached leader pubkey
|
||||||
|
pub fn last_slot(&self) -> Slot {
|
||||||
|
self.first_slot + self.leaders.len().saturating_sub(1) as u64
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn slot_info(&self) -> (Slot, Slot, Slot) {
|
||||||
|
(
|
||||||
|
self.last_slot(),
|
||||||
|
self.last_epoch_info_slot,
|
||||||
|
self.slots_in_epoch,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
|
||||||
|
pub fn get_leader_sockets(&self, current_slot: Slot, fanout_slots: u64) -> Vec<SocketAddr> {
|
||||||
|
let mut leader_set = HashSet::new();
|
||||||
|
let mut leader_sockets = Vec::new();
|
||||||
|
for leader_slot in current_slot..current_slot + fanout_slots {
|
||||||
|
if let Some(leader) = self.get_slot_leader(leader_slot) {
|
||||||
|
if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
|
||||||
|
if leader_set.insert(*leader) {
|
||||||
|
leader_sockets.push(*tpu_socket);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The leader is probably delinquent
|
||||||
|
trace!("TPU not available for leader {}", leader);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Overran the local leader schedule cache
|
||||||
|
warn!(
|
||||||
|
"Leader not known for slot {}; cache holds slots [{},{}]",
|
||||||
|
leader_slot,
|
||||||
|
self.first_slot,
|
||||||
|
self.last_slot()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
leader_sockets
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
|
||||||
|
if slot >= self.first_slot {
|
||||||
|
let index = slot - self.first_slot;
|
||||||
|
self.leaders.get(index as usize)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extract_cluster_tpu_sockets(
|
||||||
|
cluster_contact_info: Vec<RpcContactInfo>,
|
||||||
|
) -> HashMap<Pubkey, SocketAddr> {
|
||||||
|
cluster_contact_info
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|contact_info| {
|
||||||
|
Some((
|
||||||
|
Pubkey::from_str(&contact_info.pubkey).ok()?,
|
||||||
|
contact_info.tpu?,
|
||||||
|
))
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fanout(slots_in_epoch: Slot) -> Slot {
|
||||||
|
(2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_all(
|
||||||
|
&mut self,
|
||||||
|
estimated_current_slot: Slot,
|
||||||
|
cache_update_info: LeaderTpuCacheUpdateInfo,
|
||||||
|
) -> (bool, bool) {
|
||||||
|
let mut has_error = false;
|
||||||
|
let mut cluster_refreshed = false;
|
||||||
|
if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
|
||||||
|
match cluster_nodes {
|
||||||
|
Ok(cluster_nodes) => {
|
||||||
|
let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
|
||||||
|
self.leader_tpu_map = leader_tpu_map;
|
||||||
|
cluster_refreshed = true;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Failed to fetch cluster tpu sockets: {}", err);
|
||||||
|
has_error = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
|
||||||
|
self.slots_in_epoch = epoch_info.slots_in_epoch;
|
||||||
|
self.last_epoch_info_slot = estimated_current_slot;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
|
||||||
|
match slot_leaders {
|
||||||
|
Ok(slot_leaders) => {
|
||||||
|
self.first_slot = estimated_current_slot;
|
||||||
|
self.leaders = slot_leaders;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!(
|
||||||
|
"Failed to fetch slot leaders (current estimated slot: {}): {}",
|
||||||
|
estimated_current_slot, err
|
||||||
|
);
|
||||||
|
has_error = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(has_error, cluster_refreshed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, TpuSenderError>;
|
type Result<T> = std::result::Result<T, TpuSenderError>;
|
||||||
|
|
||||||
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
/// Client which sends transactions directly to the current leader's TPU port over UDP.
|
||||||
|
@ -102,7 +255,10 @@ impl TpuClient {
|
||||||
|
|
||||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||||
/// Returns the last error if all sends fail
|
/// Returns the last error if all sends fail
|
||||||
async fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
pub async fn try_send_wire_transaction(
|
||||||
|
&self,
|
||||||
|
wire_transaction: Vec<u8>,
|
||||||
|
) -> TransportResult<()> {
|
||||||
let leaders = self
|
let leaders = self
|
||||||
.leader_tpu_service
|
.leader_tpu_service
|
||||||
.leader_tpu_sockets(self.fanout_slots);
|
.leader_tpu_sockets(self.fanout_slots);
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub use crate::nonblocking::pubsub_client::PubsubClientError;
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
rpc_config::{
|
rpc_config::{
|
||||||
|
@ -31,29 +32,10 @@ use {
|
||||||
thread::{sleep, JoinHandle},
|
thread::{sleep, JoinHandle},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
|
||||||
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
|
tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket},
|
||||||
url::{ParseError, Url},
|
url::Url,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum PubsubClientError {
|
|
||||||
#[error("url parse error")]
|
|
||||||
UrlParseError(#[from] ParseError),
|
|
||||||
|
|
||||||
#[error("unable to connect to server")]
|
|
||||||
ConnectionError(#[from] tungstenite::Error),
|
|
||||||
|
|
||||||
#[error("json parse error")]
|
|
||||||
JsonParseError(#[from] serde_json::error::Error),
|
|
||||||
|
|
||||||
#[error("unexpected message format: {0}")]
|
|
||||||
UnexpectedMessageError(String),
|
|
||||||
|
|
||||||
#[error("request error: {0}")]
|
|
||||||
RequestError(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct PubsubClientSubscription<T>
|
pub struct PubsubClientSubscription<T>
|
||||||
where
|
where
|
||||||
T: DeserializeOwned,
|
T: DeserializeOwned,
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,55 +1,24 @@
|
||||||
|
pub use crate::nonblocking::tpu_client::TpuSenderError;
|
||||||
use {
|
use {
|
||||||
crate::{
|
crate::{
|
||||||
client_error::{ClientError, Result as ClientResult},
|
|
||||||
connection_cache::ConnectionCache,
|
connection_cache::ConnectionCache,
|
||||||
pubsub_client::{PubsubClient, PubsubClientError, PubsubClientSubscription},
|
nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rpc_client::RpcClient,
|
||||||
rpc_client::RpcClient,
|
|
||||||
rpc_request::MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS,
|
|
||||||
rpc_response::{RpcContactInfo, SlotUpdate},
|
|
||||||
spinner,
|
|
||||||
tpu_connection::TpuConnection,
|
|
||||||
},
|
},
|
||||||
bincode::serialize,
|
|
||||||
log::*,
|
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
commitment_config::CommitmentConfig,
|
|
||||||
epoch_info::EpochInfo,
|
|
||||||
message::Message,
|
message::Message,
|
||||||
pubkey::Pubkey,
|
|
||||||
signature::SignerError,
|
|
||||||
signers::Signers,
|
signers::Signers,
|
||||||
transaction::{Transaction, TransactionError},
|
transaction::{Transaction, TransactionError},
|
||||||
transport::{Result as TransportResult, TransportError},
|
transport::Result as TransportResult,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
collections::{HashMap, HashSet, VecDeque},
|
collections::VecDeque,
|
||||||
net::{SocketAddr, UdpSocket},
|
net::UdpSocket,
|
||||||
str::FromStr,
|
sync::{Arc, RwLock},
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc, RwLock,
|
|
||||||
},
|
|
||||||
thread::{sleep, JoinHandle},
|
|
||||||
},
|
},
|
||||||
thiserror::Error,
|
tokio::time::Duration,
|
||||||
tokio::time::{Duration, Instant},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum TpuSenderError {
|
|
||||||
#[error("Pubsub error: {0:?}")]
|
|
||||||
PubsubError(#[from] PubsubClientError),
|
|
||||||
#[error("RPC error: {0:?}")]
|
|
||||||
RpcError(#[from] ClientError),
|
|
||||||
#[error("IO error: {0:?}")]
|
|
||||||
IoError(#[from] std::io::Error),
|
|
||||||
#[error("Signer error: {0:?}")]
|
|
||||||
SignerError(#[from] SignerError),
|
|
||||||
#[error("Custom error: {0}")]
|
|
||||||
Custom(String),
|
|
||||||
}
|
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, TpuSenderError>;
|
type Result<T> = std::result::Result<T, TpuSenderError>;
|
||||||
|
|
||||||
/// Default number of slots used to build TPU socket fanout set
|
/// Default number of slots used to build TPU socket fanout set
|
||||||
|
@ -83,61 +52,34 @@ impl Default for TpuClientConfig {
|
||||||
/// The client uses RPC to determine the current leader and fetch node contact info
|
/// The client uses RPC to determine the current leader and fetch node contact info
|
||||||
pub struct TpuClient {
|
pub struct TpuClient {
|
||||||
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
|
_deprecated: UdpSocket, // TpuClient now uses the connection_cache to choose a send_socket
|
||||||
fanout_slots: u64,
|
//todo: get rid of this field
|
||||||
leader_tpu_service: LeaderTpuService,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
rpc_client: Arc<RpcClient>,
|
rpc_client: Arc<RpcClient>,
|
||||||
connection_cache: Arc<ConnectionCache>,
|
tpu_client: Arc<NonblockingTpuClient>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TpuClient {
|
impl TpuClient {
|
||||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||||
/// size
|
/// size
|
||||||
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
|
pub fn send_transaction(&self, transaction: &Transaction) -> bool {
|
||||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
self.invoke(self.tpu_client.send_transaction(transaction))
|
||||||
self.send_wire_transaction(wire_transaction)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||||
pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
pub fn send_wire_transaction(&self, wire_transaction: Vec<u8>) -> bool {
|
||||||
self.try_send_wire_transaction(wire_transaction).is_ok()
|
self.invoke(self.tpu_client.send_wire_transaction(wire_transaction))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
|
||||||
/// size
|
/// size
|
||||||
/// Returns the last error if all sends fail
|
/// Returns the last error if all sends fail
|
||||||
pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
pub fn try_send_transaction(&self, transaction: &Transaction) -> TransportResult<()> {
|
||||||
let wire_transaction = serialize(transaction).expect("serialization should succeed");
|
self.invoke(self.tpu_client.try_send_transaction(transaction))
|
||||||
self.try_send_wire_transaction(wire_transaction)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
|
||||||
/// Returns the last error if all sends fail
|
/// Returns the last error if all sends fail
|
||||||
fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {
|
||||||
let mut last_error: Option<TransportError> = None;
|
self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction))
|
||||||
let mut some_success = false;
|
|
||||||
|
|
||||||
for tpu_address in self
|
|
||||||
.leader_tpu_service
|
|
||||||
.leader_tpu_sockets(self.fanout_slots)
|
|
||||||
{
|
|
||||||
let conn = self.connection_cache.get_connection(&tpu_address);
|
|
||||||
let result = conn.send_wire_transaction_async(wire_transaction.clone());
|
|
||||||
if let Err(err) = result {
|
|
||||||
last_error = Some(err);
|
|
||||||
} else {
|
|
||||||
some_success = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !some_success {
|
|
||||||
Err(if let Some(err) = last_error {
|
|
||||||
err
|
|
||||||
} else {
|
|
||||||
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new client that disconnects when dropped
|
/// Create a new client that disconnects when dropped
|
||||||
|
@ -146,8 +88,16 @@ impl TpuClient {
|
||||||
websocket_url: &str,
|
websocket_url: &str,
|
||||||
config: TpuClientConfig,
|
config: TpuClientConfig,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let connection_cache = Arc::new(ConnectionCache::default());
|
let create_tpu_client =
|
||||||
Self::new_with_connection_cache(rpc_client, websocket_url, config, connection_cache)
|
NonblockingTpuClient::new(rpc_client.get_inner_client().clone(), websocket_url, config);
|
||||||
|
let tpu_client =
|
||||||
|
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
|
rpc_client,
|
||||||
|
tpu_client: Arc::new(tpu_client),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new client that disconnects when dropped
|
/// Create a new client that disconnects when dropped
|
||||||
|
@ -157,17 +107,19 @@ impl TpuClient {
|
||||||
config: TpuClientConfig,
|
config: TpuClientConfig,
|
||||||
connection_cache: Arc<ConnectionCache>,
|
connection_cache: Arc<ConnectionCache>,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let create_tpu_client = NonblockingTpuClient::new_with_connection_cache(
|
||||||
let leader_tpu_service =
|
rpc_client.get_inner_client().clone(),
|
||||||
LeaderTpuService::new(rpc_client.clone(), websocket_url, exit.clone())?;
|
websocket_url,
|
||||||
|
config,
|
||||||
|
connection_cache,
|
||||||
|
);
|
||||||
|
let tpu_client =
|
||||||
|
tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
_deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(),
|
||||||
fanout_slots: config.fanout_slots.min(MAX_FANOUT_SLOTS).max(1),
|
|
||||||
leader_tpu_service,
|
|
||||||
exit,
|
|
||||||
rpc_client,
|
rpc_client,
|
||||||
connection_cache,
|
tpu_client: Arc::new(tpu_client),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,332 +128,21 @@ impl TpuClient {
|
||||||
messages: &[Message],
|
messages: &[Message],
|
||||||
signers: &T,
|
signers: &T,
|
||||||
) -> Result<Vec<Option<TransactionError>>> {
|
) -> Result<Vec<Option<TransactionError>>> {
|
||||||
let mut expired_blockhash_retries = 5;
|
self.invoke(
|
||||||
|
self.tpu_client
|
||||||
let progress_bar = spinner::new_progress_bar();
|
.send_and_confirm_messages_with_spinner(messages, signers),
|
||||||
progress_bar.set_message("Setting up...");
|
)
|
||||||
|
|
||||||
let mut transactions = messages
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, message)| (i, Transaction::new_unsigned(message.clone())))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let total_transactions = transactions.len();
|
|
||||||
let mut transaction_errors = vec![None; transactions.len()];
|
|
||||||
let mut confirmed_transactions = 0;
|
|
||||||
let mut block_height = self.rpc_client.get_block_height()?;
|
|
||||||
|
|
||||||
while expired_blockhash_retries > 0 {
|
|
||||||
let (blockhash, last_valid_block_height) = self
|
|
||||||
.rpc_client
|
|
||||||
.get_latest_blockhash_with_commitment(self.rpc_client.commitment())?;
|
|
||||||
|
|
||||||
let mut pending_transactions = HashMap::new();
|
|
||||||
for (i, mut transaction) in transactions {
|
|
||||||
transaction.try_sign(signers, blockhash)?;
|
|
||||||
pending_transactions.insert(transaction.signatures[0], (i, transaction));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut last_resend = Instant::now() - TRANSACTION_RESEND_INTERVAL;
|
|
||||||
while block_height <= last_valid_block_height {
|
|
||||||
let num_transactions = pending_transactions.len();
|
|
||||||
|
|
||||||
// Periodically re-send all pending transactions
|
|
||||||
if Instant::now().duration_since(last_resend) > TRANSACTION_RESEND_INTERVAL {
|
|
||||||
for (index, (_i, transaction)) in pending_transactions.values().enumerate() {
|
|
||||||
if !self.send_transaction(transaction) {
|
|
||||||
let _result = self.rpc_client.send_transaction(transaction).ok();
|
|
||||||
}
|
|
||||||
spinner::set_message_for_confirmed_transactions(
|
|
||||||
&progress_bar,
|
|
||||||
confirmed_transactions,
|
|
||||||
total_transactions,
|
|
||||||
None, //block_height,
|
|
||||||
last_valid_block_height,
|
|
||||||
&format!("Sending {}/{} transactions", index + 1, num_transactions,),
|
|
||||||
);
|
|
||||||
sleep(SEND_TRANSACTION_INTERVAL);
|
|
||||||
}
|
|
||||||
last_resend = Instant::now();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the next block before checking for transaction statuses
|
|
||||||
let mut block_height_refreshes = 10;
|
|
||||||
spinner::set_message_for_confirmed_transactions(
|
|
||||||
&progress_bar,
|
|
||||||
confirmed_transactions,
|
|
||||||
total_transactions,
|
|
||||||
Some(block_height),
|
|
||||||
last_valid_block_height,
|
|
||||||
&format!("Waiting for next block, {} pending...", num_transactions),
|
|
||||||
);
|
|
||||||
let mut new_block_height = block_height;
|
|
||||||
while block_height == new_block_height && block_height_refreshes > 0 {
|
|
||||||
sleep(Duration::from_millis(500));
|
|
||||||
new_block_height = self.rpc_client.get_block_height()?;
|
|
||||||
block_height_refreshes -= 1;
|
|
||||||
}
|
|
||||||
block_height = new_block_height;
|
|
||||||
|
|
||||||
// Collect statuses for the transactions, drop those that are confirmed
|
|
||||||
let pending_signatures = pending_transactions.keys().cloned().collect::<Vec<_>>();
|
|
||||||
for pending_signatures_chunk in
|
|
||||||
pending_signatures.chunks(MAX_GET_SIGNATURE_STATUSES_QUERY_ITEMS)
|
|
||||||
{
|
|
||||||
if let Ok(result) = self
|
|
||||||
.rpc_client
|
|
||||||
.get_signature_statuses(pending_signatures_chunk)
|
|
||||||
{
|
|
||||||
let statuses = result.value;
|
|
||||||
for (signature, status) in
|
|
||||||
pending_signatures_chunk.iter().zip(statuses.into_iter())
|
|
||||||
{
|
|
||||||
if let Some(status) = status {
|
|
||||||
if status.satisfies_commitment(self.rpc_client.commitment()) {
|
|
||||||
if let Some((i, _)) = pending_transactions.remove(signature) {
|
|
||||||
confirmed_transactions += 1;
|
|
||||||
if status.err.is_some() {
|
|
||||||
progress_bar.println(format!(
|
|
||||||
"Failed transaction: {:?}",
|
|
||||||
status
|
|
||||||
));
|
|
||||||
}
|
|
||||||
transaction_errors[i] = status.err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
spinner::set_message_for_confirmed_transactions(
|
|
||||||
&progress_bar,
|
|
||||||
confirmed_transactions,
|
|
||||||
total_transactions,
|
|
||||||
Some(block_height),
|
|
||||||
last_valid_block_height,
|
|
||||||
"Checking transaction status...",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
if pending_transactions.is_empty() {
|
|
||||||
return Ok(transaction_errors);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
transactions = pending_transactions.into_iter().map(|(_k, v)| v).collect();
|
|
||||||
progress_bar.println(format!(
|
|
||||||
"Blockhash expired. {} retries remaining",
|
|
||||||
expired_blockhash_retries
|
|
||||||
));
|
|
||||||
expired_blockhash_retries -= 1;
|
|
||||||
}
|
|
||||||
Err(TpuSenderError::Custom("Max retries exceeded".into()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc_client(&self) -> &RpcClient {
|
pub fn rpc_client(&self) -> &RpcClient {
|
||||||
&self.rpc_client
|
&self.rpc_client
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for TpuClient {
|
fn invoke<T, F: std::future::Future<Output = T>>(&self, f: F) -> T {
|
||||||
fn drop(&mut self) {
|
// `block_on()` panics if called within an asynchronous execution context. Whereas
|
||||||
self.exit.store(true, Ordering::Relaxed);
|
// `block_in_place()` only panics if called from a current_thread runtime, which is the
|
||||||
self.leader_tpu_service.join();
|
// lesser evil.
|
||||||
}
|
tokio::task::block_in_place(move || self.rpc_client.runtime().block_on(f))
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct LeaderTpuCacheUpdateInfo {
|
|
||||||
pub(crate) maybe_cluster_nodes: Option<ClientResult<Vec<RpcContactInfo>>>,
|
|
||||||
pub(crate) maybe_epoch_info: Option<ClientResult<EpochInfo>>,
|
|
||||||
pub(crate) maybe_slot_leaders: Option<ClientResult<Vec<Pubkey>>>,
|
|
||||||
}
|
|
||||||
impl LeaderTpuCacheUpdateInfo {
|
|
||||||
pub(crate) fn has_some(&self) -> bool {
|
|
||||||
self.maybe_cluster_nodes.is_some()
|
|
||||||
|| self.maybe_epoch_info.is_some()
|
|
||||||
|| self.maybe_slot_leaders.is_some()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct LeaderTpuCache {
|
|
||||||
first_slot: Slot,
|
|
||||||
leaders: Vec<Pubkey>,
|
|
||||||
leader_tpu_map: HashMap<Pubkey, SocketAddr>,
|
|
||||||
slots_in_epoch: Slot,
|
|
||||||
last_epoch_info_slot: Slot,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LeaderTpuCache {
|
|
||||||
pub(crate) fn new(
|
|
||||||
first_slot: Slot,
|
|
||||||
slots_in_epoch: Slot,
|
|
||||||
leaders: Vec<Pubkey>,
|
|
||||||
cluster_nodes: Vec<RpcContactInfo>,
|
|
||||||
) -> Self {
|
|
||||||
let leader_tpu_map = Self::extract_cluster_tpu_sockets(cluster_nodes);
|
|
||||||
Self {
|
|
||||||
first_slot,
|
|
||||||
leaders,
|
|
||||||
leader_tpu_map,
|
|
||||||
slots_in_epoch,
|
|
||||||
last_epoch_info_slot: first_slot,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Last slot that has a cached leader pubkey
|
|
||||||
pub(crate) fn last_slot(&self) -> Slot {
|
|
||||||
self.first_slot + self.leaders.len().saturating_sub(1) as u64
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn slot_info(&self) -> (Slot, Slot, Slot) {
|
|
||||||
(
|
|
||||||
self.last_slot(),
|
|
||||||
self.last_epoch_info_slot,
|
|
||||||
self.slots_in_epoch,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the TPU sockets for the current leader and upcoming leaders according to fanout size
|
|
||||||
pub(crate) fn get_leader_sockets(
|
|
||||||
&self,
|
|
||||||
current_slot: Slot,
|
|
||||||
fanout_slots: u64,
|
|
||||||
) -> Vec<SocketAddr> {
|
|
||||||
let mut leader_set = HashSet::new();
|
|
||||||
let mut leader_sockets = Vec::new();
|
|
||||||
for leader_slot in current_slot..current_slot + fanout_slots {
|
|
||||||
if let Some(leader) = self.get_slot_leader(leader_slot) {
|
|
||||||
if let Some(tpu_socket) = self.leader_tpu_map.get(leader) {
|
|
||||||
if leader_set.insert(*leader) {
|
|
||||||
leader_sockets.push(*tpu_socket);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// The leader is probably delinquent
|
|
||||||
trace!("TPU not available for leader {}", leader);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Overran the local leader schedule cache
|
|
||||||
warn!(
|
|
||||||
"Leader not known for slot {}; cache holds slots [{},{}]",
|
|
||||||
leader_slot,
|
|
||||||
self.first_slot,
|
|
||||||
self.last_slot()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
leader_sockets
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn get_slot_leader(&self, slot: Slot) -> Option<&Pubkey> {
|
|
||||||
if slot >= self.first_slot {
|
|
||||||
let index = slot - self.first_slot;
|
|
||||||
self.leaders.get(index as usize)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn extract_cluster_tpu_sockets(
|
|
||||||
cluster_contact_info: Vec<RpcContactInfo>,
|
|
||||||
) -> HashMap<Pubkey, SocketAddr> {
|
|
||||||
cluster_contact_info
|
|
||||||
.into_iter()
|
|
||||||
.filter_map(|contact_info| {
|
|
||||||
Some((
|
|
||||||
Pubkey::from_str(&contact_info.pubkey).ok()?,
|
|
||||||
contact_info.tpu?,
|
|
||||||
))
|
|
||||||
})
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn fanout(slots_in_epoch: Slot) -> Slot {
|
|
||||||
(2 * MAX_FANOUT_SLOTS).min(slots_in_epoch)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn update_all(
|
|
||||||
&mut self,
|
|
||||||
estimated_current_slot: Slot,
|
|
||||||
cache_update_info: LeaderTpuCacheUpdateInfo,
|
|
||||||
) -> (bool, bool) {
|
|
||||||
let mut has_error = false;
|
|
||||||
let mut cluster_refreshed = false;
|
|
||||||
if let Some(cluster_nodes) = cache_update_info.maybe_cluster_nodes {
|
|
||||||
match cluster_nodes {
|
|
||||||
Ok(cluster_nodes) => {
|
|
||||||
let leader_tpu_map = LeaderTpuCache::extract_cluster_tpu_sockets(cluster_nodes);
|
|
||||||
self.leader_tpu_map = leader_tpu_map;
|
|
||||||
cluster_refreshed = true;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Failed to fetch cluster tpu sockets: {}", err);
|
|
||||||
has_error = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(Ok(epoch_info)) = cache_update_info.maybe_epoch_info {
|
|
||||||
self.slots_in_epoch = epoch_info.slots_in_epoch;
|
|
||||||
self.last_epoch_info_slot = estimated_current_slot;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(slot_leaders) = cache_update_info.maybe_slot_leaders {
|
|
||||||
match slot_leaders {
|
|
||||||
Ok(slot_leaders) => {
|
|
||||||
self.first_slot = estimated_current_slot;
|
|
||||||
self.leaders = slot_leaders;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
warn!(
|
|
||||||
"Failed to fetch slot leaders (current estimated slot: {}): {}",
|
|
||||||
estimated_current_slot, err
|
|
||||||
);
|
|
||||||
has_error = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(has_error, cluster_refreshed)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn maybe_fetch_cache_info(
|
|
||||||
leader_tpu_cache: &Arc<RwLock<LeaderTpuCache>>,
|
|
||||||
last_cluster_refresh: Instant,
|
|
||||||
rpc_client: &RpcClient,
|
|
||||||
recent_slots: &RecentLeaderSlots,
|
|
||||||
) -> LeaderTpuCacheUpdateInfo {
|
|
||||||
// Refresh cluster TPU ports every 5min in case validators restart with new port configuration
|
|
||||||
// or new validators come online
|
|
||||||
let maybe_cluster_nodes = if last_cluster_refresh.elapsed() > Duration::from_secs(5 * 60) {
|
|
||||||
Some(rpc_client.get_cluster_nodes())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let estimated_current_slot = recent_slots.estimated_current_slot();
|
|
||||||
let (last_slot, last_epoch_info_slot, slots_in_epoch) = {
|
|
||||||
let leader_tpu_cache = leader_tpu_cache.read().unwrap();
|
|
||||||
leader_tpu_cache.slot_info()
|
|
||||||
};
|
|
||||||
let maybe_epoch_info =
|
|
||||||
if estimated_current_slot >= last_epoch_info_slot.saturating_sub(slots_in_epoch) {
|
|
||||||
Some(rpc_client.get_epoch_info())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let maybe_slot_leaders = if estimated_current_slot >= last_slot.saturating_sub(MAX_FANOUT_SLOTS)
|
|
||||||
{
|
|
||||||
Some(rpc_client.get_slot_leaders(
|
|
||||||
estimated_current_slot,
|
|
||||||
LeaderTpuCache::fanout(slots_in_epoch),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
LeaderTpuCacheUpdateInfo {
|
|
||||||
maybe_cluster_nodes,
|
|
||||||
maybe_epoch_info,
|
|
||||||
maybe_slot_leaders,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -559,126 +200,6 @@ impl From<Vec<Slot>> for RecentLeaderSlots {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Service that tracks upcoming leaders and maintains an up-to-date mapping
|
|
||||||
/// of leader id to TPU socket address.
|
|
||||||
struct LeaderTpuService {
|
|
||||||
recent_slots: RecentLeaderSlots,
|
|
||||||
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
|
|
||||||
subscription: Option<PubsubClientSubscription<SlotUpdate>>,
|
|
||||||
t_leader_tpu_service: Option<JoinHandle<()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LeaderTpuService {
|
|
||||||
fn new(rpc_client: Arc<RpcClient>, websocket_url: &str, exit: Arc<AtomicBool>) -> Result<Self> {
|
|
||||||
let start_slot = rpc_client.get_slot_with_commitment(CommitmentConfig::processed())?;
|
|
||||||
|
|
||||||
let recent_slots = RecentLeaderSlots::new(start_slot);
|
|
||||||
let slots_in_epoch = rpc_client.get_epoch_info()?.slots_in_epoch;
|
|
||||||
let leaders =
|
|
||||||
rpc_client.get_slot_leaders(start_slot, LeaderTpuCache::fanout(slots_in_epoch))?;
|
|
||||||
let cluster_nodes = rpc_client.get_cluster_nodes()?;
|
|
||||||
let leader_tpu_cache = Arc::new(RwLock::new(LeaderTpuCache::new(
|
|
||||||
start_slot,
|
|
||||||
slots_in_epoch,
|
|
||||||
leaders,
|
|
||||||
cluster_nodes,
|
|
||||||
)));
|
|
||||||
|
|
||||||
let subscription = if !websocket_url.is_empty() {
|
|
||||||
let recent_slots = recent_slots.clone();
|
|
||||||
Some(PubsubClient::slot_updates_subscribe(
|
|
||||||
websocket_url,
|
|
||||||
move |update| {
|
|
||||||
let current_slot = match update {
|
|
||||||
// This update indicates that a full slot was received by the connected
|
|
||||||
// node so we can stop sending transactions to the leader for that slot
|
|
||||||
SlotUpdate::Completed { slot, .. } => slot.saturating_add(1),
|
|
||||||
// This update indicates that we have just received the first shred from
|
|
||||||
// the leader for this slot and they are probably still accepting transactions.
|
|
||||||
SlotUpdate::FirstShredReceived { slot, .. } => slot,
|
|
||||||
_ => return,
|
|
||||||
};
|
|
||||||
recent_slots.record_slot(current_slot);
|
|
||||||
},
|
|
||||||
)?)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let t_leader_tpu_service = Some({
|
|
||||||
let recent_slots = recent_slots.clone();
|
|
||||||
let leader_tpu_cache = leader_tpu_cache.clone();
|
|
||||||
std::thread::Builder::new()
|
|
||||||
.name("ldr-tpu-srv".to_string())
|
|
||||||
.spawn(move || Self::run(rpc_client, recent_slots, leader_tpu_cache, exit))
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(LeaderTpuService {
|
|
||||||
recent_slots,
|
|
||||||
leader_tpu_cache,
|
|
||||||
subscription,
|
|
||||||
t_leader_tpu_service,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn join(&mut self) {
|
|
||||||
if let Some(mut subscription) = self.subscription.take() {
|
|
||||||
let _ = subscription.send_unsubscribe();
|
|
||||||
let _ = subscription.shutdown();
|
|
||||||
}
|
|
||||||
if let Some(t_handle) = self.t_leader_tpu_service.take() {
|
|
||||||
t_handle.join().unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec<SocketAddr> {
|
|
||||||
let current_slot = self.recent_slots.estimated_current_slot();
|
|
||||||
self.leader_tpu_cache
|
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.get_leader_sockets(current_slot, fanout_slots)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn run(
|
|
||||||
rpc_client: Arc<RpcClient>,
|
|
||||||
recent_slots: RecentLeaderSlots,
|
|
||||||
leader_tpu_cache: Arc<RwLock<LeaderTpuCache>>,
|
|
||||||
exit: Arc<AtomicBool>,
|
|
||||||
) {
|
|
||||||
let mut last_cluster_refresh = Instant::now();
|
|
||||||
let mut sleep_ms = 1000;
|
|
||||||
loop {
|
|
||||||
if exit.load(Ordering::Relaxed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sleep a few slots before checking if leader cache needs to be refreshed again
|
|
||||||
sleep(Duration::from_millis(sleep_ms));
|
|
||||||
sleep_ms = 1000;
|
|
||||||
|
|
||||||
let cache_update_info = maybe_fetch_cache_info(
|
|
||||||
&leader_tpu_cache,
|
|
||||||
last_cluster_refresh,
|
|
||||||
&rpc_client,
|
|
||||||
&recent_slots,
|
|
||||||
);
|
|
||||||
|
|
||||||
if cache_update_info.has_some() {
|
|
||||||
let mut leader_tpu_cache = leader_tpu_cache.write().unwrap();
|
|
||||||
let (has_error, cluster_refreshed) = leader_tpu_cache
|
|
||||||
.update_all(recent_slots.estimated_current_slot(), cache_update_info);
|
|
||||||
if has_error {
|
|
||||||
sleep_ms = 100;
|
|
||||||
}
|
|
||||||
if cluster_refreshed {
|
|
||||||
last_cluster_refresh = Instant::now();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
Loading…
Reference in New Issue