2023-08-31 03:34:13 -07:00
|
|
|
use anyhow::Context;
|
2023-06-11 17:41:49 -07:00
|
|
|
|
2023-06-12 12:30:47 -07:00
|
|
|
use super::tpu_connection_manager::TpuConnectionManager;
|
2023-08-02 06:22:59 -07:00
|
|
|
use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
|
|
|
|
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
|
|
|
|
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
|
2023-08-31 03:34:13 -07:00
|
|
|
use solana_lite_rpc_core::data_cache::DataCache;
|
|
|
|
use solana_lite_rpc_core::leaders_fetcher_trait::LeaderFetcherInterface;
|
|
|
|
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
|
|
|
|
use solana_lite_rpc_core::streams::SlotStream;
|
|
|
|
use solana_lite_rpc_core::AnyhowJoinHandle;
|
|
|
|
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
|
2023-06-08 02:27:02 -07:00
|
|
|
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
|
2023-04-03 02:13:01 -07:00
|
|
|
use std::{
|
|
|
|
net::{IpAddr, Ipv4Addr},
|
2023-08-31 03:34:13 -07:00
|
|
|
sync::Arc,
|
2023-04-03 02:13:01 -07:00
|
|
|
};
|
2023-08-31 03:34:13 -07:00
|
|
|
use tokio::time::Duration;
|
2023-04-03 02:13:01 -07:00
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
#[derive(Clone, Copy)]
|
|
|
|
pub struct TpuServiceConfig {
|
|
|
|
pub fanout_slots: u64,
|
|
|
|
pub number_of_leaders_to_cache: usize,
|
|
|
|
pub clusterinfo_refresh_time: Duration,
|
|
|
|
pub leader_schedule_update_frequency: Duration,
|
|
|
|
pub maximum_transaction_in_queue: usize,
|
|
|
|
pub maximum_number_of_errors: usize,
|
|
|
|
pub quic_connection_params: QuicConnectionParameters,
|
2023-07-20 05:55:47 -07:00
|
|
|
pub tpu_connection_path: TpuConnectionPath,
|
2023-07-20 04:45:54 -07:00
|
|
|
}
|
|
|
|
|
2023-04-03 02:13:01 -07:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct TpuService {
|
2023-04-30 23:11:19 -07:00
|
|
|
broadcast_sender: Arc<tokio::sync::broadcast::Sender<(String, Vec<u8>)>>,
|
2023-07-20 08:54:26 -07:00
|
|
|
connection_manager: ConnectionManager,
|
2023-08-31 03:34:13 -07:00
|
|
|
leader_schedule: Arc<dyn LeaderFetcherInterface>,
|
2023-07-20 04:45:54 -07:00
|
|
|
config: TpuServiceConfig,
|
2023-08-31 03:34:13 -07:00
|
|
|
data_cache: DataCache,
|
2023-04-03 02:13:01 -07:00
|
|
|
}
|
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
#[derive(Clone)]
|
|
|
|
enum ConnectionManager {
|
2023-08-02 06:22:59 -07:00
|
|
|
DirectTpu {
|
|
|
|
tpu_connection_manager: Arc<TpuConnectionManager>,
|
|
|
|
},
|
|
|
|
QuicProxy {
|
|
|
|
quic_proxy_connection_manager: Arc<QuicProxyConnectionManager>,
|
|
|
|
},
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
|
|
|
|
2023-04-03 02:13:01 -07:00
|
|
|
impl TpuService {
|
|
|
|
pub async fn new(
|
2023-07-20 04:45:54 -07:00
|
|
|
config: TpuServiceConfig,
|
|
|
|
identity: Arc<Keypair>,
|
2023-08-31 03:34:13 -07:00
|
|
|
leader_schedule: Arc<dyn LeaderFetcherInterface>,
|
|
|
|
data_cache: DataCache,
|
2023-04-03 02:13:01 -07:00
|
|
|
) -> anyhow::Result<Self> {
|
2023-07-20 04:45:54 -07:00
|
|
|
let (sender, _) = tokio::sync::broadcast::channel(config.maximum_transaction_in_queue);
|
2023-04-03 10:07:50 -07:00
|
|
|
let (certificate, key) = new_self_signed_tls_certificate(
|
2023-07-20 04:45:54 -07:00
|
|
|
identity.as_ref(),
|
2023-04-03 10:07:50 -07:00
|
|
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
|
|
|
|
)
|
2023-07-20 04:45:54 -07:00
|
|
|
.expect("Failed to initialize QUIC client certificates");
|
2023-04-03 02:13:01 -07:00
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
let connection_manager = match config.tpu_connection_path {
|
|
|
|
TpuConnectionPath::QuicDirectPath => {
|
|
|
|
let tpu_connection_manager =
|
|
|
|
TpuConnectionManager::new(certificate, key, config.fanout_slots as usize).await;
|
|
|
|
DirectTpu {
|
|
|
|
tpu_connection_manager: Arc::new(tpu_connection_manager),
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|
|
|
|
TpuConnectionPath::QuicForwardProxyPath {
|
|
|
|
forward_proxy_address,
|
|
|
|
} => {
|
|
|
|
let quic_proxy_connection_manager =
|
|
|
|
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;
|
2023-07-20 08:54:26 -07:00
|
|
|
|
2023-08-02 06:22:59 -07:00
|
|
|
QuicProxy {
|
|
|
|
quic_proxy_connection_manager: Arc::new(quic_proxy_connection_manager),
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|
|
|
|
};
|
2023-04-03 02:13:01 -07:00
|
|
|
|
|
|
|
Ok(Self {
|
2023-08-31 03:34:13 -07:00
|
|
|
leader_schedule,
|
2023-04-03 02:13:01 -07:00
|
|
|
broadcast_sender: Arc::new(sender),
|
2023-07-20 08:54:26 -07:00
|
|
|
connection_manager,
|
2023-07-20 04:45:54 -07:00
|
|
|
config,
|
2023-08-31 03:34:13 -07:00
|
|
|
data_cache,
|
2023-04-03 02:13:01 -07:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-04-30 23:11:19 -07:00
|
|
|
pub fn send_transaction(&self, signature: String, transaction: Vec<u8>) -> anyhow::Result<()> {
|
|
|
|
self.broadcast_sender.send((signature, transaction))?;
|
2023-04-03 02:13:01 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
// update/reconfigure connections on slot change
|
2023-08-31 03:34:13 -07:00
|
|
|
async fn update_quic_connections(
|
|
|
|
&self,
|
|
|
|
current_slot: Slot,
|
|
|
|
estimated_slot: Slot,
|
|
|
|
) -> anyhow::Result<()> {
|
2023-04-03 02:13:01 -07:00
|
|
|
let load_slot = if estimated_slot <= current_slot {
|
|
|
|
current_slot
|
2023-04-26 02:18:40 -07:00
|
|
|
} else if estimated_slot.saturating_sub(current_slot) > 8 {
|
2023-04-08 03:36:43 -07:00
|
|
|
estimated_slot - 8
|
2023-04-03 02:13:01 -07:00
|
|
|
} else {
|
2023-04-08 03:36:43 -07:00
|
|
|
current_slot
|
2023-04-03 02:13:01 -07:00
|
|
|
};
|
2023-04-08 03:36:43 -07:00
|
|
|
|
2023-07-20 04:45:54 -07:00
|
|
|
let fanout = self.config.fanout_slots;
|
2023-04-11 10:58:20 -07:00
|
|
|
let last_slot = estimated_slot + fanout;
|
2023-04-03 02:13:01 -07:00
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
let cluster_nodes = self.data_cache.cluster_info.cluster_nodes.clone();
|
|
|
|
|
|
|
|
let next_leaders = self
|
|
|
|
.leader_schedule
|
|
|
|
.get_slot_leaders(load_slot, last_slot)
|
|
|
|
.await?;
|
|
|
|
// get next leader with its tpu port
|
2023-04-03 02:13:01 -07:00
|
|
|
let connections_to_keep = next_leaders
|
2023-08-31 03:34:13 -07:00
|
|
|
.iter()
|
|
|
|
.map(|x| {
|
|
|
|
let contact_info = cluster_nodes.get(&x.pubkey);
|
|
|
|
let tpu_port = match contact_info {
|
|
|
|
Some(info) => info.tpu,
|
|
|
|
_ => None,
|
|
|
|
};
|
|
|
|
(x.pubkey, tpu_port)
|
|
|
|
})
|
|
|
|
.filter(|x| x.1.is_some())
|
2023-04-03 02:13:01 -07:00
|
|
|
.map(|x| {
|
2023-08-31 03:34:13 -07:00
|
|
|
let mut addr = x.1.unwrap();
|
2023-04-03 10:01:42 -07:00
|
|
|
// add quic port offset
|
|
|
|
addr.set_port(addr.port() + QUIC_PORT_OFFSET);
|
2023-08-31 03:34:13 -07:00
|
|
|
(x.0, addr)
|
2023-04-03 02:13:01 -07:00
|
|
|
})
|
|
|
|
.collect();
|
2023-04-13 07:18:43 -07:00
|
|
|
|
2023-07-20 08:54:26 -07:00
|
|
|
match &self.connection_manager {
|
|
|
|
DirectTpu {
|
|
|
|
tpu_connection_manager,
|
|
|
|
} => {
|
|
|
|
tpu_connection_manager
|
|
|
|
.update_connections(
|
|
|
|
self.broadcast_sender.clone(),
|
|
|
|
connections_to_keep,
|
2023-08-31 03:34:13 -07:00
|
|
|
self.data_cache.identity_stakes.get_stakes().await,
|
|
|
|
self.data_cache.txs.clone(),
|
2023-07-20 08:54:26 -07:00
|
|
|
self.config.quic_connection_params,
|
|
|
|
)
|
|
|
|
.await;
|
2023-08-02 06:22:59 -07:00
|
|
|
}
|
|
|
|
QuicProxy {
|
|
|
|
quic_proxy_connection_manager,
|
|
|
|
} => {
|
2023-08-10 01:31:31 -07:00
|
|
|
let transaction_receiver = self.broadcast_sender.subscribe();
|
2023-08-02 06:22:59 -07:00
|
|
|
quic_proxy_connection_manager
|
|
|
|
.update_connection(
|
2023-08-10 01:31:31 -07:00
|
|
|
transaction_receiver,
|
2023-08-02 06:22:59 -07:00
|
|
|
connections_to_keep,
|
|
|
|
self.config.quic_connection_params,
|
|
|
|
)
|
|
|
|
.await;
|
2023-07-20 08:54:26 -07:00
|
|
|
}
|
|
|
|
}
|
2023-08-31 03:34:13 -07:00
|
|
|
Ok(())
|
2023-04-03 02:13:01 -07:00
|
|
|
}
|
|
|
|
|
2023-08-31 03:34:13 -07:00
|
|
|
pub fn start(&self, slot_notifications: SlotStream) -> AnyhowJoinHandle {
|
2023-04-03 02:13:01 -07:00
|
|
|
let this = self.clone();
|
2023-08-31 03:34:13 -07:00
|
|
|
tokio::spawn(async move {
|
|
|
|
let mut slot_notifications = slot_notifications;
|
2023-04-03 02:13:01 -07:00
|
|
|
loop {
|
2023-08-31 03:34:13 -07:00
|
|
|
let notification = slot_notifications
|
|
|
|
.recv()
|
|
|
|
.await
|
|
|
|
.context("Tpu service cannot get slot notification")?;
|
|
|
|
this.update_quic_connections(
|
|
|
|
notification.processed_slot,
|
|
|
|
notification.estimated_processed_slot,
|
2023-04-03 02:13:01 -07:00
|
|
|
)
|
2023-08-31 03:34:13 -07:00
|
|
|
.await?;
|
2023-04-03 02:13:01 -07:00
|
|
|
}
|
2023-08-31 03:34:13 -07:00
|
|
|
})
|
2023-04-30 23:11:19 -07:00
|
|
|
}
|
2023-04-03 02:13:01 -07:00
|
|
|
}
|